diff --git a/lib/socket.js b/lib/socket.js index 316eabbd1..87b0f3e2c 100644 --- a/lib/socket.js +++ b/lib/socket.js @@ -210,26 +210,32 @@ Socket.prototype.maybeUpgrade = function (transport) { */ Socket.prototype.clearTransport = function () { + // Don't receive any events from this transport any longer + // to avoid wrong 'close' event for upgrade + this.transport.removeAllListeners(); + // For polling, transport's error event doesn't necessarily imply a close event + // It's now safe to close() multiple times, and only first time onClose() counts + this.transport.close(true); // silence further transport errors and prevent uncaught exceptions this.transport.on('error', function(){ debug('error triggered by discarded transport'); }); - clearTimeout(this.pingIntervalTimer); clearTimeout(this.pingTimeoutTimer); }; /** * Called upon transport considered closed. - * Possible reasons: `ping timeout`, `client error`, `parse error`, - * `transport error`, `server close`, `transport close` + * Possible reasons: `ping timeout`, `parse error`, + * `transport error`, `transport close`, `forced close` */ Socket.prototype.onClose = function (reason, description) { if ('closed' != this.readyState) { + // keep out others as clearTransport() may cause re-entrance + this.readyState = 'closed'; this.packetsFn = []; this.sentCallbackFn = []; this.clearTransport(); - this.readyState = 'closed'; this.emit('close', reason, description); } }; diff --git a/lib/transport.js b/lib/transport.js index 54e47778a..9fa7107b6 100644 --- a/lib/transport.js +++ b/lib/transport.js @@ -61,6 +61,11 @@ Transport.prototype.onRequest = function (req) { */ Transport.prototype.close = function (fn) { + if ('closed' === this.readyState) { + 'function' == typeof fn && fn(); + return; + } + this.readyState = 'closing'; this.doClose(fn || noop); }; diff --git a/lib/transports/polling.js b/lib/transports/polling.js index 4d5a56127..4cac30eb1 100644 --- a/lib/transports/polling.js +++ b/lib/transports/polling.js @@ -49,6 +49,18 @@ Polling.prototype.name = 'polling'; Polling.prototype.onRequest = function (req) { var res = req.res; + if ('closed' === this.readyState) { + /* this should be handled in the upper tier */ + debug('incoming request after transport closed'); + res.writeHead(500); + res.end(); + return; + } + + if ('opening' === this.readyState) { + this.readyState = 'open'; + } + if ('GET' == req.method) { this.onPollRequest(req, res); } else if ('POST' == req.method) { @@ -71,6 +83,7 @@ Polling.prototype.onPollRequest = function (req, res) { // assert: this.res, '.req and .res should be (un)set together' this.onError('overlap from client'); res.writeHead(500); + res.end(); } else { debug('setting request'); if (undefined === this.req) { @@ -84,12 +97,14 @@ Polling.prototype.onPollRequest = function (req, res) { var self = this; function onClose () { + cleanup(); self.onError('poll connection closed prematurely'); } function cleanup () { req.removeListener('close', onClose); self.req = self.res = null; + self.writable = false; } req.cleanup = cleanup; @@ -117,6 +132,7 @@ Polling.prototype.onDataRequest = function (req, res) { // assert: this.dataRes, '.dataReq and .dataRes should be (un)set together' this.onError('data request overlap from client'); res.writeHead(500); + res.end(); } else { this.dataReq = req; this.dataRes = res; @@ -174,6 +190,9 @@ Polling.prototype.onData = function (data) { for (var i = 0, l = packets.length; i < l; i++) { if ('close' == packets[i].type) { debug('got xhr close packet'); + this.req && this.req.cleanup(); + this.res && this.res.end(); + this.request = null; return this.onClose(); } @@ -189,14 +208,22 @@ Polling.prototype.onData = function (data) { */ Polling.prototype.send = function (packets) { + if (!this.writable || 'closed' === this.readyState) { + /* it's more like a bug to reach here */ + debug('send while closed or not writable'); + return; + } + if (this.shouldClose) { debug('appending close packet to payload'); packets.push({ type: 'close' }); - this.shouldClose(); - this.shouldClose = null; } this.write(parser.encodePayload(packets)); + + if (this.shouldClose) { + this.onForcedClose(); + } }; /** @@ -210,7 +237,6 @@ Polling.prototype.write = function (data) { debug('writing "%s"', data); this.doWrite(data); this.req.cleanup(); - this.writable = false; }; /** @@ -228,12 +254,35 @@ Polling.prototype.doClose = function (fn) { this.dataReq.abort(); } + if (!this.shouldClose) { + this.shouldClose = []; + } + this.shouldClose.push(fn); + if (this.writable) { debug('transport writable - closing right away'); this.send([{ type: 'close' }]); - fn(); + } else if (fn === true) { + debug('transport not writable - forcibly close'); + this.onForcedClose(); } else { debug('transport not writable - buffering orderly close'); - this.shouldClose = fn; } }; + +/** + * Called on a forced close. + * + * @api private + */ + +Polling.prototype.onForcedClose = function () { + while (Array.isArray(this.shouldClose) + && this.shouldClose.length > 0) { + var fn = this.shouldClose.shift(); + 'function' == typeof fn && fn(); + } + this.shouldClose = null; + this.request = null; + this.onClose(); +}; diff --git a/lib/transports/websocket.js b/lib/transports/websocket.js index ccafa74f4..707f1577e 100644 --- a/lib/transports/websocket.js +++ b/lib/transports/websocket.js @@ -25,8 +25,8 @@ function WebSocket (req) { var self = this; this.socket = req.websocket; this.socket.on('message', this.onData.bind(this)); - this.socket.once('close', this.onClose.bind(this)); - this.socket.on('error', this.onError.bind(this)); + this.socket.once('close', function() { self.writable = false; self.onClose.apply(self, arguments); }); + this.socket.on('error', function() { self.writable = false; self.onError.apply(self, arguments); }); this.socket.on('headers', function (headers) { self.emit('headers', headers); }); @@ -72,6 +72,9 @@ WebSocket.prototype.supportsFraming = true; WebSocket.prototype.onData = function (data) { debug('received "%s"', data); + if ('opening' === this.readyState) { + this.readyState = 'open'; + } Transport.prototype.onData.call(this, data); }; @@ -116,6 +119,7 @@ WebSocket.prototype.send = function (data){ WebSocket.prototype.doClose = function (fn) { debug('closing'); + this.writable = false; + 'function' == typeof fn && fn(); this.socket.close(); - fn && fn(); }; diff --git a/test/server.js b/test/server.js index 739ecf460..133bf2356 100644 --- a/test/server.js +++ b/test/server.js @@ -580,6 +580,211 @@ describe('server', function () { }); }); }); + + it('should leave transport in a consistent state after ' + + '`forced close` (polling)', function (done) { + var engine = listen({ allowUpgrades: false }, function (port) { + var socket = new eioc.Socket('ws://localhost:%d'.s(port), { transports: ['polling'] }); + + engine.on('connection', function (conn) { + conn.on('close', function () { + setTimeout(function () { + expect(conn.transport.readyState).to.be('closed'); + expect(conn.transport.writable).to.be(false); + expect(conn.request).to.be(null); + conn.transport.close(function () { + expect(conn.transport.readyState).to.be('closed'); + expect(conn.transport.writable).to.be(false); + done(); + }); + }, 10); + }); + setTimeout(function () { + conn.close(); + }, 10); + }); + }); + }); + + it('should leave transport in a consistent state after ' + + 'forced close` (websocket)', function (done) { + var engine = listen({ allowUpgrades: false }, function (port) { + var socket = new eioc.Socket('ws://localhost:%d'.s(port), { transports: ['websocket'] }); + + engine.on('connection', function (conn) { + conn.on('close', function () { + setTimeout(function () { + expect(conn.transport.readyState).to.be('closed'); + expect(conn.transport.writable).to.be(false); + conn.transport.close(function () { + expect(conn.transport.readyState).to.be('closed'); + expect(conn.transport.writable).to.be(false); + done(); + }); + }, 10); + }); + setTimeout(function () { + conn.close(); + }, 10); + }); + }); + }); + + it('should leave transport in a consistent state after ' + + '`transport close` (polling)', function (done) { + var engine = listen({ allowUpgrades: false }, function (port) { + var socket = new eioc.Socket('ws://localhost:%d'.s(port), { transports: ['polling'] }); + + engine.on('connection', function (conn) { + conn.on('close', function () { + setTimeout(function () { + expect(conn.transport.readyState).to.be('closed'); + expect(conn.transport.writable).to.be(false); + expect(conn.request).to.be(null); + conn.transport.close(function () { + expect(conn.transport.readyState).to.be('closed'); + expect(conn.transport.writable).to.be(false); + done(); + }); + }, 10); + }); + + socket.transport.on('pollComplete', function() { + socket.close(); + }); + }); + }); + }); + + it('should leave transport in a consistent state after ' + + '`transport close` (websocket)', function (done) { + var engine = listen({ allowUpgrades: false }, function (port) { + var socket = new eioc.Socket('ws://localhost:%d'.s(port), { transports: ['websocket'] }); + + engine.on('connection', function (conn) { + conn.on('close', function () { + setTimeout(function () { + expect(conn.transport.readyState).to.be('closed'); + expect(conn.transport.writable).to.be(false); + conn.transport.close(function () { + expect(conn.transport.readyState).to.be('closed'); + expect(conn.transport.writable).to.be(false); + done(); + }); + }, 10); + }); + }); + + socket.on('open', function () { + setTimeout(function () { + socket.close(); + }, 10); + }); + }); + }); + + it('should leave transport in a consistent state after ' + + '`transport error` (polling)', function (done) { + var engine = listen({ allowUpgrades: false }, function (port) { + // hack to access the sockets created by node-xmlhttprequest + // see: https://github.com/driverdan/node-XMLHttpRequest/issues/44 + var request = require('http').request; + var sockets = []; + http.request = function(opts) { + var req = request.apply(null, arguments); + req.on('socket', function(socket) { + sockets.push(socket); + }); + return req; + }; + + var socket = new eioc.Socket('ws://localhost:%d'.s(port), { transports: ['polling'] }); + + engine.on('connection', function(conn){ + conn.on('close', function () { + setTimeout(function () { + http.request = request; + expect(conn.transport.readyState).to.be('closed'); + expect(conn.transport.writable).to.be(false); + expect(conn.request).to.be(null); + conn.transport.close(function () { + expect(conn.transport.readyState).to.be('closed'); + expect(conn.transport.writable).to.be(false); + done(); + }); + }, 10); + }); + }); + + socket.transport.on('poll', function() { + // we set a timer to wait for the request to actually reach + setTimeout(function(){ + // kill the underlying connection + sockets[1].end(); + }, 50); + }); + }); + }); + + it('should leave transport in a consistent state after ' + + '`transport error` (websocket)', function (done) { + var engine = listen({ allowUpgrades: false }, function (port) { + // hack to access the sockets created by node-xmlhttprequest + // see: https://github.com/driverdan/node-XMLHttpRequest/issues/44 + var request = require('http').request; + var sockets = []; + http.request = function(opts) { + var req = request.apply(null, arguments); + req.on('socket', function(socket) { + sockets.push(socket); + }); + return req; + }; + + var socket = new eioc.Socket('ws://localhost:%d'.s(port), { transports: ['websocket'] }); + + engine.on('connection', function(conn){ + conn.on('close', function () { + setTimeout(function () { + http.request = request; + expect(conn.transport.readyState).to.be('closed'); + expect(conn.transport.writable).to.be(false); + conn.transport.close(function () { + expect(conn.transport.readyState).to.be('closed'); + expect(conn.transport.writable).to.be(false); + done(); + }); + }, 10); + }); + }); + + socket.on('open', function () { + setTimeout(function () { + // kill the underlying connection + sockets[0].end(); + }, 50); + }); + }); + }); + + it('should trigger transport close callback for `forced close`, ' + + 'even between polling cycle', function (done) { + var engine = listen({ allowUpgrades: false }, function (port) { + var socket = new eioc.Socket('ws://localhost:%d'.s(port), { transports: ['polling'] }); + + engine.on('connection', function (conn) { + socket.transport.on('pollComplete', function() { + conn.close(); + }); + conn.on('close', function (reason) { + expect(reason).to.be('forced close'); + conn.transport.close(function() { + done(); + }); + }); + }); + }); + }); }); describe('messages', function () {