server.js 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196
  1. // Copyright 2015 The MemDB Authors.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
  12. // implied. See the License for the specific language governing
  13. // permissions and limitations under the License. See the AUTHORS file
  14. // for names of contributors.
  15. 'use strict';
  16. var Database = require('./database');
  17. var memdbLogger = require('memdb-logger');
  18. var net = require('net');
  19. var http = require('http');
  20. var P = require('bluebird');
  21. var uuid = require('node-uuid');
  22. var Protocol = require('./protocol');
  23. var utils = require('./utils');
  24. var consts = require('./consts');
  25. var DEFAULT_PORT = 31017;
  26. exports.start = function(opts){
  27. var deferred = P.defer();
  28. var logger = memdbLogger.getLogger('memdb', __filename, 'shard:' + opts.shardId);
  29. logger.warn('starting %s...', opts.shardId);
  30. var bind = opts.bind || '0.0.0.0';
  31. var port = opts.port || DEFAULT_PORT;
  32. var db = new Database(opts);
  33. var sockets = utils.forceHashMap();
  34. var _isShutingDown = false;
  35. var server = net.createServer(function(socket){
  36. var clientId = uuid.v4();
  37. sockets[clientId] = socket;
  38. var connIds = utils.forceHashMap();
  39. var remoteAddress = socket.remoteAddress;
  40. var protocol = new Protocol({socket : socket});
  41. protocol.on('msg', function(msg){
  42. logger.debug('[conn:%s] %s => %j', msg.connId, remoteAddress, msg);
  43. var resp = {seq : msg.seq};
  44. P.try(function(){
  45. if(msg.method === 'connect'){
  46. var clientVersion = msg.args[0];
  47. if(parseFloat(clientVersion) < parseFloat(consts.minClientVersion)){
  48. throw new Error('client version not supported, please upgrade');
  49. }
  50. var connId = db.connect().connId;
  51. connIds[connId] = true;
  52. return {
  53. connId : connId,
  54. };
  55. }
  56. if(!msg.connId){
  57. throw new Error('connId is required');
  58. }
  59. if(msg.method === 'disconnect'){
  60. return db.disconnect(msg.connId)
  61. .then(function(){
  62. delete connIds[msg.connId];
  63. });
  64. }
  65. return db.execute(msg.connId, msg.method, msg.args);
  66. })
  67. .then(function(ret){
  68. resp.err = null;
  69. resp.data = ret;
  70. }, function(err){
  71. resp.err = {
  72. message : err.message,
  73. stack : err.stack,
  74. };
  75. resp.data = null;
  76. })
  77. .then(function(){
  78. protocol.send(resp);
  79. logger.debug('[conn:%s] %s <= %j', msg.connId, remoteAddress, resp);
  80. })
  81. .catch(function(e){
  82. logger.error(e.stack);
  83. });
  84. });
  85. protocol.on('close', function(){
  86. P.map(Object.keys(connIds), function(connId){
  87. return db.disconnect(connId);
  88. })
  89. .then(function(){
  90. connIds = utils.forceHashMap();
  91. delete sockets[clientId];
  92. logger.info('client %s disconnected', remoteAddress);
  93. })
  94. .catch(function(e){
  95. logger.error(e.stack);
  96. });
  97. });
  98. protocol.on('error', function(e){
  99. logger.error(e.stack);
  100. });
  101. logger.info('client %s connected', remoteAddress);
  102. });
  103. server.on('error', function(err){
  104. logger.error(err.stack);
  105. if(!deferred.isResolved()){
  106. deferred.reject(err);
  107. }
  108. });
  109. P.try(function(){
  110. return P.promisify(server.listen, server)(port, bind);
  111. })
  112. .then(function(){
  113. return db.start();
  114. })
  115. .then(function(){
  116. logger.warn('server started on %s:%s', bind, port);
  117. deferred.resolve();
  118. })
  119. .catch(function(err){
  120. logger.error(err.stack);
  121. deferred.reject(err);
  122. });
  123. var shutdown = function(){
  124. logger.warn('receive shutdown signal');
  125. if(_isShutingDown){
  126. return;
  127. }
  128. _isShutingDown = true;
  129. return P.try(function(){
  130. var deferred = P.defer();
  131. server.once('close', function(){
  132. logger.debug('on server close');
  133. deferred.resolve();
  134. });
  135. server.close();
  136. Object.keys(sockets).forEach(function(id){
  137. try{
  138. sockets[id].end();
  139. sockets[id].destroy();
  140. }
  141. catch(e){
  142. logger.error(e.stack);
  143. }
  144. });
  145. return deferred.promise;
  146. })
  147. .then(function(){
  148. return db.stop();
  149. })
  150. .catch(function(e){
  151. logger.error(e.stack);
  152. })
  153. .finally(function(){
  154. logger.warn('server closed');
  155. memdbLogger.shutdown(function(){
  156. process.exit(0);
  157. });
  158. });
  159. };
  160. process.on('SIGTERM', shutdown);
  161. process.on('SIGINT', shutdown);
  162. process.on('uncaughtException', function(err) {
  163. logger.error('Uncaught exception: %s', err.stack);
  164. });
  165. return deferred.promise;
  166. };