client.js 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163
  1. // Copyright 2015 The MemDB Authors.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
  12. // implied. See the License for the specific language governing
  13. // permissions and limitations under the License. See the AUTHORS file
  14. // for names of contributors.
  15. 'use strict';
  16. var P = require('bluebird');
  17. var util = require('util');
  18. var net = require('net');
  19. var EventEmitter = require('events').EventEmitter;
  20. var Protocol = require('./protocol');
  21. var logger = require('memdb-logger').getLogger('memdb-client', __filename);
  22. var Client = function(){
  23. EventEmitter.call(this);
  24. this.protocol = null;
  25. this.seq = 1;
  26. this.requests = {}; //{seq : deferred}
  27. this.domains = {}; //{seq : domain} saved domains
  28. this.disconnectDeferred = null;
  29. };
  30. util.inherits(Client, EventEmitter);
  31. var proto = Client.prototype;
  32. proto.connect = function(host, port){
  33. if(!!this.protocol){
  34. throw new Error('connect already called');
  35. }
  36. var self = this;
  37. logger.debug('start connect to %s:%s', host, port);
  38. var connectDeferred = P.defer();
  39. var socket = net.createConnection(port, host);
  40. this.protocol = new Protocol({socket : socket});
  41. this.protocol.once('connect', function(){
  42. logger.info('connected to %s:%s', host, port);
  43. connectDeferred.resolve();
  44. });
  45. this.protocol.on('close', function(){
  46. logger.info('disconnected from %s:%s', host, port);
  47. // reject all remaining requests
  48. for(var seq in self.requests){
  49. process.domain = self.domains[seq];
  50. self.requests[seq].reject(new Error('connection closed'));
  51. }
  52. self.requests = {};
  53. self.domains = {};
  54. // Server will not disconnect if the client process exit immediately
  55. // So delay resolve promise
  56. if(self.disconnectDeferred){
  57. setTimeout(function(){
  58. self.disconnectDeferred.resolve();
  59. }, 1);
  60. }
  61. self.protocol = null;
  62. self.emit('close');
  63. });
  64. this.protocol.on('msg', function(msg){
  65. var request = self.requests[msg.seq];
  66. if(!request){
  67. return;
  68. }
  69. // restore saved domain
  70. process.domain = self.domains[msg.seq];
  71. if(!msg.err){
  72. logger.info('%s:%s => %j', host, port, msg);
  73. request.resolve(msg.data);
  74. }
  75. else{
  76. logger.error('%s:%s => %j', host, port, msg);
  77. request.reject(msg.err);
  78. }
  79. delete self.requests[msg.seq];
  80. delete self.domains[msg.seq];
  81. });
  82. this.protocol.on('error', function(err){
  83. if(!connectDeferred.isResolved()){
  84. connectDeferred.reject(err);
  85. }
  86. // Reject all pending requests
  87. Object.keys(self.requests).forEach(function(seq){
  88. // restore saved domain
  89. process.domain = self.domains[seq];
  90. self.requests[seq].reject(err);
  91. delete self.domains[seq];
  92. delete self.requests[seq];
  93. });
  94. });
  95. this.protocol.on('timeout', function(){
  96. self.disconnect();
  97. });
  98. return connectDeferred.promise;
  99. };
  100. proto.disconnect = function(){
  101. if(!this.protocol){
  102. return;
  103. }
  104. this.disconnectDeferred = P.defer();
  105. this.protocol.disconnect();
  106. return this.disconnectDeferred.promise;
  107. };
  108. proto.request = function(connId, method, args){
  109. if(!this.protocol){
  110. throw new Error('not connected');
  111. }
  112. var seq = this.seq++;
  113. var deferred = P.defer();
  114. this.requests[seq] = deferred;
  115. var msg = {
  116. seq : seq,
  117. connId : connId,
  118. method : method,
  119. args : args,
  120. };
  121. this.protocol.send(msg);
  122. // save domain
  123. this.domains[seq] = process.domain;
  124. logger.info('%s:%s <= %j', this.protocol.socket.remoteAddress, this.protocol.socket.remotePort, msg);
  125. return deferred.promise;
  126. };
  127. module.exports = Client;