mqttconnector.js 2.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899
  1. var util = require('util');
  2. var EventEmitter = require('events').EventEmitter;
  3. var mqtt = require('mqtt');
  4. var constants = require('../util/constants');
  5. var MQTTSocket = require('./mqttsocket');
  6. var Adaptor = require('./mqtt/mqttadaptor');
  7. var generate = require('./mqtt/generate');
  8. var logger = require('pomelo-logger').getLogger('pomelo', __filename);
  9. var curId = 1;
  10. /**
  11. * Connector that manager low level connection and protocol bewteen server and client.
  12. * Develper can provide their own connector to switch the low level prototol, such as tcp or probuf.
  13. */
  14. var Connector = function(port, host, opts) {
  15. if (!(this instanceof Connector)) {
  16. return new Connector(port, host, opts);
  17. }
  18. EventEmitter.call(this);
  19. this.port = port;
  20. this.host = host;
  21. this.opts = opts || {};
  22. this.adaptor = new Adaptor(this.opts);
  23. };
  24. util.inherits(Connector, EventEmitter);
  25. module.exports = Connector;
  26. /**
  27. * Start connector to listen the specified port
  28. */
  29. Connector.prototype.start = function(cb) {
  30. var self = this;
  31. this.mqttServer = mqtt.createServer();
  32. this.mqttServer.on('client', function(client) {
  33. client.on('error', function(err) {
  34. client.stream.destroy();
  35. });
  36. client.on('close', function() {
  37. client.stream.destroy();
  38. });
  39. client.on('disconnect', function(packet) {
  40. client.stream.destroy();
  41. });
  42. if(self.opts.disconnectOnTimeout) {
  43. var timeout = self.opts.timeout * 1000 || constants.TIME.DEFAULT_MQTT_HEARTBEAT_TIMEOUT;
  44. client.stream.setTimeout(timeout,function() {
  45. client.emit('close');
  46. });
  47. }
  48. client.on('connect', function(packet) {
  49. client.connack({returnCode: 0});
  50. var mqttsocket = new MQTTSocket(curId++, client, self.adaptor);
  51. self.emit('connection', mqttsocket);
  52. });
  53. });
  54. this.mqttServer.listen(this.port);
  55. process.nextTick(cb);
  56. };
  57. Connector.prototype.stop = function() {
  58. this.mqttServer.close();
  59. process.exit(0);
  60. };
  61. var composeResponse = function(msgId, route, msgBody) {
  62. return {
  63. id: msgId,
  64. body: msgBody
  65. };
  66. };
  67. var composePush = function(route, msgBody) {
  68. var msg = generate.publish(msgBody);
  69. if(!msg) {
  70. logger.error('invalid mqtt publish message: %j', msgBody);
  71. }
  72. return msg;
  73. };
  74. Connector.prototype.encode = function(reqId, route, msgBody) {
  75. if (!!reqId) {
  76. return composeResponse(reqId, route, msgBody);
  77. } else {
  78. return composePush(route, msgBody);
  79. }
  80. };
  81. Connector.prototype.close = function() {
  82. this.mqttServer.close();
  83. };