push.js 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414
  1. // Copyright 2015 rain1017.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
  12. // implied. See the License for the specific language governing
  13. // permissions and limitations under the License. See the AUTHORS file
  14. // for names of contributors.
  15. 'use strict';
  16. var logger = require('memdb-client').logger.getLogger('push', __filename);
  17. var P = require('memdb-client').Promise;
  18. // Msgs keep in history
  19. var DEFAULT_MAX_MSG_COUNT = 100;
  20. // Max player count in one channel
  21. var DEFAULT_MAX_PLAYER_COUNT = 1000;
  22. var Controller = function(app){
  23. this.app = app;
  24. var opts = app.get('pushConfig') || {};
  25. this.config = {
  26. maxMsgCount : opts.maxMsgCount || DEFAULT_MAX_MSG_COUNT,
  27. maxPlayerCount : opts.maxPlayerCount || DEFAULT_MAX_PLAYER_COUNT,
  28. };
  29. this.msgBuff = {}; // {connectorId : [{uids : [uid], route : 'route', msg : msg}] }
  30. var self = this;
  31. this.app.event.on('transactionSuccess', function(){
  32. try{
  33. self.onTransactionSuccess();
  34. }
  35. catch(err){
  36. logger.error(err.stack);
  37. }
  38. });
  39. this.app.event.on('transactionFail', function(){
  40. try{
  41. self.onTransactionFail();
  42. }
  43. catch(err){
  44. logger.error(err.stack);
  45. }
  46. });
  47. };
  48. var proto = Controller.prototype;
  49. /**
  50. * ChannelIds:
  51. *
  52. * a:areaId - channel for an area
  53. * t:teamId - channel for a team
  54. * p:playerId - channel for a player
  55. * g:groupId - channel for a discussion group
  56. */
  57. /**
  58. * player join a channel
  59. * auto create new channels
  60. */
  61. proto.joinAsync = function(channelId, playerId, connectorId){
  62. var models = this.app.models;
  63. if(!connectorId){
  64. connectorId = '';
  65. }
  66. var channel = null;
  67. return P.bind(this)
  68. .then(function(){
  69. return P.bind(this)
  70. .then(function(){
  71. return models.Channel.findByIdAsync(channelId);
  72. })
  73. .then(function(ret){
  74. if(ret){
  75. channel = ret;
  76. return;
  77. }
  78. channel = new models.Channel({_id : channelId});
  79. logger.info('create channel %s', channelId);
  80. var channelMsg = new models.ChannelMsg({_id : channelId});
  81. return channelMsg.saveAsync();
  82. })
  83. .then(function(){
  84. if(channel.players.length >= this.config.maxPlayerCount){
  85. throw new Error('player count in channel ' + channelId + ' exceed limit');
  86. }
  87. var index = channel.players.indexOf(playerId);
  88. if(index === -1){
  89. channel.players.push(playerId);
  90. channel.connectors.push(connectorId);
  91. }
  92. else{
  93. channel.connectors[index] = connectorId;
  94. }
  95. return channel.saveAsync();
  96. });
  97. })
  98. .then(function(ret){
  99. return P.bind(this)
  100. .then(function(){
  101. return models.PlayerChannel.findByIdAsync(playerId);
  102. })
  103. .then(function(playerChannel){
  104. if(!playerChannel){
  105. playerChannel = new models.PlayerChannel({_id : playerId});
  106. }
  107. if(playerChannel.channels.indexOf(channelId) === -1){
  108. playerChannel.channels.push(channelId);
  109. }
  110. return playerChannel.saveAsync();
  111. });
  112. })
  113. .then(function(){
  114. logger.info('joinAsync %j', [channelId, playerId, connectorId]);
  115. });
  116. };
  117. /**
  118. * player quit a channel
  119. * auto remove empty channels
  120. */
  121. proto.quitAsync = function(channelId, playerId){
  122. var models = this.app.models;
  123. return P.bind(this)
  124. .then(function(){
  125. return P.bind(this)
  126. .then(function(){
  127. return models.Channel.findByIdAsync(channelId);
  128. })
  129. .then(function(channel){
  130. if(!channel){
  131. throw new Error('channel ' + channelId + ' not exist');
  132. }
  133. var index = channel.players.indexOf(playerId);
  134. if(index !== -1){
  135. channel.players.splice(index, 1);
  136. channel.connectors.splice(index, 1);
  137. }
  138. if(channel.players.length === 0){
  139. logger.info('remove channel %s', channelId);
  140. return channel.removeAsync()
  141. .then(function(){
  142. return models.ChannelMsg.removeAsync({_id : channelId});
  143. });
  144. }
  145. else{
  146. return channel.saveAsync();
  147. }
  148. });
  149. })
  150. .then(function(ret){
  151. return P.bind(this)
  152. .then(function(){
  153. return models.PlayerChannel.findByIdAsync(playerId);
  154. })
  155. .then(function(playerChannel){
  156. if(!playerChannel){
  157. throw new Error('playerChannel ' + playerId + ' not exist');
  158. }
  159. var index = playerChannel.channels.indexOf(channelId);
  160. if(index !== -1){
  161. playerChannel.channels.splice(index, 1);
  162. }
  163. if(playerChannel.channels.length === 0){
  164. return playerChannel.removeAsync();
  165. }
  166. else{
  167. return playerChannel.saveAsync();
  168. }
  169. });
  170. })
  171. .then(function(){
  172. logger.info('quitAsync %j', [channelId, playerId]);
  173. });
  174. };
  175. proto.connectAsync = function(playerId, connectorId){
  176. if(!connectorId){
  177. connectorId = '';
  178. }
  179. var models = this.app.models;
  180. return P.bind(this)
  181. .then(function(){
  182. return models.PlayerChannel.findByIdAsync(playerId);
  183. })
  184. .then(function(playerChannel){
  185. if(!playerChannel){
  186. return;
  187. }
  188. return P.each(playerChannel.channels, function(channelId){
  189. return P.try(function(){
  190. return models.Channel.findByIdAsync(channelId);
  191. })
  192. .then(function(channel){
  193. if(!channel){
  194. throw new Error('channel ' + channelId + ' not exist');
  195. }
  196. var index = channel.players.indexOf(playerId);
  197. channel.connectors[index] = connectorId;
  198. channel.markModified('connectors');
  199. return channel.saveAsync();
  200. });
  201. });
  202. })
  203. .then(function(){
  204. logger.info('connectAsync %j', [playerId, connectorId]);
  205. });
  206. };
  207. proto.disconnectAsync = function(playerId){
  208. var models = this.app.models;
  209. return P.bind(this)
  210. .then(function(){
  211. return models.PlayerChannel.findByIdAsync(playerId);
  212. })
  213. .then(function(playerChannel){
  214. if(!playerChannel){
  215. return;
  216. }
  217. return P.each(playerChannel.channels, function(channelId){
  218. return P.try(function(){
  219. return models.Channel.findByIdAsync(channelId);
  220. })
  221. .then(function(channel){
  222. if(!channel){
  223. throw new Error('channel ' + channelId + ' not exist');
  224. }
  225. var index = channel.players.indexOf(playerId);
  226. channel.connectors[index] = '';
  227. channel.markModified('connectors');
  228. return channel.saveAsync();
  229. });
  230. });
  231. })
  232. .then(function(){
  233. logger.info('disconnectAsync %j', [playerId]);
  234. });
  235. };
  236. proto.pushAsync = function(channelId, playerIds, route, msg, persistent){
  237. var args = [].slice.call(arguments);
  238. var channelMsg = null;
  239. var self = this;
  240. return P.try(function(){
  241. if(persistent){
  242. return self.app.models.ChannelMsg.findByIdAsync(channelId);
  243. }
  244. })
  245. .then(function(ret){
  246. channelMsg = ret;
  247. return self.app.models.Channel.findByIdAsync(channelId);
  248. })
  249. .then(function(channel){
  250. if(!channel){
  251. throw new Error('channel ' + channelId + ' not exist');
  252. }
  253. var pushMsg = {msg : msg, route : route};
  254. if(persistent){
  255. pushMsg.seq = channelMsg.seq;
  256. channelMsg.msgs.push(pushMsg);
  257. channelMsg.seq++;
  258. if(channelMsg.msgs.length > self.config.maxMsgCount){
  259. channelMsg.msgs = channelMsg.msgs.slice(self.config.maxMsgCount / 2);
  260. }
  261. channelMsg.markModified('msgs');
  262. }
  263. var connectorUids = {};
  264. if(!!playerIds){
  265. if(persistent){
  266. throw new Error('can not send persistent message to specific players');
  267. }
  268. playerIds.forEach(function(playerId){
  269. var index = channel.players.indexOf(playerId);
  270. if(index !== -1){
  271. var connectorId = channel.connectors[index];
  272. if(!!connectorId){
  273. if(!connectorUids[connectorId]){
  274. connectorUids[connectorId] = [];
  275. }
  276. connectorUids[connectorId].push(playerId);
  277. }
  278. }
  279. });
  280. }
  281. else{
  282. for(var i=0; i<channel.players.length; i++){
  283. var connectorId = channel.connectors[i];
  284. if(!!connectorId){
  285. if(!connectorUids[connectorId]){
  286. connectorUids[connectorId] = [];
  287. }
  288. var playerId = channel.players[i];
  289. connectorUids[connectorId].push(playerId);
  290. }
  291. }
  292. }
  293. // Save msg in buffer (will send on transaction success)
  294. Object.keys(connectorUids).forEach(function(connectorId){
  295. if(!self.msgBuff.hasOwnProperty(connectorId)){
  296. self.msgBuff[connectorId] = [];
  297. }
  298. self.msgBuff[connectorId].push({
  299. uids : connectorUids[connectorId],
  300. route : route,
  301. msg : pushMsg
  302. });
  303. });
  304. if(persistent){
  305. return channelMsg.saveAsync();
  306. }
  307. })
  308. .then(function(){
  309. logger.info('push %j', args);
  310. });
  311. };
  312. proto.onTransactionSuccess = function(){
  313. logger.debug('onTransactionSuccess');
  314. var msgBuff = this.msgBuff;
  315. this.msgBuff = {};
  316. var self = this;
  317. Object.keys(msgBuff).forEach(function(connectorId){
  318. msgBuff[connectorId].forEach(function(item){
  319. var opts = {type : 'push', userOptions: {}, isPush : true};
  320. // return immediately, ignore result
  321. P.promisify(self.app.rpcInvoke, self.app)(connectorId, {
  322. namespace : 'sys',
  323. service : 'channelRemote',
  324. method : 'pushMessage',
  325. args : [item.route, item.msg, item.uids, opts]
  326. })
  327. .catch(function(e){
  328. logger.warn(e.stack);
  329. })
  330. .then(function(){
  331. logger.info('pushToConnectors %s %j %j %j', connectorId, item.uids, item.route, item.msg);
  332. });
  333. });
  334. });
  335. };
  336. proto.onTransactionFail = function(){
  337. logger.debug('onTransactionFail');
  338. // Clear msg buff
  339. this.msgBuff = {};
  340. };
  341. proto.getMsgsAsync = function(channelId, seq, count){
  342. if(!seq){
  343. seq = 0;
  344. }
  345. if(!count){
  346. count = this.config.maxMsgCount;
  347. }
  348. return P.bind(this)
  349. .then(function(){
  350. return this.app.models.ChannelMsg.findByIdAsync(channelId);
  351. })
  352. .then(function(channelMsg){
  353. if(!channelMsg){
  354. throw new Error('channel ' + channelId + ' not exist');
  355. }
  356. var start = seq - channelMsg.seq + channelMsg.msgs.length, end = start + count;
  357. if(start < 0){
  358. start = 0;
  359. }
  360. if(end < 0){
  361. end = 0;
  362. }
  363. var msgs = channelMsg.msgs.slice(start, end);
  364. logger.info('getMsgsAsync %j => %j', [channelId, seq, count], msgs);
  365. return msgs;
  366. });
  367. };
  368. module.exports = function(app){
  369. return new Controller(app);
  370. };