replset.js 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561
  1. "use strict";
  2. var EventEmitter = require('events').EventEmitter
  3. , inherits = require('util').inherits
  4. , f = require('util').format
  5. , Server = require('./server')
  6. , Mongos = require('./mongos')
  7. , Cursor = require('./cursor')
  8. , AggregationCursor = require('./aggregation_cursor')
  9. , CommandCursor = require('./command_cursor')
  10. , ReadPreference = require('./read_preference')
  11. , MongoCR = require('mongodb-core').MongoCR
  12. , MongoError = require('mongodb-core').MongoError
  13. , ServerCapabilities = require('./topology_base').ServerCapabilities
  14. , Store = require('./topology_base').Store
  15. , Define = require('./metadata')
  16. , CServer = require('mongodb-core').Server
  17. , CReplSet = require('mongodb-core').ReplSet
  18. , CoreReadPreference = require('mongodb-core').ReadPreference
  19. , shallowClone = require('./utils').shallowClone;
  20. /**
  21. * @fileOverview The **ReplSet** class is a class that represents a Replicaset topology and is
  22. * used to construct connections.
  23. *
  24. * **ReplSet Should not be used, use MongoClient.connect**
  25. * @example
  26. * var Db = require('mongodb').Db,
  27. * ReplSet = require('mongodb').ReplSet,
  28. * Server = require('mongodb').Server,
  29. * test = require('assert');
  30. * // Connect using ReplSet
  31. * var server = new Server('localhost', 27017);
  32. * var db = new Db('test', new ReplSet([server]));
  33. * db.open(function(err, db) {
  34. * // Get an additional db
  35. * db.close();
  36. * });
  37. */
  38. /**
  39. * Creates a new ReplSet instance
  40. * @class
  41. * @deprecated
  42. * @param {Server[]} servers A seedlist of servers participating in the replicaset.
  43. * @param {object} [options=null] Optional settings.
  44. * @param {booelan} [options.ha=true] Turn on high availability monitoring.
  45. * @param {number} [options.haInterval=5000] Time between each replicaset status check.
  46. * @param {string} options.replicaSet The name of the replicaset to connect to.
  47. * @param {number} [options.secondaryAcceptableLatencyMS=15] Sets the range of servers to pick when using NEAREST (lowest ping ms + the latency fence, ex: range of 1 to (1 + 15) ms)
  48. * @param {boolean} [options.connectWithNoPrimary=false] Sets if the driver should connect even if no primary is available
  49. * @param {number} [options.poolSize=5] Number of connections in the connection pool for each server instance, set to 5 as default for legacy reasons.
  50. * @param {boolean} [options.ssl=false] Use ssl connection (needs to have a mongod server with ssl support)
  51. * @param {boolean|function} [options.checkServerIdentity=true] Ensure we check server identify during SSL, set to false to disable checking. Only works for Node 0.12.x or higher. You can pass in a boolean or your own checkServerIdentity override function.
  52. * @param {object} [options.sslValidate=true] Validate mongod server certificate against ca (needs to have a mongod server with ssl support, 2.4 or higher)
  53. * @param {array} [options.sslCA=null] Array of valid certificates either as Buffers or Strings (needs to have a mongod server with ssl support, 2.4 or higher)
  54. * @param {(Buffer|string)} [options.sslCert=null] String or buffer containing the certificate we wish to present (needs to have a mongod server with ssl support, 2.4 or higher)
  55. * @param {(Buffer|string)} [options.sslKey=null] String or buffer containing the certificate private key we wish to present (needs to have a mongod server with ssl support, 2.4 or higher)
  56. * @param {(Buffer|string)} [options.sslPass=null] String or buffer containing the certificate password (needs to have a mongod server with ssl support, 2.4 or higher)
  57. * @param {object} [options.socketOptions=null] Socket options
  58. * @param {boolean} [options.socketOptions.noDelay=true] TCP Socket NoDelay option.
  59. * @param {number} [options.socketOptions.keepAlive=0] TCP KeepAlive on the socket with a X ms delay before start.
  60. * @param {number} [options.socketOptions.connectTimeoutMS=0] TCP Connection timeout setting
  61. * @param {number} [options.socketOptions.socketTimeoutMS=0] TCP Socket timeout setting
  62. * @fires ReplSet#connect
  63. * @fires ReplSet#ha
  64. * @fires ReplSet#joined
  65. * @fires ReplSet#left
  66. * @fires ReplSet#fullsetup
  67. * @fires ReplSet#open
  68. * @fires ReplSet#close
  69. * @fires ReplSet#error
  70. * @fires ReplSet#timeout
  71. * @fires ReplSet#parseError
  72. * @return {ReplSet} a ReplSet instance.
  73. */
  74. var ReplSet = function(servers, options) {
  75. if(!(this instanceof ReplSet)) return new ReplSet(servers, options);
  76. options = options || {};
  77. var self = this;
  78. // Ensure all the instances are Server
  79. for(var i = 0; i < servers.length; i++) {
  80. if(!(servers[i] instanceof Server)) {
  81. throw MongoError.create({message: "all seed list instances must be of the Server type", driver:true});
  82. }
  83. }
  84. // Store option defaults
  85. var storeOptions = {
  86. force: false
  87. , bufferMaxEntries: -1
  88. }
  89. // Shared global store
  90. var store = options.store || new Store(self, storeOptions);
  91. // Set up event emitter
  92. EventEmitter.call(this);
  93. // Debug tag
  94. var tag = options.tag;
  95. // Build seed list
  96. var seedlist = servers.map(function(x) {
  97. return {host: x.host, port: x.port}
  98. });
  99. // Final options
  100. var finalOptions = shallowClone(options);
  101. // Default values
  102. finalOptions.size = typeof options.poolSize == 'number' ? options.poolSize : 5;
  103. finalOptions.reconnect = typeof options.auto_reconnect == 'boolean' ? options.auto_reconnect : true;
  104. finalOptions.emitError = typeof options.emitError == 'boolean' ? options.emitError : true;
  105. finalOptions.cursorFactory = Cursor;
  106. // Add the store
  107. finalOptions.disconnectHandler = store;
  108. // Socket options passed down
  109. if(options.socketOptions) {
  110. if(options.socketOptions.connectTimeoutMS) {
  111. this.connectTimeoutMS = options.socketOptions.connectTimeoutMS;
  112. finalOptions.connectionTimeout = options.socketOptions.connectTimeoutMS;
  113. }
  114. if(options.socketOptions.socketTimeoutMS) {
  115. finalOptions.socketTimeout = options.socketOptions.socketTimeoutMS;
  116. }
  117. }
  118. // Get the name
  119. var replicaSet = options.replicaSet || options.rs_name;
  120. // Set up options
  121. finalOptions.setName = replicaSet;
  122. // Are we running in debug mode
  123. var debug = typeof options.debug == 'boolean' ? options.debug : false;
  124. if(debug) {
  125. finalOptions.debug = debug;
  126. }
  127. // Map keep alive setting
  128. if(options.socketOptions && typeof options.socketOptions.keepAlive == 'number') {
  129. finalOptions.keepAlive = true;
  130. if(typeof options.socketOptions.keepAlive == 'number') {
  131. finalOptions.keepAliveInitialDelay = options.socketOptions.keepAlive;
  132. }
  133. }
  134. // Connection timeout
  135. if(options.socketOptions && typeof options.socketOptions.connectionTimeout == 'number') {
  136. finalOptions.connectionTimeout = options.socketOptions.connectionTimeout;
  137. }
  138. // Socket timeout
  139. if(options.socketOptions && typeof options.socketOptions.socketTimeout == 'number') {
  140. finalOptions.socketTimeout = options.socketOptions.socketTimeout;
  141. }
  142. // noDelay
  143. if(options.socketOptions && typeof options.socketOptions.noDelay == 'boolean') {
  144. finalOptions.noDelay = options.socketOptions.noDelay;
  145. }
  146. if(typeof options.secondaryAcceptableLatencyMS == 'number') {
  147. finalOptions.acceptableLatency = options.secondaryAcceptableLatencyMS;
  148. }
  149. if(options.connectWithNoPrimary == true) {
  150. finalOptions.secondaryOnlyConnectionAllowed = true;
  151. }
  152. // Add the non connection store
  153. finalOptions.disconnectHandler = store;
  154. // Translate the options
  155. if(options.sslCA) finalOptions.ca = options.sslCA;
  156. if(typeof options.sslValidate == 'boolean') finalOptions.rejectUnauthorized = options.sslValidate;
  157. if(options.sslKey) finalOptions.key = options.sslKey;
  158. if(options.sslCert) finalOptions.cert = options.sslCert;
  159. if(options.sslPass) finalOptions.passphrase = options.sslPass;
  160. if(options.checkServerIdentity) finalOptions.checkServerIdentity = options.checkServerIdentity;
  161. // Create the ReplSet
  162. var replset = new CReplSet(seedlist, finalOptions)
  163. // Server capabilities
  164. var sCapabilities = null;
  165. // Add auth prbufferMaxEntriesoviders
  166. replset.addAuthProvider('mongocr', new MongoCR());
  167. // Listen to reconnect event
  168. replset.on('reconnect', function() {
  169. self.emit('reconnect');
  170. store.execute();
  171. });
  172. // Internal state
  173. this.s = {
  174. // Replicaset
  175. replset: replset
  176. // Server capabilities
  177. , sCapabilities: null
  178. // Debug tag
  179. , tag: options.tag
  180. // Store options
  181. , storeOptions: storeOptions
  182. // Cloned options
  183. , clonedOptions: finalOptions
  184. // Store
  185. , store: store
  186. // Options
  187. , options: options
  188. }
  189. // Debug
  190. if(debug) {
  191. // Last ismaster
  192. Object.defineProperty(this, 'replset', {
  193. enumerable:true, get: function() { return replset; }
  194. });
  195. }
  196. // Last ismaster
  197. Object.defineProperty(this, 'isMasterDoc', {
  198. enumerable:true, get: function() { return replset.lastIsMaster(); }
  199. });
  200. // BSON property
  201. Object.defineProperty(this, 'bson', {
  202. enumerable: true, get: function() {
  203. return replset.bson;
  204. }
  205. });
  206. Object.defineProperty(this, 'haInterval', {
  207. enumerable:true, get: function() { return replset.haInterval; }
  208. });
  209. }
  210. /**
  211. * @ignore
  212. */
  213. inherits(ReplSet, EventEmitter);
  214. var define = ReplSet.define = new Define('ReplSet', ReplSet, false);
  215. // Ensure the right read Preference object
  216. var translateReadPreference = function(options) {
  217. if(typeof options.readPreference == 'string') {
  218. options.readPreference = new CoreReadPreference(options.readPreference);
  219. } else if(options.readPreference instanceof ReadPreference) {
  220. options.readPreference = new CoreReadPreference(options.readPreference.mode
  221. , options.readPreference.tags);
  222. }
  223. return options;
  224. }
  225. ReplSet.prototype.parserType = function() {
  226. return this.s.replset.parserType();
  227. }
  228. define.classMethod('parserType', {callback: false, promise:false, returns: [String]});
  229. // Connect method
  230. ReplSet.prototype.connect = function(db, _options, callback) {
  231. var self = this;
  232. if('function' === typeof _options) callback = _options, _options = {};
  233. if(_options == null) _options = {};
  234. if(!('function' === typeof callback)) callback = null;
  235. self.s.options = _options;
  236. // Update bufferMaxEntries
  237. self.s.storeOptions.bufferMaxEntries = db.bufferMaxEntries;
  238. // Actual handler
  239. var errorHandler = function(event) {
  240. return function(err) {
  241. if(event != 'error') {
  242. self.emit(event, err);
  243. }
  244. }
  245. }
  246. // Connect handler
  247. var connectHandler = function() {
  248. // Clear out all the current handlers left over
  249. ["timeout", "error", "close"].forEach(function(e) {
  250. self.s.replset.removeAllListeners(e);
  251. });
  252. // Set up listeners
  253. self.s.replset.once('timeout', errorHandler('timeout'));
  254. self.s.replset.once('error', errorHandler('error'));
  255. self.s.replset.once('close', errorHandler('close'));
  256. // relay the event
  257. var relay = function(event) {
  258. return function(t, server) {
  259. self.emit(event, t, server);
  260. }
  261. }
  262. // Replset events relay
  263. var replsetRelay = function(event) {
  264. return function(t, server) {
  265. self.emit(event, t, server.lastIsMaster(), server);
  266. }
  267. }
  268. // Relay ha
  269. var relayHa = function(t, state) {
  270. self.emit('ha', t, state);
  271. if(t == 'start') {
  272. self.emit('ha_connect', t, state);
  273. } else if(t == 'end') {
  274. self.emit('ha_ismaster', t, state);
  275. }
  276. }
  277. // Set up serverConfig listeners
  278. self.s.replset.on('joined', replsetRelay('joined'));
  279. self.s.replset.on('left', relay('left'));
  280. self.s.replset.on('ping', relay('ping'));
  281. self.s.replset.on('ha', relayHa);
  282. self.s.replset.on('fullsetup', function(topology) {
  283. self.emit('fullsetup', null, self);
  284. });
  285. self.s.replset.on('all', function(topology) {
  286. self.emit('all', null, self);
  287. });
  288. // Emit open event
  289. self.emit('open', null, self);
  290. // Return correctly
  291. try {
  292. callback(null, self);
  293. } catch(err) {
  294. process.nextTick(function() { throw err; })
  295. }
  296. }
  297. // Error handler
  298. var connectErrorHandler = function(event) {
  299. return function(err) {
  300. ['timeout', 'error', 'close'].forEach(function(e) {
  301. self.s.replset.removeListener(e, connectErrorHandler);
  302. });
  303. self.s.replset.removeListener('connect', connectErrorHandler);
  304. // Destroy the replset
  305. self.s.replset.destroy();
  306. // Try to callback
  307. try {
  308. callback(err);
  309. } catch(err) {
  310. if(!self.s.replset.isConnected())
  311. process.nextTick(function() { throw err; })
  312. }
  313. }
  314. }
  315. // Set up listeners
  316. self.s.replset.once('timeout', connectErrorHandler('timeout'));
  317. self.s.replset.once('error', connectErrorHandler('error'));
  318. self.s.replset.once('close', connectErrorHandler('close'));
  319. self.s.replset.once('connect', connectHandler);
  320. // Start connection
  321. self.s.replset.connect(_options);
  322. }
  323. // Server capabilities
  324. ReplSet.prototype.capabilities = function() {
  325. if(this.s.sCapabilities) return this.s.sCapabilities;
  326. if(this.s.replset.lastIsMaster() == null) return null;
  327. this.s.sCapabilities = new ServerCapabilities(this.s.replset.lastIsMaster());
  328. return this.s.sCapabilities;
  329. }
  330. define.classMethod('capabilities', {callback: false, promise:false, returns: [ServerCapabilities]});
  331. // Command
  332. ReplSet.prototype.command = function(ns, cmd, options, callback) {
  333. options = translateReadPreference(options);
  334. this.s.replset.command(ns, cmd, options, callback);
  335. }
  336. define.classMethod('command', {callback: true, promise:false});
  337. // Insert
  338. ReplSet.prototype.insert = function(ns, ops, options, callback) {
  339. this.s.replset.insert(ns, ops, options, callback);
  340. }
  341. define.classMethod('insert', {callback: true, promise:false});
  342. // Update
  343. ReplSet.prototype.update = function(ns, ops, options, callback) {
  344. this.s.replset.update(ns, ops, options, callback);
  345. }
  346. define.classMethod('update', {callback: true, promise:false});
  347. // Remove
  348. ReplSet.prototype.remove = function(ns, ops, options, callback) {
  349. this.s.replset.remove(ns, ops, options, callback);
  350. }
  351. define.classMethod('remove', {callback: true, promise:false});
  352. // Destroyed
  353. ReplSet.prototype.isDestroyed = function() {
  354. return this.s.replset.isDestroyed();
  355. }
  356. // IsConnected
  357. ReplSet.prototype.isConnected = function() {
  358. return this.s.replset.isConnected();
  359. }
  360. define.classMethod('isConnected', {callback: false, promise:false, returns: [Boolean]});
  361. ReplSet.prototype.setBSONParserType = function(type) {
  362. return this.s.replset.setBSONParserType(type);
  363. }
  364. // Insert
  365. ReplSet.prototype.cursor = function(ns, cmd, options) {
  366. options = translateReadPreference(options);
  367. options.disconnectHandler = this.s.store;
  368. return this.s.replset.cursor(ns, cmd, options);
  369. }
  370. define.classMethod('cursor', {callback: false, promise:false, returns: [Cursor, AggregationCursor, CommandCursor]});
  371. ReplSet.prototype.lastIsMaster = function() {
  372. return this.s.replset.lastIsMaster();
  373. }
  374. ReplSet.prototype.close = function(forceClosed) {
  375. var self = this;
  376. this.s.replset.destroy();
  377. // We need to wash out all stored processes
  378. if(forceClosed == true) {
  379. this.s.storeOptions.force = forceClosed;
  380. this.s.store.flush();
  381. }
  382. var events = ['timeout', 'error', 'close', 'joined', 'left'];
  383. events.forEach(function(e) {
  384. self.removeAllListeners(e);
  385. });
  386. }
  387. define.classMethod('close', {callback: false, promise:false});
  388. ReplSet.prototype.auth = function() {
  389. var args = Array.prototype.slice.call(arguments, 0);
  390. this.s.replset.auth.apply(this.s.replset, args);
  391. }
  392. define.classMethod('auth', {callback: true, promise:false});
  393. /**
  394. * All raw connections
  395. * @method
  396. * @return {array}
  397. */
  398. ReplSet.prototype.connections = function() {
  399. return this.s.replset.connections();
  400. }
  401. define.classMethod('connections', {callback: false, promise:false, returns:[Array]});
  402. /**
  403. * A replset connect event, used to verify that the connection is up and running
  404. *
  405. * @event ReplSet#connect
  406. * @type {ReplSet}
  407. */
  408. /**
  409. * The replset high availability event
  410. *
  411. * @event ReplSet#ha
  412. * @type {function}
  413. * @param {string} type The stage in the high availability event (start|end)
  414. * @param {boolean} data.norepeat This is a repeating high availability process or a single execution only
  415. * @param {number} data.id The id for this high availability request
  416. * @param {object} data.state An object containing the information about the current replicaset
  417. */
  418. /**
  419. * A server member left the replicaset
  420. *
  421. * @event ReplSet#left
  422. * @type {function}
  423. * @param {string} type The type of member that left (primary|secondary|arbiter)
  424. * @param {Server} server The server object that left
  425. */
  426. /**
  427. * A server member joined the replicaset
  428. *
  429. * @event ReplSet#joined
  430. * @type {function}
  431. * @param {string} type The type of member that joined (primary|secondary|arbiter)
  432. * @param {Server} server The server object that joined
  433. */
  434. /**
  435. * ReplSet open event, emitted when replicaset can start processing commands.
  436. *
  437. * @event ReplSet#open
  438. * @type {Replset}
  439. */
  440. /**
  441. * ReplSet fullsetup event, emitted when all servers in the topology have been connected to.
  442. *
  443. * @event ReplSet#fullsetup
  444. * @type {Replset}
  445. */
  446. /**
  447. * ReplSet close event
  448. *
  449. * @event ReplSet#close
  450. * @type {object}
  451. */
  452. /**
  453. * ReplSet error event, emitted if there is an error listener.
  454. *
  455. * @event ReplSet#error
  456. * @type {MongoError}
  457. */
  458. /**
  459. * ReplSet timeout event
  460. *
  461. * @event ReplSet#timeout
  462. * @type {object}
  463. */
  464. /**
  465. * ReplSet parseError event
  466. *
  467. * @event ReplSet#parseError
  468. * @type {object}
  469. */
  470. module.exports = ReplSet;