backendlocker.js 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178
  1. // Copyright 2015 The MemDB Authors.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
  12. // implied. See the License for the specific language governing
  13. // permissions and limitations under the License. See the AUTHORS file
  14. // for names of contributors.
  15. 'use strict';
  16. var P = require('bluebird');
  17. var Logger = require('memdb-logger');
  18. var util = require('util');
  19. var redis = P.promisifyAll(require('redis'));
  20. var BackendLocker = function(opts){
  21. opts = opts || {};
  22. this.shardId = opts.shardId;
  23. this.config = {
  24. host : opts.host || '127.0.0.1',
  25. port : opts.port || 6379,
  26. db : opts.db || 0,
  27. options : opts.options || {},
  28. prefix : opts.prefix || 'bl$',
  29. heartbeatPrefix : opts.heartbeatPrefix || 'hb$',
  30. heartbeatTimeout : opts.heartbeatTimeout,
  31. heartbeatInterval : opts.heartbeatInterval,
  32. };
  33. this.client = null;
  34. this.heartbeatInterval = null;
  35. this.logger = Logger.getLogger('memdb', __filename, 'shard:' + this.shardId);
  36. };
  37. var proto = BackendLocker.prototype;
  38. proto.start = function(){
  39. return P.bind(this)
  40. .then(function(){
  41. this.client = redis.createClient(this.config.port, this.config.host, {retry_max_delay : 10 * 1000, enable_offline_queue : true});
  42. var self = this;
  43. this.client.on('error', function(err){
  44. self.logger.error(err.stack);
  45. });
  46. return this.client.selectAsync(this.config.db);
  47. })
  48. .then(function(){
  49. if(this.shardId){
  50. return this.isAlive()
  51. .then(function(ret){
  52. if(ret){
  53. throw new Error('Current shard is running in some other process');
  54. }
  55. });
  56. }
  57. })
  58. .then(function(){
  59. if(this.shardId && this.config.heartbeatInterval > 0){
  60. this.heartbeatInterval = setInterval(this.heartbeat.bind(this), this.config.heartbeatInterval);
  61. return this.heartbeat();
  62. }
  63. })
  64. .then(function(){
  65. this.logger.info('backendLocker started %s:%s:%s', this.config.host, this.config.port, this.config.db);
  66. });
  67. };
  68. proto.stop = function(){
  69. return P.bind(this)
  70. .then(function(){
  71. clearInterval(this.heartbeatInterval);
  72. return this.clearHeartbeat();
  73. })
  74. .then(function(){
  75. return this.client.quitAsync();
  76. })
  77. .then(function(){
  78. this.logger.info('backendLocker stoped');
  79. });
  80. };
  81. proto.tryLock = function(docId, shardId){
  82. this.logger.debug('tryLock %s', docId);
  83. var self = this;
  84. return this.client.setnxAsync(this._docKey(docId), shardId || this.shardId)
  85. .then(function(ret){
  86. if(ret === 1){
  87. self.logger.debug('locked %s', docId);
  88. return true;
  89. }
  90. else{
  91. return false;
  92. }
  93. });
  94. };
  95. proto.getHolderId = function(docId){
  96. return this.client.getAsync(this._docKey(docId));
  97. };
  98. proto.isHeld = function(docId, shardId){
  99. var self = this;
  100. return this.getHolderId(docId)
  101. .then(function(ret){
  102. return ret === (shardId || self.shardId);
  103. });
  104. };
  105. // concurrency safe between shards
  106. // not concurrency safe in same shard
  107. proto.unlock = function(docId){
  108. this.logger.debug('unlock %s', docId);
  109. var self = this;
  110. return this.isHeld(docId)
  111. .then(function(held){
  112. if(held){
  113. return self.client.delAsync(self._docKey(docId));
  114. }
  115. });
  116. };
  117. proto.heartbeat = function(){
  118. var timeout = Math.floor(this.config.heartbeatTimeout / 1000);
  119. if(timeout <= 0){
  120. timeout = 1;
  121. }
  122. var self = this;
  123. return this.client.setexAsync(this._heartbeatKey(this.shardId), timeout, 1)
  124. .then(function(){
  125. self.logger.debug('heartbeat');
  126. })
  127. .catch(function(err){
  128. self.logger.error(err.stack);
  129. });
  130. };
  131. proto.clearHeartbeat = function(){
  132. return this.client.delAsync(this._heartbeatKey(this.shardId));
  133. };
  134. proto.isAlive = function(shardId){
  135. return this.client.existsAsync(this._heartbeatKey(shardId || this.shardId))
  136. .then(function(ret){
  137. return !!ret;
  138. });
  139. };
  140. proto.getActiveShards = function(){
  141. var prefix = this.config.heartbeatPrefix;
  142. return this.client.keysAsync(prefix + '*')
  143. .then(function(keys){
  144. return keys.map(function(key){
  145. return key.slice(prefix.length);
  146. });
  147. });
  148. };
  149. proto._docKey = function(docId){
  150. return this.config.prefix + docId;
  151. };
  152. proto._heartbeatKey = function(shardId){
  153. return this.config.heartbeatPrefix + shardId;
  154. };
  155. module.exports = BackendLocker;