| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556 |
- // Copyright 2015 The MemDB Authors.
- //
- // Licensed under the Apache License, Version 2.0 (the "License");
- // you may not use this file except in compliance with the License.
- // You may obtain a copy of the License at
- //
- // http://www.apache.org/licenses/LICENSE-2.0
- //
- // Unless required by applicable law or agreed to in writing, software
- // distributed under the License is distributed on an "AS IS" BASIS,
- // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- // implied. See the License for the specific language governing
- // permissions and limitations under the License. See the AUTHORS file
- // for names of contributors.
- 'use strict';
- var P = require('bluebird');
- var Logger = require('memdb-logger');
- var util = require('util');
- var utils = require('./utils');
- var EventEmitter = require('events').EventEmitter;
- var DEFAULT_MAX_COLLISION = 10000;
- var Collection = function(opts){
- opts = opts || {};
- this.name = opts.name;
- this._checkName(this.name);
- this.shard = opts.shard;
- this.conn = opts.conn;
- this.config = opts.config || {};
- this.config.maxCollision = this.config.maxCollision || DEFAULT_MAX_COLLISION;
- // {indexKey : {indexValue : {id1 : 1, id2 : -1}}}
- this.changedIndexes = utils.forceHashMap();
- this.pendingIndexTasks = utils.forceHashMap(); //{id, [Promise]}
- this.updateIndexEvent = 'updateIndex$' + this.name + '$' + this.conn._id;
- this.shard.on(this.updateIndexEvent, this.onUpdateIndex.bind(this));
- this.logger = Logger.getLogger('memdb', __filename, 'shard:' + this.shard._id);
- EventEmitter.call(this);
- };
- util.inherits(Collection, EventEmitter);
- var proto = Collection.prototype;
- proto.close = function(){
- this.shard.removeListener(this.updateIndexEvent, this.onUpdateIndex);
- };
- proto.insert = function(docs){
- if(!Array.isArray(docs)){
- return this._insertById(docs && docs._id, docs);
- }
- var self = this;
- return P.mapSeries(docs, function(doc){ //disable concurrent to avoid race condition
- return self._insertById(doc && doc._id, doc);
- });
- };
- proto._insertById = function(id, doc){
- if(!utils.isDict(doc)){
- throw new Error('doc must be a dictionary');
- }
- if(id === null || id === undefined){
- id = utils.uuid();
- }
- id = this._checkId(id);
- doc._id = id;
- var self = this;
- return P.try(function(){
- return self.lock(id);
- })
- .then(function(){
- return self.shard.insert(self.conn._id, self._key(id), doc);
- })
- .then(function(){
- return self._finishIndexTasks(id);
- })
- .thenReturn(id);
- };
- proto.find = function(query, fields, opts){
- if(typeof(query) === 'number' || typeof(query) === 'string'){
- return this.findById(query, fields, opts);
- }
- if(!utils.isDict(query)){
- throw new Error('invalid query');
- }
- if(query.hasOwnProperty('_id')){
- return this.findById(query._id, fields, opts)
- .then(function(doc){
- if(!doc){
- return [];
- }
- return [doc];
- });
- }
- var keys = Object.keys(query).sort();
- if(keys.length === 0){
- throw new Error('You must specify query key');
- }
- var indexKey = JSON.stringify(keys);
- var indexConfig = this.config.indexes && this.config.indexes[indexKey];
- if(!indexConfig){
- throw new Error('No index configured for keys - ' + indexKey);
- }
- var valueIgnore = indexConfig.valueIgnore || {};
- var values = keys.map(function(key){
- var value = query[key];
- if(value === null || value === undefined){
- throw new Error('query value can not be null or undefined');
- }
- var ignores = valueIgnore[key] || [];
- if(ignores.indexOf(value) !== -1){
- throw new Error('value ' + value + ' for key ' + key + ' is ignored in index');
- }
- return value;
- });
- var indexValue = JSON.stringify(values);
- return this._findByIndex(indexKey, indexValue, fields, opts);
- };
- proto.findOne = function(query, fields, opts){
- opts = opts || {};
- opts.limit = 1;
- return this.find(query, fields, opts)
- .then(function(docs){
- if(!Array.isArray(docs)){
- return docs;
- }
- if(docs.length === 0){
- return null;
- }
- return docs[0];
- });
- };
- proto.findById = function(id, fields, opts){
- id = this._checkId(id);
- var self = this;
- return P.try(function(){
- if(opts && opts.readonly){
- return self.shard.findReadOnly(self.conn._id, self._key(id), fields);
- }
- return P.try(function(){
- if(opts && opts.nolock){
- return;
- }
- return self.lock(id);
- })
- .then(function(){
- return self.shard.find(self.conn._id, self._key(id), fields, opts);
- });
- });
- };
- proto.findReadOnly = function(query, fields, opts){
- opts = opts || {};
- opts.readonly = true;
- return this.find(query, fields, opts);
- };
- proto.findOneReadOnly = function(query, fields, opts){
- opts = opts || {};
- opts.readonly = true;
- return this.findOne(query, fields, opts);
- };
- proto.findByIdReadOnly = function(id, fields, opts){
- opts = opts || {};
- opts.readonly = true;
- return this.findById(id, fields, opts);
- };
- proto.count = function(query, opts){
- opts = opts || {};
- opts.count = true;
- var self = this;
- return P.try(function(){
- return self.find(query, null, opts);
- })
- .then(function(ret){
- if(typeof(ret) === 'number'){
- return ret;
- }
- else if(Array.isArray(ret)){
- return ret.length;
- }
- else if(!ret){
- return 0;
- }
- throw new Error('Unexpected find result - ' + ret);
- });
- };
- proto._findByIndex = function(indexKey, indexValue, fields, opts){
- opts = opts || {};
- var self = this;
- var indexCollection = this.conn.getCollection(this._indexCollectionName(indexKey), true);
- var nolock = opts.nolock;
- return P.try(function(){
- opts.nolock = true; // force not using lock
- return indexCollection.findById(indexValue, 'format ids', opts);
- })
- .then(function(doc){
- opts.nolock = nolock; // restore param
- var ids = utils.forceHashMap();
- if(doc){
- doc.ids.forEach(function(id){
- ids[id] = 1;
- });
- }
- var changedIds = (self.changedIndexes[indexKey] && self.changedIndexes[indexKey][indexValue]) || {};
- for(var id in changedIds){
- if(changedIds[id] === 1){
- ids[id] = 1;
- }
- else{
- delete ids[id];
- }
- }
- ids = Object.keys(ids);
- if(opts && opts.count){ // return count only
- return ids.length;
- }
- if(opts && opts.limit){
- ids = ids.slice(0, opts.limit);
- }
- var docs = [];
- ids.sort(); // keep id in order, avoid deadlock
- return P.each(ids, function(id){
- return self.findById(id, fields, opts)
- .then(function(doc){
- // WARN: This is possible that doc is null due to index collection is not locked
- if(!!doc){
- docs.push(doc);
- }
- });
- })
- .thenReturn(docs);
- });
- };
- proto.update = function(query, modifier, opts){
- opts = opts || {};
- var self = this;
- return P.try(function(){
- return self.find(query, '_id');
- })
- .then(function(ret){
- if(!ret || ret.length === 0){
- if(!opts.upsert){
- return 0;
- }
- // upsert
- if(typeof(query) === 'string' || typeof(query) === 'number'){
- query = {_id : query};
- }
- return self.insert(query)
- .then(function(id){
- return self._updateById(id, modifier, opts);
- })
- .thenReturn(1);
- }
- if(!Array.isArray(ret)){
- return self._updateById(ret._id, modifier, opts)
- .thenReturn(1);
- }
- return P.each(ret, function(doc){
- return self._updateById(doc._id, modifier, opts);
- })
- .thenReturn(ret.length);
- });
- };
- proto._updateById = function(id, modifier, opts){
- id = this._checkId(id);
- var self = this;
- return P.try(function(){
- return self.shard.update(self.conn._id, self._key(id), modifier, opts);
- })
- .then(function(){
- return self._finishIndexTasks(id);
- });
- };
- proto.remove = function(query, opts){
- var self = this;
- return P.try(function(){
- return self.find(query, '_id');
- })
- .then(function(ret){
- if(!ret || ret.length === 0){
- return 0;
- }
- if(!Array.isArray(ret)){
- return self._removeById(ret._id, opts)
- .thenReturn(1);
- }
- return P.each(ret, function(doc){
- return self._removeById(doc._id, opts);
- })
- .thenReturn(ret.length);
- });
- };
- proto._removeById = function(id, opts){
- id = this._checkId(id);
- var self = this;
- return P.try(function(){
- return self.shard.remove(self.conn._id, self._key(id), opts);
- })
- .then(function(){
- return self._finishIndexTasks(id);
- });
- };
- proto.lock = function(id){
- id = this._checkId(id);
- if(this.shard.isLocked(this.conn._id, this._key(id))){
- return;
- }
- var self = this;
- return this.shard.lock(this.conn._id, this._key(id))
- .then(function(ret){
- self.emit('lock', id);
- return ret;
- });
- };
- proto.onUpdateIndex = function(id, indexKey, oldValue, newValue){
- this.logger.debug('onUpdateIndex(%s, %s, %s, %s)', id, indexKey, oldValue, newValue);
- var self = this;
- var promise = P.try(function(){
- var config = self.config.indexes[indexKey];
- if(!config){
- throw new Error('index - ' + indexKey + ' not configured');
- }
- if(!self.changedIndexes[indexKey]){
- self.changedIndexes[indexKey] = utils.forceHashMap();
- }
- var changedIndex = self.changedIndexes[indexKey];
- if(oldValue !== null){
- if(!changedIndex[oldValue]){
- changedIndex[oldValue] = utils.forceHashMap();
- }
- if(changedIndex[oldValue][id] === 1){
- delete changedIndex[oldValue][id];
- }
- else{
- changedIndex[oldValue][id] = -1;
- }
- }
- if(newValue !== null){
- if(!changedIndex[newValue]){
- changedIndex[newValue] = utils.forceHashMap();
- }
- if(changedIndex[newValue][id] === -1){
- delete changedIndex[oldValue][id];
- }
- else{
- changedIndex[newValue][id] = 1;
- }
- }
- if(!config.unique){
- return;
- }
- return P.try(function(){
- if(oldValue !== null){
- return self.commitOneIndex(indexKey, oldValue, changedIndex[oldValue], config)
- .then(function(){
- delete changedIndex[oldValue];
- });
- }
- })
- .then(function(){
- if(newValue !== null){
- return self.commitOneIndex(indexKey, newValue, changedIndex[newValue], config)
- .then(function(){
- delete changedIndex[newValue];
- });
- }
- });
- });
- if(!this.pendingIndexTasks[id]){
- this.pendingIndexTasks[id] = [];
- }
- this.pendingIndexTasks[id].push(promise);
- };
- proto.commitIndex = function(){
- var self = this;
- // must update in sorted order to avoid dead lock
- return P.each(Object.keys(this.changedIndexes).sort(), function(indexKey){
- var changedIndex = self.changedIndexes[indexKey];
- var config = self.config.indexes[indexKey];
- return P.each(Object.keys(changedIndex).sort(), function(indexValue){
- var changedIds = changedIndex[indexValue];
- return self.commitOneIndex(indexKey, indexValue, changedIds, config);
- });
- })
- .then(function(){
- self.changedIndexes = utils.forceHashMap();
- });
- };
- proto.rollbackIndex = function(){
- this.changedIndexes = utils.forceHashMap();
- };
- // indexKey: json encoded sorted fields array
- // indexValue: json encoded sorted fields value
- proto.commitOneIndex = function(indexKey, indexValue, changedIds, config){
- var indexCollection = this.conn.getCollection(this._indexCollectionName(indexKey), true);
- var modifier = {$pushAll : {ids : []}, $pullAll : {ids : []}};
- var countDelta = 0;
- for(var id in changedIds){
- if(changedIds[id] === 1){
- modifier.$pushAll.ids.push(id);
- countDelta++;
- }
- else{
- modifier.$pullAll.ids.push(id);
- countDelta--;
- }
- }
- var self = this;
- return P.try(function(){
- return indexCollection.find(indexValue, 'ids');
- })
- .then(function(ret){
- var oldCount = ret ? ret.ids.length : 0;
- var newCount = oldCount + countDelta;
- if(config.unique && newCount > 1){
- throw new Error('duplicate value - ' + indexValue + ' for unique index - ' + indexKey);
- }
- if(newCount > config.maxCollision){
- throw new Error('too many documents have value - ' + indexValue + ' for index - ' + indexKey);
- }
- if(newCount > 0){
- return indexCollection.update(indexValue, modifier, {upsert : true});
- }
- else if(newCount === 0){
- return indexCollection.remove(indexValue);
- }
- else{
- throw new Error(util.format('index corrupted: %s %s, please rebuild index', self.name, indexKey));
- }
- });
- };
- proto._finishIndexTasks = function(id){
- if(!this.pendingIndexTasks[id]){
- return;
- }
- // Save domain
- var d = process.domain;
- var self = this;
- return P.each(self.pendingIndexTasks[id], function(promise){
- return promise;
- })
- .finally(function(){
- delete self.pendingIndexTasks[id];
- // Restore domain
- process.domain = d;
- });
- };
- // 'index.name.key1.key2'
- proto._indexCollectionName = function(indexKey){
- var keys = JSON.parse(indexKey).map(function(key){
- return utils.escapeField(key);
- });
- return 'index.' + utils.escapeField(this.name) + '.' + keys.join('.');
- };
- proto._key = function(id){
- return this.name + '$' + id;
- };
- proto._checkId = function(id){
- if(typeof(id) === 'string'){
- return id;
- }
- else if(typeof(id) === 'number'){
- return id.toString();
- }
- throw new Error('id must be number or string');
- };
- //http://docs.mongodb.org/manual/reference/limits/#Restriction-on-Collection-Names
- proto._checkName = function(name){
- if(!name){
- throw new Error('Collection name can not empty');
- }
- if(typeof(name) !== 'string'){
- throw new Error('Collection name must be string');
- }
- if(name.indexOf('$') !== -1){
- throw new Error('Collection name can not contain "$"');
- }
- if(name.indexOf('system.') === 0){
- throw new Error('Collection name can not begin with "system."');
- }
- };
- module.exports = Collection;
|