autoconnection.js 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369
  1. // Copyright 2015 The MemDB Authors.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
  12. // implied. See the License for the specific language governing
  13. // permissions and limitations under the License. See the AUTHORS file
  14. // for names of contributors.
  15. 'use strict';
  16. var _ = require('lodash');
  17. var domain = require('domain');
  18. var P = require('bluebird');
  19. var Connection = require('./connection');
  20. var AsyncLock = require('async-lock');
  21. var consts = require('./consts');
  22. var uuid = require('node-uuid');
  23. var logger = require('memdb-logger').getLogger('memdb-client', __filename);
  24. // Max connections per shard (max concurrent transactions per shard)
  25. var DEFAULT_MAX_CONNECTION = 32;
  26. // Idle time before close connection
  27. var DEFAULT_CONNECTION_IDLE_TIMEOUT = 600 * 1000;
  28. // Max pending tasks per shard
  29. var DEFAULT_MAX_PENDING_TASK = 2048;
  30. var DEFAULT_RECONNECT_INTERVAL = 2000;
  31. // Use one connection per transaction
  32. // Route request to shards
  33. var AutoConnection = function(opts){
  34. opts = opts || {};
  35. this.db = opts.db;
  36. this.config = {
  37. maxConnection : opts.maxConnection || DEFAULT_MAX_CONNECTION,
  38. connectionIdleTimeout : opts.connectionIdleTimeout || DEFAULT_CONNECTION_IDLE_TIMEOUT,
  39. maxPendingTask : opts.maxPendingTask || DEFAULT_MAX_PENDING_TASK,
  40. reconnectInterval : opts.reconnectInterval || DEFAULT_RECONNECT_INTERVAL,
  41. // allow concurrent request inside one connection, internal use only
  42. concurrentInConnection : opts.concurrentInConnection || false,
  43. // {shardId : {host : '127.0.0.1', port : 31017}}
  44. shards : opts.shards || {},
  45. };
  46. if(this.config.concurrentInConnection){
  47. this.config.maxConnection = 1;
  48. }
  49. var shardIds = Object.keys(this.config.shards);
  50. if(shardIds.length === 0){
  51. throw new Error('please specify opts.shards');
  52. }
  53. var shards = {};
  54. shardIds.forEach(function(shardId){
  55. shards[shardId] = {
  56. connections : {}, // {connId : connection}
  57. freeConnections : {}, // {connId : true}
  58. connectionTimeouts : {}, // {connId : timeout}
  59. pendingTasks : [],
  60. reconnectInterval : null,
  61. };
  62. });
  63. this.shards = shards;
  64. this.openConnectionLock = new AsyncLock({Promise : P, maxPending : 10000});
  65. this.collections = {};
  66. };
  67. var proto = AutoConnection.prototype;
  68. proto.close = function(){
  69. var self = this;
  70. // Close all connections to all shards
  71. return P.map(Object.keys(this.shards), function(shardId){
  72. var shard = self.shards[shardId];
  73. clearInterval(shard.reconnectInterval);
  74. shard.reconnectInterval = null;
  75. // reject all pending tasks
  76. shard.pendingTasks.forEach(function(task){
  77. task.deferred.reject(new Error('connection closed'));
  78. });
  79. shard.pendingTasks = [];
  80. // close all connections
  81. var connections = shard.connections;
  82. return P.map(Object.keys(connections), function(connId){
  83. var conn = connections[connId];
  84. if(conn){
  85. return conn.close();
  86. }
  87. });
  88. })
  89. .then(function(){
  90. logger.info('autoConnection closed');
  91. })
  92. .catch(function(err){
  93. logger.error(err.stack);
  94. });
  95. };
  96. proto.openConnection = function(shardId){
  97. var self = this;
  98. return this.openConnectionLock.acquire(shardId, function(){
  99. var shard = self._shard(shardId);
  100. if(Object.keys(shard.connections).length >= self.config.maxConnection){
  101. return;
  102. }
  103. var conn = new Connection({
  104. host : self.config.shards[shardId].host,
  105. port : self.config.shards[shardId].port,
  106. idleTimeout : self.config.connectionIdleTimeout
  107. });
  108. return conn.connect()
  109. .then(function(connId){
  110. clearInterval(shard.reconnectInterval);
  111. shard.reconnectInterval = null;
  112. shard.connections[connId] = conn;
  113. logger.info('[shard:%s][conn:%s] open connection', shardId, connId);
  114. shard.freeConnections[connId] = true;
  115. conn.on('close', function(){
  116. logger.info('[shard:%s][conn:%s] connection closed', shardId, connId);
  117. delete shard.connections[connId];
  118. delete shard.freeConnections[connId];
  119. });
  120. setImmediate(function(){
  121. return self._runTask(shardId);
  122. });
  123. }, function(e){
  124. if(!shard.reconnectInterval){
  125. shard.reconnectInterval = setInterval(function(){
  126. return self.openConnection(shardId);
  127. }, self.config.reconnectInterval);
  128. }
  129. logger.error(e.stack);
  130. if(Object.keys(shard.connections).length === 0){
  131. logger.error('No connection available for shard %s', shardId);
  132. // no available connection, reject all pending tasks
  133. shard.pendingTasks.forEach(function(task){
  134. task.deferred.reject(e);
  135. });
  136. shard.pendingTasks = [];
  137. }
  138. });
  139. })
  140. .catch(function(e){
  141. logger.error(e.stack);
  142. });
  143. };
  144. proto._task = function(method, args, shardId){
  145. var deferred = P.defer();
  146. try{
  147. if(!shardId){
  148. var shardIds = Object.keys(this.shards);
  149. if(shardIds.length > 1){
  150. throw new Error('You must specify shardId');
  151. }
  152. shardId = shardIds[0];
  153. }
  154. var shard = this._shard(shardId);
  155. if(shard.pendingTasks.length >= this.config.maxPendingTask){
  156. throw new Error('Too much pending tasks');
  157. }
  158. shard.pendingTasks.push({
  159. method : method,
  160. args : args,
  161. deferred : deferred
  162. });
  163. var self = this;
  164. setImmediate(function(){
  165. return self._runTask(shardId);
  166. });
  167. }
  168. catch(err){
  169. deferred.reject(err);
  170. }
  171. finally{
  172. return deferred.promise;
  173. }
  174. };
  175. proto._runTask = function(shardId){
  176. var self = this;
  177. var shard = this._shard(shardId);
  178. if(shard.pendingTasks.length === 0){
  179. return;
  180. }
  181. var connIds = Object.keys(shard.freeConnections);
  182. if(connIds.length === 0){
  183. return this.openConnection(shardId);
  184. }
  185. var connId = connIds[0];
  186. var conn = shard.connections[connId];
  187. if(!this.config.concurrentInConnection){
  188. delete shard.freeConnections[connId];
  189. }
  190. var task = shard.pendingTasks.shift();
  191. if(this.config.concurrentInConnection){
  192. // start run next before current task finish
  193. setImmediate(function(){
  194. self._runTask(shardId);
  195. });
  196. }
  197. return P.try(function(){
  198. if(task.method === '__t'){
  199. var func = task.args[0];
  200. return self._runTransaction(func, conn, shardId);
  201. }
  202. else{
  203. var method = conn[task.method];
  204. if(typeof(method) !== 'function'){
  205. throw new Error('invalid method - ' + task.method);
  206. }
  207. return method.apply(conn, task.args);
  208. }
  209. })
  210. .then(function(ret){
  211. task.deferred.resolve(ret);
  212. }, function(err){
  213. task.deferred.reject(err);
  214. })
  215. .finally(function(){
  216. if(shard.connections.hasOwnProperty(connId)){
  217. shard.freeConnections[connId] = true;
  218. }
  219. setImmediate(function(){
  220. self._runTask(shardId);
  221. });
  222. })
  223. .catch(function(e){
  224. logger.error(e.stack);
  225. });
  226. };
  227. proto._runTransaction = function(func, conn, shardId){
  228. if(typeof(func) !== 'function'){
  229. throw new Error('Function is required');
  230. }
  231. var deferred = P.defer();
  232. var scope = domain.create();
  233. scope.__memdb__ = {shard: shardId, conn: conn._connId, trans : uuid.v4()};
  234. var self = this;
  235. scope.run(function(){
  236. logger.info('[shard:%s][conn:%s] transaction start', shardId, conn._connId);
  237. var startTick = Date.now();
  238. return P.try(function(){
  239. return func();
  240. })
  241. .then(function(ret){
  242. return conn.commit()
  243. .then(function(){
  244. logger.info('[shard:%s][conn:%s] transaction done (%sms)', shardId, conn._connId, Date.now() - startTick);
  245. delete scope.__memdb__;
  246. deferred.resolve(ret);
  247. });
  248. }, function(err){
  249. return conn.rollback()
  250. .then(function(){
  251. logger.error('[shard:%s][conn:%s] transaction error %s', shardId, conn._connId, err.stack);
  252. delete scope.__memdb__;
  253. deferred.reject(err);
  254. });
  255. })
  256. .catch(function(e){
  257. logger.error(e.stack);
  258. deferred.reject(e);
  259. });
  260. });
  261. return deferred.promise;
  262. };
  263. // Get connection from current scope
  264. proto._connection = function(){
  265. var info = process.domain && process.domain.__memdb__;
  266. if(!info){
  267. throw new Error('You are not in any transaction scope');
  268. }
  269. var shard = this._shard(info.shard);
  270. var conn = shard.connections[info.conn];
  271. if(!conn){
  272. throw new Error('connection ' + info.conn + ' not exist');
  273. }
  274. return conn;
  275. };
  276. proto._shard = function(shardId){
  277. var shard = this.shards[shardId];
  278. if(!shard){
  279. throw new Error('shard ' + shardId + ' not exist');
  280. }
  281. return shard;
  282. };
  283. proto.collection = function(name){
  284. var self = this;
  285. if(!this.collections[name]){
  286. var collection = {};
  287. // Method must be called inside transaction
  288. consts.collMethods.forEach(function(method){
  289. collection[method] = function(){
  290. var conn = self._connection();
  291. var args = [name].concat([].slice.call(arguments));
  292. return conn[method].apply(conn, args);
  293. };
  294. });
  295. this.collections[name] = collection;
  296. }
  297. return this.collections[name];
  298. };
  299. consts.connMethods.forEach(function(method){
  300. if(method === 'commit' || method === 'rollback'){
  301. return;
  302. }
  303. // Methods not bind to transaction
  304. proto[method] = function(){
  305. var shardId = arguments[0];
  306. var args = [].slice.call(arguments, 1);
  307. return this._task(method, args, shardId);
  308. };
  309. });
  310. proto.transaction = function(func, shardId){
  311. return this._task('__t', [func], shardId);
  312. };
  313. module.exports = AutoConnection;