unordered.js 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539
  1. "use strict";
  2. var common = require('./common')
  3. , utils = require('../utils')
  4. , toError = require('../utils').toError
  5. , f = require('util').format
  6. , handleCallback = require('../utils').handleCallback
  7. , shallowClone = utils.shallowClone
  8. , WriteError = common.WriteError
  9. , BulkWriteResult = common.BulkWriteResult
  10. , LegacyOp = common.LegacyOp
  11. , ObjectID = require('mongodb-core').BSON.ObjectID
  12. , Define = require('../metadata')
  13. , Batch = common.Batch
  14. , mergeBatchResults = common.mergeBatchResults;
  15. /**
  16. * Create a FindOperatorsUnordered instance (INTERNAL TYPE, do not instantiate directly)
  17. * @class
  18. * @property {number} length Get the number of operations in the bulk.
  19. * @return {FindOperatorsUnordered} a FindOperatorsUnordered instance.
  20. */
  21. var FindOperatorsUnordered = function(self) {
  22. this.s = self.s;
  23. }
  24. /**
  25. * Add a single update document to the bulk operation
  26. *
  27. * @method
  28. * @param {object} doc update operations
  29. * @throws {MongoError}
  30. * @return {UnorderedBulkOperation}
  31. */
  32. FindOperatorsUnordered.prototype.update = function(updateDocument) {
  33. // Perform upsert
  34. var upsert = typeof this.s.currentOp.upsert == 'boolean' ? this.s.currentOp.upsert : false;
  35. // Establish the update command
  36. var document = {
  37. q: this.s.currentOp.selector
  38. , u: updateDocument
  39. , multi: true
  40. , upsert: upsert
  41. }
  42. // Clear out current Op
  43. this.s.currentOp = null;
  44. // Add the update document to the list
  45. return addToOperationsList(this, common.UPDATE, document);
  46. }
  47. /**
  48. * Add a single update one document to the bulk operation
  49. *
  50. * @method
  51. * @param {object} doc update operations
  52. * @throws {MongoError}
  53. * @return {UnorderedBulkOperation}
  54. */
  55. FindOperatorsUnordered.prototype.updateOne = function(updateDocument) {
  56. // Perform upsert
  57. var upsert = typeof this.s.currentOp.upsert == 'boolean' ? this.s.currentOp.upsert : false;
  58. // Establish the update command
  59. var document = {
  60. q: this.s.currentOp.selector
  61. , u: updateDocument
  62. , multi: false
  63. , upsert: upsert
  64. }
  65. // Clear out current Op
  66. this.s.currentOp = null;
  67. // Add the update document to the list
  68. return addToOperationsList(this, common.UPDATE, document);
  69. }
  70. /**
  71. * Add a replace one operation to the bulk operation
  72. *
  73. * @method
  74. * @param {object} doc the new document to replace the existing one with
  75. * @throws {MongoError}
  76. * @return {UnorderedBulkOperation}
  77. */
  78. FindOperatorsUnordered.prototype.replaceOne = function(updateDocument) {
  79. this.updateOne(updateDocument);
  80. }
  81. /**
  82. * Upsert modifier for update bulk operation
  83. *
  84. * @method
  85. * @throws {MongoError}
  86. * @return {UnorderedBulkOperation}
  87. */
  88. FindOperatorsUnordered.prototype.upsert = function() {
  89. this.s.currentOp.upsert = true;
  90. return this;
  91. }
  92. /**
  93. * Add a remove one operation to the bulk operation
  94. *
  95. * @method
  96. * @throws {MongoError}
  97. * @return {UnorderedBulkOperation}
  98. */
  99. FindOperatorsUnordered.prototype.removeOne = function() {
  100. // Establish the update command
  101. var document = {
  102. q: this.s.currentOp.selector
  103. , limit: 1
  104. }
  105. // Clear out current Op
  106. this.s.currentOp = null;
  107. // Add the remove document to the list
  108. return addToOperationsList(this, common.REMOVE, document);
  109. }
  110. /**
  111. * Add a remove operation to the bulk operation
  112. *
  113. * @method
  114. * @throws {MongoError}
  115. * @return {UnorderedBulkOperation}
  116. */
  117. FindOperatorsUnordered.prototype.remove = function() {
  118. // Establish the update command
  119. var document = {
  120. q: this.s.currentOp.selector
  121. , limit: 0
  122. }
  123. // Clear out current Op
  124. this.s.currentOp = null;
  125. // Add the remove document to the list
  126. return addToOperationsList(this, common.REMOVE, document);
  127. }
  128. //
  129. // Add to the operations list
  130. //
  131. var addToOperationsList = function(_self, docType, document) {
  132. // Get the bsonSize
  133. var bsonSize = _self.s.bson.calculateObjectSize(document, false);
  134. // Throw error if the doc is bigger than the max BSON size
  135. if(bsonSize >= _self.s.maxBatchSizeBytes) throw toError("document is larger than the maximum size " + _self.s.maxBatchSizeBytes);
  136. // Holds the current batch
  137. _self.s.currentBatch = null;
  138. // Get the right type of batch
  139. if(docType == common.INSERT) {
  140. _self.s.currentBatch = _self.s.currentInsertBatch;
  141. } else if(docType == common.UPDATE) {
  142. _self.s.currentBatch = _self.s.currentUpdateBatch;
  143. } else if(docType == common.REMOVE) {
  144. _self.s.currentBatch = _self.s.currentRemoveBatch;
  145. }
  146. // Create a new batch object if we don't have a current one
  147. if(_self.s.currentBatch == null) _self.s.currentBatch = new Batch(docType, _self.s.currentIndex);
  148. // Check if we need to create a new batch
  149. if(((_self.s.currentBatch.size + 1) >= _self.s.maxWriteBatchSize)
  150. || ((_self.s.currentBatch.sizeBytes + bsonSize) >= _self.s.maxBatchSizeBytes)
  151. || (_self.s.currentBatch.batchType != docType)) {
  152. // Save the batch to the execution stack
  153. _self.s.batches.push(_self.s.currentBatch);
  154. // Create a new batch
  155. _self.s.currentBatch = new Batch(docType, _self.s.currentIndex);
  156. }
  157. // We have an array of documents
  158. if(Array.isArray(document)) {
  159. throw toError("operation passed in cannot be an Array");
  160. } else {
  161. _self.s.currentBatch.operations.push(document);
  162. _self.s.currentBatch.originalIndexes.push(_self.s.currentIndex);
  163. _self.s.currentIndex = _self.s.currentIndex + 1;
  164. }
  165. // Save back the current Batch to the right type
  166. if(docType == common.INSERT) {
  167. _self.s.currentInsertBatch = _self.s.currentBatch;
  168. _self.s.bulkResult.insertedIds.push({index: _self.s.currentIndex, _id: document._id});
  169. } else if(docType == common.UPDATE) {
  170. _self.s.currentUpdateBatch = _self.s.currentBatch;
  171. } else if(docType == common.REMOVE) {
  172. _self.s.currentRemoveBatch = _self.s.currentBatch;
  173. }
  174. // Update current batch size
  175. _self.s.currentBatch.size = _self.s.currentBatch.size + 1;
  176. _self.s.currentBatch.sizeBytes = _self.s.currentBatch.sizeBytes + bsonSize;
  177. // Return self
  178. return _self;
  179. }
  180. /**
  181. * Create a new UnorderedBulkOperation instance (INTERNAL TYPE, do not instantiate directly)
  182. * @class
  183. * @return {UnorderedBulkOperation} a UnorderedBulkOperation instance.
  184. */
  185. var UnorderedBulkOperation = function(topology, collection, options) {
  186. options = options == null ? {} : options;
  187. // Contains reference to self
  188. var self = this;
  189. // Get the namesspace for the write operations
  190. var namespace = collection.collectionName;
  191. // Used to mark operation as executed
  192. var executed = false;
  193. // Current item
  194. // var currentBatch = null;
  195. var currentOp = null;
  196. var currentIndex = 0;
  197. var batches = [];
  198. // The current Batches for the different operations
  199. var currentInsertBatch = null;
  200. var currentUpdateBatch = null;
  201. var currentRemoveBatch = null;
  202. // Handle to the bson serializer, used to calculate running sizes
  203. var bson = topology.bson;
  204. // Set max byte size
  205. var maxBatchSizeBytes = topology.isMasterDoc && topology.isMasterDoc.maxBsonObjectSize
  206. ? topology.isMasterDoc.maxBsonObjectSize : (1024*1025*16);
  207. var maxWriteBatchSize = topology.isMasterDoc && topology.isMasterDoc.maxWriteBatchSize
  208. ? topology.isMasterDoc.maxWriteBatchSize : 1000;
  209. // Get the write concern
  210. var writeConcern = common.writeConcern(shallowClone(options), collection, options);
  211. // Get the promiseLibrary
  212. var promiseLibrary = options.promiseLibrary;
  213. // No promise library selected fall back
  214. if(!promiseLibrary) {
  215. promiseLibrary = typeof global.Promise == 'function' ?
  216. global.Promise : require('es6-promise').Promise;
  217. }
  218. // Final results
  219. var bulkResult = {
  220. ok: 1
  221. , writeErrors: []
  222. , writeConcernErrors: []
  223. , insertedIds: []
  224. , nInserted: 0
  225. , nUpserted: 0
  226. , nMatched: 0
  227. , nModified: 0
  228. , nRemoved: 0
  229. , upserted: []
  230. };
  231. // Internal state
  232. this.s = {
  233. // Final result
  234. bulkResult: bulkResult
  235. // Current batch state
  236. , currentInsertBatch: null
  237. , currentUpdateBatch: null
  238. , currentRemoveBatch: null
  239. , currentBatch: null
  240. , currentIndex: 0
  241. , batches: []
  242. // Write concern
  243. , writeConcern: writeConcern
  244. // Max batch size options
  245. , maxBatchSizeBytes: maxBatchSizeBytes
  246. , maxWriteBatchSize: maxWriteBatchSize
  247. // Namespace
  248. , namespace: namespace
  249. // BSON
  250. , bson: bson
  251. // Topology
  252. , topology: topology
  253. // Options
  254. , options: options
  255. // Current operation
  256. , currentOp: currentOp
  257. // Executed
  258. , executed: executed
  259. // Collection
  260. , collection: collection
  261. // Promise Library
  262. , promiseLibrary: promiseLibrary
  263. // Bypass validation
  264. , bypassDocumentValidation: typeof options.bypassDocumentValidation == 'boolean' ? options.bypassDocumentValidation : false
  265. }
  266. }
  267. var define = UnorderedBulkOperation.define = new Define('UnorderedBulkOperation', UnorderedBulkOperation, false);
  268. /**
  269. * Add a single insert document to the bulk operation
  270. *
  271. * @param {object} doc the document to insert
  272. * @throws {MongoError}
  273. * @return {UnorderedBulkOperation}
  274. */
  275. UnorderedBulkOperation.prototype.insert = function(document) {
  276. if(this.s.collection.s.db.options.forceServerObjectId !== true && document._id == null) document._id = new ObjectID();
  277. return addToOperationsList(this, common.INSERT, document);
  278. }
  279. /**
  280. * Initiate a find operation for an update/updateOne/remove/removeOne/replaceOne
  281. *
  282. * @method
  283. * @param {object} selector The selector for the bulk operation.
  284. * @throws {MongoError}
  285. * @return {FindOperatorsUnordered}
  286. */
  287. UnorderedBulkOperation.prototype.find = function(selector) {
  288. if (!selector) {
  289. throw toError("Bulk find operation must specify a selector");
  290. }
  291. // Save a current selector
  292. this.s.currentOp = {
  293. selector: selector
  294. }
  295. return new FindOperatorsUnordered(this);
  296. }
  297. Object.defineProperty(UnorderedBulkOperation.prototype, 'length', {
  298. enumerable: true,
  299. get: function() {
  300. return this.s.currentIndex;
  301. }
  302. });
  303. UnorderedBulkOperation.prototype.raw = function(op) {
  304. var key = Object.keys(op)[0];
  305. // Set up the force server object id
  306. var forceServerObjectId = typeof this.s.options.forceServerObjectId == 'boolean'
  307. ? this.s.options.forceServerObjectId : this.s.collection.s.db.options.forceServerObjectId;
  308. // Update operations
  309. if((op.updateOne && op.updateOne.q)
  310. || (op.updateMany && op.updateMany.q)
  311. || (op.replaceOne && op.replaceOne.q)) {
  312. op[key].multi = op.updateOne || op.replaceOne ? false : true;
  313. return addToOperationsList(this, common.UPDATE, op[key]);
  314. }
  315. // Crud spec update format
  316. if(op.updateOne || op.updateMany || op.replaceOne) {
  317. var multi = op.updateOne || op.replaceOne ? false : true;
  318. var operation = {q: op[key].filter, u: op[key].update || op[key].replacement, multi: multi}
  319. if(op[key].upsert) operation.upsert = true;
  320. return addToOperationsList(this, common.UPDATE, operation);
  321. }
  322. // Remove operations
  323. if(op.removeOne || op.removeMany || (op.deleteOne && op.deleteOne.q) || op.deleteMany && op.deleteMany.q) {
  324. op[key].limit = op.removeOne ? 1 : 0;
  325. return addToOperationsList(this, common.REMOVE, op[key]);
  326. }
  327. // Crud spec delete operations, less efficient
  328. if(op.deleteOne || op.deleteMany) {
  329. var limit = op.deleteOne ? 1 : 0;
  330. var operation = {q: op[key].filter, limit: limit}
  331. return addToOperationsList(this, common.REMOVE, operation);
  332. }
  333. // Insert operations
  334. if(op.insertOne && op.insertOne.document == null) {
  335. if(forceServerObjectId !== true && op.insertOne._id == null) op.insertOne._id = new ObjectID();
  336. return addToOperationsList(this, common.INSERT, op.insertOne);
  337. } else if(op.insertOne && op.insertOne.document) {
  338. if(forceServerObjectId !== true && op.insertOne.document._id == null) op.insertOne.document._id = new ObjectID();
  339. return addToOperationsList(this, common.INSERT, op.insertOne.document);
  340. }
  341. if(op.insertMany) {
  342. for(var i = 0; i < op.insertMany.length; i++) {
  343. if(forceServerObjectId !== true && op.insertMany[i]._id == null) op.insertMany[i]._id = new ObjectID();
  344. addToOperationsList(this, common.INSERT, op.insertMany[i]);
  345. }
  346. return;
  347. }
  348. // No valid type of operation
  349. throw toError("bulkWrite only supports insertOne, insertMany, updateOne, updateMany, removeOne, removeMany, deleteOne, deleteMany");
  350. }
  351. //
  352. // Execute the command
  353. var executeBatch = function(self, batch, callback) {
  354. var finalOptions = {ordered: false}
  355. if(self.s.writeConcern != null) {
  356. finalOptions.writeConcern = self.s.writeConcern;
  357. }
  358. var resultHandler = function(err, result) {
  359. // Error is a driver related error not a bulk op error, terminate
  360. if(err && err.driver || err && err.message) {
  361. return handleCallback(callback, err);
  362. }
  363. // If we have and error
  364. if(err) err.ok = 0;
  365. handleCallback(callback, null, mergeBatchResults(false, batch, self.s.bulkResult, err, result));
  366. }
  367. // Set an operationIf if provided
  368. if(self.operationId) {
  369. resultHandler.operationId = self.operationId;
  370. }
  371. // Serialize functions
  372. if(self.s.options.serializeFunctions) {
  373. finalOptions.serializeFunctions = true
  374. }
  375. // Is the bypassDocumentValidation options specific
  376. if(self.s.bypassDocumentValidation == true) {
  377. finalOptions.bypassDocumentValidation = true;
  378. }
  379. try {
  380. if(batch.batchType == common.INSERT) {
  381. self.s.topology.insert(self.s.collection.namespace, batch.operations, finalOptions, resultHandler);
  382. } else if(batch.batchType == common.UPDATE) {
  383. self.s.topology.update(self.s.collection.namespace, batch.operations, finalOptions, resultHandler);
  384. } else if(batch.batchType == common.REMOVE) {
  385. self.s.topology.remove(self.s.collection.namespace, batch.operations, finalOptions, resultHandler);
  386. }
  387. } catch(err) {
  388. // Force top level error
  389. err.ok = 0;
  390. // Merge top level error and return
  391. handleCallback(callback, null, mergeBatchResults(false, batch, self.s.bulkResult, err, null));
  392. }
  393. }
  394. //
  395. // Execute all the commands
  396. var executeBatches = function(self, callback) {
  397. var numberOfCommandsToExecute = self.s.batches.length;
  398. var error = null;
  399. // Execute over all the batches
  400. for(var i = 0; i < self.s.batches.length; i++) {
  401. executeBatch(self, self.s.batches[i], function(err, result) {
  402. // Driver layer error capture it
  403. if(err) error = err;
  404. // Count down the number of commands left to execute
  405. numberOfCommandsToExecute = numberOfCommandsToExecute - 1;
  406. // Execute
  407. if(numberOfCommandsToExecute == 0) {
  408. // Driver level error
  409. if(error) return handleCallback(callback, error);
  410. // Treat write errors
  411. var error = self.s.bulkResult.writeErrors.length > 0 ? toError(self.s.bulkResult.writeErrors[0]) : null;
  412. handleCallback(callback, error, new BulkWriteResult(self.s.bulkResult));
  413. }
  414. });
  415. }
  416. }
  417. /**
  418. * The callback format for results
  419. * @callback UnorderedBulkOperation~resultCallback
  420. * @param {MongoError} error An error instance representing the error during the execution.
  421. * @param {BulkWriteResult} result The bulk write result.
  422. */
  423. /**
  424. * Execute the ordered bulk operation
  425. *
  426. * @method
  427. * @param {object} [options=null] Optional settings.
  428. * @param {(number|string)} [options.w=null] The write concern.
  429. * @param {number} [options.wtimeout=null] The write concern timeout.
  430. * @param {boolean} [options.j=false] Specify a journal write concern.
  431. * @param {boolean} [options.fsync=false] Specify a file sync write concern.
  432. * @param {UnorderedBulkOperation~resultCallback} [callback] The result callback
  433. * @throws {MongoError}
  434. * @return {Promise} returns Promise if no callback passed
  435. */
  436. UnorderedBulkOperation.prototype.execute = function(_writeConcern, callback) {
  437. var self = this;
  438. if(this.s.executed) throw toError("batch cannot be re-executed");
  439. if(typeof _writeConcern == 'function') {
  440. callback = _writeConcern;
  441. } else {
  442. this.s.writeConcern = _writeConcern;
  443. }
  444. // If we have current batch
  445. if(this.s.currentInsertBatch) this.s.batches.push(this.s.currentInsertBatch);
  446. if(this.s.currentUpdateBatch) this.s.batches.push(this.s.currentUpdateBatch);
  447. if(this.s.currentRemoveBatch) this.s.batches.push(this.s.currentRemoveBatch);
  448. // If we have no operations in the bulk raise an error
  449. if(this.s.batches.length == 0) {
  450. throw toError("Invalid Operation, No operations in bulk");
  451. }
  452. // Execute using callback
  453. if(typeof callback == 'function') return executeBatches(this, callback);
  454. // Return a Promise
  455. return new this.s.promiseLibrary(function(resolve, reject) {
  456. executeBatches(self, function(err, r) {
  457. if(err) return reject(err);
  458. resolve(r);
  459. });
  460. });
  461. }
  462. define.classMethod('execute', {callback: true, promise:false});
  463. /**
  464. * Returns an unordered batch object
  465. * @ignore
  466. */
  467. var initializeUnorderedBulkOp = function(topology, collection, options) {
  468. return new UnorderedBulkOperation(topology, collection, options);
  469. }
  470. initializeUnorderedBulkOp.UnorderedBulkOperation = UnorderedBulkOperation;
  471. module.exports = initializeUnorderedBulkOp;
  472. module.exports.Bulk = UnorderedBulkOperation;