mongos.js 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499
  1. "use strict";
  2. var EventEmitter = require('events').EventEmitter
  3. , inherits = require('util').inherits
  4. , f = require('util').format
  5. , ServerCapabilities = require('./topology_base').ServerCapabilities
  6. , MongoCR = require('mongodb-core').MongoCR
  7. , MongoError = require('mongodb-core').MongoError
  8. , CMongos = require('mongodb-core').Mongos
  9. , Cursor = require('./cursor')
  10. , AggregationCursor = require('./aggregation_cursor')
  11. , CommandCursor = require('./command_cursor')
  12. , Define = require('./metadata')
  13. , Server = require('./server')
  14. , Store = require('./topology_base').Store
  15. , shallowClone = require('./utils').shallowClone;
  16. /**
  17. * @fileOverview The **Mongos** class is a class that represents a Mongos Proxy topology and is
  18. * used to construct connections.
  19. *
  20. * **Mongos Should not be used, use MongoClient.connect**
  21. * @example
  22. * var Db = require('mongodb').Db,
  23. * Mongos = require('mongodb').Mongos,
  24. * Server = require('mongodb').Server,
  25. * test = require('assert');
  26. * // Connect using Mongos
  27. * var server = new Server('localhost', 27017);
  28. * var db = new Db('test', new Mongos([server]));
  29. * db.open(function(err, db) {
  30. * // Get an additional db
  31. * db.close();
  32. * });
  33. */
  34. /**
  35. * Creates a new Mongos instance
  36. * @class
  37. * @deprecated
  38. * @param {Server[]} servers A seedlist of servers participating in the replicaset.
  39. * @param {object} [options=null] Optional settings.
  40. * @param {booelan} [options.ha=true] Turn on high availability monitoring.
  41. * @param {number} [options.haInterval=5000] Time between each replicaset status check.
  42. * @param {number} [options.poolSize=5] Number of connections in the connection pool for each server instance, set to 5 as default for legacy reasons.
  43. * @param {boolean} [options.ssl=false] Use ssl connection (needs to have a mongod server with ssl support)
  44. * @param {boolean|function} [options.checkServerIdentity=true] Ensure we check server identify during SSL, set to false to disable checking. Only works for Node 0.12.x or higher. You can pass in a boolean or your own checkServerIdentity override function.
  45. * @param {object} [options.sslValidate=true] Validate mongod server certificate against ca (needs to have a mongod server with ssl support, 2.4 or higher)
  46. * @param {array} [options.sslCA=null] Array of valid certificates either as Buffers or Strings (needs to have a mongod server with ssl support, 2.4 or higher)
  47. * @param {(Buffer|string)} [options.sslCert=null] String or buffer containing the certificate we wish to present (needs to have a mongod server with ssl support, 2.4 or higher)
  48. * @param {(Buffer|string)} [options.sslKey=null] String or buffer containing the certificate private key we wish to present (needs to have a mongod server with ssl support, 2.4 or higher)
  49. * @param {(Buffer|string)} [options.sslPass=null] String or buffer containing the certificate password (needs to have a mongod server with ssl support, 2.4 or higher)
  50. * @param {object} [options.socketOptions=null] Socket options
  51. * @param {boolean} [options.socketOptions.noDelay=true] TCP Socket NoDelay option.
  52. * @param {number} [options.socketOptions.keepAlive=0] TCP KeepAlive on the socket with a X ms delay before start.
  53. * @param {number} [options.socketOptions.connectTimeoutMS=0] TCP Connection timeout setting
  54. * @param {number} [options.socketOptions.socketTimeoutMS=0] TCP Socket timeout setting
  55. * @fires Mongos#connect
  56. * @fires Mongos#ha
  57. * @fires Mongos#joined
  58. * @fires Mongos#left
  59. * @fires Mongos#fullsetup
  60. * @fires Mongos#open
  61. * @fires Mongos#close
  62. * @fires Mongos#error
  63. * @fires Mongos#timeout
  64. * @fires Mongos#parseError
  65. * @return {Mongos} a Mongos instance.
  66. */
  67. var Mongos = function(servers, options) {
  68. if(!(this instanceof Mongos)) return new Mongos(servers, options);
  69. options = options || {};
  70. var self = this;
  71. // Ensure all the instances are Server
  72. for(var i = 0; i < servers.length; i++) {
  73. if(!(servers[i] instanceof Server)) {
  74. throw MongoError.create({message: "all seed list instances must be of the Server type", driver:true});
  75. }
  76. }
  77. // Store option defaults
  78. var storeOptions = {
  79. force: false
  80. , bufferMaxEntries: -1
  81. }
  82. // Shared global store
  83. var store = options.store || new Store(self, storeOptions);
  84. // Set up event emitter
  85. EventEmitter.call(this);
  86. // Debug tag
  87. var tag = options.tag;
  88. // Build seed list
  89. var seedlist = servers.map(function(x) {
  90. return {host: x.host, port: x.port}
  91. });
  92. // Final options
  93. var finalOptions = shallowClone(options);
  94. // Default values
  95. finalOptions.size = typeof options.poolSize == 'number' ? options.poolSize : 5;
  96. finalOptions.reconnect = typeof options.auto_reconnect == 'boolean' ? options.auto_reconnect : true;
  97. finalOptions.emitError = typeof options.emitError == 'boolean' ? options.emitError : true;
  98. finalOptions.cursorFactory = Cursor;
  99. // Add the store
  100. finalOptions.disconnectHandler = store;
  101. // Ensure we change the sslCA option to ca if available
  102. if(options.sslCA) finalOptions.ca = options.sslCA;
  103. if(typeof options.sslValidate == 'boolean') finalOptions.rejectUnauthorized = options.sslValidate;
  104. if(options.sslKey) finalOptions.key = options.sslKey;
  105. if(options.sslCert) finalOptions.cert = options.sslCert;
  106. if(options.sslPass) finalOptions.passphrase = options.sslPass;
  107. if(options.checkServerIdentity) finalOptions.checkServerIdentity = options.checkServerIdentity;
  108. // Socket options passed down
  109. if(options.socketOptions) {
  110. if(options.socketOptions.connectTimeoutMS) {
  111. this.connectTimeoutMS = options.socketOptions.connectTimeoutMS;
  112. finalOptions.connectionTimeout = options.socketOptions.connectTimeoutMS;
  113. }
  114. if(options.socketOptions.socketTimeoutMS)
  115. finalOptions.socketTimeout = options.socketOptions.socketTimeoutMS;
  116. }
  117. // Are we running in debug mode
  118. var debug = typeof options.debug == 'boolean' ? options.debug : false;
  119. if(debug) {
  120. finalOptions.debug = debug;
  121. }
  122. // Map keep alive setting
  123. if(options.socketOptions && typeof options.socketOptions.keepAlive == 'number') {
  124. finalOptions.keepAlive = true;
  125. if(typeof options.socketOptions.keepAlive == 'number') {
  126. finalOptions.keepAliveInitialDelay = options.socketOptions.keepAlive;
  127. }
  128. }
  129. // Connection timeout
  130. if(options.socketOptions && typeof options.socketOptions.connectionTimeout == 'number') {
  131. finalOptions.connectionTimeout = options.socketOptions.connectionTimeout;
  132. }
  133. // Socket timeout
  134. if(options.socketOptions && typeof options.socketOptions.socketTimeout == 'number') {
  135. finalOptions.socketTimeout = options.socketOptions.socketTimeout;
  136. }
  137. // noDelay
  138. if(options.socketOptions && typeof options.socketOptions.noDelay == 'boolean') {
  139. finalOptions.noDelay = options.socketOptions.noDelay;
  140. }
  141. if(typeof options.secondaryAcceptableLatencyMS == 'number') {
  142. finalOptions.acceptableLatency = options.secondaryAcceptableLatencyMS;
  143. }
  144. // Add the non connection store
  145. finalOptions.disconnectHandler = store;
  146. // Create the Mongos
  147. var mongos = new CMongos(seedlist, finalOptions)
  148. // Server capabilities
  149. var sCapabilities = null;
  150. // Add auth prbufferMaxEntriesoviders
  151. mongos.addAuthProvider('mongocr', new MongoCR());
  152. // Internal state
  153. this.s = {
  154. // Create the Mongos
  155. mongos: mongos
  156. // Server capabilities
  157. , sCapabilities: sCapabilities
  158. // Debug turned on
  159. , debug: debug
  160. // Store option defaults
  161. , storeOptions: storeOptions
  162. // Cloned options
  163. , clonedOptions: finalOptions
  164. // Actual store of callbacks
  165. , store: store
  166. // Options
  167. , options: options
  168. }
  169. // Last ismaster
  170. Object.defineProperty(this, 'isMasterDoc', {
  171. enumerable:true, get: function() { return self.s.mongos.lastIsMaster(); }
  172. });
  173. // Last ismaster
  174. Object.defineProperty(this, 'numberOfConnectedServers', {
  175. enumerable:true, get: function() {
  176. return self.s.mongos.s.mongosState.connectedServers().length;
  177. }
  178. });
  179. // BSON property
  180. Object.defineProperty(this, 'bson', {
  181. enumerable: true, get: function() {
  182. return self.s.mongos.bson;
  183. }
  184. });
  185. Object.defineProperty(this, 'haInterval', {
  186. enumerable:true, get: function() { return self.s.mongos.haInterval; }
  187. });
  188. }
  189. /**
  190. * @ignore
  191. */
  192. inherits(Mongos, EventEmitter);
  193. var define = Mongos.define = new Define('Mongos', Mongos, false);
  194. // Connect
  195. Mongos.prototype.connect = function(db, _options, callback) {
  196. var self = this;
  197. if('function' === typeof _options) callback = _options, _options = {};
  198. if(_options == null) _options = {};
  199. if(!('function' === typeof callback)) callback = null;
  200. self.s.options = _options;
  201. // Update bufferMaxEntries
  202. self.s.storeOptions.bufferMaxEntries = db.bufferMaxEntries;
  203. // Error handler
  204. var connectErrorHandler = function(event) {
  205. return function(err) {
  206. // Remove all event handlers
  207. var events = ['timeout', 'error', 'close'];
  208. events.forEach(function(e) {
  209. self.removeListener(e, connectErrorHandler);
  210. });
  211. self.s.mongos.removeListener('connect', connectErrorHandler);
  212. // Try to callback
  213. try {
  214. callback(err);
  215. } catch(err) {
  216. process.nextTick(function() { throw err; })
  217. }
  218. }
  219. }
  220. // Actual handler
  221. var errorHandler = function(event) {
  222. return function(err) {
  223. if(event != 'error') {
  224. self.emit(event, err);
  225. }
  226. }
  227. }
  228. // Error handler
  229. var reconnectHandler = function(err) {
  230. self.emit('reconnect');
  231. self.s.store.execute();
  232. }
  233. // Connect handler
  234. var connectHandler = function() {
  235. // Clear out all the current handlers left over
  236. ["timeout", "error", "close"].forEach(function(e) {
  237. self.s.mongos.removeAllListeners(e);
  238. });
  239. // Set up listeners
  240. self.s.mongos.once('timeout', errorHandler('timeout'));
  241. self.s.mongos.once('error', errorHandler('error'));
  242. self.s.mongos.once('close', errorHandler('close'));
  243. // relay the event
  244. var relay = function(event) {
  245. return function(t, server) {
  246. self.emit(event, t, server);
  247. }
  248. }
  249. // Set up serverConfig listeners
  250. self.s.mongos.on('joined', relay('joined'));
  251. self.s.mongos.on('left', relay('left'));
  252. self.s.mongos.on('fullsetup', relay('fullsetup'));
  253. // Emit open event
  254. self.emit('open', null, self);
  255. // Return correctly
  256. try {
  257. callback(null, self);
  258. } catch(err) {
  259. process.nextTick(function() { throw err; })
  260. }
  261. }
  262. // Set up listeners
  263. self.s.mongos.once('timeout', connectErrorHandler('timeout'));
  264. self.s.mongos.once('error', connectErrorHandler('error'));
  265. self.s.mongos.once('close', connectErrorHandler('close'));
  266. self.s.mongos.once('connect', connectHandler);
  267. // Reconnect server
  268. self.s.mongos.on('reconnect', reconnectHandler);
  269. // Start connection
  270. self.s.mongos.connect(_options);
  271. }
  272. Mongos.prototype.parserType = function() {
  273. return this.s.mongos.parserType();
  274. }
  275. define.classMethod('parserType', {callback: false, promise:false, returns: [String]});
  276. // Server capabilities
  277. Mongos.prototype.capabilities = function() {
  278. if(this.s.sCapabilities) return this.s.sCapabilities;
  279. if(this.s.mongos.lastIsMaster() == null) return null;
  280. this.s.sCapabilities = new ServerCapabilities(this.s.mongos.lastIsMaster());
  281. return this.s.sCapabilities;
  282. }
  283. define.classMethod('capabilities', {callback: false, promise:false, returns: [ServerCapabilities]});
  284. // Command
  285. Mongos.prototype.command = function(ns, cmd, options, callback) {
  286. this.s.mongos.command(ns, cmd, options, callback);
  287. }
  288. define.classMethod('command', {callback: true, promise:false});
  289. // Insert
  290. Mongos.prototype.insert = function(ns, ops, options, callback) {
  291. this.s.mongos.insert(ns, ops, options, function(e, m) {
  292. callback(e, m)
  293. });
  294. }
  295. define.classMethod('insert', {callback: true, promise:false});
  296. // Update
  297. Mongos.prototype.update = function(ns, ops, options, callback) {
  298. this.s.mongos.update(ns, ops, options, callback);
  299. }
  300. define.classMethod('update', {callback: true, promise:false});
  301. // Remove
  302. Mongos.prototype.remove = function(ns, ops, options, callback) {
  303. this.s.mongos.remove(ns, ops, options, callback);
  304. }
  305. define.classMethod('remove', {callback: true, promise:false});
  306. // Destroyed
  307. Mongos.prototype.isDestroyed = function() {
  308. return this.s.mongos.isDestroyed();
  309. }
  310. // IsConnected
  311. Mongos.prototype.isConnected = function() {
  312. return this.s.mongos.isConnected();
  313. }
  314. define.classMethod('isConnected', {callback: false, promise:false, returns: [Boolean]});
  315. // Insert
  316. Mongos.prototype.cursor = function(ns, cmd, options) {
  317. options.disconnectHandler = this.s.store;
  318. return this.s.mongos.cursor(ns, cmd, options);
  319. }
  320. define.classMethod('cursor', {callback: false, promise:false, returns: [Cursor, AggregationCursor, CommandCursor]});
  321. Mongos.prototype.setBSONParserType = function(type) {
  322. return this.s.mongos.setBSONParserType(type);
  323. }
  324. Mongos.prototype.lastIsMaster = function() {
  325. return this.s.mongos.lastIsMaster();
  326. }
  327. Mongos.prototype.close = function(forceClosed) {
  328. this.s.mongos.destroy();
  329. // We need to wash out all stored processes
  330. if(forceClosed == true) {
  331. this.s.storeOptions.force = forceClosed;
  332. this.s.store.flush();
  333. }
  334. }
  335. define.classMethod('close', {callback: false, promise:false});
  336. Mongos.prototype.auth = function() {
  337. var args = Array.prototype.slice.call(arguments, 0);
  338. this.s.mongos.auth.apply(this.s.mongos, args);
  339. }
  340. define.classMethod('auth', {callback: true, promise:false});
  341. /**
  342. * All raw connections
  343. * @method
  344. * @return {array}
  345. */
  346. Mongos.prototype.connections = function() {
  347. return this.s.mongos.connections();
  348. }
  349. define.classMethod('connections', {callback: false, promise:false, returns:[Array]});
  350. /**
  351. * A mongos connect event, used to verify that the connection is up and running
  352. *
  353. * @event Mongos#connect
  354. * @type {Mongos}
  355. */
  356. /**
  357. * The mongos high availability event
  358. *
  359. * @event Mongos#ha
  360. * @type {function}
  361. * @param {string} type The stage in the high availability event (start|end)
  362. * @param {boolean} data.norepeat This is a repeating high availability process or a single execution only
  363. * @param {number} data.id The id for this high availability request
  364. * @param {object} data.state An object containing the information about the current replicaset
  365. */
  366. /**
  367. * A server member left the mongos set
  368. *
  369. * @event Mongos#left
  370. * @type {function}
  371. * @param {string} type The type of member that left (primary|secondary|arbiter)
  372. * @param {Server} server The server object that left
  373. */
  374. /**
  375. * A server member joined the mongos set
  376. *
  377. * @event Mongos#joined
  378. * @type {function}
  379. * @param {string} type The type of member that joined (primary|secondary|arbiter)
  380. * @param {Server} server The server object that joined
  381. */
  382. /**
  383. * Mongos fullsetup event, emitted when all proxies in the topology have been connected to.
  384. *
  385. * @event Mongos#fullsetup
  386. * @type {Mongos}
  387. */
  388. /**
  389. * Mongos open event, emitted when mongos can start processing commands.
  390. *
  391. * @event Mongos#open
  392. * @type {Mongos}
  393. */
  394. /**
  395. * Mongos close event
  396. *
  397. * @event Mongos#close
  398. * @type {object}
  399. */
  400. /**
  401. * Mongos error event, emitted if there is an error listener.
  402. *
  403. * @event Mongos#error
  404. * @type {MongoError}
  405. */
  406. /**
  407. * Mongos timeout event
  408. *
  409. * @event Mongos#timeout
  410. * @type {object}
  411. */
  412. /**
  413. * Mongos parseError event
  414. *
  415. * @event Mongos#parseError
  416. * @type {object}
  417. */
  418. module.exports = Mongos;