apm.js 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608
  1. var EventEmitter = require('events').EventEmitter,
  2. inherits = require('util').inherits;
  3. // Get prototypes
  4. var AggregationCursor = require('./aggregation_cursor'),
  5. CommandCursor = require('./command_cursor'),
  6. OrderedBulkOperation = require('./bulk/ordered').OrderedBulkOperation,
  7. UnorderedBulkOperation = require('./bulk/unordered').UnorderedBulkOperation,
  8. GridStore = require('./gridfs/grid_store'),
  9. Server = require('./server'),
  10. ReplSet = require('./replset'),
  11. Mongos = require('./mongos'),
  12. Cursor = require('./cursor'),
  13. Collection = require('./collection'),
  14. Db = require('./db'),
  15. Admin = require('./admin');
  16. var basicOperationIdGenerator = {
  17. operationId: 1,
  18. next: function() {
  19. return this.operationId++;
  20. }
  21. }
  22. var basicTimestampGenerator = {
  23. current: function() {
  24. return new Date().getTime();
  25. },
  26. duration: function(start, end) {
  27. return end - start;
  28. }
  29. }
  30. var senstiveCommands = ['authenticate', 'saslStart', 'saslContinue', 'getnonce',
  31. 'createUser', 'updateUser', 'copydbgetnonce', 'copydbsaslstart', 'copydb'];
  32. var Instrumentation = function(core, options, callback) {
  33. options = options || {};
  34. // Optional id generators
  35. var operationIdGenerator = options.operationIdGenerator || basicOperationIdGenerator;
  36. // Optional timestamp generator
  37. var timestampGenerator = options.timestampGenerator || basicTimestampGenerator;
  38. // Extend with event emitter functionality
  39. EventEmitter.call(this);
  40. // Contains all the instrumentation overloads
  41. this.overloads = [];
  42. // ---------------------------------------------------------
  43. //
  44. // Instrument prototype
  45. //
  46. // ---------------------------------------------------------
  47. var instrumentPrototype = function(callback) {
  48. var instrumentations = []
  49. // Classes to support
  50. var classes = [GridStore, OrderedBulkOperation, UnorderedBulkOperation,
  51. CommandCursor, AggregationCursor, Cursor, Collection, Db];
  52. // Add instrumentations to the available list
  53. for(var i = 0; i < classes.length; i++) {
  54. if(classes[i].define) {
  55. instrumentations.push(classes[i].define.generate());
  56. }
  57. }
  58. // Return the list of instrumentation points
  59. callback(null, instrumentations);
  60. }
  61. // Did the user want to instrument the prototype
  62. if(typeof callback == 'function') {
  63. instrumentPrototype(callback);
  64. }
  65. // ---------------------------------------------------------
  66. //
  67. // Server
  68. //
  69. // ---------------------------------------------------------
  70. // Reference
  71. var self = this;
  72. // Names of methods we need to wrap
  73. var methods = ['command', 'insert', 'update', 'remove'];
  74. // Prototype
  75. var proto = core.Server.prototype;
  76. // Core server method we are going to wrap
  77. methods.forEach(function(x) {
  78. var func = proto[x];
  79. // Add to overloaded methods
  80. self.overloads.push({proto: proto, name:x, func:func});
  81. // The actual prototype
  82. proto[x] = function() {
  83. var requestId = core.Query.nextRequestId();
  84. // Get the aruments
  85. var args = Array.prototype.slice.call(arguments, 0);
  86. var ns = args[0];
  87. var commandObj = args[1];
  88. var options = args[2] || {};
  89. var keys = Object.keys(commandObj);
  90. var commandName = keys[0];
  91. var db = ns.split('.')[0];
  92. // Do we have a legacy insert/update/remove command
  93. if(x == 'insert' && !this.lastIsMaster().maxWireVersion) {
  94. commandName = 'insert';
  95. // Get the collection
  96. var col = ns.split('.');
  97. col.shift();
  98. col = col.join('.');
  99. // Re-write the command
  100. commandObj = {
  101. insert: col, documents: commandObj
  102. }
  103. if(options.writeConcern && Object.keys(options.writeConcern).length > 0) {
  104. commandObj.writeConcern = options.writeConcern;
  105. }
  106. commandObj.ordered = options.ordered != undefined ? options.ordered : true;
  107. } else if(x == 'update' && !this.lastIsMaster().maxWireVersion) {
  108. commandName = 'update';
  109. // Get the collection
  110. var col = ns.split('.');
  111. col.shift();
  112. col = col.join('.');
  113. // Re-write the command
  114. commandObj = {
  115. update: col, updates: commandObj
  116. }
  117. if(options.writeConcern && Object.keys(options.writeConcern).length > 0) {
  118. commandObj.writeConcern = options.writeConcern;
  119. }
  120. commandObj.ordered = options.ordered != undefined ? options.ordered : true;
  121. } else if(x == 'remove' && !this.lastIsMaster().maxWireVersion) {
  122. commandName = 'delete';
  123. // Get the collection
  124. var col = ns.split('.');
  125. col.shift();
  126. col = col.join('.');
  127. // Re-write the command
  128. commandObj = {
  129. delete: col, deletes: commandObj
  130. }
  131. if(options.writeConcern && Object.keys(options.writeConcern).length > 0) {
  132. commandObj.writeConcern = options.writeConcern;
  133. }
  134. commandObj.ordered = options.ordered != undefined ? options.ordered : true;
  135. } else if(x == 'insert' || x == 'update' || x == 'remove' && this.lastIsMaster().maxWireVersion >= 2) {
  136. // Skip the insert/update/remove commands as they are executed as actual write commands in 2.6 or higher
  137. return func.apply(this, args);
  138. }
  139. // Get the callback
  140. var callback = args.pop();
  141. // Set current callback operation id from the current context or create
  142. // a new one
  143. var ourOpId = callback.operationId || operationIdGenerator.next();
  144. // Get a connection reference for this server instance
  145. var connection = this.s.pool.get()
  146. // Emit the start event for the command
  147. var command = {
  148. // Returns the command.
  149. command: commandObj,
  150. // Returns the database name.
  151. databaseName: db,
  152. // Returns the command name.
  153. commandName: commandName,
  154. // Returns the driver generated request id.
  155. requestId: requestId,
  156. // Returns the driver generated operation id.
  157. // This is used to link events together such as bulk write operations. OPTIONAL.
  158. operationId: ourOpId,
  159. // Returns the connection id for the command. For languages that do not have this,
  160. // this MUST return the driver equivalent which MUST include the server address and port.
  161. // The name of this field is flexible to match the object that is returned from the driver.
  162. connectionId: connection
  163. };
  164. // Filter out any sensitive commands
  165. if(senstiveCommands.indexOf(commandName.toLowerCase())) {
  166. command.commandObj = {};
  167. command.commandObj[commandName] = true;
  168. }
  169. // Emit the started event
  170. self.emit('started', command)
  171. // Start time
  172. var startTime = timestampGenerator.current();
  173. // Push our handler callback
  174. args.push(function(err, r) {
  175. var endTime = timestampGenerator.current();
  176. var command = {
  177. duration: timestampGenerator.duration(startTime, endTime),
  178. commandName: commandName,
  179. requestId: requestId,
  180. operationId: ourOpId,
  181. connectionId: connection
  182. };
  183. // If we have an error
  184. if(err || (r && r.result && r.result.ok == 0)) {
  185. command.failure = err || r.result.writeErrors || r.result;
  186. // Filter out any sensitive commands
  187. if(senstiveCommands.indexOf(commandName.toLowerCase())) {
  188. command.failure = {};
  189. }
  190. self.emit('failed', command);
  191. } else if(commandObj && commandObj.writeConcern
  192. && commandObj.writeConcern.w == 0) {
  193. // If we have write concern 0
  194. command.reply = {ok:1};
  195. self.emit('succeeded', command);
  196. } else {
  197. command.reply = r && r.result ? r.result : r;
  198. // Filter out any sensitive commands
  199. if(senstiveCommands.indexOf(commandName.toLowerCase()) != -1) {
  200. command.reply = {};
  201. }
  202. self.emit('succeeded', command);
  203. }
  204. // Return to caller
  205. callback(err, r);
  206. });
  207. // Apply the call
  208. func.apply(this, args);
  209. }
  210. });
  211. // ---------------------------------------------------------
  212. //
  213. // Bulk Operations
  214. //
  215. // ---------------------------------------------------------
  216. // Inject ourselves into the Bulk methods
  217. var methods = ['execute'];
  218. var prototypes = [
  219. require('./bulk/ordered').Bulk.prototype,
  220. require('./bulk/unordered').Bulk.prototype
  221. ]
  222. prototypes.forEach(function(proto) {
  223. // Core server method we are going to wrap
  224. methods.forEach(function(x) {
  225. var func = proto[x];
  226. // Add to overloaded methods
  227. self.overloads.push({proto: proto, name:x, func:func});
  228. // The actual prototype
  229. proto[x] = function() {
  230. var bulk = this;
  231. // Get the aruments
  232. var args = Array.prototype.slice.call(arguments, 0);
  233. // Set an operation Id on the bulk object
  234. this.operationId = operationIdGenerator.next();
  235. // Get the callback
  236. var callback = args.pop();
  237. // If we have a callback use this
  238. if(typeof callback == 'function') {
  239. args.push(function(err, r) {
  240. // Return to caller
  241. callback(err, r);
  242. });
  243. // Apply the call
  244. func.apply(this, args);
  245. } else {
  246. return func.apply(this, args);
  247. }
  248. }
  249. });
  250. });
  251. // ---------------------------------------------------------
  252. //
  253. // Cursor
  254. //
  255. // ---------------------------------------------------------
  256. // Inject ourselves into the Cursor methods
  257. var methods = ['_find', '_getmore', '_killcursor'];
  258. var prototypes = [
  259. require('./cursor').prototype,
  260. require('./command_cursor').prototype,
  261. require('./aggregation_cursor').prototype
  262. ]
  263. // Command name translation
  264. var commandTranslation = {
  265. '_find': 'find', '_getmore': 'getMore', '_killcursor': 'killCursors', '_explain': 'explain'
  266. }
  267. prototypes.forEach(function(proto) {
  268. // Core server method we are going to wrap
  269. methods.forEach(function(x) {
  270. var func = proto[x];
  271. // Add to overloaded methods
  272. self.overloads.push({proto: proto, name:x, func:func});
  273. // The actual prototype
  274. proto[x] = function() {
  275. var cursor = this;
  276. var requestId = core.Query.nextRequestId();
  277. var ourOpId = operationIdGenerator.next();
  278. var parts = this.ns.split('.');
  279. var db = parts[0];
  280. // Get the collection
  281. parts.shift();
  282. var collection = parts.join('.');
  283. // Set the command
  284. var command = this.query;
  285. var cmd = this.s.cmd;
  286. // If we have a find method, set the operationId on the cursor
  287. if(x == '_find') {
  288. cursor.operationId = ourOpId;
  289. }
  290. // Do we have a find command rewrite it
  291. if(x == '_getmore') {
  292. command = {
  293. getMore: this.cursorState.cursorId,
  294. collection: collection,
  295. batchSize: cmd.batchSize
  296. }
  297. if(cmd.maxTimeMS) command.maxTimeMS = cmd.maxTimeMS;
  298. } else if(x == '_killcursors') {
  299. command = {
  300. killCursors: collection,
  301. cursors: [this.cursorState.cursorId]
  302. }
  303. } else if(cmd.find) {
  304. command = {
  305. find: collection, filter: cmd.query
  306. }
  307. if(cmd.sort) command.sort = cmd.sort;
  308. if(cmd.fields) command.projection = cmd.fields;
  309. if(cmd.limit && cmd.limit < 0) {
  310. command.limit = Math.abs(cmd.limit);
  311. command.singleBatch = true;
  312. } else if(cmd.limit) {
  313. command.limit = Math.abs(cmd.limit);
  314. }
  315. // Options
  316. if(cmd.skip) command.skip = cmd.skip;
  317. if(cmd.hint) command.hint = cmd.hint;
  318. if(cmd.batchSize) command.batchSize = cmd.batchSize;
  319. if(typeof cmd.returnKey == 'boolean') command.returnKey = cmd.returnKey;
  320. if(cmd.comment) command.comment = cmd.comment;
  321. if(cmd.min) command.min = cmd.min;
  322. if(cmd.max) command.max = cmd.max;
  323. if(cmd.maxScan) command.maxScan = cmd.maxScan;
  324. if(cmd.maxTimeMS) command.maxTimeMS = cmd.maxTimeMS;
  325. // Flags
  326. if(typeof cmd.awaitData == 'boolean') command.awaitData = cmd.awaitData;
  327. if(typeof cmd.snapshot == 'boolean') command.snapshot = cmd.snapshot;
  328. if(typeof cmd.tailable == 'boolean') command.tailable = cmd.tailable;
  329. if(typeof cmd.oplogReplay == 'boolean') command.oplogReplay = cmd.oplogReplay;
  330. if(typeof cmd.noCursorTimeout == 'boolean') command.noCursorTimeout = cmd.noCursorTimeout;
  331. if(typeof cmd.partial == 'boolean') command.partial = cmd.partial;
  332. if(typeof cmd.showDiskLoc == 'boolean') command.showRecordId = cmd.showDiskLoc;
  333. // Read Concern
  334. if(cmd.readConcern) command.readConcern = cmd.readConcern;
  335. // Override method
  336. if(cmd.explain) command.explain = cmd.explain;
  337. if(cmd.exhaust) command.exhaust = cmd.exhaust;
  338. // If we have a explain flag
  339. if(cmd.explain) {
  340. // Create fake explain command
  341. command = {
  342. explain: command,
  343. verbosity: 'allPlansExecution'
  344. }
  345. // Set readConcern on the command if available
  346. if(cmd.readConcern) command.readConcern = cmd.readConcern
  347. // Set up the _explain name for the command
  348. x = '_explain';
  349. }
  350. } else {
  351. command = cmd;
  352. }
  353. // Set up the connection
  354. var connectionId = null;
  355. // Set local connection
  356. if(this.connection) connectionId = this.connection;
  357. if(!connectionId && this.server && this.server.getConnection) connectionId = this.server.getConnection();
  358. // Get the command Name
  359. var commandName = x == '_find' ? Object.keys(command)[0] : commandTranslation[x];
  360. // Emit the start event for the command
  361. var command = {
  362. // Returns the command.
  363. command: command,
  364. // Returns the database name.
  365. databaseName: db,
  366. // Returns the command name.
  367. commandName: commandName,
  368. // Returns the driver generated request id.
  369. requestId: requestId,
  370. // Returns the driver generated operation id.
  371. // This is used to link events together such as bulk write operations. OPTIONAL.
  372. operationId: this.operationId,
  373. // Returns the connection id for the command. For languages that do not have this,
  374. // this MUST return the driver equivalent which MUST include the server address and port.
  375. // The name of this field is flexible to match the object that is returned from the driver.
  376. connectionId: connectionId
  377. };
  378. // Get the aruments
  379. var args = Array.prototype.slice.call(arguments, 0);
  380. // Get the callback
  381. var callback = args.pop();
  382. // We do not have a callback but a Promise
  383. if(typeof callback == 'function' || command.commandName == 'killCursors') {
  384. var startTime = timestampGenerator.current();
  385. // Emit the started event
  386. self.emit('started', command)
  387. // Emit succeeded event with killcursor if we have a legacy protocol
  388. if(command.commandName == 'killCursors'
  389. && this.server.lastIsMaster()
  390. && this.server.lastIsMaster().maxWireVersion < 4) {
  391. // Emit the succeeded command
  392. var command = {
  393. duration: timestampGenerator.duration(startTime, timestampGenerator.current()),
  394. commandName: commandName,
  395. requestId: requestId,
  396. operationId: cursor.operationId,
  397. connectionId: cursor.server.getConnection(),
  398. reply: [{ok:1}]
  399. };
  400. // Emit the command
  401. return self.emit('succeeded', command)
  402. }
  403. // Add our callback handler
  404. args.push(function(err, r) {
  405. if(err) {
  406. // Command
  407. var command = {
  408. duration: timestampGenerator.duration(startTime, timestampGenerator.current()),
  409. commandName: commandName,
  410. requestId: requestId,
  411. operationId: ourOpId,
  412. connectionId: cursor.server.getConnection(),
  413. failure: err };
  414. // Emit the command
  415. self.emit('failed', command)
  416. } else {
  417. // Do we have a getMore
  418. if(commandName.toLowerCase() == 'getmore' && r == null) {
  419. r = {
  420. cursor: {
  421. id: cursor.cursorState.cursorId,
  422. ns: cursor.ns,
  423. nextBatch: cursor.cursorState.documents
  424. }, ok:1
  425. }
  426. } else if(commandName.toLowerCase() == 'find' && r == null) {
  427. r = {
  428. cursor: {
  429. id: cursor.cursorState.cursorId,
  430. ns: cursor.ns,
  431. firstBatch: cursor.cursorState.documents
  432. }, ok:1
  433. }
  434. } else if(commandName.toLowerCase() == 'killcursors' && r == null) {
  435. r = {
  436. cursorsUnknown:[cursor.cursorState.lastCursorId],
  437. ok:1
  438. }
  439. }
  440. // cursor id is zero, we can issue success command
  441. var command = {
  442. duration: timestampGenerator.duration(startTime, timestampGenerator.current()),
  443. commandName: commandName,
  444. requestId: requestId,
  445. operationId: cursor.operationId,
  446. connectionId: cursor.server.getConnection(),
  447. reply: r && r.result ? r.result : r
  448. };
  449. // Emit the command
  450. self.emit('succeeded', command)
  451. }
  452. // Return
  453. if(!callback) return;
  454. // Return to caller
  455. callback(err, r);
  456. });
  457. // Apply the call
  458. func.apply(this, args);
  459. } else {
  460. // Assume promise, push back the missing value
  461. args.push(callback);
  462. // Get the promise
  463. var promise = func.apply(this, args);
  464. // Return a new promise
  465. return new cursor.s.promiseLibrary(function(resolve, reject) {
  466. var startTime = timestampGenerator.current();
  467. // Emit the started event
  468. self.emit('started', command)
  469. // Execute the function
  470. promise.then(function(r) {
  471. // cursor id is zero, we can issue success command
  472. var command = {
  473. duration: timestampGenerator.duration(startTime, timestampGenerator.current()),
  474. commandName: commandName,
  475. requestId: requestId,
  476. operationId: cursor.operationId,
  477. connectionId: cursor.server.getConnection(),
  478. reply: cursor.cursorState.documents
  479. };
  480. // Emit the command
  481. self.emit('succeeded', command)
  482. }).catch(function(err) {
  483. // Command
  484. var command = {
  485. duration: timestampGenerator.duration(startTime, timestampGenerator.current()),
  486. commandName: commandName,
  487. requestId: requestId,
  488. operationId: ourOpId,
  489. connectionId: cursor.server.getConnection(),
  490. failure: err };
  491. // Emit the command
  492. self.emit('failed', command)
  493. // reject the promise
  494. reject(err);
  495. });
  496. });
  497. }
  498. }
  499. });
  500. });
  501. }
  502. inherits(Instrumentation, EventEmitter);
  503. Instrumentation.prototype.uninstrument = function() {
  504. for(var i = 0; i < this.overloads.length; i++) {
  505. var obj = this.overloads[i];
  506. obj.proto[obj.name] = obj.func;
  507. }
  508. // Remove all listeners
  509. this.removeAllListeners('started');
  510. this.removeAllListeners('succeeded');
  511. this.removeAllListeners('failed');
  512. }
  513. module.exports = Instrumentation;