index.js 43 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284
  1. /*global Buffer require exports console setTimeout */
  2. var net = require("net"),
  3. util = require("./lib/util"),
  4. Queue = require("./lib/queue"),
  5. to_array = require("./lib/to_array"),
  6. events = require("events"),
  7. crypto = require("crypto"),
  8. parsers = [], commands,
  9. connection_id = 0,
  10. default_port = 6379,
  11. default_host = "127.0.0.1";
  12. // can set this to true to enable for all connections
  13. exports.debug_mode = false;
  14. var arraySlice = Array.prototype.slice
  15. function trace() {
  16. if (!exports.debug_mode) return;
  17. console.log.apply(null, arraySlice.call(arguments))
  18. }
  19. // hiredis might not be installed
  20. try {
  21. require("./lib/parser/hiredis");
  22. parsers.push(require("./lib/parser/hiredis"));
  23. } catch (err) {
  24. if (exports.debug_mode) {
  25. console.warn("hiredis parser not installed.");
  26. }
  27. }
  28. parsers.push(require("./lib/parser/javascript"));
  29. function RedisClient(stream, options) {
  30. this.stream = stream;
  31. this.options = options = options || {};
  32. this.connection_id = ++connection_id;
  33. this.connected = false;
  34. this.ready = false;
  35. this.connections = 0;
  36. if (this.options.socket_nodelay === undefined) {
  37. this.options.socket_nodelay = true;
  38. }
  39. if (this.options.socket_keepalive === undefined) {
  40. this.options.socket_keepalive = true;
  41. }
  42. this.should_buffer = false;
  43. this.command_queue_high_water = this.options.command_queue_high_water || 1000;
  44. this.command_queue_low_water = this.options.command_queue_low_water || 0;
  45. this.max_attempts = null;
  46. if (options.max_attempts && !isNaN(options.max_attempts) && options.max_attempts > 0) {
  47. this.max_attempts = +options.max_attempts;
  48. }
  49. this.command_queue = new Queue(); // holds sent commands to de-pipeline them
  50. this.offline_queue = new Queue(); // holds commands issued but not able to be sent
  51. this.commands_sent = 0;
  52. this.connect_timeout = false;
  53. if (options.connect_timeout && !isNaN(options.connect_timeout) && options.connect_timeout > 0) {
  54. this.connect_timeout = +options.connect_timeout;
  55. }
  56. this.enable_offline_queue = true;
  57. if (typeof this.options.enable_offline_queue === "boolean") {
  58. this.enable_offline_queue = this.options.enable_offline_queue;
  59. }
  60. this.retry_max_delay = null;
  61. if (options.retry_max_delay !== undefined && !isNaN(options.retry_max_delay) && options.retry_max_delay > 0) {
  62. this.retry_max_delay = options.retry_max_delay;
  63. }
  64. this.initialize_retry_vars();
  65. this.pub_sub_mode = false;
  66. this.subscription_set = {};
  67. this.monitoring = false;
  68. this.closing = false;
  69. this.server_info = {};
  70. this.auth_pass = null;
  71. if (options.auth_pass !== undefined) {
  72. this.auth_pass = options.auth_pass;
  73. }
  74. this.parser_module = null;
  75. this.selected_db = null; // save the selected db here, used when reconnecting
  76. this.old_state = null;
  77. this.install_stream_listeners();
  78. events.EventEmitter.call(this);
  79. }
  80. util.inherits(RedisClient, events.EventEmitter);
  81. exports.RedisClient = RedisClient;
  82. RedisClient.prototype.install_stream_listeners = function() {
  83. var self = this;
  84. this.stream.on("connect", function () {
  85. self.on_connect();
  86. });
  87. this.stream.on("data", function (buffer_from_socket) {
  88. self.on_data(buffer_from_socket);
  89. });
  90. this.stream.on("error", function (msg) {
  91. self.on_error(msg.message);
  92. });
  93. this.stream.on("close", function () {
  94. self.connection_gone("close");
  95. });
  96. this.stream.on("end", function () {
  97. self.connection_gone("end");
  98. });
  99. this.stream.on("drain", function () {
  100. self.should_buffer = false;
  101. self.emit("drain");
  102. });
  103. };
  104. RedisClient.prototype.initialize_retry_vars = function () {
  105. this.retry_timer = null;
  106. this.retry_totaltime = 0;
  107. this.retry_delay = 150;
  108. this.retry_backoff = 1.7;
  109. this.attempts = 1;
  110. };
  111. RedisClient.prototype.unref = function () {
  112. trace("User requesting to unref the connection");
  113. if (this.connected) {
  114. trace("unref'ing the socket connection");
  115. this.stream.unref();
  116. }
  117. else {
  118. trace("Not connected yet, will unref later");
  119. this.once("connect", function () {
  120. this.unref();
  121. })
  122. }
  123. };
  124. // flush offline_queue and command_queue, erroring any items with a callback first
  125. RedisClient.prototype.flush_and_error = function (message) {
  126. var command_obj, error;
  127. error = new Error(message);
  128. while (this.offline_queue.length > 0) {
  129. command_obj = this.offline_queue.shift();
  130. if (typeof command_obj.callback === "function") {
  131. try {
  132. command_obj.callback(error);
  133. } catch (callback_err) {
  134. process.nextTick(function () {
  135. throw callback_err;
  136. });
  137. }
  138. }
  139. }
  140. this.offline_queue = new Queue();
  141. while (this.command_queue.length > 0) {
  142. command_obj = this.command_queue.shift();
  143. if (typeof command_obj.callback === "function") {
  144. try {
  145. command_obj.callback(error);
  146. } catch (callback_err) {
  147. process.nextTick(function () {
  148. throw callback_err;
  149. });
  150. }
  151. }
  152. }
  153. this.command_queue = new Queue();
  154. };
  155. RedisClient.prototype.on_error = function (msg) {
  156. var message = "Redis connection to " + this.address + " failed - " + msg;
  157. if (this.closing) {
  158. return;
  159. }
  160. if (exports.debug_mode) {
  161. console.warn(message);
  162. }
  163. this.flush_and_error(message);
  164. this.connected = false;
  165. this.ready = false;
  166. this.emit("error", new Error(message));
  167. // "error" events get turned into exceptions if they aren't listened for. If the user handled this error
  168. // then we should try to reconnect.
  169. this.connection_gone("error");
  170. };
  171. RedisClient.prototype.do_auth = function () {
  172. var self = this;
  173. if (exports.debug_mode) {
  174. console.log("Sending auth to " + self.address + " id " + self.connection_id);
  175. }
  176. self.send_anyway = true;
  177. self.send_command("auth", [this.auth_pass], function (err, res) {
  178. if (err) {
  179. if (err.toString().match("LOADING")) {
  180. // if redis is still loading the db, it will not authenticate and everything else will fail
  181. console.log("Redis still loading, trying to authenticate later");
  182. setTimeout(function () {
  183. self.do_auth();
  184. }, 2000); // TODO - magic number alert
  185. return;
  186. } else if (err.toString().match("no password is set")) {
  187. console.log("Warning: Redis server does not require a password, but a password was supplied.")
  188. err = null;
  189. res = "OK";
  190. } else {
  191. return self.emit("error", new Error("Auth error: " + err.message));
  192. }
  193. }
  194. if (res.toString() !== "OK") {
  195. return self.emit("error", new Error("Auth failed: " + res.toString()));
  196. }
  197. if (exports.debug_mode) {
  198. console.log("Auth succeeded " + self.address + " id " + self.connection_id);
  199. }
  200. if (self.auth_callback) {
  201. self.auth_callback(err, res);
  202. self.auth_callback = null;
  203. }
  204. // now we are really connected
  205. self.emit("connect");
  206. self.initialize_retry_vars();
  207. if (self.options.no_ready_check) {
  208. self.on_ready();
  209. } else {
  210. self.ready_check();
  211. }
  212. });
  213. self.send_anyway = false;
  214. };
  215. RedisClient.prototype.on_connect = function () {
  216. if (exports.debug_mode) {
  217. console.log("Stream connected " + this.address + " id " + this.connection_id);
  218. }
  219. this.connected = true;
  220. this.ready = false;
  221. this.connections += 1;
  222. this.command_queue = new Queue();
  223. this.emitted_end = false;
  224. if (this.options.socket_nodelay) {
  225. this.stream.setNoDelay();
  226. }
  227. this.stream.setKeepAlive(this.options.socket_keepalive);
  228. this.stream.setTimeout(0);
  229. this.init_parser();
  230. if (this.auth_pass) {
  231. this.do_auth();
  232. } else {
  233. this.emit("connect");
  234. this.initialize_retry_vars();
  235. if (this.options.no_ready_check) {
  236. this.on_ready();
  237. } else {
  238. this.ready_check();
  239. }
  240. }
  241. };
  242. RedisClient.prototype.init_parser = function () {
  243. var self = this;
  244. if (this.options.parser) {
  245. if (! parsers.some(function (parser) {
  246. if (parser.name === self.options.parser) {
  247. self.parser_module = parser;
  248. if (exports.debug_mode) {
  249. console.log("Using parser module: " + self.parser_module.name);
  250. }
  251. return true;
  252. }
  253. })) {
  254. throw new Error("Couldn't find named parser " + self.options.parser + " on this system");
  255. }
  256. } else {
  257. if (exports.debug_mode) {
  258. console.log("Using default parser module: " + parsers[0].name);
  259. }
  260. this.parser_module = parsers[0];
  261. }
  262. this.parser_module.debug_mode = exports.debug_mode;
  263. // return_buffers sends back Buffers from parser to callback. detect_buffers sends back Buffers from parser, but
  264. // converts to Strings if the input arguments are not Buffers.
  265. this.reply_parser = new this.parser_module.Parser({
  266. return_buffers: self.options.return_buffers || self.options.detect_buffers || false
  267. });
  268. // "reply error" is an error sent back by Redis
  269. this.reply_parser.on("reply error", function (reply) {
  270. if (reply instanceof Error) {
  271. self.return_error(reply);
  272. } else {
  273. self.return_error(new Error(reply));
  274. }
  275. });
  276. this.reply_parser.on("reply", function (reply) {
  277. self.return_reply(reply);
  278. });
  279. // "error" is bad. Somehow the parser got confused. It'll try to reset and continue.
  280. this.reply_parser.on("error", function (err) {
  281. self.emit("error", new Error("Redis reply parser error: " + err.stack));
  282. });
  283. };
  284. RedisClient.prototype.on_ready = function () {
  285. var self = this;
  286. this.ready = true;
  287. if (this.old_state !== null) {
  288. this.monitoring = this.old_state.monitoring;
  289. this.pub_sub_mode = this.old_state.pub_sub_mode;
  290. this.selected_db = this.old_state.selected_db;
  291. this.old_state = null;
  292. }
  293. // magically restore any modal commands from a previous connection
  294. if (this.selected_db !== null) {
  295. // this trick works if and only if the following send_command
  296. // never goes into the offline queue
  297. var pub_sub_mode = this.pub_sub_mode;
  298. this.pub_sub_mode = false;
  299. this.send_command('select', [this.selected_db]);
  300. this.pub_sub_mode = pub_sub_mode;
  301. }
  302. if (this.pub_sub_mode === true) {
  303. // only emit "ready" when all subscriptions were made again
  304. var callback_count = 0;
  305. var callback = function () {
  306. callback_count--;
  307. if (callback_count === 0) {
  308. self.emit("ready");
  309. }
  310. };
  311. Object.keys(this.subscription_set).forEach(function (key) {
  312. var parts = key.split(" ");
  313. if (exports.debug_mode) {
  314. console.warn("sending pub/sub on_ready " + parts[0] + ", " + parts[1]);
  315. }
  316. callback_count++;
  317. self.send_command(parts[0] + "scribe", [parts[1]], callback);
  318. });
  319. return;
  320. } else if (this.monitoring) {
  321. this.send_command("monitor");
  322. } else {
  323. this.send_offline_queue();
  324. }
  325. this.emit("ready");
  326. };
  327. RedisClient.prototype.on_info_cmd = function (err, res) {
  328. var self = this, obj = {}, lines, retry_time;
  329. if (err) {
  330. return self.emit("error", new Error("Ready check failed: " + err.message));
  331. }
  332. lines = res.toString().split("\r\n");
  333. lines.forEach(function (line) {
  334. var parts = line.split(':');
  335. if (parts[1]) {
  336. obj[parts[0]] = parts[1];
  337. }
  338. });
  339. obj.versions = [];
  340. if( obj.redis_version ){
  341. obj.redis_version.split('.').forEach(function (num) {
  342. obj.versions.push(+num);
  343. });
  344. }
  345. // expose info key/vals to users
  346. this.server_info = obj;
  347. if (!obj.loading || (obj.loading && obj.loading === "0")) {
  348. if (exports.debug_mode) {
  349. console.log("Redis server ready.");
  350. }
  351. this.on_ready();
  352. } else {
  353. retry_time = obj.loading_eta_seconds * 1000;
  354. if (retry_time > 1000) {
  355. retry_time = 1000;
  356. }
  357. if (exports.debug_mode) {
  358. console.log("Redis server still loading, trying again in " + retry_time);
  359. }
  360. setTimeout(function () {
  361. self.ready_check();
  362. }, retry_time);
  363. }
  364. };
  365. RedisClient.prototype.ready_check = function () {
  366. var self = this;
  367. if (exports.debug_mode) {
  368. console.log("checking server ready state...");
  369. }
  370. this.send_anyway = true; // secret flag to send_command to send something even if not "ready"
  371. this.info(function (err, res) {
  372. self.on_info_cmd(err, res);
  373. });
  374. this.send_anyway = false;
  375. };
  376. RedisClient.prototype.send_offline_queue = function () {
  377. var command_obj, buffered_writes = 0;
  378. while (this.offline_queue.length > 0) {
  379. command_obj = this.offline_queue.shift();
  380. if (exports.debug_mode) {
  381. console.log("Sending offline command: " + command_obj.command);
  382. }
  383. buffered_writes += !this.send_command(command_obj.command, command_obj.args, command_obj.callback);
  384. }
  385. this.offline_queue = new Queue();
  386. // Even though items were shifted off, Queue backing store still uses memory until next add, so just get a new Queue
  387. if (!buffered_writes) {
  388. this.should_buffer = false;
  389. this.emit("drain");
  390. }
  391. };
  392. RedisClient.prototype.connection_gone = function (why) {
  393. var self = this;
  394. // If a retry is already in progress, just let that happen
  395. if (this.retry_timer) {
  396. return;
  397. }
  398. if (exports.debug_mode) {
  399. console.warn("Redis connection is gone from " + why + " event.");
  400. }
  401. this.connected = false;
  402. this.ready = false;
  403. if (this.old_state === null) {
  404. var state = {
  405. monitoring: this.monitoring,
  406. pub_sub_mode: this.pub_sub_mode,
  407. selected_db: this.selected_db
  408. };
  409. this.old_state = state;
  410. this.monitoring = false;
  411. this.pub_sub_mode = false;
  412. this.selected_db = null;
  413. }
  414. // since we are collapsing end and close, users don't expect to be called twice
  415. if (! this.emitted_end) {
  416. this.emit("end");
  417. this.emitted_end = true;
  418. }
  419. this.flush_and_error("Redis connection gone from " + why + " event.");
  420. // If this is a requested shutdown, then don't retry
  421. if (this.closing) {
  422. this.retry_timer = null;
  423. if (exports.debug_mode) {
  424. console.warn("connection ended from quit command, not retrying.");
  425. }
  426. return;
  427. }
  428. var nextDelay = Math.floor(this.retry_delay * this.retry_backoff);
  429. if (this.retry_max_delay !== null && nextDelay > this.retry_max_delay) {
  430. this.retry_delay = this.retry_max_delay;
  431. } else {
  432. this.retry_delay = nextDelay;
  433. }
  434. if (exports.debug_mode) {
  435. console.log("Retry connection in " + this.retry_delay + " ms");
  436. }
  437. if (this.max_attempts && this.attempts >= this.max_attempts) {
  438. this.retry_timer = null;
  439. // TODO - some people need a "Redis is Broken mode" for future commands that errors immediately, and others
  440. // want the program to exit. Right now, we just log, which doesn't really help in either case.
  441. console.error("node_redis: Couldn't get Redis connection after " + this.max_attempts + " attempts.");
  442. return;
  443. }
  444. this.attempts += 1;
  445. this.emit("reconnecting", {
  446. delay: self.retry_delay,
  447. attempt: self.attempts
  448. });
  449. this.retry_timer = setTimeout(function () {
  450. if (exports.debug_mode) {
  451. console.log("Retrying connection...");
  452. }
  453. self.retry_totaltime += self.retry_delay;
  454. if (self.connect_timeout && self.retry_totaltime >= self.connect_timeout) {
  455. self.retry_timer = null;
  456. // TODO - engage Redis is Broken mode for future commands, or whatever
  457. console.error("node_redis: Couldn't get Redis connection after " + self.retry_totaltime + "ms.");
  458. return;
  459. }
  460. self.stream = net.createConnection(self.connectionOption);
  461. self.install_stream_listeners();
  462. self.retry_timer = null;
  463. }, this.retry_delay);
  464. };
  465. RedisClient.prototype.on_data = function (data) {
  466. if (exports.debug_mode) {
  467. console.log("net read " + this.address + " id " + this.connection_id + ": " + data.toString());
  468. }
  469. try {
  470. this.reply_parser.execute(data);
  471. } catch (err) {
  472. // This is an unexpected parser problem, an exception that came from the parser code itself.
  473. // Parser should emit "error" events if it notices things are out of whack.
  474. // Callbacks that throw exceptions will land in return_reply(), below.
  475. // TODO - it might be nice to have a different "error" event for different types of errors
  476. this.emit("error", err);
  477. }
  478. };
  479. RedisClient.prototype.return_error = function (err) {
  480. var command_obj = this.command_queue.shift(), queue_len = this.command_queue.getLength();
  481. if (this.pub_sub_mode === false && queue_len === 0) {
  482. this.command_queue = new Queue();
  483. this.emit("idle");
  484. }
  485. if (this.should_buffer && queue_len <= this.command_queue_low_water) {
  486. this.emit("drain");
  487. this.should_buffer = false;
  488. }
  489. if (command_obj && typeof command_obj.callback === "function") {
  490. try {
  491. command_obj.callback(err);
  492. } catch (callback_err) {
  493. // if a callback throws an exception, re-throw it on a new stack so the parser can keep going
  494. process.nextTick(function () {
  495. throw callback_err;
  496. });
  497. }
  498. } else {
  499. console.log("node_redis: no callback to send error: " + err.message);
  500. // this will probably not make it anywhere useful, but we might as well throw
  501. process.nextTick(function () {
  502. throw err;
  503. });
  504. }
  505. };
  506. // if a callback throws an exception, re-throw it on a new stack so the parser can keep going.
  507. // if a domain is active, emit the error on the domain, which will serve the same function.
  508. // put this try/catch in its own function because V8 doesn't optimize this well yet.
  509. function try_callback(callback, reply) {
  510. try {
  511. callback(null, reply);
  512. } catch (err) {
  513. if (process.domain) {
  514. var currDomain = process.domain;
  515. currDomain.emit('error', err);
  516. if (process.domain === currDomain) {
  517. currDomain.exit();
  518. }
  519. } else {
  520. process.nextTick(function () {
  521. throw err;
  522. });
  523. }
  524. }
  525. }
  526. // hgetall converts its replies to an Object. If the reply is empty, null is returned.
  527. function reply_to_object(reply) {
  528. var obj = {}, j, jl, key, val;
  529. if (reply.length === 0) {
  530. return null;
  531. }
  532. for (j = 0, jl = reply.length; j < jl; j += 2) {
  533. key = reply[j].toString('binary');
  534. val = reply[j + 1];
  535. obj[key] = val;
  536. }
  537. return obj;
  538. }
  539. function reply_to_strings(reply) {
  540. var i;
  541. if (Buffer.isBuffer(reply)) {
  542. return reply.toString();
  543. }
  544. if (Array.isArray(reply)) {
  545. for (i = 0; i < reply.length; i++) {
  546. if (reply[i] !== null && reply[i] !== undefined) {
  547. reply[i] = reply[i].toString();
  548. }
  549. }
  550. return reply;
  551. }
  552. return reply;
  553. }
  554. RedisClient.prototype.return_reply = function (reply) {
  555. var command_obj, len, type, timestamp, argindex, args, queue_len;
  556. // If the "reply" here is actually a message received asynchronously due to a
  557. // pubsub subscription, don't pop the command queue as we'll only be consuming
  558. // the head command prematurely.
  559. if (Array.isArray(reply) && reply.length > 0 && reply[0]) {
  560. type = reply[0].toString();
  561. }
  562. if (this.pub_sub_mode && (type == 'message' || type == 'pmessage')) {
  563. trace("received pubsub message");
  564. }
  565. else {
  566. command_obj = this.command_queue.shift();
  567. }
  568. queue_len = this.command_queue.getLength();
  569. if (this.pub_sub_mode === false && queue_len === 0) {
  570. this.command_queue = new Queue(); // explicitly reclaim storage from old Queue
  571. this.emit("idle");
  572. }
  573. if (this.should_buffer && queue_len <= this.command_queue_low_water) {
  574. this.emit("drain");
  575. this.should_buffer = false;
  576. }
  577. if (command_obj && !command_obj.sub_command) {
  578. if (typeof command_obj.callback === "function") {
  579. if (this.options.detect_buffers && command_obj.buffer_args === false) {
  580. // If detect_buffers option was specified, then the reply from the parser will be Buffers.
  581. // If this command did not use Buffer arguments, then convert the reply to Strings here.
  582. reply = reply_to_strings(reply);
  583. }
  584. // TODO - confusing and error-prone that hgetall is special cased in two places
  585. if (reply && 'hgetall' === command_obj.command.toLowerCase()) {
  586. reply = reply_to_object(reply);
  587. }
  588. try_callback(command_obj.callback, reply);
  589. } else if (exports.debug_mode) {
  590. console.log("no callback for reply: " + (reply && reply.toString && reply.toString()));
  591. }
  592. } else if (this.pub_sub_mode || (command_obj && command_obj.sub_command)) {
  593. if (Array.isArray(reply)) {
  594. type = reply[0].toString();
  595. if (type === "message") {
  596. this.emit("message", reply[1].toString(), reply[2]); // channel, message
  597. } else if (type === "pmessage") {
  598. this.emit("pmessage", reply[1].toString(), reply[2].toString(), reply[3]); // pattern, channel, message
  599. } else if (type === "subscribe" || type === "unsubscribe" || type === "psubscribe" || type === "punsubscribe") {
  600. if (reply[2] === 0) {
  601. this.pub_sub_mode = false;
  602. if (this.debug_mode) {
  603. console.log("All subscriptions removed, exiting pub/sub mode");
  604. }
  605. } else {
  606. this.pub_sub_mode = true;
  607. }
  608. // subscribe commands take an optional callback and also emit an event, but only the first response is included in the callback
  609. // TODO - document this or fix it so it works in a more obvious way
  610. // reply[1] can be null
  611. var reply1String = (reply[1] === null) ? null : reply[1].toString();
  612. if (command_obj && typeof command_obj.callback === "function") {
  613. try_callback(command_obj.callback, reply1String);
  614. }
  615. this.emit(type, reply1String, reply[2]); // channel, count
  616. } else {
  617. throw new Error("subscriptions are active but got unknown reply type " + type);
  618. }
  619. } else if (! this.closing) {
  620. throw new Error("subscriptions are active but got an invalid reply: " + reply);
  621. }
  622. } else if (this.monitoring) {
  623. len = reply.indexOf(" ");
  624. timestamp = reply.slice(0, len);
  625. argindex = reply.indexOf('"');
  626. args = reply.slice(argindex + 1, -1).split('" "').map(function (elem) {
  627. return elem.replace(/\\"/g, '"');
  628. });
  629. this.emit("monitor", timestamp, args);
  630. } else {
  631. throw new Error("node_redis command queue state error. If you can reproduce this, please report it.");
  632. }
  633. };
  634. // This Command constructor is ever so slightly faster than using an object literal, but more importantly, using
  635. // a named constructor helps it show up meaningfully in the V8 CPU profiler and in heap snapshots.
  636. function Command(command, args, sub_command, buffer_args, callback) {
  637. this.command = command;
  638. this.args = args;
  639. this.sub_command = sub_command;
  640. this.buffer_args = buffer_args;
  641. this.callback = callback;
  642. }
  643. RedisClient.prototype.send_command = function (command, args, callback) {
  644. var arg, command_obj, i, il, elem_count, buffer_args, stream = this.stream, command_str = "", buffered_writes = 0, last_arg_type, lcaseCommand;
  645. if (typeof command !== "string") {
  646. throw new Error("First argument to send_command must be the command name string, not " + typeof command);
  647. }
  648. if (Array.isArray(args)) {
  649. if (typeof callback === "function") {
  650. // probably the fastest way:
  651. // client.command([arg1, arg2], cb); (straight passthrough)
  652. // send_command(command, [arg1, arg2], cb);
  653. } else if (! callback) {
  654. // most people find this variable argument length form more convenient, but it uses arguments, which is slower
  655. // client.command(arg1, arg2, cb); (wraps up arguments into an array)
  656. // send_command(command, [arg1, arg2, cb]);
  657. // client.command(arg1, arg2); (callback is optional)
  658. // send_command(command, [arg1, arg2]);
  659. // client.command(arg1, arg2, undefined); (callback is undefined)
  660. // send_command(command, [arg1, arg2, undefined]);
  661. last_arg_type = typeof args[args.length - 1];
  662. if (last_arg_type === "function" || last_arg_type === "undefined") {
  663. callback = args.pop();
  664. }
  665. } else {
  666. throw new Error("send_command: last argument must be a callback or undefined");
  667. }
  668. } else {
  669. throw new Error("send_command: second argument must be an array");
  670. }
  671. if (callback && process.domain) callback = process.domain.bind(callback);
  672. // if the last argument is an array and command is sadd or srem, expand it out:
  673. // client.sadd(arg1, [arg2, arg3, arg4], cb);
  674. // converts to:
  675. // client.sadd(arg1, arg2, arg3, arg4, cb);
  676. lcaseCommand = command.toLowerCase();
  677. if ((lcaseCommand === 'sadd' || lcaseCommand === 'srem') && args.length > 0 && Array.isArray(args[args.length - 1])) {
  678. args = args.slice(0, -1).concat(args[args.length - 1]);
  679. }
  680. // if the value is undefined or null and command is set or setx, need not to send message to redis
  681. if (command === 'set' || command === 'setex') {
  682. if(args[args.length - 1] === undefined || args[args.length - 1] === null) {
  683. var err = new Error('send_command: ' + command + ' value must not be undefined or null');
  684. return callback && callback(err);
  685. }
  686. }
  687. buffer_args = false;
  688. for (i = 0, il = args.length, arg; i < il; i += 1) {
  689. if (Buffer.isBuffer(args[i])) {
  690. buffer_args = true;
  691. }
  692. }
  693. command_obj = new Command(command, args, false, buffer_args, callback);
  694. if ((!this.ready && !this.send_anyway) || !stream.writable) {
  695. if (exports.debug_mode) {
  696. if (!stream.writable) {
  697. console.log("send command: stream is not writeable.");
  698. }
  699. }
  700. if (this.enable_offline_queue) {
  701. if (exports.debug_mode) {
  702. console.log("Queueing " + command + " for next server connection.");
  703. }
  704. this.offline_queue.push(command_obj);
  705. this.should_buffer = true;
  706. } else {
  707. var not_writeable_error = new Error('send_command: stream not writeable. enable_offline_queue is false');
  708. if (command_obj.callback) {
  709. command_obj.callback(not_writeable_error);
  710. } else {
  711. throw not_writeable_error;
  712. }
  713. }
  714. return false;
  715. }
  716. if (command === "subscribe" || command === "psubscribe" || command === "unsubscribe" || command === "punsubscribe") {
  717. this.pub_sub_command(command_obj);
  718. } else if (command === "monitor") {
  719. this.monitoring = true;
  720. } else if (command === "quit") {
  721. this.closing = true;
  722. } else if (this.pub_sub_mode === true) {
  723. throw new Error("Connection in subscriber mode, only subscriber commands may be used");
  724. }
  725. this.command_queue.push(command_obj);
  726. this.commands_sent += 1;
  727. elem_count = args.length + 1;
  728. // Always use "Multi bulk commands", but if passed any Buffer args, then do multiple writes, one for each arg.
  729. // This means that using Buffers in commands is going to be slower, so use Strings if you don't already have a Buffer.
  730. command_str = "*" + elem_count + "\r\n$" + command.length + "\r\n" + command + "\r\n";
  731. if (! buffer_args) { // Build up a string and send entire command in one write
  732. for (i = 0, il = args.length, arg; i < il; i += 1) {
  733. arg = args[i];
  734. if (typeof arg !== "string") {
  735. arg = String(arg);
  736. }
  737. command_str += "$" + Buffer.byteLength(arg) + "\r\n" + arg + "\r\n";
  738. }
  739. if (exports.debug_mode) {
  740. console.log("send " + this.address + " id " + this.connection_id + ": " + command_str);
  741. }
  742. buffered_writes += !stream.write(command_str);
  743. } else {
  744. if (exports.debug_mode) {
  745. console.log("send command (" + command_str + ") has Buffer arguments");
  746. }
  747. buffered_writes += !stream.write(command_str);
  748. for (i = 0, il = args.length, arg; i < il; i += 1) {
  749. arg = args[i];
  750. if (!(Buffer.isBuffer(arg) || arg instanceof String)) {
  751. arg = String(arg);
  752. }
  753. if (Buffer.isBuffer(arg)) {
  754. if (arg.length === 0) {
  755. if (exports.debug_mode) {
  756. console.log("send_command: using empty string for 0 length buffer");
  757. }
  758. buffered_writes += !stream.write("$0\r\n\r\n");
  759. } else {
  760. buffered_writes += !stream.write("$" + arg.length + "\r\n");
  761. buffered_writes += !stream.write(arg);
  762. buffered_writes += !stream.write("\r\n");
  763. if (exports.debug_mode) {
  764. console.log("send_command: buffer send " + arg.length + " bytes");
  765. }
  766. }
  767. } else {
  768. if (exports.debug_mode) {
  769. console.log("send_command: string send " + Buffer.byteLength(arg) + " bytes: " + arg);
  770. }
  771. buffered_writes += !stream.write("$" + Buffer.byteLength(arg) + "\r\n" + arg + "\r\n");
  772. }
  773. }
  774. }
  775. if (exports.debug_mode) {
  776. console.log("send_command buffered_writes: " + buffered_writes, " should_buffer: " + this.should_buffer);
  777. }
  778. if (buffered_writes || this.command_queue.getLength() >= this.command_queue_high_water) {
  779. this.should_buffer = true;
  780. }
  781. return !this.should_buffer;
  782. };
  783. RedisClient.prototype.pub_sub_command = function (command_obj) {
  784. var i, key, command, args;
  785. if (this.pub_sub_mode === false && exports.debug_mode) {
  786. console.log("Entering pub/sub mode from " + command_obj.command);
  787. }
  788. this.pub_sub_mode = true;
  789. command_obj.sub_command = true;
  790. command = command_obj.command;
  791. args = command_obj.args;
  792. if (command === "subscribe" || command === "psubscribe") {
  793. if (command === "subscribe") {
  794. key = "sub";
  795. } else {
  796. key = "psub";
  797. }
  798. for (i = 0; i < args.length; i++) {
  799. this.subscription_set[key + " " + args[i]] = true;
  800. }
  801. } else {
  802. if (command === "unsubscribe") {
  803. key = "sub";
  804. } else {
  805. key = "psub";
  806. }
  807. for (i = 0; i < args.length; i++) {
  808. delete this.subscription_set[key + " " + args[i]];
  809. }
  810. }
  811. };
  812. RedisClient.prototype.end = function () {
  813. this.stream._events = {};
  814. //clear retry_timer
  815. if(this.retry_timer){
  816. clearTimeout(this.retry_timer);
  817. this.retry_timer=null;
  818. }
  819. this.stream.on("error", function(){});
  820. this.connected = false;
  821. this.ready = false;
  822. this.closing = true;
  823. return this.stream.destroySoon();
  824. };
  825. function Multi(client, args) {
  826. this._client = client;
  827. this.queue = [["MULTI"]];
  828. if (Array.isArray(args)) {
  829. this.queue = this.queue.concat(args);
  830. }
  831. }
  832. exports.Multi = Multi;
  833. // take 2 arrays and return the union of their elements
  834. function set_union(seta, setb) {
  835. var obj = {};
  836. seta.forEach(function (val) {
  837. obj[val] = true;
  838. });
  839. setb.forEach(function (val) {
  840. obj[val] = true;
  841. });
  842. return Object.keys(obj);
  843. }
  844. // This static list of commands is updated from time to time. ./lib/commands.js can be updated with generate_commands.js
  845. commands = set_union(["get", "set", "setnx", "setex", "append", "strlen", "del", "exists", "setbit", "getbit", "setrange", "getrange", "substr",
  846. "incr", "decr", "mget", "rpush", "lpush", "rpushx", "lpushx", "linsert", "rpop", "lpop", "brpop", "brpoplpush", "blpop", "llen", "lindex",
  847. "lset", "lrange", "ltrim", "lrem", "rpoplpush", "sadd", "srem", "smove", "sismember", "scard", "spop", "srandmember", "sinter", "sinterstore",
  848. "sunion", "sunionstore", "sdiff", "sdiffstore", "smembers", "zadd", "zincrby", "zrem", "zremrangebyscore", "zremrangebyrank", "zunionstore",
  849. "zinterstore", "zrange", "zrangebyscore", "zrevrangebyscore", "zcount", "zrevrange", "zcard", "zscore", "zrank", "zrevrank", "hset", "hsetnx",
  850. "hget", "hmset", "hmget", "hincrby", "hdel", "hlen", "hkeys", "hvals", "hgetall", "hexists", "incrby", "decrby", "getset", "mset", "msetnx",
  851. "randomkey", "select", "move", "rename", "renamenx", "expire", "expireat", "keys", "dbsize", "auth", "ping", "echo", "save", "bgsave",
  852. "bgrewriteaof", "shutdown", "lastsave", "type", "multi", "exec", "discard", "sync", "flushdb", "flushall", "sort", "info", "monitor", "ttl",
  853. "persist", "slaveof", "debug", "config", "subscribe", "unsubscribe", "psubscribe", "punsubscribe", "publish", "watch", "unwatch", "cluster",
  854. "restore", "migrate", "dump", "object", "client", "eval", "evalsha"], require("./lib/commands"));
  855. commands.forEach(function (fullCommand) {
  856. var command = fullCommand.split(' ')[0];
  857. RedisClient.prototype[command] = function (args, callback) {
  858. if (Array.isArray(args) && typeof callback === "function") {
  859. return this.send_command(command, args, callback);
  860. } else {
  861. return this.send_command(command, to_array(arguments));
  862. }
  863. };
  864. RedisClient.prototype[command.toUpperCase()] = RedisClient.prototype[command];
  865. Multi.prototype[command] = function () {
  866. this.queue.push([command].concat(to_array(arguments)));
  867. return this;
  868. };
  869. Multi.prototype[command.toUpperCase()] = Multi.prototype[command];
  870. });
  871. // store db in this.select_db to restore it on reconnect
  872. RedisClient.prototype.select = function (db, callback) {
  873. var self = this;
  874. this.send_command('select', [db], function (err, res) {
  875. if (err === null) {
  876. self.selected_db = db;
  877. }
  878. if (typeof(callback) === 'function') {
  879. callback(err, res);
  880. } else if (err) {
  881. self.emit('error', err);
  882. }
  883. });
  884. };
  885. RedisClient.prototype.SELECT = RedisClient.prototype.select;
  886. // Stash auth for connect and reconnect. Send immediately if already connected.
  887. RedisClient.prototype.auth = function () {
  888. var args = to_array(arguments);
  889. this.auth_pass = args[0];
  890. this.auth_callback = args[1];
  891. if (exports.debug_mode) {
  892. console.log("Saving auth as " + this.auth_pass);
  893. }
  894. if (this.connected) {
  895. this.send_command("auth", args);
  896. }
  897. };
  898. RedisClient.prototype.AUTH = RedisClient.prototype.auth;
  899. RedisClient.prototype.hmget = function (arg1, arg2, arg3) {
  900. if (Array.isArray(arg2) && typeof arg3 === "function") {
  901. return this.send_command("hmget", [arg1].concat(arg2), arg3);
  902. } else if (Array.isArray(arg1) && typeof arg2 === "function") {
  903. return this.send_command("hmget", arg1, arg2);
  904. } else {
  905. return this.send_command("hmget", to_array(arguments));
  906. }
  907. };
  908. RedisClient.prototype.HMGET = RedisClient.prototype.hmget;
  909. RedisClient.prototype.hmset = function (args, callback) {
  910. var tmp_args, tmp_keys, i, il, key;
  911. if (Array.isArray(args) && typeof callback === "function") {
  912. return this.send_command("hmset", args, callback);
  913. }
  914. args = to_array(arguments);
  915. if (typeof args[args.length - 1] === "function") {
  916. callback = args[args.length - 1];
  917. args.length -= 1;
  918. } else {
  919. callback = null;
  920. }
  921. if (args.length === 2 && (typeof args[0] === "string" || typeof args[0] === "number") && typeof args[1] === "object") {
  922. // User does: client.hmset(key, {key1: val1, key2: val2})
  923. // assuming key is a string, i.e. email address
  924. // if key is a number, i.e. timestamp, convert to string
  925. if (typeof args[0] === "number") {
  926. args[0] = args[0].toString();
  927. }
  928. tmp_args = [ args[0] ];
  929. tmp_keys = Object.keys(args[1]);
  930. for (i = 0, il = tmp_keys.length; i < il ; i++) {
  931. key = tmp_keys[i];
  932. tmp_args.push(key);
  933. tmp_args.push(args[1][key]);
  934. }
  935. args = tmp_args;
  936. }
  937. return this.send_command("hmset", args, callback);
  938. };
  939. RedisClient.prototype.HMSET = RedisClient.prototype.hmset;
  940. Multi.prototype.hmset = function () {
  941. var args = to_array(arguments), tmp_args;
  942. if (args.length >= 2 && typeof args[0] === "string" && typeof args[1] === "object") {
  943. tmp_args = [ "hmset", args[0] ];
  944. Object.keys(args[1]).map(function (key) {
  945. tmp_args.push(key);
  946. tmp_args.push(args[1][key]);
  947. });
  948. if (args[2]) {
  949. tmp_args.push(args[2]);
  950. }
  951. args = tmp_args;
  952. } else {
  953. args.unshift("hmset");
  954. }
  955. this.queue.push(args);
  956. return this;
  957. };
  958. Multi.prototype.HMSET = Multi.prototype.hmset;
  959. Multi.prototype.exec = function (callback) {
  960. var self = this;
  961. var errors = [];
  962. // drain queue, callback will catch "QUEUED" or error
  963. // TODO - get rid of all of these anonymous functions which are elegant but slow
  964. this.queue.forEach(function (args, index) {
  965. var command = args[0], obj;
  966. if (typeof args[args.length - 1] === "function") {
  967. args = args.slice(1, -1);
  968. } else {
  969. args = args.slice(1);
  970. }
  971. if (args.length === 1 && Array.isArray(args[0])) {
  972. args = args[0];
  973. }
  974. if (command.toLowerCase() === 'hmset' && typeof args[1] === 'object') {
  975. obj = args.pop();
  976. Object.keys(obj).forEach(function (key) {
  977. args.push(key);
  978. args.push(obj[key]);
  979. });
  980. }
  981. this._client.send_command(command, args, function (err, reply) {
  982. if (err) {
  983. var cur = self.queue[index];
  984. if (typeof cur[cur.length - 1] === "function") {
  985. cur[cur.length - 1](err);
  986. } else {
  987. errors.push(new Error(err));
  988. }
  989. }
  990. });
  991. }, this);
  992. // TODO - make this callback part of Multi.prototype instead of creating it each time
  993. return this._client.send_command("EXEC", [], function (err, replies) {
  994. if (err) {
  995. if (callback) {
  996. errors.push(new Error(err));
  997. callback(errors);
  998. return;
  999. } else {
  1000. throw new Error(err);
  1001. }
  1002. }
  1003. var i, il, reply, args;
  1004. if (replies) {
  1005. for (i = 1, il = self.queue.length; i < il; i += 1) {
  1006. reply = replies[i - 1];
  1007. args = self.queue[i];
  1008. // TODO - confusing and error-prone that hgetall is special cased in two places
  1009. if (reply && args[0].toLowerCase() === "hgetall") {
  1010. replies[i - 1] = reply = reply_to_object(reply);
  1011. }
  1012. if (typeof args[args.length - 1] === "function") {
  1013. args[args.length - 1](null, reply);
  1014. }
  1015. }
  1016. }
  1017. if (callback) {
  1018. callback(null, replies);
  1019. }
  1020. });
  1021. };
  1022. Multi.prototype.EXEC = Multi.prototype.exec;
  1023. RedisClient.prototype.multi = function (args) {
  1024. return new Multi(this, args);
  1025. };
  1026. RedisClient.prototype.MULTI = function (args) {
  1027. return new Multi(this, args);
  1028. };
  1029. // stash original eval method
  1030. var eval_orig = RedisClient.prototype.eval;
  1031. // hook eval with an attempt to evalsha for cached scripts
  1032. RedisClient.prototype.eval = RedisClient.prototype.EVAL = function () {
  1033. var self = this,
  1034. args = to_array(arguments),
  1035. callback;
  1036. if (typeof args[args.length - 1] === "function") {
  1037. callback = args.pop();
  1038. }
  1039. if (Array.isArray(args[0])) {
  1040. args = args[0];
  1041. }
  1042. // replace script source with sha value
  1043. var source = args[0];
  1044. args[0] = crypto.createHash("sha1").update(source).digest("hex");
  1045. self.evalsha(args, function (err, reply) {
  1046. if (err && /NOSCRIPT/.test(err.message)) {
  1047. args[0] = source;
  1048. eval_orig.call(self, args, callback);
  1049. } else if (callback) {
  1050. callback(err, reply);
  1051. }
  1052. });
  1053. };
  1054. exports.createClient = function(arg0, arg1, arg2){
  1055. if( arguments.length === 0 ){
  1056. // createClient()
  1057. return createClient_tcp(default_port, default_host, {});
  1058. } else if( typeof arg0 === 'number' ||
  1059. typeof arg0 === 'string' && arg0.match(/^\d+$/) ){
  1060. // createClient( 3000, host, options)
  1061. // createClient('3000', host, options)
  1062. return createClient_tcp(arg0, arg1, arg2);
  1063. } else if( typeof arg0 === 'string' ){
  1064. // createClient( '/tmp/redis.sock', options)
  1065. return createClient_unix(arg0,arg1);
  1066. } else if( arg0 !== null && typeof arg0 === 'object' ){
  1067. // createClient(options)
  1068. return createClient_tcp(default_port, default_host, arg0 );
  1069. } else if( arg0 === null && arg1 === null ){
  1070. // for backward compatibility
  1071. // createClient(null,null,options)
  1072. return createClient_tcp(default_port, default_host, arg2);
  1073. } else {
  1074. throw new Error('unknown type of connection in createClient()');
  1075. }
  1076. }
  1077. var createClient_unix = function(path, options){
  1078. var cnxOptions = {
  1079. path: path
  1080. };
  1081. var net_client = net.createConnection(cnxOptions);
  1082. var redis_client = new RedisClient(net_client, options || {});
  1083. redis_client.connectionOption = cnxOptions;
  1084. redis_client.address = path;
  1085. return redis_client;
  1086. }
  1087. var createClient_tcp = function (port_arg, host_arg, options) {
  1088. var cnxOptions = {
  1089. 'port' : port_arg || default_port,
  1090. 'host' : host_arg || default_host,
  1091. 'family' : (options && options.family === 'IPv6') ? 6 : 4
  1092. };
  1093. var net_client = net.createConnection(cnxOptions);
  1094. var redis_client = new RedisClient(net_client, options || {});
  1095. redis_client.connectionOption = cnxOptions;
  1096. redis_client.address = cnxOptions.host + ':' + cnxOptions.port;
  1097. return redis_client;
  1098. };
  1099. exports.print = function (err, reply) {
  1100. if (err) {
  1101. console.log("Error: " + err);
  1102. } else {
  1103. console.log("Reply: " + reply);
  1104. }
  1105. };