connection.js 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395
  1. // Copyright 2015 The MemDB Authors.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
  12. // implied. See the License for the specific language governing
  13. // permissions and limitations under the License. See the AUTHORS file
  14. // for names of contributors.
  15. /* jshint ignore:start */
  16. /*!
  17. * Module dependencies.
  18. */
  19. var MongooseConnection = require('mongoose/lib/connection')
  20. , mongo = require('mongodb')
  21. , Db = mongo.Db
  22. , Server = mongo.Server
  23. , Mongos = mongo.Mongos
  24. , STATES = require('mongoose/lib/connectionstate')
  25. , ReplSetServers = mongo.ReplSet
  26. , utils = require('mongoose/lib/utils');
  27. /**
  28. * A [node-mongodb-native](https://github.com/mongodb/node-mongodb-native) connection implementation.
  29. *
  30. * @inherits Connection
  31. * @api private
  32. */
  33. function MemdbConnection() {
  34. MongooseConnection.apply(this, arguments);
  35. this._listening = false;
  36. };
  37. /**
  38. * Expose the possible connection states.
  39. * @api public
  40. */
  41. MemdbConnection.STATES = STATES;
  42. /*!
  43. * Inherits from Connection.
  44. */
  45. MemdbConnection.prototype.__proto__ = MongooseConnection.prototype;
  46. /**
  47. * Opens the connection to MongoDB.
  48. *
  49. * @param {Function} fn
  50. * @return {Connection} this
  51. * @api private
  52. */
  53. MemdbConnection.prototype.doOpen = function (fn) {
  54. if (this.db) {
  55. mute(this);
  56. }
  57. var server = new Server(this.host, this.port, this.options.server);
  58. this.db = new Db(this.name, server, this.options.db);
  59. var self = this;
  60. this.db.open(function (err) {
  61. if (err) return fn(err);
  62. listen(self);
  63. fn();
  64. });
  65. return this;
  66. };
  67. /**
  68. * Switches to a different database using the same connection pool.
  69. *
  70. * Returns a new connection object, with the new db.
  71. *
  72. * @param {String} name The database name
  73. * @return {Connection} New Connection Object
  74. * @api public
  75. */
  76. MemdbConnection.prototype.useDb = function (name) {
  77. // we have to manually copy all of the attributes...
  78. var newConn = new this.constructor();
  79. newConn.name = name;
  80. newConn.base = this.base;
  81. newConn.collections = {};
  82. newConn.models = {};
  83. newConn.replica = this.replica;
  84. newConn.hosts = this.hosts;
  85. newConn.host = this.host;
  86. newConn.port = this.port;
  87. newConn.user = this.user;
  88. newConn.pass = this.pass;
  89. newConn.options = this.options;
  90. newConn._readyState = this._readyState;
  91. newConn._closeCalled = this._closeCalled;
  92. newConn._hasOpened = this._hasOpened;
  93. newConn._listening = false;
  94. // First, when we create another db object, we are not guaranteed to have a
  95. // db object to work with. So, in the case where we have a db object and it
  96. // is connected, we can just proceed with setting everything up. However, if
  97. // we do not have a db or the state is not connected, then we need to wait on
  98. // the 'open' event of the connection before doing the rest of the setup
  99. // the 'connected' event is the first time we'll have access to the db object
  100. var self = this;
  101. if (this.db && this._readyState === STATES.connected) {
  102. wireup();
  103. } else {
  104. this.once('connected', wireup);
  105. }
  106. function wireup () {
  107. newConn.db = self.db.db(name);
  108. newConn.onOpen();
  109. // setup the events appropriately
  110. listen(newConn);
  111. }
  112. newConn.name = name;
  113. // push onto the otherDbs stack, this is used when state changes
  114. this.otherDbs.push(newConn);
  115. newConn.otherDbs.push(this);
  116. return newConn;
  117. };
  118. /*!
  119. * Register listeners for important events and bubble appropriately.
  120. */
  121. function listen (conn) {
  122. if (conn._listening) return;
  123. conn._listening = true;
  124. conn.db.on('close', function(){
  125. if (conn._closeCalled) return;
  126. // the driver never emits an `open` event. auto_reconnect still
  127. // emits a `close` event but since we never get another
  128. // `open` we can't emit close
  129. if (conn.db.serverConfig.autoReconnect) {
  130. conn.readyState = STATES.disconnected;
  131. conn.emit('close');
  132. return;
  133. }
  134. conn.onClose();
  135. });
  136. conn.db.on('error', function(err){
  137. conn.emit('error', err);
  138. });
  139. conn.db.on('reconnect', function() {
  140. conn.readyState = STATES.connected;
  141. conn.emit('reconnected');
  142. });
  143. conn.db.on('timeout', function(err){
  144. var error = new Error(err && err.err || 'connection timeout');
  145. conn.emit('error', error);
  146. });
  147. conn.db.on('open', function (err, db) {
  148. if (STATES.disconnected === conn.readyState && db && db.databaseName) {
  149. conn.readyState = STATES.connected;
  150. conn.emit('reconnected');
  151. }
  152. });
  153. conn.db.on('parseError', function(err) {
  154. conn.emit('parseError', err);
  155. });
  156. }
  157. /*!
  158. * Remove listeners registered in `listen`
  159. */
  160. function mute (conn) {
  161. if (!conn.db) throw new Error('missing db');
  162. conn.db.removeAllListeners("close");
  163. conn.db.removeAllListeners("error");
  164. conn.db.removeAllListeners("timeout");
  165. conn.db.removeAllListeners("open");
  166. conn.db.removeAllListeners("fullsetup");
  167. conn._listening = false;
  168. }
  169. /**
  170. * Opens a connection to a MongoDB ReplicaSet.
  171. *
  172. * See description of [doOpen](#MemdbConnection-doOpen) for server options. In this case `options.replset` is also passed to ReplSetServers.
  173. *
  174. * @param {Function} fn
  175. * @api private
  176. * @return {Connection} this
  177. */
  178. MemdbConnection.prototype.doOpenSet = function (fn) {
  179. if (this.db) {
  180. mute(this);
  181. }
  182. var servers = []
  183. , self = this;
  184. this.hosts.forEach(function (server) {
  185. var host = server.host || server.ipc;
  186. var port = server.port || 27017;
  187. servers.push(new Server(host, port, self.options.server));
  188. })
  189. var server = this.options.mongos
  190. ? new Mongos(servers, this.options.mongos)
  191. : new ReplSetServers(servers, this.options.replset);
  192. this.db = new Db(this.name, server, this.options.db);
  193. this.db.on('fullsetup', function () {
  194. self.emit('fullsetup')
  195. });
  196. this.db.open(function (err) {
  197. if (err) return fn(err);
  198. fn();
  199. listen(self);
  200. });
  201. return this;
  202. };
  203. /**
  204. * Closes the connection
  205. *
  206. * @param {Function} fn
  207. * @return {Connection} this
  208. * @api private
  209. */
  210. MemdbConnection.prototype.doClose = function (fn) {
  211. this.db.close();
  212. if (fn) fn();
  213. return this;
  214. }
  215. /**
  216. * Prepares default connection options for the node-mongodb-native driver.
  217. *
  218. * _NOTE: `passed` options take precedence over connection string options._
  219. *
  220. * @param {Object} passed options that were passed directly during connection
  221. * @param {Object} [connStrOptions] options that were passed in the connection string
  222. * @api private
  223. */
  224. MemdbConnection.prototype.parseOptions = function (passed, connStrOpts) {
  225. var o = passed || {};
  226. o.db || (o.db = {});
  227. o.auth || (o.auth = {});
  228. o.server || (o.server = {});
  229. o.replset || (o.replset = {});
  230. o.server.socketOptions || (o.server.socketOptions = {});
  231. o.replset.socketOptions || (o.replset.socketOptions = {});
  232. var opts = connStrOpts || {};
  233. Object.keys(opts).forEach(function (name) {
  234. switch (name) {
  235. case 'ssl':
  236. case 'poolSize':
  237. if ('undefined' == typeof o.server[name]) {
  238. o.server[name] = o.replset[name] = opts[name];
  239. }
  240. break;
  241. case 'slaveOk':
  242. if ('undefined' == typeof o.server.slave_ok) {
  243. o.server.slave_ok = opts[name];
  244. }
  245. break;
  246. case 'autoReconnect':
  247. if ('undefined' == typeof o.server.auto_reconnect) {
  248. o.server.auto_reconnect = opts[name];
  249. }
  250. break;
  251. case 'socketTimeoutMS':
  252. case 'connectTimeoutMS':
  253. if ('undefined' == typeof o.server.socketOptions[name]) {
  254. o.server.socketOptions[name] = o.replset.socketOptions[name] = opts[name];
  255. }
  256. break;
  257. case 'authdb':
  258. if ('undefined' == typeof o.auth.authdb) {
  259. o.auth.authdb = opts[name];
  260. }
  261. break;
  262. case 'authSource':
  263. if ('undefined' == typeof o.auth.authSource) {
  264. o.auth.authSource = opts[name];
  265. }
  266. break;
  267. case 'retries':
  268. case 'reconnectWait':
  269. case 'rs_name':
  270. if ('undefined' == typeof o.replset[name]) {
  271. o.replset[name] = opts[name];
  272. }
  273. break;
  274. case 'replicaSet':
  275. if ('undefined' == typeof o.replset.rs_name) {
  276. o.replset.rs_name = opts[name];
  277. }
  278. break;
  279. case 'readSecondary':
  280. if ('undefined' == typeof o.replset.read_secondary) {
  281. o.replset.read_secondary = opts[name];
  282. }
  283. break;
  284. case 'nativeParser':
  285. if ('undefined' == typeof o.db.native_parser) {
  286. o.db.native_parser = opts[name];
  287. }
  288. break;
  289. case 'w':
  290. case 'safe':
  291. case 'fsync':
  292. case 'journal':
  293. case 'wtimeoutMS':
  294. if ('undefined' == typeof o.db[name]) {
  295. o.db[name] = opts[name];
  296. }
  297. break;
  298. case 'readPreference':
  299. if ('undefined' == typeof o.db.read_preference) {
  300. o.db.read_preference = opts[name];
  301. }
  302. break;
  303. case 'readPreferenceTags':
  304. if ('undefined' == typeof o.db.read_preference_tags) {
  305. o.db.read_preference_tags = opts[name];
  306. }
  307. break;
  308. }
  309. })
  310. if (!('auto_reconnect' in o.server)) {
  311. o.server.auto_reconnect = true;
  312. }
  313. if (!o.db.read_preference) {
  314. // read from primaries by default
  315. o.db.read_preference = 'primary';
  316. }
  317. // mongoose creates its own ObjectIds
  318. o.db.forceServerObjectId = false;
  319. // default safe using new nomenclature
  320. if (!('journal' in o.db || 'j' in o.db ||
  321. 'fsync' in o.db || 'safe' in o.db || 'w' in o.db)) {
  322. o.db.w = 1;
  323. }
  324. validate(o);
  325. return o;
  326. }
  327. /*!
  328. * Validates the driver db options.
  329. *
  330. * @param {Object} o
  331. */
  332. function validate (o) {
  333. if (-1 === o.db.w || 0 === o.db.w) {
  334. if (o.db.journal || o.db.fsync || o.db.safe) {
  335. throw new Error(
  336. 'Invalid writeConcern: '
  337. + 'w set to -1 or 0 cannot be combined with safe|fsync|journal');
  338. }
  339. }
  340. }
  341. /*!
  342. * Module exports.
  343. */
  344. module.exports = MemdbConnection;