connector.js 9.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369
  1. var logger = require('pomelo-logger').getLogger('pomelo', __filename);
  2. var taskManager = require('../common/manager/taskManager');
  3. var pomelo = require('../pomelo');
  4. var rsa = require("node-bignumber");
  5. var events = require('../util/events');
  6. var utils = require('../util/utils');
  7. module.exports = function(app, opts) {
  8. return new Component(app, opts);
  9. };
  10. /**
  11. * Connector component. Receive client requests and attach session with socket.
  12. *
  13. * @param {Object} app current application context
  14. * @param {Object} opts attach parameters
  15. * opts.connector {Object} provides low level network and protocol details implementation between server and clients.
  16. */
  17. var Component = function(app, opts) {
  18. opts = opts || {};
  19. this.app = app;
  20. this.connector = getConnector(app, opts);
  21. this.encode = opts.encode;
  22. this.decode = opts.decode;
  23. this.useCrypto = opts.useCrypto;
  24. this.blacklistFun = opts.blacklistFun;
  25. this.keys = {};
  26. this.blacklist = [];
  27. if(opts.useDict) {
  28. app.load(pomelo.dictionary, app.get('dictionaryConfig'));
  29. }
  30. if(opts.useProtobuf) {
  31. app.load(pomelo.protobuf, app.get('protobufConfig'));
  32. }
  33. // component dependencies
  34. this.server = null;
  35. this.session = null;
  36. this.connection = null;
  37. };
  38. var pro = Component.prototype;
  39. pro.name = '__connector__';
  40. pro.start = function(cb) {
  41. this.server = this.app.components.__server__;
  42. this.session = this.app.components.__session__;
  43. this.connection = this.app.components.__connection__;
  44. // check component dependencies
  45. if(!this.server) {
  46. process.nextTick(function() {
  47. utils.invokeCallback(cb, new Error('fail to start connector component for no server component loaded'));
  48. });
  49. return;
  50. }
  51. if(!this.session) {
  52. process.nextTick(function() {
  53. utils.invokeCallback(cb, new Error('fail to start connector component for no session component loaded'));
  54. });
  55. return;
  56. }
  57. process.nextTick(cb);
  58. };
  59. pro.afterStart = function(cb) {
  60. this.connector.start(cb);
  61. this.connector.on('connection', hostFilter.bind(this, bindEvents));
  62. };
  63. pro.stop = function(force, cb) {
  64. if(this.connector) {
  65. this.connector.stop(force, cb);
  66. this.connector = null;
  67. return;
  68. } else {
  69. process.nextTick(cb);
  70. }
  71. };
  72. pro.send = function(reqId, route, msg, recvs, opts, cb) {
  73. logger.debug('[%s] send message reqId: %s, route: %s, msg: %j, receivers: %j, opts: %j', this.app.serverId, reqId, route, msg, recvs, opts);
  74. var emsg = msg;
  75. if(this.encode) {
  76. // use costumized encode
  77. emsg = this.encode.call(this, reqId, route, msg);
  78. } else if(this.connector.encode) {
  79. // use connector default encode
  80. emsg = this.connector.encode(reqId, route, msg);
  81. }
  82. if(!emsg) {
  83. process.nextTick(function() {
  84. utils.invokeCallback(cb, new Error('fail to send message for encode result is empty.'));
  85. return;
  86. });
  87. }
  88. this.app.components.__pushScheduler__.schedule(reqId, route, emsg,
  89. recvs, opts, cb);
  90. };
  91. pro.setPubKey = function(id, key) {
  92. var pubKey = new rsa.Key();
  93. pubKey.n = new rsa.BigInteger(key.rsa_n, 16);
  94. pubKey.e = key.rsa_e;
  95. this.keys[id] = pubKey;
  96. };
  97. pro.getPubKey = function(id) {
  98. return this.keys[id];
  99. };
  100. var getConnector = function(app, opts) {
  101. var connector = opts.connector;
  102. if(!connector) {
  103. return getDefaultConnector(app, opts);
  104. }
  105. if(typeof connector !== 'function') {
  106. return connector;
  107. }
  108. var curServer = app.getCurServer();
  109. return connector(curServer.clientPort, curServer.host, opts);
  110. };
  111. var getDefaultConnector = function(app, opts) {
  112. var DefaultConnector = require('../connectors/sioconnector');
  113. var curServer = app.getCurServer();
  114. return new DefaultConnector(curServer.clientPort, curServer.host, opts);
  115. };
  116. var hostFilter = function(cb, socket) {
  117. var ip = socket.remoteAddress.ip;
  118. var check = function(list) {
  119. for(var address in list) {
  120. var exp = new RegExp(list[address]);
  121. if(exp.test(ip)) {
  122. socket.disconnect();
  123. return true;
  124. }
  125. }
  126. return false;
  127. };
  128. // dynamical check
  129. if(this.blacklist.length !== 0 && !!check(this.blacklist)) {
  130. return;
  131. }
  132. // static check
  133. if(!!this.blacklistFun && typeof this.blacklistFun === 'function') {
  134. var self = this;
  135. self.blacklistFun(function(err, list) {
  136. if(!!err) {
  137. logger.error('connector blacklist error: %j', err.stack);
  138. utils.invokeCallback(cb, self, socket);
  139. return;
  140. }
  141. if(!Array.isArray(list)) {
  142. logger.error('connector blacklist is not array: %j', list);
  143. utils.invokeCallback(cb, self, socket);
  144. return;
  145. }
  146. if(!!check(list)) {
  147. return;
  148. } else {
  149. utils.invokeCallback(cb, self, socket);
  150. return;
  151. }
  152. });
  153. } else {
  154. utils.invokeCallback(cb, this, socket);
  155. }
  156. };
  157. var bindEvents = function(self, socket) {
  158. if(self.connection) {
  159. self.connection.increaseConnectionCount();
  160. var statisticInfo = self.connection.getStatisticsInfo();
  161. var curServer = self.app.getCurServer();
  162. if(statisticInfo.totalConnCount > curServer['max-connections']) {
  163. logger.warn('the server %s has reached the max connections %s', curServer.id, curServer['max-connections']);
  164. socket.disconnect();
  165. return;
  166. }
  167. }
  168. //create session for connection
  169. var session = getSession(self, socket);
  170. var closed = false;
  171. socket.on('disconnect', function() {
  172. if(closed) {
  173. return;
  174. }
  175. closed = true;
  176. if(self.connection) {
  177. self.connection.decreaseConnectionCount(session.uid);
  178. }
  179. });
  180. socket.on('error', function() {
  181. if(closed) {
  182. return;
  183. }
  184. closed = true;
  185. if(self.connection) {
  186. self.connection.decreaseConnectionCount(session.uid);
  187. }
  188. });
  189. // new message
  190. socket.on('message', function(msg) {
  191. var dmsg = msg;
  192. if(self.decode) {
  193. dmsg = self.decode.call(self, msg, session);
  194. } else if(self.connector.decode) {
  195. dmsg = self.connector.decode(msg);
  196. }
  197. if(!dmsg) {
  198. // discard invalid message
  199. return;
  200. }
  201. // use rsa crypto
  202. if(self.useCrypto) {
  203. var verified = verifyMessage(self, session, dmsg);
  204. if(!verified) {
  205. logger.error('fail to verify the data received from client.');
  206. return;
  207. }
  208. }
  209. handleMessage(self, session, dmsg);
  210. }); //on message end
  211. };
  212. /**
  213. * get session for current connection
  214. */
  215. var getSession = function(self, socket) {
  216. var app = self.app, sid = socket.id;
  217. var session = self.session.get(sid);
  218. if(session) {
  219. return session;
  220. }
  221. session = self.session.create(sid, app.getServerId(), socket);
  222. logger.debug('[%s] getSession session is created with session id: %s', app.getServerId(), sid);
  223. // bind events for session
  224. socket.on('disconnect', session.closed.bind(session));
  225. socket.on('error', session.closed.bind(session));
  226. session.on('closed', onSessionClose.bind(null, app));
  227. session.on('bind', function(uid) {
  228. logger.debug('session on [%s] bind with uid: %s', self.app.serverId, uid);
  229. // update connection statistics if necessary
  230. if(self.connection) {
  231. self.connection.addLoginedUser(uid, {
  232. loginTime: Date.now(),
  233. uid: uid,
  234. address: socket.remoteAddress.ip + ':' + socket.remoteAddress.port
  235. });
  236. }
  237. self.app.event.emit(events.BIND_SESSION, session);
  238. });
  239. session.on('unbind', function(uid) {
  240. if(self.connection) {
  241. self.connection.removeLoginedUser(uid);
  242. }
  243. self.app.event.emit(events.UNBIND_SESSION, session);
  244. });
  245. return session;
  246. };
  247. var onSessionClose = function(app, session, reason) {
  248. taskManager.closeQueue(session.id, true);
  249. app.event.emit(events.CLOSE_SESSION, session);
  250. };
  251. var handleMessage = function(self, session, msg) {
  252. logger.debug('[%s] handleMessage session id: %s, msg: %j', self.app.serverId, session.id, msg);
  253. var type = checkServerType(msg.route);
  254. if(!type) {
  255. logger.error('invalid route string. route : %j', msg.route);
  256. return;
  257. }
  258. self.server.globalHandle(msg, session.toFrontendSession(), function(err, resp, opts) {
  259. if(resp && !msg.id) {
  260. logger.warn('try to response to a notify: %j', msg.route);
  261. return;
  262. }
  263. if (!msg.id && !resp) return;
  264. if (!resp) resp = {};
  265. if (!!err && !resp.code){
  266. resp.code = 500;
  267. }
  268. opts = {type: 'response', userOptions: opts || {}};
  269. // for compatiablity
  270. opts.isResponse = true;
  271. self.send(msg.id, msg.route, resp, [session.id], opts,
  272. function() {});
  273. });
  274. };
  275. /**
  276. * Get server type form request message.
  277. */
  278. var checkServerType = function (route) {
  279. if(!route) {
  280. return null;
  281. }
  282. var idx = route.indexOf('.');
  283. if(idx < 0) {
  284. return null;
  285. }
  286. return route.substring(0, idx);
  287. };
  288. var verifyMessage = function (self, session, msg) {
  289. var sig = msg.body.__crypto__;
  290. if(!sig) {
  291. logger.error('receive data from client has no signature [%s]', self.app.serverId);
  292. return false;
  293. }
  294. var pubKey;
  295. if(!session) {
  296. logger.error('could not find session.');
  297. return false;
  298. }
  299. if(!session.get('pubKey')) {
  300. pubKey = self.getPubKey(session.id);
  301. if(!!pubKey) {
  302. delete self.keys[session.id];
  303. session.set('pubKey', pubKey);
  304. }
  305. else {
  306. logger.error('could not get public key, session id is %s', session.id);
  307. return false;
  308. }
  309. }
  310. else {
  311. pubKey = session.get('pubKey');
  312. }
  313. if(!pubKey.n || !pubKey.e) {
  314. logger.error('could not verify message without public key [%s]', self.app.serverId);
  315. return false;
  316. }
  317. delete msg.body.__crypto__;
  318. var message = JSON.stringify(msg.body);
  319. if(utils.hasChineseChar(message))
  320. message = utils.unicodeToUtf8(message);
  321. return pubKey.verifyString(message, sig);
  322. };