From fdaca8b95e6932d4c29677304df446b34bf7f6c4 Mon Sep 17 00:00:00 2001 From: roam Date: Sun, 17 Feb 2013 14:47:11 +0800 Subject: [PATCH 1/8] Make Transport close safe to call multiple times This is to make it safe to call close() as many times as you wish. Right now it might be guaranteed in Socket, I think it's better to have it check here to avoid subtle problems in future. --- lib/transport.js | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/lib/transport.js b/lib/transport.js index 54e47778a..9fa7107b6 100644 --- a/lib/transport.js +++ b/lib/transport.js @@ -61,6 +61,11 @@ Transport.prototype.onRequest = function (req) { */ Transport.prototype.close = function (fn) { + if ('closed' === this.readyState) { + 'function' == typeof fn && fn(); + return; + } + this.readyState = 'closing'; this.doClose(fn || noop); }; From de209976cde952f2f6808e75d84a846c265809e4 Mon Sep 17 00:00:00 2001 From: roam Date: Sun, 17 Feb 2013 15:31:46 +0800 Subject: [PATCH 2/8] Add a new Transport readyState 'open' which is used in maybeUpgrade() in socket.js Also check readyState against 'closed' to avoid wrong state transition. --- lib/transports/polling.js | 18 ++++++++++++++++++ lib/transports/websocket.js | 3 +++ 2 files changed, 21 insertions(+) diff --git a/lib/transports/polling.js b/lib/transports/polling.js index 4d5a56127..6d7546513 100644 --- a/lib/transports/polling.js +++ b/lib/transports/polling.js @@ -49,6 +49,18 @@ Polling.prototype.name = 'polling'; Polling.prototype.onRequest = function (req) { var res = req.res; + if ('closed' === this.readyState) { + /* this should be handled in the upper tier */ + debug('incoming request after transport closed'); + res.writeHead(500); + res.end(); + return; + } + + if ('opening' === this.readyState) { + this.readyState = 'open'; + } + if ('GET' == req.method) { this.onPollRequest(req, res); } else if ('POST' == req.method) { @@ -189,6 +201,12 @@ Polling.prototype.onData = function (data) { */ Polling.prototype.send = function (packets) { + if (!this.writable || 'closed' === this.readyState) { + /* it's more like a bug to reach here */ + debug('send while closed or not writable'); + return; + } + if (this.shouldClose) { debug('appending close packet to payload'); packets.push({ type: 'close' }); diff --git a/lib/transports/websocket.js b/lib/transports/websocket.js index ccafa74f4..b52f6d1ac 100644 --- a/lib/transports/websocket.js +++ b/lib/transports/websocket.js @@ -72,6 +72,9 @@ WebSocket.prototype.supportsFraming = true; WebSocket.prototype.onData = function (data) { debug('received "%s"', data); + if ('opening' === this.readyState) { + this.readyState = 'open'; + } Transport.prototype.onData.call(this, data); }; From 512b64d06158000745115d88a5fb648008e62729 Mon Sep 17 00:00:00 2001 From: roam Date: Sun, 17 Feb 2013 15:46:40 +0800 Subject: [PATCH 3/8] End request when detecting overlap polling/data request --- lib/transports/polling.js | 2 ++ 1 file changed, 2 insertions(+) diff --git a/lib/transports/polling.js b/lib/transports/polling.js index 6d7546513..6f95751fd 100644 --- a/lib/transports/polling.js +++ b/lib/transports/polling.js @@ -83,6 +83,7 @@ Polling.prototype.onPollRequest = function (req, res) { // assert: this.res, '.req and .res should be (un)set together' this.onError('overlap from client'); res.writeHead(500); + res.end(); } else { debug('setting request'); if (undefined === this.req) { @@ -129,6 +130,7 @@ Polling.prototype.onDataRequest = function (req, res) { // assert: this.dataRes, '.dataReq and .dataRes should be (un)set together' this.onError('data request overlap from client'); res.writeHead(500); + res.end(); } else { this.dataReq = req; this.dataRes = res; From e7834fa69f2dd69f3e142e1fed8ec941e03f5045 Mon Sep 17 00:00:00 2001 From: roam Date: Sun, 17 Feb 2013 15:58:05 +0800 Subject: [PATCH 4/8] Make transport not writable in case of errors, between polling cycle and when closing. --- lib/transports/polling.js | 2 +- lib/transports/websocket.js | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/lib/transports/polling.js b/lib/transports/polling.js index 6f95751fd..08d28669f 100644 --- a/lib/transports/polling.js +++ b/lib/transports/polling.js @@ -103,6 +103,7 @@ Polling.prototype.onPollRequest = function (req, res) { function cleanup () { req.removeListener('close', onClose); self.req = self.res = null; + self.writable = false; } req.cleanup = cleanup; @@ -230,7 +231,6 @@ Polling.prototype.write = function (data) { debug('writing "%s"', data); this.doWrite(data); this.req.cleanup(); - this.writable = false; }; /** diff --git a/lib/transports/websocket.js b/lib/transports/websocket.js index b52f6d1ac..6862ff0da 100644 --- a/lib/transports/websocket.js +++ b/lib/transports/websocket.js @@ -120,5 +120,6 @@ WebSocket.prototype.send = function (data){ WebSocket.prototype.doClose = function (fn) { debug('closing'); this.socket.close(); + this.writable = false; fn && fn(); }; From c172abb25a56571df8bc53fd5038360422c2a9ff Mon Sep 17 00:00:00 2001 From: roam Date: Sun, 17 Feb 2013 17:03:31 +0800 Subject: [PATCH 5/8] Reliable cleanup & close & close callback --- lib/transports/polling.js | 22 ++++++++++++++++++---- 1 file changed, 18 insertions(+), 4 deletions(-) diff --git a/lib/transports/polling.js b/lib/transports/polling.js index 08d28669f..d859a34c8 100644 --- a/lib/transports/polling.js +++ b/lib/transports/polling.js @@ -97,6 +97,7 @@ Polling.prototype.onPollRequest = function (req, res) { var self = this; function onClose () { + cleanup(); self.onError('poll connection closed prematurely'); } @@ -189,6 +190,9 @@ Polling.prototype.onData = function (data) { for (var i = 0, l = packets.length; i < l; i++) { if ('close' == packets[i].type) { debug('got xhr close packet'); + this.req && this.req.cleanup(); + this.res && this.res.end(); + this.request = null; return this.onClose(); } @@ -213,11 +217,18 @@ Polling.prototype.send = function (packets) { if (this.shouldClose) { debug('appending close packet to payload'); packets.push({ type: 'close' }); - this.shouldClose(); - this.shouldClose = null; } this.write(parser.encodePayload(packets)); + + if (this.shouldClose) { + for (var i in this.shouldClose) { + 'function' == typeof this.shouldClose[i] && this.shouldClose[i](); + } + this.shouldClose = null; + this.request = null; + this.onClose(); + } }; /** @@ -248,12 +259,15 @@ Polling.prototype.doClose = function (fn) { this.dataReq.abort(); } + if (!this.shouldClose) { + this.shouldClose = []; + } + this.shouldClose.push(fn); + if (this.writable) { debug('transport writable - closing right away'); this.send([{ type: 'close' }]); - fn(); } else { debug('transport not writable - buffering orderly close'); - this.shouldClose = fn; } }; From aa0209f03238b0422b73d51c7f59b4f7490edce9 Mon Sep 17 00:00:00 2001 From: roam Date: Mon, 18 Feb 2013 17:43:49 +0800 Subject: [PATCH 6/8] Added test cases to verify transport in a consistent state --- test/server.js | 205 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 205 insertions(+) diff --git a/test/server.js b/test/server.js index 739ecf460..133bf2356 100644 --- a/test/server.js +++ b/test/server.js @@ -580,6 +580,211 @@ describe('server', function () { }); }); }); + + it('should leave transport in a consistent state after ' + + '`forced close` (polling)', function (done) { + var engine = listen({ allowUpgrades: false }, function (port) { + var socket = new eioc.Socket('ws://localhost:%d'.s(port), { transports: ['polling'] }); + + engine.on('connection', function (conn) { + conn.on('close', function () { + setTimeout(function () { + expect(conn.transport.readyState).to.be('closed'); + expect(conn.transport.writable).to.be(false); + expect(conn.request).to.be(null); + conn.transport.close(function () { + expect(conn.transport.readyState).to.be('closed'); + expect(conn.transport.writable).to.be(false); + done(); + }); + }, 10); + }); + setTimeout(function () { + conn.close(); + }, 10); + }); + }); + }); + + it('should leave transport in a consistent state after ' + + 'forced close` (websocket)', function (done) { + var engine = listen({ allowUpgrades: false }, function (port) { + var socket = new eioc.Socket('ws://localhost:%d'.s(port), { transports: ['websocket'] }); + + engine.on('connection', function (conn) { + conn.on('close', function () { + setTimeout(function () { + expect(conn.transport.readyState).to.be('closed'); + expect(conn.transport.writable).to.be(false); + conn.transport.close(function () { + expect(conn.transport.readyState).to.be('closed'); + expect(conn.transport.writable).to.be(false); + done(); + }); + }, 10); + }); + setTimeout(function () { + conn.close(); + }, 10); + }); + }); + }); + + it('should leave transport in a consistent state after ' + + '`transport close` (polling)', function (done) { + var engine = listen({ allowUpgrades: false }, function (port) { + var socket = new eioc.Socket('ws://localhost:%d'.s(port), { transports: ['polling'] }); + + engine.on('connection', function (conn) { + conn.on('close', function () { + setTimeout(function () { + expect(conn.transport.readyState).to.be('closed'); + expect(conn.transport.writable).to.be(false); + expect(conn.request).to.be(null); + conn.transport.close(function () { + expect(conn.transport.readyState).to.be('closed'); + expect(conn.transport.writable).to.be(false); + done(); + }); + }, 10); + }); + + socket.transport.on('pollComplete', function() { + socket.close(); + }); + }); + }); + }); + + it('should leave transport in a consistent state after ' + + '`transport close` (websocket)', function (done) { + var engine = listen({ allowUpgrades: false }, function (port) { + var socket = new eioc.Socket('ws://localhost:%d'.s(port), { transports: ['websocket'] }); + + engine.on('connection', function (conn) { + conn.on('close', function () { + setTimeout(function () { + expect(conn.transport.readyState).to.be('closed'); + expect(conn.transport.writable).to.be(false); + conn.transport.close(function () { + expect(conn.transport.readyState).to.be('closed'); + expect(conn.transport.writable).to.be(false); + done(); + }); + }, 10); + }); + }); + + socket.on('open', function () { + setTimeout(function () { + socket.close(); + }, 10); + }); + }); + }); + + it('should leave transport in a consistent state after ' + + '`transport error` (polling)', function (done) { + var engine = listen({ allowUpgrades: false }, function (port) { + // hack to access the sockets created by node-xmlhttprequest + // see: https://github.com/driverdan/node-XMLHttpRequest/issues/44 + var request = require('http').request; + var sockets = []; + http.request = function(opts) { + var req = request.apply(null, arguments); + req.on('socket', function(socket) { + sockets.push(socket); + }); + return req; + }; + + var socket = new eioc.Socket('ws://localhost:%d'.s(port), { transports: ['polling'] }); + + engine.on('connection', function(conn){ + conn.on('close', function () { + setTimeout(function () { + http.request = request; + expect(conn.transport.readyState).to.be('closed'); + expect(conn.transport.writable).to.be(false); + expect(conn.request).to.be(null); + conn.transport.close(function () { + expect(conn.transport.readyState).to.be('closed'); + expect(conn.transport.writable).to.be(false); + done(); + }); + }, 10); + }); + }); + + socket.transport.on('poll', function() { + // we set a timer to wait for the request to actually reach + setTimeout(function(){ + // kill the underlying connection + sockets[1].end(); + }, 50); + }); + }); + }); + + it('should leave transport in a consistent state after ' + + '`transport error` (websocket)', function (done) { + var engine = listen({ allowUpgrades: false }, function (port) { + // hack to access the sockets created by node-xmlhttprequest + // see: https://github.com/driverdan/node-XMLHttpRequest/issues/44 + var request = require('http').request; + var sockets = []; + http.request = function(opts) { + var req = request.apply(null, arguments); + req.on('socket', function(socket) { + sockets.push(socket); + }); + return req; + }; + + var socket = new eioc.Socket('ws://localhost:%d'.s(port), { transports: ['websocket'] }); + + engine.on('connection', function(conn){ + conn.on('close', function () { + setTimeout(function () { + http.request = request; + expect(conn.transport.readyState).to.be('closed'); + expect(conn.transport.writable).to.be(false); + conn.transport.close(function () { + expect(conn.transport.readyState).to.be('closed'); + expect(conn.transport.writable).to.be(false); + done(); + }); + }, 10); + }); + }); + + socket.on('open', function () { + setTimeout(function () { + // kill the underlying connection + sockets[0].end(); + }, 50); + }); + }); + }); + + it('should trigger transport close callback for `forced close`, ' + + 'even between polling cycle', function (done) { + var engine = listen({ allowUpgrades: false }, function (port) { + var socket = new eioc.Socket('ws://localhost:%d'.s(port), { transports: ['polling'] }); + + engine.on('connection', function (conn) { + socket.transport.on('pollComplete', function() { + conn.close(); + }); + conn.on('close', function (reason) { + expect(reason).to.be('forced close'); + conn.transport.close(function() { + done(); + }); + }); + }); + }); + }); }); describe('messages', function () { From 79704b86a62030460e1ef828f00638726ef44fe0 Mon Sep 17 00:00:00 2001 From: roam Date: Mon, 18 Feb 2013 17:45:10 +0800 Subject: [PATCH 7/8] Fixed issues found in the test failures --- lib/transports/polling.js | 5 +++-- lib/transports/websocket.js | 6 +++--- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/lib/transports/polling.js b/lib/transports/polling.js index d859a34c8..4bf267c32 100644 --- a/lib/transports/polling.js +++ b/lib/transports/polling.js @@ -222,8 +222,9 @@ Polling.prototype.send = function (packets) { this.write(parser.encodePayload(packets)); if (this.shouldClose) { - for (var i in this.shouldClose) { - 'function' == typeof this.shouldClose[i] && this.shouldClose[i](); + while (this.shouldClose.length > 0) { + var fn = this.shouldClose.shift(); + 'function' == typeof fn && fn(); } this.shouldClose = null; this.request = null; diff --git a/lib/transports/websocket.js b/lib/transports/websocket.js index 6862ff0da..3b9c202e2 100644 --- a/lib/transports/websocket.js +++ b/lib/transports/websocket.js @@ -25,8 +25,8 @@ function WebSocket (req) { var self = this; this.socket = req.websocket; this.socket.on('message', this.onData.bind(this)); - this.socket.once('close', this.onClose.bind(this)); - this.socket.on('error', this.onError.bind(this)); + this.socket.once('close', function() { self.writable = false; self.onClose.apply(self, arguments); }); + this.socket.on('error', function() { self.writable = false; self.onError.apply(self, arguments); }); this.socket.on('headers', function (headers) { self.emit('headers', headers); }); @@ -119,7 +119,7 @@ WebSocket.prototype.send = function (data){ WebSocket.prototype.doClose = function (fn) { debug('closing'); - this.socket.close(); this.writable = false; fn && fn(); + this.socket.close(); }; From 7aa0ced95c8abc005b946890852cca39c8ece230 Mon Sep 17 00:00:00 2001 From: roam Date: Wed, 20 Feb 2013 14:48:29 +0800 Subject: [PATCH 8/8] Make sure transport closed in clearTransport() --- lib/socket.js | 14 ++++++++++---- lib/transports/polling.js | 28 +++++++++++++++++++++------- lib/transports/websocket.js | 2 +- 3 files changed, 32 insertions(+), 12 deletions(-) diff --git a/lib/socket.js b/lib/socket.js index 316eabbd1..87b0f3e2c 100644 --- a/lib/socket.js +++ b/lib/socket.js @@ -210,26 +210,32 @@ Socket.prototype.maybeUpgrade = function (transport) { */ Socket.prototype.clearTransport = function () { + // Don't receive any events from this transport any longer + // to avoid wrong 'close' event for upgrade + this.transport.removeAllListeners(); + // For polling, transport's error event doesn't necessarily imply a close event + // It's now safe to close() multiple times, and only first time onClose() counts + this.transport.close(true); // silence further transport errors and prevent uncaught exceptions this.transport.on('error', function(){ debug('error triggered by discarded transport'); }); - clearTimeout(this.pingIntervalTimer); clearTimeout(this.pingTimeoutTimer); }; /** * Called upon transport considered closed. - * Possible reasons: `ping timeout`, `client error`, `parse error`, - * `transport error`, `server close`, `transport close` + * Possible reasons: `ping timeout`, `parse error`, + * `transport error`, `transport close`, `forced close` */ Socket.prototype.onClose = function (reason, description) { if ('closed' != this.readyState) { + // keep out others as clearTransport() may cause re-entrance + this.readyState = 'closed'; this.packetsFn = []; this.sentCallbackFn = []; this.clearTransport(); - this.readyState = 'closed'; this.emit('close', reason, description); } }; diff --git a/lib/transports/polling.js b/lib/transports/polling.js index 4bf267c32..4cac30eb1 100644 --- a/lib/transports/polling.js +++ b/lib/transports/polling.js @@ -222,13 +222,7 @@ Polling.prototype.send = function (packets) { this.write(parser.encodePayload(packets)); if (this.shouldClose) { - while (this.shouldClose.length > 0) { - var fn = this.shouldClose.shift(); - 'function' == typeof fn && fn(); - } - this.shouldClose = null; - this.request = null; - this.onClose(); + this.onForcedClose(); } }; @@ -268,7 +262,27 @@ Polling.prototype.doClose = function (fn) { if (this.writable) { debug('transport writable - closing right away'); this.send([{ type: 'close' }]); + } else if (fn === true) { + debug('transport not writable - forcibly close'); + this.onForcedClose(); } else { debug('transport not writable - buffering orderly close'); } }; + +/** + * Called on a forced close. + * + * @api private + */ + +Polling.prototype.onForcedClose = function () { + while (Array.isArray(this.shouldClose) + && this.shouldClose.length > 0) { + var fn = this.shouldClose.shift(); + 'function' == typeof fn && fn(); + } + this.shouldClose = null; + this.request = null; + this.onClose(); +}; diff --git a/lib/transports/websocket.js b/lib/transports/websocket.js index 3b9c202e2..707f1577e 100644 --- a/lib/transports/websocket.js +++ b/lib/transports/websocket.js @@ -120,6 +120,6 @@ WebSocket.prototype.send = function (data){ WebSocket.prototype.doClose = function (fn) { debug('closing'); this.writable = false; - fn && fn(); + 'function' == typeof fn && fn(); this.socket.close(); };