collection.js 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556
  1. // Copyright 2015 The MemDB Authors.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
  12. // implied. See the License for the specific language governing
  13. // permissions and limitations under the License. See the AUTHORS file
  14. // for names of contributors.
  15. 'use strict';
  16. var P = require('bluebird');
  17. var Logger = require('memdb-logger');
  18. var util = require('util');
  19. var utils = require('./utils');
  20. var EventEmitter = require('events').EventEmitter;
  21. var DEFAULT_MAX_COLLISION = 10000;
  22. var Collection = function(opts){
  23. opts = opts || {};
  24. this.name = opts.name;
  25. this._checkName(this.name);
  26. this.shard = opts.shard;
  27. this.conn = opts.conn;
  28. this.config = opts.config || {};
  29. this.config.maxCollision = this.config.maxCollision || DEFAULT_MAX_COLLISION;
  30. // {indexKey : {indexValue : {id1 : 1, id2 : -1}}}
  31. this.changedIndexes = utils.forceHashMap();
  32. this.pendingIndexTasks = utils.forceHashMap(); //{id, [Promise]}
  33. this.updateIndexEvent = 'updateIndex$' + this.name + '$' + this.conn._id;
  34. this.shard.on(this.updateIndexEvent, this.onUpdateIndex.bind(this));
  35. this.logger = Logger.getLogger('memdb', __filename, 'shard:' + this.shard._id);
  36. EventEmitter.call(this);
  37. };
  38. util.inherits(Collection, EventEmitter);
  39. var proto = Collection.prototype;
  40. proto.close = function(){
  41. this.shard.removeListener(this.updateIndexEvent, this.onUpdateIndex);
  42. };
  43. proto.insert = function(docs){
  44. if(!Array.isArray(docs)){
  45. return this._insertById(docs && docs._id, docs);
  46. }
  47. var self = this;
  48. return P.mapSeries(docs, function(doc){ //disable concurrent to avoid race condition
  49. return self._insertById(doc && doc._id, doc);
  50. });
  51. };
  52. proto._insertById = function(id, doc){
  53. if(!utils.isDict(doc)){
  54. throw new Error('doc must be a dictionary');
  55. }
  56. if(id === null || id === undefined){
  57. id = utils.uuid();
  58. }
  59. id = this._checkId(id);
  60. doc._id = id;
  61. var self = this;
  62. return P.try(function(){
  63. return self.lock(id);
  64. })
  65. .then(function(){
  66. return self.shard.insert(self.conn._id, self._key(id), doc);
  67. })
  68. .then(function(){
  69. return self._finishIndexTasks(id);
  70. })
  71. .thenReturn(id);
  72. };
  73. proto.find = function(query, fields, opts){
  74. if(typeof(query) === 'number' || typeof(query) === 'string'){
  75. return this.findById(query, fields, opts);
  76. }
  77. if(!utils.isDict(query)){
  78. throw new Error('invalid query');
  79. }
  80. if(query.hasOwnProperty('_id')){
  81. return this.findById(query._id, fields, opts)
  82. .then(function(doc){
  83. if(!doc){
  84. return [];
  85. }
  86. return [doc];
  87. });
  88. }
  89. var keys = Object.keys(query).sort();
  90. if(keys.length === 0){
  91. throw new Error('You must specify query key');
  92. }
  93. var indexKey = JSON.stringify(keys);
  94. var indexConfig = this.config.indexes && this.config.indexes[indexKey];
  95. if(!indexConfig){
  96. throw new Error('No index configured for keys - ' + indexKey);
  97. }
  98. var valueIgnore = indexConfig.valueIgnore || {};
  99. var values = keys.map(function(key){
  100. var value = query[key];
  101. if(value === null || value === undefined){
  102. throw new Error('query value can not be null or undefined');
  103. }
  104. var ignores = valueIgnore[key] || [];
  105. if(ignores.indexOf(value) !== -1){
  106. throw new Error('value ' + value + ' for key ' + key + ' is ignored in index');
  107. }
  108. return value;
  109. });
  110. var indexValue = JSON.stringify(values);
  111. return this._findByIndex(indexKey, indexValue, fields, opts);
  112. };
  113. proto.findOne = function(query, fields, opts){
  114. opts = opts || {};
  115. opts.limit = 1;
  116. return this.find(query, fields, opts)
  117. .then(function(docs){
  118. if(!Array.isArray(docs)){
  119. return docs;
  120. }
  121. if(docs.length === 0){
  122. return null;
  123. }
  124. return docs[0];
  125. });
  126. };
  127. proto.findById = function(id, fields, opts){
  128. id = this._checkId(id);
  129. var self = this;
  130. return P.try(function(){
  131. if(opts && opts.readonly){
  132. return self.shard.findReadOnly(self.conn._id, self._key(id), fields);
  133. }
  134. return P.try(function(){
  135. if(opts && opts.nolock){
  136. return;
  137. }
  138. return self.lock(id);
  139. })
  140. .then(function(){
  141. return self.shard.find(self.conn._id, self._key(id), fields, opts);
  142. });
  143. });
  144. };
  145. proto.findReadOnly = function(query, fields, opts){
  146. opts = opts || {};
  147. opts.readonly = true;
  148. return this.find(query, fields, opts);
  149. };
  150. proto.findOneReadOnly = function(query, fields, opts){
  151. opts = opts || {};
  152. opts.readonly = true;
  153. return this.findOne(query, fields, opts);
  154. };
  155. proto.findByIdReadOnly = function(id, fields, opts){
  156. opts = opts || {};
  157. opts.readonly = true;
  158. return this.findById(id, fields, opts);
  159. };
  160. proto.count = function(query, opts){
  161. opts = opts || {};
  162. opts.count = true;
  163. var self = this;
  164. return P.try(function(){
  165. return self.find(query, null, opts);
  166. })
  167. .then(function(ret){
  168. if(typeof(ret) === 'number'){
  169. return ret;
  170. }
  171. else if(Array.isArray(ret)){
  172. return ret.length;
  173. }
  174. else if(!ret){
  175. return 0;
  176. }
  177. throw new Error('Unexpected find result - ' + ret);
  178. });
  179. };
  180. proto._findByIndex = function(indexKey, indexValue, fields, opts){
  181. opts = opts || {};
  182. var self = this;
  183. var indexCollection = this.conn.getCollection(this._indexCollectionName(indexKey), true);
  184. var nolock = opts.nolock;
  185. return P.try(function(){
  186. opts.nolock = true; // force not using lock
  187. return indexCollection.findById(indexValue, 'format ids', opts);
  188. })
  189. .then(function(doc){
  190. opts.nolock = nolock; // restore param
  191. var ids = utils.forceHashMap();
  192. if(doc){
  193. doc.ids.forEach(function(id){
  194. ids[id] = 1;
  195. });
  196. }
  197. var changedIds = (self.changedIndexes[indexKey] && self.changedIndexes[indexKey][indexValue]) || {};
  198. for(var id in changedIds){
  199. if(changedIds[id] === 1){
  200. ids[id] = 1;
  201. }
  202. else{
  203. delete ids[id];
  204. }
  205. }
  206. ids = Object.keys(ids);
  207. if(opts && opts.count){ // return count only
  208. return ids.length;
  209. }
  210. if(opts && opts.limit){
  211. ids = ids.slice(0, opts.limit);
  212. }
  213. var docs = [];
  214. ids.sort(); // keep id in order, avoid deadlock
  215. return P.each(ids, function(id){
  216. return self.findById(id, fields, opts)
  217. .then(function(doc){
  218. // WARN: This is possible that doc is null due to index collection is not locked
  219. if(!!doc){
  220. docs.push(doc);
  221. }
  222. });
  223. })
  224. .thenReturn(docs);
  225. });
  226. };
  227. proto.update = function(query, modifier, opts){
  228. opts = opts || {};
  229. var self = this;
  230. return P.try(function(){
  231. return self.find(query, '_id');
  232. })
  233. .then(function(ret){
  234. if(!ret || ret.length === 0){
  235. if(!opts.upsert){
  236. return 0;
  237. }
  238. // upsert
  239. if(typeof(query) === 'string' || typeof(query) === 'number'){
  240. query = {_id : query};
  241. }
  242. return self.insert(query)
  243. .then(function(id){
  244. return self._updateById(id, modifier, opts);
  245. })
  246. .thenReturn(1);
  247. }
  248. if(!Array.isArray(ret)){
  249. return self._updateById(ret._id, modifier, opts)
  250. .thenReturn(1);
  251. }
  252. return P.each(ret, function(doc){
  253. return self._updateById(doc._id, modifier, opts);
  254. })
  255. .thenReturn(ret.length);
  256. });
  257. };
  258. proto._updateById = function(id, modifier, opts){
  259. id = this._checkId(id);
  260. var self = this;
  261. return P.try(function(){
  262. return self.shard.update(self.conn._id, self._key(id), modifier, opts);
  263. })
  264. .then(function(){
  265. return self._finishIndexTasks(id);
  266. });
  267. };
  268. proto.remove = function(query, opts){
  269. var self = this;
  270. return P.try(function(){
  271. return self.find(query, '_id');
  272. })
  273. .then(function(ret){
  274. if(!ret || ret.length === 0){
  275. return 0;
  276. }
  277. if(!Array.isArray(ret)){
  278. return self._removeById(ret._id, opts)
  279. .thenReturn(1);
  280. }
  281. return P.each(ret, function(doc){
  282. return self._removeById(doc._id, opts);
  283. })
  284. .thenReturn(ret.length);
  285. });
  286. };
  287. proto._removeById = function(id, opts){
  288. id = this._checkId(id);
  289. var self = this;
  290. return P.try(function(){
  291. return self.shard.remove(self.conn._id, self._key(id), opts);
  292. })
  293. .then(function(){
  294. return self._finishIndexTasks(id);
  295. });
  296. };
  297. proto.lock = function(id){
  298. id = this._checkId(id);
  299. if(this.shard.isLocked(this.conn._id, this._key(id))){
  300. return;
  301. }
  302. var self = this;
  303. return this.shard.lock(this.conn._id, this._key(id))
  304. .then(function(ret){
  305. self.emit('lock', id);
  306. return ret;
  307. });
  308. };
  309. proto.onUpdateIndex = function(id, indexKey, oldValue, newValue){
  310. this.logger.debug('onUpdateIndex(%s, %s, %s, %s)', id, indexKey, oldValue, newValue);
  311. var self = this;
  312. var promise = P.try(function(){
  313. var config = self.config.indexes[indexKey];
  314. if(!config){
  315. throw new Error('index - ' + indexKey + ' not configured');
  316. }
  317. if(!self.changedIndexes[indexKey]){
  318. self.changedIndexes[indexKey] = utils.forceHashMap();
  319. }
  320. var changedIndex = self.changedIndexes[indexKey];
  321. if(oldValue !== null){
  322. if(!changedIndex[oldValue]){
  323. changedIndex[oldValue] = utils.forceHashMap();
  324. }
  325. if(changedIndex[oldValue][id] === 1){
  326. delete changedIndex[oldValue][id];
  327. }
  328. else{
  329. changedIndex[oldValue][id] = -1;
  330. }
  331. }
  332. if(newValue !== null){
  333. if(!changedIndex[newValue]){
  334. changedIndex[newValue] = utils.forceHashMap();
  335. }
  336. if(changedIndex[newValue][id] === -1){
  337. delete changedIndex[oldValue][id];
  338. }
  339. else{
  340. changedIndex[newValue][id] = 1;
  341. }
  342. }
  343. if(!config.unique){
  344. return;
  345. }
  346. return P.try(function(){
  347. if(oldValue !== null){
  348. return self.commitOneIndex(indexKey, oldValue, changedIndex[oldValue], config)
  349. .then(function(){
  350. delete changedIndex[oldValue];
  351. });
  352. }
  353. })
  354. .then(function(){
  355. if(newValue !== null){
  356. return self.commitOneIndex(indexKey, newValue, changedIndex[newValue], config)
  357. .then(function(){
  358. delete changedIndex[newValue];
  359. });
  360. }
  361. });
  362. });
  363. if(!this.pendingIndexTasks[id]){
  364. this.pendingIndexTasks[id] = [];
  365. }
  366. this.pendingIndexTasks[id].push(promise);
  367. };
  368. proto.commitIndex = function(){
  369. var self = this;
  370. // must update in sorted order to avoid dead lock
  371. return P.each(Object.keys(this.changedIndexes).sort(), function(indexKey){
  372. var changedIndex = self.changedIndexes[indexKey];
  373. var config = self.config.indexes[indexKey];
  374. return P.each(Object.keys(changedIndex).sort(), function(indexValue){
  375. var changedIds = changedIndex[indexValue];
  376. return self.commitOneIndex(indexKey, indexValue, changedIds, config);
  377. });
  378. })
  379. .then(function(){
  380. self.changedIndexes = utils.forceHashMap();
  381. });
  382. };
  383. proto.rollbackIndex = function(){
  384. this.changedIndexes = utils.forceHashMap();
  385. };
  386. // indexKey: json encoded sorted fields array
  387. // indexValue: json encoded sorted fields value
  388. proto.commitOneIndex = function(indexKey, indexValue, changedIds, config){
  389. var indexCollection = this.conn.getCollection(this._indexCollectionName(indexKey), true);
  390. var modifier = {$pushAll : {ids : []}, $pullAll : {ids : []}};
  391. var countDelta = 0;
  392. for(var id in changedIds){
  393. if(changedIds[id] === 1){
  394. modifier.$pushAll.ids.push(id);
  395. countDelta++;
  396. }
  397. else{
  398. modifier.$pullAll.ids.push(id);
  399. countDelta--;
  400. }
  401. }
  402. var self = this;
  403. return P.try(function(){
  404. return indexCollection.find(indexValue, 'ids');
  405. })
  406. .then(function(ret){
  407. var oldCount = ret ? ret.ids.length : 0;
  408. var newCount = oldCount + countDelta;
  409. if(config.unique && newCount > 1){
  410. throw new Error('duplicate value - ' + indexValue + ' for unique index - ' + indexKey);
  411. }
  412. if(newCount > config.maxCollision){
  413. throw new Error('too many documents have value - ' + indexValue + ' for index - ' + indexKey);
  414. }
  415. if(newCount > 0){
  416. return indexCollection.update(indexValue, modifier, {upsert : true});
  417. }
  418. else if(newCount === 0){
  419. return indexCollection.remove(indexValue);
  420. }
  421. else{
  422. throw new Error(util.format('index corrupted: %s %s, please rebuild index', self.name, indexKey));
  423. }
  424. });
  425. };
  426. proto._finishIndexTasks = function(id){
  427. if(!this.pendingIndexTasks[id]){
  428. return;
  429. }
  430. // Save domain
  431. var d = process.domain;
  432. var self = this;
  433. return P.each(self.pendingIndexTasks[id], function(promise){
  434. return promise;
  435. })
  436. .finally(function(){
  437. delete self.pendingIndexTasks[id];
  438. // Restore domain
  439. process.domain = d;
  440. });
  441. };
  442. // 'index.name.key1.key2'
  443. proto._indexCollectionName = function(indexKey){
  444. var keys = JSON.parse(indexKey).map(function(key){
  445. return utils.escapeField(key);
  446. });
  447. return 'index.' + utils.escapeField(this.name) + '.' + keys.join('.');
  448. };
  449. proto._key = function(id){
  450. return this.name + '$' + id;
  451. };
  452. proto._checkId = function(id){
  453. if(typeof(id) === 'string'){
  454. return id;
  455. }
  456. else if(typeof(id) === 'number'){
  457. return id.toString();
  458. }
  459. throw new Error('id must be number or string');
  460. };
  461. //http://docs.mongodb.org/manual/reference/limits/#Restriction-on-Collection-Names
  462. proto._checkName = function(name){
  463. if(!name){
  464. throw new Error('Collection name can not empty');
  465. }
  466. if(typeof(name) !== 'string'){
  467. throw new Error('Collection name must be string');
  468. }
  469. if(name.indexOf('$') !== -1){
  470. throw new Error('Collection name can not contain "$"');
  471. }
  472. if(name.indexOf('system.') === 0){
  473. throw new Error('Collection name can not begin with "system."');
  474. }
  475. };
  476. module.exports = Collection;