protocol.js 2.5 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495
  1. // Copyright 2015 The MemDB Authors.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
  12. // implied. See the License for the specific language governing
  13. // permissions and limitations under the License. See the AUTHORS file
  14. // for names of contributors.
  15. 'use strict';
  16. var util = require('util');
  17. var EventEmitter = require('events').EventEmitter;
  18. var P = require('bluebird');
  19. var logger = require('memdb-logger').getLogger('memdb', __filename);
  20. var DEFAULT_MAX_MSG_LENGTH = 1024 * 1024;
  21. var Protocol = function(opts){
  22. EventEmitter.call(this);
  23. opts = opts || {};
  24. this.socket = opts.socket;
  25. this.socket.setEncoding('utf8');
  26. this.maxMsgLength = opts.maxMsgLength || DEFAULT_MAX_MSG_LENGTH;
  27. this.remainLine = '';
  28. var self = this;
  29. this.socket.on('data', function(data){
  30. // message is json encoded and splited by '\n'
  31. var lines = data.split('\n');
  32. for(var i=0; i<lines.length - 1; i++){
  33. try{
  34. var msg = '';
  35. if(i === 0){
  36. msg = JSON.parse(self.remainLine + lines[i]);
  37. self.remainLine = '';
  38. }
  39. else{
  40. msg = JSON.parse(lines[i]);
  41. }
  42. self.emit('msg', msg);
  43. }
  44. catch(err){
  45. logger.error(err.stack);
  46. }
  47. }
  48. self.remainLine = lines[lines.length - 1];
  49. });
  50. this.socket.on('close', function(hadError){
  51. self.emit('close', hadError);
  52. });
  53. this.socket.on('connect', function(){
  54. self.emit('connect');
  55. });
  56. this.socket.on('error', function(err){
  57. self.emit('error', err);
  58. });
  59. this.socket.on('timeout', function(){
  60. self.emit('timeout');
  61. });
  62. };
  63. util.inherits(Protocol, EventEmitter);
  64. Protocol.prototype.send = function(msg){
  65. var data = JSON.stringify(msg) + '\n';
  66. if(data.length > this.maxMsgLength){
  67. throw new Error('msg length exceed limit');
  68. }
  69. var ret = this.socket.write(data);
  70. if(!ret){
  71. logger.warn('socket.write return false');
  72. }
  73. };
  74. Protocol.prototype.disconnect = function(){
  75. this.socket.end();
  76. };
  77. module.exports = Protocol;