database.js 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269
  1. // Copyright 2015 The MemDB Authors.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
  12. // implied. See the License for the specific language governing
  13. // permissions and limitations under the License. See the AUTHORS file
  14. // for names of contributors.
  15. 'use strict';
  16. var P = require('bluebird');
  17. var Logger = require('memdb-logger');
  18. var utils = require('./utils');
  19. var util = require('util');
  20. var os = require('os');
  21. var EventEmitter = require('events').EventEmitter;
  22. var Connection = require('./connection');
  23. var Shard = require('./shard');
  24. var consts = require('./consts');
  25. var vm = require('vm');
  26. var AsyncLock = require('async-lock');
  27. var _ = require('lodash');
  28. var DEFAULT_SLOWQUERY = 2000;
  29. // Extend promise
  30. utils.extendPromise(P);
  31. var Database = function(opts){
  32. // clone since we want to modify it
  33. opts = utils.clone(opts) || {};
  34. this.logger = Logger.getLogger('memdb', __filename, 'shard:' + opts.shardId);
  35. this.connections = utils.forceHashMap();
  36. this.connectionLock = new AsyncLock({Promise : P});
  37. this.dbWrappers = utils.forceHashMap(); //{connId : dbWrapper}
  38. this.opsCounter = utils.rateCounter();
  39. this.tpsCounter = utils.rateCounter();
  40. opts.slowQuery = opts.slowQuery || DEFAULT_SLOWQUERY;
  41. // Parse index config
  42. opts.collections = opts.collections || {};
  43. Object.keys(opts.collections).forEach(function(name){
  44. var collection = opts.collections[name];
  45. var indexes = {};
  46. (collection.indexes || []).forEach(function(index){
  47. if(!Array.isArray(index.keys)){
  48. index.keys = [index.keys];
  49. }
  50. var indexKey = JSON.stringify(index.keys.sort());
  51. if(indexes[indexKey]){
  52. throw new Error('Duplicate index keys - ' + indexKey);
  53. }
  54. delete index.keys;
  55. indexes[indexKey] = index;
  56. });
  57. collection.indexes = indexes;
  58. });
  59. this.logger.info('parsed opts: %j', opts);
  60. this.shard = new Shard(opts);
  61. this.config = opts;
  62. this.timeCounter = utils.timeCounter();
  63. };
  64. util.inherits(Database, EventEmitter);
  65. var proto = Database.prototype;
  66. proto.start = function(){
  67. var self = this;
  68. return this.shard.start()
  69. .then(function(){
  70. self.logger.info('database started');
  71. });
  72. };
  73. proto.stop = function(force){
  74. var self = this;
  75. this.opsCounter.stop();
  76. this.tpsCounter.stop();
  77. return P.try(function(){
  78. // Make sure no new request come anymore
  79. // Wait for all operations finish
  80. return utils.waitUntil(function(){
  81. return !self.connectionLock.isBusy();
  82. });
  83. })
  84. .then(function(){
  85. self.logger.debug('all requests finished');
  86. return self.shard.stop(force);
  87. })
  88. .then(function(){
  89. self.logger.info('database stoped');
  90. });
  91. };
  92. proto.connect = function(){
  93. var connId = utils.uuid();
  94. var opts = {
  95. _id : connId,
  96. shard : this.shard,
  97. config : this.config,
  98. logger : this.logger
  99. };
  100. var conn = new Connection(opts);
  101. this.connections[connId] = conn;
  102. var self = this;
  103. var dbWrapper = {};
  104. consts.collMethods.concat(consts.connMethods).forEach(function(method){
  105. dbWrapper[method] = function(){
  106. return self.execute(connId, method, [].slice.call(arguments));
  107. };
  108. });
  109. this.dbWrappers[connId] = dbWrapper;
  110. this.logger.info('[conn:%s] connection created', connId);
  111. return {
  112. connId : connId,
  113. };
  114. };
  115. proto.disconnect = function(connId){
  116. var self = this;
  117. return P.try(function(){
  118. var conn = self.getConnection(connId);
  119. return self.execute(connId, 'close', [], {ignoreConcurrent : true});
  120. })
  121. .then(function(){
  122. delete self.connections[connId];
  123. delete self.dbWrappers[connId];
  124. self.logger.info('[conn:%s] connection closed', connId);
  125. });
  126. };
  127. // Execute a command
  128. proto.execute = function(connId, method, args, opts){
  129. opts = opts || {};
  130. var self = this;
  131. if(method[0] === '$'){ // Internal method (allow concurrent call)
  132. var conn = this.getConnection(connId);
  133. return conn[method].apply(conn, args);
  134. }
  135. if(method === 'info'){
  136. return {
  137. connId : connId,
  138. ver : consts.version,
  139. uptime : process.uptime(),
  140. mem : process.memoryUsage(),
  141. // rate for last 1, 5, 15 minutes
  142. ops : [this.opsCounter.rate(60), this.opsCounter.rate(300), this.opsCounter.rate(900)],
  143. tps : [this.tpsCounter.rate(60), this.tpsCounter.rate(300), this.tpsCounter.rate(900)],
  144. lps : [this.shard.loadCounter.rate(60), this.shard.loadCounter.rate(300), this.shard.loadCounter.rate(900)],
  145. ups : [this.shard.unloadCounter.rate(60), this.shard.unloadCounter.rate(300), this.shard.unloadCounter.rate(900)],
  146. pps : [this.shard.persistentCounter.rate(60), this.shard.persistentCounter.rate(300), this.shard.persistentCounter.rate(900)],
  147. counter : this.timeCounter.getCounts(),
  148. };
  149. }
  150. else if(method === 'resetCounter'){
  151. this.opsCounter.reset();
  152. this.tpsCounter.reset();
  153. this.shard.loadCounter.reset();
  154. this.shard.unloadCounter.reset();
  155. this.shard.persistentCounter.reset();
  156. this.timeCounter.reset();
  157. return;
  158. }
  159. else if(method === 'eval'){
  160. var script = args[0] || '';
  161. var sandbox = args[1] || {};
  162. sandbox.require = require;
  163. sandbox.P = P;
  164. sandbox._ = _;
  165. sandbox.db = this.dbWrappers[connId];
  166. var context = vm.createContext(sandbox);
  167. return vm.runInContext(script, context);
  168. }
  169. // Query in the same connection must execute in series
  170. // This is usually a client bug here
  171. if(this.connectionLock.isBusy(connId) && !opts.ignoreConcurrent){
  172. var err = new Error(util.format('[conn:%s] concurrent query on same connection. %s(%j)', connId, method, args));
  173. this.logger.error(err);
  174. throw err;
  175. }
  176. // Ensure series execution in same connection
  177. return this.connectionLock.acquire(connId, function(cb){
  178. self.logger.debug('[conn:%s] start %s(%j)...', connId, method, args);
  179. if(method === 'commit' || method === 'rollback'){
  180. self.tpsCounter.inc();
  181. }
  182. else{
  183. self.opsCounter.inc();
  184. }
  185. var hrtimer = utils.hrtimer(true);
  186. var conn = null;
  187. return P.try(function(){
  188. conn = self.getConnection(connId);
  189. var func = conn[method];
  190. if(typeof(func) !== 'function'){
  191. throw new Error('unsupported command - ' + method);
  192. }
  193. return func.apply(conn, args);
  194. })
  195. .then(function(ret){
  196. var timespan = hrtimer.stop();
  197. var level = timespan < self.config.slowQuery ? 'info' : 'warn'; // warn slow query
  198. self.logger[level]('[conn:%s] %s(%j) => %j (%sms)', connId, method, args, ret, timespan);
  199. var category = method;
  200. if(consts.collMethods.indexOf(method) !== -1){
  201. category += ':' + args[0];
  202. }
  203. self.timeCounter.add(category, timespan);
  204. return ret;
  205. }, function(err){
  206. var timespan = hrtimer.stop();
  207. self.logger.error('[conn:%s] %s(%j) => %s (%sms)', connId, method, args, err.stack ? err.stack : err, timespan);
  208. if(conn){
  209. conn.rollback();
  210. }
  211. // Rethrow to client
  212. throw err;
  213. })
  214. .nodeify(cb);
  215. });
  216. };
  217. proto.getConnection = function(id){
  218. var conn = this.connections[id];
  219. if(!conn){
  220. throw new Error('connection ' + id + ' not exist');
  221. }
  222. return conn;
  223. };
  224. module.exports = Database;