shard.js 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910
  1. // Copyright 2015 The MemDB Authors.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
  12. // implied. See the License for the specific language governing
  13. // permissions and limitations under the License. See the AUTHORS file
  14. // for names of contributors.
  15. 'use strict';
  16. var _ = require('lodash');
  17. var P = require('bluebird');
  18. var Logger = require('memdb-logger');
  19. var util = require('util');
  20. var redis = require('redis');
  21. var AsyncLock = require('async-lock');
  22. var EventEmitter = require('events').EventEmitter;
  23. var backends = require('./backends');
  24. var Document = require('./document'); //jshint ignore:line
  25. var BackendLocker = require('./backendlocker');
  26. var Slave = require('./slave');
  27. var utils = require('./utils');
  28. var AutoConnection = require('../lib/autoconnection');
  29. var STATE = {
  30. INITED : 0,
  31. STARTING : 1,
  32. RUNNING : 2,
  33. STOPING : 3,
  34. STOPED : 4
  35. };
  36. // memory limit 1024MB
  37. var DEFAULT_MEMORY_LIMIT = 1024;
  38. // GC check interval
  39. var DEFAULT_GC_INTERVAL = 1000;
  40. // unload doc count per GC cycle
  41. var DEFAULT_GC_COUNT = 100;
  42. // Idle time before doc is unloaded
  43. // tune this to balance memory usage and performance
  44. // set 0 to never
  45. var DEFAULT_IDLE_TIMEOUT = 1800 * 1000;
  46. // Persistent delay after doc has commited (in ms)
  47. // tune this to balance backend data delay and performance
  48. // set 0 to never
  49. var DEFAULT_PERSISTENT_DELAY = 600 * 1000;
  50. // timeout for locking backend doc
  51. var DEFAULT_BACKEND_LOCK_TIMEOUT = 30 * 1000;
  52. // retry interval for backend lock
  53. var DEFAULT_BACKEND_LOCK_RETRY_INTERVAL = 50;
  54. // delay between unload and load
  55. // Can't load again immediately, prevent 'locking hungry' from other shards
  56. var DEFAULT_RELOAD_DELAY = 20;
  57. // timeout for locking doc
  58. var DEFAULT_LOCK_TIMEOUT = 30 * 1000;
  59. // heartbeat settings, must be multiple of 1000
  60. var DEFAULT_HEARTBEAT_INTERVAL = 2 * 1000;
  61. var DEFAULT_HEARTBEAT_TIMEOUT = 5 * 1000;
  62. var Shard = function(opts){
  63. EventEmitter.call(this);
  64. opts = opts || {};
  65. this._id = opts.shardId;
  66. if(!this._id){
  67. throw new Error('shardId is empty');
  68. }
  69. this._id = this._id.toString();
  70. if(this._id.indexOf('$') !== -1){
  71. throw new Error('shardId can not contain "$"');
  72. }
  73. this.logger = Logger.getLogger('memdb', __filename, 'shard:' + this._id);
  74. this.config = {
  75. locking : opts.locking || {},
  76. backend : opts.backend || {},
  77. slave : opts.slave || {},
  78. shards : opts.shards || {},
  79. idleTimeout : opts.hasOwnProperty('idleTimeout') ? opts.idleTimeout : DEFAULT_IDLE_TIMEOUT,
  80. persistentDelay : opts.hasOwnProperty('persistentDelay') ? opts.persistentDelay : DEFAULT_PERSISTENT_DELAY,
  81. heartbeatInterval : opts.heartbeatInterval || DEFAULT_HEARTBEAT_INTERVAL,
  82. heartbeatTimeout : opts.heartbeatTimeout || DEFAULT_HEARTBEAT_TIMEOUT,
  83. backendLockTimeout : opts.backendLockTimeout || DEFAULT_BACKEND_LOCK_TIMEOUT,
  84. backendLockRetryInterval : opts.backendLockRetryInterval || DEFAULT_BACKEND_LOCK_RETRY_INTERVAL,
  85. reloadDelay : opts.reloadDelay || DEFAULT_RELOAD_DELAY,
  86. lockTimeout : opts.lockTimeout || DEFAULT_LOCK_TIMEOUT,
  87. memoryLimit : opts.memoryLimit || DEFAULT_MEMORY_LIMIT,
  88. gcCount : opts.gcCount || DEFAULT_GC_COUNT,
  89. gcInterval : opts.gcInterval || DEFAULT_GC_INTERVAL,
  90. disableSlave : opts.disableSlave || false,
  91. collections : opts.collections || {},
  92. };
  93. // global locking
  94. var lockerConf = this.config.locking;
  95. lockerConf.shardId = this._id;
  96. lockerConf.heartbeatTimeout = this.config.heartbeatTimeout;
  97. lockerConf.heartbeatInterval = this.config.heartbeatInterval;
  98. this.backendLocker = new BackendLocker(lockerConf);
  99. // backend storage
  100. var backendConf = this.config.backend;
  101. backendConf.shardId = this._id;
  102. this.backend = backends.create(backendConf);
  103. // slave redis
  104. var slaveConf = this.config.slave;
  105. slaveConf.shardId = this._id;
  106. this.slave = new Slave(slaveConf);
  107. // memdb client to communicate with other shards
  108. this.autoconn = new AutoConnection({
  109. shards : this.config.shards,
  110. concurrentInConnection : true,
  111. });
  112. // Document storage {key : doc}
  113. this.docs = utils.forceHashMap();
  114. // Newly commited docs (for incremental _save)
  115. this.commitedKeys = utils.forceHashMap(); // {key : version}
  116. // Idle timeout before unload
  117. this.idleTimeouts = utils.forceHashMap(); // {key : timeout}
  118. // Doc persistent timeout
  119. this.persistentTimeouts = utils.forceHashMap(); // {key : timeout}
  120. // GC interval
  121. this.gcInterval = null;
  122. // Lock async operations for each key
  123. this.keyLock = new AsyncLock({Promise : P});
  124. // Task locker
  125. this.taskLock = new AsyncLock({Promise : P});
  126. // Doc locker
  127. this.docLock = new AsyncLock({
  128. timeout : this.config.lockTimeout,
  129. Promise : P,
  130. });
  131. // Current concurrent commiting processes
  132. this.commitingCount = 0;
  133. // Current key unloading task
  134. this.unloadingKeys = utils.forceHashMap();
  135. this.loadCounter = utils.rateCounter();
  136. this.unloadCounter = utils.rateCounter();
  137. this.persistentCounter = utils.rateCounter();
  138. this.state = STATE.INITED;
  139. };
  140. util.inherits(Shard, EventEmitter);
  141. var proto = Shard.prototype;
  142. proto.start = function(){
  143. this._ensureState(STATE.INITED);
  144. this.state = STATE.STARTING;
  145. return P.bind(this)
  146. .then(function(){
  147. return this.backendLocker.start();
  148. })
  149. .then(function(){
  150. return this.backend.start();
  151. })
  152. .then(function(){
  153. if(!this.config.disableSlave){
  154. return this.slave.start();
  155. }
  156. })
  157. .then(function(){
  158. if(!this.config.disableSlave){
  159. return this.restoreFromSlave();
  160. }
  161. })
  162. .then(function(){
  163. this.gcInterval = setInterval(this.gc.bind(this), this.config.gcInterval);
  164. this.state = STATE.RUNNING;
  165. this.emit('start');
  166. this.logger.info('shard started');
  167. });
  168. };
  169. proto.stop = function(){
  170. this._ensureState(STATE.RUNNING);
  171. // This will prevent any further requests
  172. // All commited data will be saved, while uncommited data will be rolled back
  173. this.state = STATE.STOPING;
  174. clearInterval(this.gcInterval);
  175. return P.bind(this)
  176. .then(function(){
  177. // Wait for all running task finish
  178. return this.taskLock.acquire('', function(){});
  179. })
  180. .then(function(){
  181. this.logger.debug('all running tasks finished');
  182. // Wait for all commit process finish
  183. var deferred = P.defer();
  184. var self = this;
  185. var check = function(){
  186. if(self.commitingCount <= 0){
  187. deferred.resolve();
  188. }
  189. else{
  190. setTimeout(check, 200);
  191. }
  192. };
  193. check();
  194. return deferred.promise;
  195. })
  196. .then(function(){
  197. this.logger.debug('all commit processes finished');
  198. // WARN: Make sure all connections are closed now
  199. var self = this;
  200. return P.mapLimit(Object.keys(this.docs), function(key){
  201. return self.keyLock.acquire(key, function(){
  202. return self._unload(key);
  203. })
  204. .catch(function(e){
  205. self.logger.error(e.stack);
  206. });
  207. });
  208. })
  209. .then(function(){
  210. this.logger.debug('all docs unloaded');
  211. this.loadCounter.stop();
  212. this.unloadCounter.stop();
  213. this.persistentCounter.stop();
  214. if(!this.config.disableSlave){
  215. return this.slave.stop();
  216. }
  217. })
  218. .then(function(){
  219. return this.backend.stop();
  220. })
  221. .then(function(){
  222. return this.backendLocker.stop();
  223. })
  224. .then(function(){
  225. return this.autoconn.close();
  226. })
  227. .then(function(){
  228. this.state = STATE.STOPED;
  229. this.emit('stop');
  230. this.logger.info('shard stoped');
  231. });
  232. };
  233. proto.find = function(connId, key, fields){
  234. this._ensureState(STATE.RUNNING);
  235. var self = this;
  236. if(this.docs[key]){ //already loaded
  237. if(this.docs[key].isFree()){
  238. // restart idle timer if doc doesn't locked by anyone
  239. this._cancelIdleTimeout(key);
  240. this._startIdleTimeout(key);
  241. }
  242. var ret = this.docs[key].find(connId, fields);
  243. self.logger.debug('[conn:%s] find(%s, %j) => %j', connId, key, fields, ret);
  244. return ret;
  245. }
  246. return this.keyLock.acquire(key, function(){
  247. return P.try(function(){
  248. return self._load(key);
  249. })
  250. .then(function(){
  251. return self.docs[key].find(connId, fields);
  252. })
  253. .then(function(ret){
  254. self.logger.debug('[conn:%s] find(%s, %j) => %j', connId, key, fields, ret);
  255. return ret;
  256. });
  257. });
  258. };
  259. proto.update = function(connId, key, doc, opts){
  260. this._ensureState(STATE.RUNNING);
  261. // Since lock is called before, so doc is loaded for sure
  262. var ret = this._doc(key).update(connId, doc, opts);
  263. this.logger.debug('[conn:%s] update(%s, %j, %j) => %s', connId, key, doc, opts, ret);
  264. return ret;
  265. };
  266. proto.insert = function(connId, key, doc){
  267. this._ensureState(STATE.RUNNING);
  268. var ret = this._doc(key).insert(connId, doc);
  269. this.logger.debug('[conn:%s] insert(%s, %j) => %s', connId, key, doc, ret);
  270. return ret;
  271. };
  272. proto.remove = function(connId, key){
  273. this._ensureState(STATE.RUNNING);
  274. var ret = this._doc(key).remove(connId);
  275. this.logger.debug('[conn:%s] remove(%s) => %s', connId, key, ret);
  276. return ret;
  277. };
  278. proto.rollback = function(connId, keys){
  279. // Skip state check
  280. if(!Array.isArray(keys)){
  281. keys = [keys];
  282. }
  283. var self = this;
  284. keys.forEach(function(key){
  285. self._doc(key).rollback(connId);
  286. });
  287. this.logger.debug('[conn:%s] rollback(%j)', connId, keys);
  288. };
  289. proto.lock = function(connId, key){
  290. this._ensureState(STATE.RUNNING);
  291. if(this.isLocked(connId, key)){
  292. return true;
  293. }
  294. this.logger.debug('[conn:%s] shard.lock(%s) start', connId, key);
  295. var self = this;
  296. return this.keyLock.acquire(key, function(){
  297. return P.try(function(){
  298. return self._load(key);
  299. })
  300. .then(function(){
  301. return self.docs[key].lock(connId)
  302. .then(function(){
  303. self.logger.debug('[conn:%s] shard.lock(%s) success', connId, key);
  304. return true;
  305. }, function(e){
  306. throw new Error(util.format('[conn:%s] shard.lock(%s) failed', connId, key));
  307. });
  308. });
  309. });
  310. };
  311. proto.commit = function(connId, keys){
  312. this._ensureState(STATE.RUNNING);
  313. if(!Array.isArray(keys)){
  314. keys = [keys];
  315. }
  316. if(keys.length === 0){
  317. return;
  318. }
  319. var self = this;
  320. keys.forEach(function(key){
  321. if(!self.isLocked(connId, key)){
  322. throw new Error('[conn:%s] %s not locked', connId, key);
  323. }
  324. });
  325. this.commitingCount++;
  326. // commit is not concurrency safe for same connection.
  327. // but database.js guarantee that every request from same connection are in series.
  328. return P.try(function(){
  329. if(self.config.disableSlave){
  330. return;
  331. }
  332. // Sync data to slave
  333. if(keys.length === 1){
  334. var key = keys[0];
  335. var doc = self._doc(key)._getChanged();
  336. return self.slave.set(key, doc);
  337. }
  338. else{
  339. var docs = utils.forceHashMap();
  340. keys.forEach(function(key){
  341. docs[key] = self._doc(key)._getChanged();
  342. });
  343. return self.slave.setMulti(docs);
  344. }
  345. //TODO: possibly loss consistency
  346. // if setMulti return failed but actually sccuess
  347. })
  348. .then(function(){
  349. // Real Commit
  350. keys.forEach(function(key){
  351. self._doc(key).commit(connId);
  352. });
  353. self.logger.debug('[conn:%s] commit(%j)', connId, keys);
  354. })
  355. .finally(function(ret){
  356. self.commitingCount--;
  357. });
  358. };
  359. proto.isLocked = function(connId, key){
  360. return this.docs[key] && this.docs[key].isLocked(connId);
  361. };
  362. proto.findReadOnly = function(connId, key, fields){
  363. this._ensureState(STATE.RUNNING);
  364. var self = this;
  365. if(this._isLoaded(key)){
  366. return this.find(connId, key, fields);
  367. }
  368. return P.try(function(){
  369. return self.backendLocker.getHolderId(key);
  370. })
  371. .then(function(shardId){
  372. if(!shardId || shardId === self._id){
  373. return self.find(connId, key, fields);
  374. }
  375. return self.autoconn.$findReadOnly(shardId, key, fields);
  376. });
  377. };
  378. // Called by other shards
  379. proto.$unload = function(key){
  380. if(this.state !== STATE.RUNNING){
  381. return false;
  382. }
  383. if(this.unloadingKeys[key]){
  384. return false;
  385. }
  386. this.unloadingKeys[key] = true;
  387. var self = this;
  388. var deferred = P.defer();
  389. this.keyLock.acquire(key, function(){
  390. if(!self.docs[key]){
  391. // possibly timing issue
  392. // or a redundant backend lock is held caused by unsuccessful unload
  393. self.logger.warn('this shard does not hold %s', key);
  394. return P.try(function(){
  395. return self.slave.del(key);
  396. })
  397. .then(function(){
  398. return self._unlockBackend(key);
  399. })
  400. .then(function(){
  401. deferred.resolve(true);
  402. }, function(e){
  403. deferred.reject(e);
  404. throw e;
  405. });
  406. }
  407. return P.try(function(){
  408. return self._unload(key);
  409. })
  410. .then(function(){
  411. deferred.resolve(true);
  412. }, function(e){
  413. deferred.reject(e);
  414. throw e;
  415. })
  416. .delay(self.config.reloadDelay);
  417. })
  418. .catch(function(e){
  419. self.logger.error(e.stack);
  420. })
  421. .finally(function(){
  422. delete self.unloadingKeys[key];
  423. });
  424. return deferred.promise;
  425. };
  426. // internal method, not concurrency safe
  427. proto._load = function(key){
  428. if(this.docs[key]){ // already loaded
  429. return;
  430. }
  431. this.logger.debug('start load %s', key);
  432. var obj = null;
  433. var self = this;
  434. return P.try(function(){
  435. // get backend lock
  436. return self._lockBackend(key);
  437. })
  438. .then(function(){
  439. var res = self._resolveKey(key);
  440. return self.backend.get(res.name, res.id);
  441. })
  442. .then(function(ret){
  443. obj = ret;
  444. if(!self.config.disableSlave){
  445. // Sync data to slave
  446. return self.slave.set(key, obj);
  447. }
  448. })
  449. .then(function(){
  450. self._addDoc(key, obj);
  451. self.loadCounter.inc();
  452. self.logger.info('loaded %s', key);
  453. });
  454. };
  455. proto._addDoc = function(key, obj){
  456. var self = this;
  457. var res = this._resolveKey(key);
  458. var coll = this.config.collections[res.name];
  459. var indexes = (coll && coll.indexes) || {};
  460. var opts = {
  461. _id : res.id,
  462. doc: obj,
  463. indexes: indexes,
  464. locker : this.docLock,
  465. lockKey : key,
  466. };
  467. var doc = new Document(opts);
  468. this._startIdleTimeout(key);
  469. doc.on('lock', function(){
  470. self._cancelIdleTimeout(key);
  471. });
  472. doc.on('unlock', function(){
  473. self._startIdleTimeout(key);
  474. });
  475. doc.on('commit', function(){
  476. self._setCommited(key);
  477. // delay sometime and persistent to backend
  478. if(!self.persistentTimeouts.hasOwnProperty(key) && self.config.persistentDelay >= 0){
  479. self.persistentTimeouts[key] = setTimeout(function(){
  480. delete self.persistentTimeouts[key];
  481. return self.keyLock.acquire(key, function(){
  482. return self._persistent(key);
  483. })
  484. .catch(function(err){
  485. self.logger.error(err.stack);
  486. });
  487. }, self.config.persistentDelay);
  488. }
  489. });
  490. doc.on('updateIndex', function(connId, indexKey, oldValue, newValue){
  491. // pass event to collection
  492. self.emit('updateIndex$' + res.name + '$' + connId, res.id, indexKey, oldValue, newValue);
  493. });
  494. // Loaded at this instant
  495. self.docs[key] = doc;
  496. };
  497. // internal method, not concurrency safe
  498. proto._unload = function(key){
  499. if(!this.docs[key]){ //already unloaded
  500. return;
  501. }
  502. this.logger.debug('start unload %s', key);
  503. var doc = this.docs[key];
  504. return P.bind(this)
  505. .then(function(){
  506. // Wait all existing lock release
  507. return doc._waitUnlock();
  508. })
  509. .then(function(){
  510. // Persistent immediately
  511. return this._persistent(key);
  512. })
  513. .then(function(){
  514. if(!this.config.disableSlave){
  515. // sync data to slave
  516. return this.slave.del(key);
  517. }
  518. })
  519. .then(function(){
  520. this._cancelIdleTimeout(key);
  521. if(this.persistentTimeouts.hasOwnProperty(key)){
  522. clearTimeout(this.persistentTimeouts[key]);
  523. delete this.persistentTimeouts[key];
  524. }
  525. doc.removeAllListeners('commit');
  526. doc.removeAllListeners('updateIndex');
  527. doc.removeAllListeners('lock');
  528. doc.removeAllListeners('unlock');
  529. // _unloaded at this instant
  530. delete this.docs[key];
  531. // Release backend lock
  532. return this._unlockBackend(key);
  533. })
  534. .then(function(){
  535. this.unloadCounter.inc();
  536. this.logger.info('unloaded %s', key);
  537. });
  538. };
  539. // internal method, not concurrency safe
  540. proto._lockBackend = function(key){
  541. var self = this;
  542. return P.try(function(){
  543. return self.backendLocker.tryLock(key);
  544. })
  545. .then(function(success){
  546. if(success){
  547. return;
  548. }
  549. var startTick = Date.now();
  550. var tryLock = function(wait){
  551. return P.try(function(){
  552. return self.backendLocker.getHolderId(key);
  553. })
  554. .then(function(shardId){
  555. if(shardId === self._id){
  556. // already locked
  557. return;
  558. }
  559. return P.try(function(){
  560. if(shardId){
  561. // notify holder to unload the doc
  562. return self.autoconn.$unload(shardId, key);
  563. }
  564. else{
  565. return true;
  566. }
  567. })
  568. .then(function(success){
  569. if(success){
  570. return self.backendLocker.tryLock(key);
  571. }
  572. else{
  573. return false;
  574. }
  575. })
  576. .then(function(success){
  577. if(success){
  578. self.logger.debug('locked backend doc - %s (%sms)', key, Date.now() - startTick);
  579. return;
  580. }
  581. if(Date.now() - startTick >= self.config.backendLockTimeout){
  582. throw new Error('lock backend doc - ' + key + ' timed out');
  583. }
  584. // delay some time and try again
  585. return P.delay(wait / 2 + _.random(wait))
  586. .then(function(){
  587. return tryLock(wait);
  588. });
  589. });
  590. });
  591. };
  592. return tryLock(self.config.backendLockRetryInterval);
  593. });
  594. };
  595. proto._unlockBackend = function(key){
  596. return this.backendLocker.unlock(key);
  597. };
  598. // internal method, not concurrency safe
  599. proto._persistent = function(key){
  600. if(!this.commitedKeys.hasOwnProperty(key)){
  601. return; // no change
  602. }
  603. var doc = this._doc(key)._getCommited();
  604. var ver = this.commitedKeys[key]; // get current version
  605. var self = this;
  606. var res = this._resolveKey(key);
  607. return this.backend.set(res.name, res.id, doc)
  608. .then(function(){
  609. // no new change, remove the flag
  610. if(self.commitedKeys[key] === ver){
  611. delete self.commitedKeys[key];
  612. }
  613. self.persistentCounter.inc();
  614. self.logger.debug('persistented %s', key);
  615. });
  616. };
  617. //TODO: setTimeout is slow, takes 1/100000 sec
  618. proto._startIdleTimeout = function(key){
  619. if(!this.config.idleTimeout){
  620. return;
  621. }
  622. var self = this;
  623. this.idleTimeouts[key] = setTimeout(function(){
  624. return self.keyLock.acquire(key, function(){
  625. if(self.docs[key]){
  626. self.logger.debug('%s idle timed out, will unload', key);
  627. return self._unload(key);
  628. }
  629. })
  630. .catch(function(e){
  631. self.logger.error(e.stack);
  632. });
  633. }, this.config.idleTimeout);
  634. };
  635. proto._cancelIdleTimeout = function(key){
  636. clearTimeout(this.idleTimeouts[key]);
  637. delete this.idleTimeouts[key];
  638. };
  639. proto._setCommited = function(key){
  640. if(!this.commitedKeys.hasOwnProperty(key)){
  641. this.commitedKeys[key] = 0;
  642. }
  643. this.commitedKeys[key]++;
  644. };
  645. // Flush changes to backend storage
  646. proto.flushBackend = function(connId){
  647. this._ensureState(STATE.RUNNING);
  648. var self = this;
  649. return this.taskLock.acquire('', function(){
  650. return P.mapLimit(Object.keys(self.commitedKeys), function(key){
  651. return self.keyLock.acquire(key, function(){
  652. return self._persistent(key);
  653. });
  654. });
  655. })
  656. .then(function(){
  657. self.logger.warn('[conn:%s] flushed Backend', connId);
  658. return true;
  659. });
  660. };
  661. // Garbage collection
  662. proto.gc = function(){
  663. if(this.state !== STATE.RUNNING){
  664. return;
  665. }
  666. if(this.taskLock.isBusy('')){
  667. return;
  668. }
  669. var self = this;
  670. return this.taskLock.acquire('', function(){
  671. var usage = process.memoryUsage();
  672. var memSize = usage.heapUsed;
  673. if(memSize < self.config.memoryLimit * 1024 * 1024){
  674. // Memory not reach limit, no need to gc
  675. return;
  676. }
  677. self.logger.warn('Start GC. Memory usage is too high, please reduce idleTimeout. %j', usage);
  678. var startTick = Date.now();
  679. // remove some doc
  680. var keys = [], count = 0;
  681. for(var key in self.docs){
  682. keys.push(key);
  683. count++;
  684. if(count >= self.config.gcCount){
  685. break;
  686. }
  687. }
  688. return P.mapLimit(keys, function(key){
  689. return self.keyLock.acquire(key, function(){
  690. return self._unload(key);
  691. })
  692. .catch(function(e){
  693. self.logger.error(e.stack);
  694. });
  695. })
  696. .then(function(){
  697. self.logger.warn('Finish GC in %s ms. %s docs have been unloaded.', Date.now() - startTick, keys.length);
  698. })
  699. .then(function(){
  700. process.nextTick(self.gc.bind(self));
  701. });
  702. })
  703. .catch(function(e){
  704. self.logger.error(e.stack);
  705. });
  706. };
  707. proto.restoreFromSlave = function(){
  708. this._ensureState(STATE.STARTING);
  709. return P.bind(this)
  710. .then(function(){
  711. return this.slave.getAllKeys();
  712. })
  713. .then(function(keys){
  714. if(keys.length === 0){
  715. return;
  716. }
  717. this.logger.error('Server not stopped properly, will restore data from slave');
  718. return P.bind(this)
  719. .then(function(){
  720. return this.slave.getMulti(keys);
  721. })
  722. .then(function(items){
  723. var self = this;
  724. return P.mapLimit(Object.keys(items), function(key){
  725. return self.keyLock.acquire(key, function(){
  726. self._addDoc(key, items[key]);
  727. // persistent all docs to backend
  728. self._setCommited(key);
  729. return self._persistent(key);
  730. });
  731. });
  732. })
  733. .then(function(){
  734. this.logger.warn('restored %s keys from slave', keys.length);
  735. });
  736. });
  737. };
  738. proto._doc = function(key){
  739. if(!this.docs.hasOwnProperty(key)){
  740. throw new Error(key + ' is not loaded');
  741. }
  742. return this.docs[key];
  743. };
  744. proto._isLoaded = function(key){
  745. return !!this.docs[key];
  746. };
  747. // key - collectionName$docId
  748. proto._resolveKey = function(key){
  749. var i = key.indexOf('$');
  750. if(i === -1){
  751. throw new Error('invalid key: ' + key);
  752. }
  753. return {name : key.slice(0, i), id : key.slice(i + 1)};
  754. };
  755. proto._ensureState = function(state){
  756. if(this.state !== state){
  757. throw new Error(util.format('Server state is incorrect, expected %s, actual %s', state, this.state));
  758. }
  759. };
  760. module.exports = Shard;