| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329 |
- // Copyright 2015 The MemDB Authors.
- //
- // Licensed under the Apache License, Version 2.0 (the "License");
- // you may not use this file except in compliance with the License.
- // You may obtain a copy of the License at
- //
- // http://www.apache.org/licenses/LICENSE-2.0
- //
- // Unless required by applicable law or agreed to in writing, software
- // distributed under the License is distributed on an "AS IS" BASIS,
- // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- // implied. See the License for the specific language governing
- // permissions and limitations under the License. See the AUTHORS file
- // for names of contributors.
- 'use strict';
- var P = require('bluebird');
- var util = require('util');
- var utils = require('./utils');
- var AsyncLock = require('async-lock');
- var EventEmitter = require('events').EventEmitter;
- var modifier = require('./modifier');
- var logger = require('memdb-logger').getLogger('memdb', __filename);
- var DEFAULT_LOCK_TIMEOUT = 10 * 1000;
- var Document = function(opts){ //jshint ignore:line
- opts = opts || {};
- if(!opts.hasOwnProperty('_id')){
- throw new Error('_id is not specified');
- }
- this._id = opts._id;
- var doc = opts.doc || null;
- if(typeof(doc) !== 'object'){
- throw new Error('doc must be object');
- }
- if(!!doc){
- doc._id = this._id;
- }
- this.commited = doc;
- this.changed = undefined; // undefined means no change, while null means removed
- this.connId = null; // Connection that hold the document lock
- this.locker = opts.locker;
- this.lockKey = opts.lockKey;
- if(!this.locker){
- this.locker = new AsyncLock({
- Promise : P,
- timeout : opts.lockTimeout || DEFAULT_LOCK_TIMEOUT
- });
- this.lockKey = '';
- }
- this.releaseCallback = null;
- this.indexes = opts.indexes || {};
- this.savedIndexValues = {}; //{indexKey : indexValue}
- EventEmitter.call(this);
- };
- util.inherits(Document, EventEmitter);
- var proto = Document.prototype;
- proto.find = function(connId, fields){
- var doc = this.isLocked(connId) ? this._getChanged() : this.commited;
- if(doc === null){
- return null;
- }
- if(!fields){
- return doc;
- }
- var includeFields = [], excludeFields = [];
- if(typeof(fields) === 'string'){
- includeFields = fields.split(' ');
- }
- else if(typeof(fields) === 'object'){
- for(var field in fields){
- if(!!fields[field]){
- includeFields.push(field);
- }
- else{
- excludeFields.push(field);
- }
- }
- if(includeFields.length > 0 && excludeFields.length > 0){
- throw new Error('Can not specify both include and exclude fields');
- }
- }
- var ret = null;
- if(includeFields.length > 0){
- ret = {};
- includeFields.forEach(function(field){
- if(doc.hasOwnProperty(field)){
- ret[field] = doc[field];
- }
- });
- ret._id = this._id;
- }
- else if(excludeFields.length > 0){
- ret = {};
- for(var key in doc){
- ret[key] = doc[key];
- }
- excludeFields.forEach(function(key){
- delete ret[key];
- });
- }
- else{
- ret = doc;
- }
- return ret;
- };
- proto.exists = function(connId){
- return this.isLocked(connId) ? this._getChanged() !== null: this.commited !== null;
- };
- proto.insert = function(connId, doc){
- this.modify(connId, '$insert', doc);
- };
- proto.remove = function(connId){
- this.modify(connId, '$remove');
- };
- proto.update = function(connId, modifier, opts){
- opts = opts || {};
- if(!modifier){
- throw new Error('modifier is empty');
- }
- modifier = modifier || {};
- var isModify = false;
- for(var field in modifier){
- isModify = (field[0] === '$');
- break;
- }
- if(!isModify){
- this.modify(connId, '$replace', modifier);
- }
- else{
- for(var cmd in modifier){
- this.modify(connId, cmd, modifier[cmd]);
- }
- }
- };
- proto.modify = function(connId, cmd, param){
- this.ensureLocked(connId);
- for(var indexKey in this.indexes){
- if(!this.savedIndexValues.hasOwnProperty(indexKey)){
- this.savedIndexValues[indexKey] = this._getIndexValue(indexKey, this.indexes[indexKey]);
- }
- }
- var modifyFunc = modifier[cmd];
- if(typeof(modifyFunc) !== 'function'){
- throw new Error('invalid modifier - ' + cmd);
- }
- if(this.changed === undefined){ //copy on write
- this.changed = utils.clone(this.commited);
- }
- this.changed = modifyFunc(this.changed, param);
- // id is immutable
- if(!!this.changed){
- this.changed._id = this._id;
- }
- for(indexKey in this.indexes){
- var value = this._getIndexValue(indexKey, this.indexes[indexKey]);
- if(value !== this.savedIndexValues[indexKey]){
- logger.trace('%s.updateIndex(%s, %s, %s)', this._id, indexKey, this.savedIndexValues[indexKey], value);
- this.emit('updateIndex', connId, indexKey, this.savedIndexValues[indexKey], value);
- this.savedIndexValues[indexKey] = value;
- }
- }
- logger.trace('%s.modify(%s, %j) => %j', this._id, cmd, param, this.changed);
- };
- proto.lock = function(connId){
- if(connId === null || connId === undefined){
- throw new Error('connId is null');
- }
- var deferred = P.defer();
- if(this.isLocked(connId)){
- deferred.resolve();
- }
- else{
- var self = this;
- this.locker.acquire(this.lockKey, function(release){
- self.connId = connId;
- self.releaseCallback = release;
- self.emit('lock');
- deferred.resolve();
- })
- .catch(function(err){
- if(!deferred.isResolved()){
- deferred.reject(new Error('doc.lock failed - ' + self.lockKey));
- }
- });
- }
- return deferred.promise;
- };
- // Wait existing lock release (not create new lock)
- proto._waitUnlock = function(){
- var deferred = P.defer();
- var self = this;
- this.locker.acquire(this.lockKey, function(){
- deferred.resolve();
- })
- .catch(function(err){
- deferred.reject(new Error('doc._waitUnlock failed - ' + self.lockKey));
- });
- return deferred.promise;
- };
- proto._unlock = function(){
- if(this.connId === null){
- return;
- }
- this.connId = null;
- var releaseCallback = this.releaseCallback;
- this.releaseCallback = null;
- releaseCallback();
- this.emit('unlock');
- };
- proto._getChanged = function(){
- return this.changed !== undefined ? this.changed : this.commited;
- };
- proto._getCommited = function(){
- return this.commited;
- };
- proto.commit = function(connId){
- this.ensureLocked(connId);
- if(this.changed !== undefined){
- this.commited = this.changed;
- }
- this.changed = undefined;
- this.emit('commit');
- this._unlock();
- };
- proto.rollback = function(connId){
- this.ensureLocked(connId);
- this.changed = undefined;
- this.savedIndexValues = {};
- this.emit('rollback');
- this._unlock();
- };
- proto.ensureLocked = function(connId){
- if(!this.isLocked(connId)){
- throw new Error('doc not locked by ' + connId);
- }
- };
- proto.isLocked = function(connId){
- return this.connId === connId && connId !== null && connId !== undefined;
- };
- proto.isFree = function(){
- return this.connId === null;
- };
- proto._getIndexValue = function(indexKey, opts){
- opts = opts || {};
- var self = this;
- var indexValue = JSON.parse(indexKey).sort().map(function(key){
- var doc = self._getChanged();
- var value = !!doc ? doc[key] : undefined;
- // null and undefined is not included in index
- if(value === null || value === undefined){
- return undefined;
- }
- if(['number', 'string', 'boolean'].indexOf(typeof(value)) === -1){
- throw new Error('invalid value for indexed key ' + indexKey);
- }
- var ignores = opts.valueIgnore ? opts.valueIgnore[key] || [] : [];
- if(ignores.indexOf(value) !== -1){
- return undefined;
- }
- return value;
- });
- // Return null if one of the value is undefined
- if(indexValue.indexOf(undefined) !== -1){
- return null;
- }
- return JSON.stringify(indexValue);
- };
- module.exports = Document;
|