From 9bf10ed0514d96f9f2d40fae2274ab1be8464535 Mon Sep 17 00:00:00 2001 From: Daniel Shaw Date: Sun, 10 Jul 2011 17:11:08 -0700 Subject: [PATCH 01/28] Updated to the latest redis release. Lots of important patches. --- package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/package.json b/package.json index 4b000b81f0..f82758cbf2 100644 --- a/package.json +++ b/package.json @@ -17,7 +17,7 @@ , "dependencies": { "socket.io-client": "0.7.3" , "policyfile": "0.0.3" - , "redis": "0.6.0" + , "redis": "0.6.6" } , "devDependencies": { "expresso": "0.7.7" From 59e250b186ed88209f49140bdb336393066fdee8 Mon Sep 17 00:00:00 2001 From: Daniel Shaw Date: Mon, 11 Jul 2011 00:14:37 -0700 Subject: [PATCH 02/28] Run initStore on store setting change. Fixes #367. --- lib/manager.js | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/lib/manager.js b/lib/manager.js index 6d1cc7c50d..9357a0cd2f 100644 --- a/lib/manager.js +++ b/lib/manager.js @@ -83,14 +83,18 @@ function Manager (server) { , 'client store expiration': 15 }; + var self = this; + this.initStore(); + this.on('set:store', function() { + self.initStore(); + }); + // reset listeners this.oldListeners = server.listeners('request'); server.removeAllListeners('request'); - var self = this; - server.on('request', function (req, res) { self.handleRequest(req, res); }); From dfebed6c2f46a151638fb0d5dd9abf7275159438 Mon Sep 17 00:00:00 2001 From: Daniel Shaw Date: Tue, 12 Jul 2011 02:56:48 -0700 Subject: [PATCH 03/28] Fix typo. Also mentioned in @dluxemburg's #341. --- lib/stores/redis.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/stores/redis.js b/lib/stores/redis.js index 689ec8b0f1..97f26719d6 100644 --- a/lib/stores/redis.js +++ b/lib/stores/redis.js @@ -116,7 +116,7 @@ Redis.prototype.subscribe = function (name, consumer, fn) { self.on('unsubscribe', function unsubscribe (ch) { if (name == ch) { self.sub.removeListener('message', message); - self.removeEvent('unsubscribe', unsubscribe); + self.removeListener('unsubscribe', unsubscribe); } }); From bb2e100e7fe49f96464ca7ee0837a1ce261fcc73 Mon Sep 17 00:00:00 2001 From: einaros Date: Tue, 16 Aug 2011 07:46:57 +0200 Subject: [PATCH 04/28] Added http referrer verification to manager.js verifyOrigin + tests for origins setting --- lib/manager.js | 19 ++++++---- test/manager.test.js | 85 ++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 97 insertions(+), 7 deletions(-) diff --git a/lib/manager.js b/lib/manager.js index 1b854b1739..24e75625ab 100644 --- a/lib/manager.js +++ b/lib/manager.js @@ -870,7 +870,7 @@ Manager.prototype.handshakeData = function (data) { */ Manager.prototype.verifyOrigin = function (request) { - var origin = request.headers.origin + var origin = request.headers.origin || request.headers.referer , origins = this.get('origins'); if (origin === 'null') origin = '*'; @@ -882,14 +882,19 @@ Manager.prototype.verifyOrigin = function (request) { if (origin) { try { var parts = url.parse(origin); - - return - ~origins.indexOf(parts.host + ':' + parts.port) || - ~origins.indexOf(parts.host + ':*') || + var ok = + ~origins.indexOf(parts.hostname + ':' + parts.port) || + ~origins.indexOf(parts.hostname + ':*') || ~origins.indexOf('*:' + parts.port); - } catch (ex) {} + if (!ok) this.log.warn('illegal origin: ' + origin); + return ok; + } catch (ex) { + this.log.warn('error parsing origin'); + } + } + else { + this.log.warn('origin missing from handshake, yet required by config'); } - return false; }; diff --git a/test/manager.test.js b/test/manager.test.js index 6b8319f85c..93922d38db 100644 --- a/test/manager.test.js +++ b/test/manager.test.js @@ -472,6 +472,91 @@ module.exports = { }); }, + 'test that a referer is accepted for *:* origin': function (done) { + var port = ++ports + , io = sio.listen(port) + , cl = client(port); + + io.configure(function () { + io.set('origins', '*:*'); + }); + + cl.get('/socket.io/{protocol}', { headers: { referer: 'http://foo.bar.com:82/something' } }, function (res, data) { + res.statusCode.should.eql(200); + cl.end(); + io.server.close(); + done(); + }); + }, + + 'test that valid referer is accepted for addr:* origin': function (done) { + var port = ++ports + , io = sio.listen(port) + , cl = client(port); + + io.configure(function () { + io.set('origins', 'foo.bar.com:*'); + }); + + cl.get('/socket.io/{protocol}', { headers: { referer: 'http://foo.bar.com/something' } }, function (res, data) { + res.statusCode.should.eql(200); + cl.end(); + io.server.close(); + done(); + }); + }, + + 'test that erroneous referer is denied for addr:* origin': function (done) { + var port = ++ports + , io = sio.listen(port) + , cl = client(port); + + io.configure(function () { + io.set('origins', 'foo.bar.com:*'); + }); + + cl.get('/socket.io/{protocol}', { headers: { referer: 'http://baz.bar.com/something' } }, function (res, data) { + res.statusCode.should.eql(403); + cl.end(); + io.server.close(); + done(); + }); + }, + + 'test that valid referer port is accepted for addr:port origin': function (done) { + var port = ++ports + , io = sio.listen(port) + , cl = client(port); + + io.configure(function () { + io.set('origins', 'foo.bar.com:81'); + }); + + cl.get('/socket.io/{protocol}', { headers: { referer: 'http://foo.bar.com:81/something' } }, function (res, data) { + res.statusCode.should.eql(200); + cl.end(); + io.server.close(); + done(); + }); + }, + + 'test that erroneous referer port is denied for addr:port origin': function (done) { + var port = ++ports + , io = sio.listen(port) + , cl = client(port); + + io.configure(function () { + io.set('origins', 'foo.bar.com:81'); + }); + + cl.get('/socket.io/{protocol}', { headers: { referer: 'http://foo.bar.com/something' } }, function (res, data) { + res.statusCode.should.eql(403); + cl.end(); + io.server.close(); + done(); + }); + }, + 'test limiting the supported transports for a manager': function (done) { var port = ++ports , io = sio.listen(port) From 186649102defa4443e464bf884629311184d3d85 Mon Sep 17 00:00:00 2001 From: einaros Date: Sat, 27 Aug 2011 17:41:49 +0200 Subject: [PATCH 05/28] added hybi10 support --- lib/transports/websocket.js | 327 +---------------------- lib/transports/wsver/8.js | 446 ++++++++++++++++++++++++++++++++ lib/transports/wsver/default.js | 350 +++++++++++++++++++++++++ lib/transports/wsver/index.js | 9 + lib/util.js | 25 ++ 5 files changed, 835 insertions(+), 322 deletions(-) create mode 100644 lib/transports/wsver/8.js create mode 100644 lib/transports/wsver/default.js create mode 100644 lib/transports/wsver/index.js diff --git a/lib/transports/websocket.js b/lib/transports/websocket.js index 41f90dc15d..2082bd6fa1 100644 --- a/lib/transports/websocket.js +++ b/lib/transports/websocket.js @@ -9,10 +9,7 @@ * Module requirements. */ -var Transport = require('../transport') - , EventEmitter = process.EventEmitter - , crypto = require('crypto') - , parser = require('../parser'); +var protocolVersions = require('./wsver'); /** * Export the constructor. @@ -28,323 +25,9 @@ exports = module.exports = WebSocket; */ function WebSocket (mng, data, req) { - // parser - var self = this; - - this.parser = new Parser(); - this.parser.on('data', function (packet) { - self.log.debug(self.name + ' received data packet', packet); - self.onMessage(parser.decodePacket(packet)); - }); - this.parser.on('close', function () { - self.end(); - }); - this.parser.on('error', function () { - self.end(); - }); - - Transport.call(this, mng, data, req); -}; - -/** - * Inherits from Transport. - */ - -WebSocket.prototype.__proto__ = Transport.prototype; - -/** - * Transport name - * - * @api public - */ - -WebSocket.prototype.name = 'websocket'; - -/** - * Called when the socket connects. - * - * @api private - */ - -WebSocket.prototype.onSocketConnect = function () { - var self = this; - - this.socket.setNoDelay(true); - - this.buffer = true; - this.buffered = []; - - if (this.req.headers.upgrade !== 'WebSocket') { - this.log.warn(this.name + ' connection invalid'); - this.end(); - return; + var version = req.headers['sec-websocket-version']; + if (typeof version !== 'undefined' && typeof protocolVersions[version] !== 'undefined') { + return new protocolVersions[version](mng, data, req); } - - var origin = this.req.headers.origin - , location = (this.socket.encrypted ? 'wss' : 'ws') - + '://' + this.req.headers.host + this.req.url - , waitingForNonce = false; - - if (this.req.headers['sec-websocket-key1']) { - // If we don't have the nonce yet, wait for it (HAProxy compatibility). - if (! (this.req.head && this.req.head.length >= 8)) { - waitingForNonce = true; - } - - var headers = [ - 'HTTP/1.1 101 WebSocket Protocol Handshake' - , 'Upgrade: WebSocket' - , 'Connection: Upgrade' - , 'Sec-WebSocket-Origin: ' + origin - , 'Sec-WebSocket-Location: ' + location - ]; - - if (this.req.headers['sec-websocket-protocol']){ - headers.push('Sec-WebSocket-Protocol: ' - + this.req.headers['sec-websocket-protocol']); - } - } else { - var headers = [ - 'HTTP/1.1 101 Web Socket Protocol Handshake' - , 'Upgrade: WebSocket' - , 'Connection: Upgrade' - , 'WebSocket-Origin: ' + origin - , 'WebSocket-Location: ' + location - ]; - } - - try { - this.socket.write(headers.concat('', '').join('\r\n')); - this.socket.setTimeout(0); - this.socket.setNoDelay(true); - this.socket.setEncoding('utf8'); - } catch (e) { - this.end(); - return; - } - - if (waitingForNonce) { - this.socket.setEncoding('binary'); - } else if (this.proveReception(headers)) { - self.flush(); - } - - var headBuffer = ''; - - this.socket.on('data', function (data) { - if (waitingForNonce) { - headBuffer += data; - - if (headBuffer.length < 8) { - return; - } - - // Restore the connection to utf8 encoding after receiving the nonce - self.socket.setEncoding('utf8'); - waitingForNonce = false; - - // Stuff the nonce into the location where it's expected to be - self.req.head = headBuffer.substr(0, 8); - headBuffer = ''; - - if (self.proveReception(headers)) { - self.flush(); - } - - return; - } - - self.parser.add(data); - }); -}; - -/** - * Writes to the socket. - * - * @api private - */ - -WebSocket.prototype.write = function (data) { - if (this.open) { - this.drained = false; - - if (this.buffer) { - this.buffered.push(data); - return this; - } - - var length = Buffer.byteLength(data) - , buffer = new Buffer(2 + length); - - buffer.write('\u0000', 'binary'); - buffer.write(data, 1, 'utf8'); - buffer.write('\uffff', 1 + length, 'binary'); - - try { - if (this.socket.write(buffer)) { - this.drained = true; - } - } catch (e) { - this.end(); - } - - this.log.debug(this.name + ' writing', data); - } -}; - -/** - * Flushes the internal buffer - * - * @api private - */ - -WebSocket.prototype.flush = function () { - this.buffer = false; - - for (var i = 0, l = this.buffered.length; i < l; i++) { - this.write(this.buffered.splice(0, 1)[0]); - } -}; - -/** - * Finishes the handshake. - * - * @api private - */ - -WebSocket.prototype.proveReception = function (headers) { - var self = this - , k1 = this.req.headers['sec-websocket-key1'] - , k2 = this.req.headers['sec-websocket-key2']; - - if (k1 && k2){ - var md5 = crypto.createHash('md5'); - - [k1, k2].forEach(function (k) { - var n = parseInt(k.replace(/[^\d]/g, '')) - , spaces = k.replace(/[^ ]/g, '').length; - - if (spaces === 0 || n % spaces !== 0){ - self.log.warn('Invalid ' + self.name + ' key: "' + k + '".'); - self.end(); - return false; - } - - n /= spaces; - - md5.update(String.fromCharCode( - n >> 24 & 0xFF, - n >> 16 & 0xFF, - n >> 8 & 0xFF, - n & 0xFF)); - }); - - md5.update(this.req.head.toString('binary')); - - try { - this.socket.write(md5.digest('binary'), 'binary'); - } catch (e) { - this.end(); - } - } - - return true; -}; - -/** - * Writes a payload. - * - * @api private - */ - -WebSocket.prototype.payload = function (msgs) { - for (var i = 0, l = msgs.length; i < l; i++) { - this.write(msgs[i]); - } - - return this; -}; - -/** - * Closes the connection. - * - * @api private - */ - -WebSocket.prototype.doClose = function () { - this.socket.end(); -}; - -/** - * WebSocket parser - * - * @api public - */ - -function Parser () { - this.buffer = ''; - this.i = 0; -}; - -/** - * Inherits from EventEmitter. - */ - -Parser.prototype.__proto__ = EventEmitter.prototype; - -/** - * Adds data to the buffer. - * - * @api public - */ - -Parser.prototype.add = function (data) { - this.buffer += data; - this.parse(); -}; - -/** - * Parses the buffer. - * - * @api private - */ - -Parser.prototype.parse = function () { - for (var i = this.i, chr, l = this.buffer.length; i < l; i++){ - chr = this.buffer[i]; - - if (this.buffer.length == 2 && this.buffer[1] == '\u0000') { - this.emit('close'); - this.buffer = ''; - this.i = 0; - return; - } - - if (i === 0){ - if (chr != '\u0000') - this.error('Bad framing. Expected null byte as first frame'); - else - continue; - } - - if (chr == '\ufffd'){ - this.emit('data', this.buffer.substr(1, i - 1)); - this.buffer = this.buffer.substr(i + 1); - this.i = 0; - return this.parse(); - } - } -}; - -/** - * Handles an error - * - * @api private - */ - -Parser.prototype.error = function (reason) { - this.buffer = ''; - this.i = 0; - this.emit('error', reason); - return this; + return new protocolVersions['default'](mng, data, req); }; diff --git a/lib/transports/wsver/8.js b/lib/transports/wsver/8.js new file mode 100644 index 0000000000..939ab30180 --- /dev/null +++ b/lib/transports/wsver/8.js @@ -0,0 +1,446 @@ + +/*! + * socket.io-node + * Copyright(c) 2011 LearnBoost + * MIT Licensed + */ + +/** + * Module requirements. + */ + +var Transport = require('../../transport') + , EventEmitter = process.EventEmitter + , crypto = require('crypto') + , parser = require('../../parser') + , util = require('../../util'); + +/** + * Export the constructor. + */ + +exports = module.exports = WebSocket; +exports.Parser = Parser; + +/** + * HTTP interface constructor. Interface compatible with all transports that + * depend on request-response cycles. + * + * @api public + */ + +function WebSocket (mng, data, req) { + // parser + var self = this; + + this.parser = new Parser(); + this.parser.on('data', function (packet) { + self.onMessage(parser.decodePacket(packet)); + }); + this.parser.on('ping', function () { + // version 8 ping => pong + this.socket.write('\u008a\u0000'); + }); + this.parser.on('close', function () { + self.end(); + }); + this.parser.on('error', function () { + self.end(); + }); + + Transport.call(this, mng, data, req); +}; + +/** + * Inherits from Transport. + */ + +WebSocket.prototype.__proto__ = Transport.prototype; + +/** + * Transport name + * + * @api public + */ + +WebSocket.prototype.name = 'websocket'; + +/** + * Called when the socket connects. + * + * @api private + */ + +WebSocket.prototype.onSocketConnect = function () { + var self = this; + + this.socket.setNoDelay(true); + + if (this.req.headers.upgrade !== 'websocket') { + this.log.warn(this.name + ' connection invalid'); + this.end(); + return; + } + + var origin = this.req.headers.origin + , location = (this.socket.encrypted ? 'wss' : 'ws') + + '://' + this.req.headers.host + this.req.url; + + if (!this.req.headers['sec-websocket-key']) { + this.log.warn(this.name + ' connection invalid: received no key'); + this.end(); + return; + } + + // calc key + var key = this.req.headers['sec-websocket-key']; + var shasum = crypto.createHash('sha1'); + shasum.update(key + "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"); + key = shasum.digest('base64'); + + var headers = [ + 'HTTP/1.1 101 Switching Protocols' + , 'Upgrade: websocket' + , 'Connection: Upgrade' + , 'Sec-WebSocket-Accept: ' + key + ]; + + try { + this.socket.write(headers.concat('', '').join('\r\n')); + this.socket.setTimeout(0); + this.socket.setNoDelay(true); + } catch (e) { + this.end(); + return; + } + + this.socket.on('data', function (data) { + self.parser.add(data); + }); +}; + +/** + * Writes to the socket. + * + * @api private + */ + +WebSocket.prototype.write = function (data) { + if (this.open) { + var buf = this.frame(0x81, data); + this.socket.write(buf, 'binary'); + this.log.debug(this.name + ' writing', data); + } +}; + +WebSocket.prototype.frame = function (opcode, data) { + var startOffset = 2, secondByte = data.length, buf; + if (data.length > 125) { + // opcode = 0x81; + startOffset += 2; + secondByte = 126; + } + if (data.length > 65536) { + startOffset += 6; + secondByte = 127; + } + buf = new Buffer(data.length + startOffset, 'utf8'); + buf[0] = opcode; + buf[1] = secondByte; + switch (secondByte) { + case 126: + buf[2] = data.length >>> 8; + buf[3] = data.length % 256; + break; + case 127: + for (var i = 6; i > 2; i--) { + buf[i] = secondByte % 256; + secondByte >>>= 8; + } + } + buf.write(data, startOffset, 'utf8'); + return buf; +}; + +/** + * Closes the connection. + * + * @api private + */ + +WebSocket.prototype.doClose = function () { + this.socket.end(); +}; + +/** + * WebSocket parser + * + * @api public + */ + +function Parser () { + this.state = { + activeFragmentedOperation: null, + lastFragment: false, + masked: false, + opcode: 0 + }; + this.overflow = null; + this.expectOffset = 0; + this.expectBuffer = null; + this.expectHandler = null; + this.currentMessage = ''; + + var self = this; + this.opcodeHandlers = { + // text + '1': function(data) { + var finish = function(mask, data) { + self.currentMessage += self.unmask(mask, data); + if (self.state.lastFragment) { + self.emit('data', self.currentMessage); + self.currentMessage = ''; + } + self.endPacket(); + } + + var expectData = function(length) { + if (self.state.masked) { + self.expect('Mask', 4, function(data) { + var mask = data; + self.expect('Data', length, function(data) { + finish(mask, data); + }); + }); + } + else { + self.expect('Data', length, function(data) { + finish(null, data); + }); + } + } + + // decode length + var firstLength = data[1] & 0x7f; + if (firstLength < 126) { + expectData(firstLength); + } + else if (firstLength == 126) { + self.expect('Length', 2, function(data) { + expectData(util.unpack(data)); + }); + } + else if (firstLength == 127) { + self.expect('Length', 8, function(data) { + expectData(util.unpack(data)); + }); + } + }, + // ping + '9': function(data) { + if (self.state.lastFragment == false) { + self.error('fragmented ping is not supported'); + return; + } + + var finish = function(mask, data) { + self.emit('ping', self.unmask(mask, data)); + self.endPacket(); + } + + var expectData = function(length) { + if (self.state.masked) { + self.expect('Mask', 4, function(data) { + var mask = data; + self.expect('Data', length, function(data) { + finish(mask, data); + }); + }); + } + else { + self.expect('Data', length, function(data) { + finish(null, data); + }); + } + } + + // decode length + var firstLength = data[1] & 0x7f; + if (firstLength == 0) { + finish(null, null); + } + else if (firstLength < 126) { + expectData(firstLength); + } + else if (firstLength == 126) { + self.expect('Length', 2, function(data) { + expectData(util.unpack(data)); + }); + } + else if (firstLength == 127) { + self.expect('Length', 8, function(data) { + expectData(util.unpack(data)); + }); + } + } + } + + this.expect('Opcode', 2, this.processPacket); +}; + +/** + * Inherits from EventEmitter. + */ + +Parser.prototype.__proto__ = EventEmitter.prototype; + +/** + * Add new data to the parser. + * + * @api public + */ + +Parser.prototype.add = function(data) { + if (this.expectBuffer == null) { + this.addToOverflow(data); + return; + } + var toRead = Math.min(data.length, this.expectBuffer.length - this.expectOffset); + data.copy(this.expectBuffer, this.expectOffset, 0, toRead); + this.expectOffset += toRead; + if (toRead < data.length) { + // at this point the overflow buffer shouldn't at all exist + this.overflow = new Buffer(data.length - toRead); + data.copy(this.overflow, 0, toRead, toRead + this.overflow.length); + } + if (this.expectOffset == this.expectBuffer.length) { + var bufferForHandler = this.expectBuffer; + this.expectBuffer = null; + this.expectOffset = 0; + this.expectHandler.call(this, bufferForHandler); + } +} + +/** + * Adds a piece of data to the overflow. + * + * @api private + */ + +Parser.prototype.addToOverflow = function(data) { + if (this.overflow == null) this.overflow = data; + else { + var prevOverflow = this.overflow; + this.overflow = new Buffer(this.overflow.length + data.length); + prevOverflow.copy(this.overflow, 0); + data.copy(this.overflow, prevOverflow.length); + } +} + +/** + * Waits for a certain amount of bytes to be available, then fires a callback. + * + * @api private + */ + +Parser.prototype.expect = function(what, length, handler) { + this.expectBuffer = new Buffer(length); + this.expectOffset = 0; + this.expectHandler = handler; + if (this.overflow != null) { + var toOverflow = this.overflow; + this.overflow = null; + this.add(toOverflow); + } +} + +/** + * Start processing a new packet. + * + * @api private + */ + +Parser.prototype.processPacket = function (data) { + if ((data[0] & 0x70) != 0) this.error('reserved fields not empty'); + if ((data[0] & 0x80) == 0x80) this.state.lastFragment = true; + this.state.masked = (data[1] & 0x80) == 0x80; + var opcode = data[0] & 0xf; + if (opcode == 0) { + if (this.state.opcode != 1 || this.state.opcode != 2) { + this.error('continuation frame cannot follow current opcode') + return; + } + } + else this.state.opcode = opcode; + this.state.opcode = data[0] & 0xf; + var handler = this.opcodeHandlers[this.state.opcode]; + if (typeof handler == 'undefined') this.error('no handler for opcode ' + this.state.opcode); + else handler(data); +} + +/** + * Endprocessing a packet. + * + * @api private + */ + +Parser.prototype.endPacket = function() { + this.expectOffset = 0; + this.expectBuffer = null; + this.expectHandler = null; + if (this.state.lastFragment && this.state.opcode == this.state.activeFragmentedOperation) { + // end current fragmented operation + this.state.activeFragmentedOperation = null; + } + this.state.lastFragment = false; + this.state.opcode = this.state.activeFragmentedOperation != null ? this.state.activeFragmentedOperation : 0; + this.state.masked = false; + this.expect('Opcode', 2, this.processPacket); +} + +/** + * Reset the parser state. + * + * @api private + */ + +Parser.prototype.reset = function() { + this.state = { + activeFragmentedOperation: null, + lastFragment: false, + masked: false, + opcode: 0 + }; + this.expectOffset = 0; + this.expectBuffer = null; + this.expectHandler = null; + this.overflow = null; + this.currentMessage = ''; +} + +/** + * Unmask received data. + * + * @api private + */ + +Parser.prototype.unmask = function (mask, buf) { + if (mask != null) { + for (var i = 0, ll = buf.length; i < ll; i++) { + buf[i] ^= mask[i % 4]; + } + } + return buf != null ? buf.toString('utf8') : ''; +} + +/** + * Handles an error + * + * @api private + */ + +Parser.prototype.error = function (reason) { + this.reset(); + this.emit('error', reason); + return this; +}; diff --git a/lib/transports/wsver/default.js b/lib/transports/wsver/default.js new file mode 100644 index 0000000000..092729d923 --- /dev/null +++ b/lib/transports/wsver/default.js @@ -0,0 +1,350 @@ + +/*! + * socket.io-node + * Copyright(c) 2011 LearnBoost + * MIT Licensed + */ + +/** + * Module requirements. + */ + +var Transport = require('../../transport') + , EventEmitter = process.EventEmitter + , crypto = require('crypto') + , parser = require('../../parser'); + +/** + * Export the constructor. + */ + +exports = module.exports = WebSocket; + +/** + * HTTP interface constructor. Interface compatible with all transports that + * depend on request-response cycles. + * + * @api public + */ + +function WebSocket (mng, data, req) { + // parser + var self = this; + + this.parser = new Parser(); + this.parser.on('data', function (packet) { + self.log.debug(self.name + ' received data packet', packet); + self.onMessage(parser.decodePacket(packet)); + }); + this.parser.on('close', function () { + self.end(); + }); + this.parser.on('error', function () { + self.end(); + }); + + Transport.call(this, mng, data, req); +}; + +/** + * Inherits from Transport. + */ + +WebSocket.prototype.__proto__ = Transport.prototype; + +/** + * Transport name + * + * @api public + */ + +WebSocket.prototype.name = 'websocket'; + +/** + * Called when the socket connects. + * + * @api private + */ + +WebSocket.prototype.onSocketConnect = function () { + var self = this; + + this.socket.setNoDelay(true); + + this.buffer = true; + this.buffered = []; + + if (this.req.headers.upgrade !== 'WebSocket') { + this.log.warn(this.name + ' connection invalid'); + this.end(); + return; + } + + var origin = this.req.headers.origin + , location = (this.socket.encrypted ? 'wss' : 'ws') + + '://' + this.req.headers.host + this.req.url + , waitingForNonce = false; + + if (this.req.headers['sec-websocket-key1']) { + // If we don't have the nonce yet, wait for it (HAProxy compatibility). + if (! (this.req.head && this.req.head.length >= 8)) { + waitingForNonce = true; + } + + var headers = [ + 'HTTP/1.1 101 WebSocket Protocol Handshake' + , 'Upgrade: WebSocket' + , 'Connection: Upgrade' + , 'Sec-WebSocket-Origin: ' + origin + , 'Sec-WebSocket-Location: ' + location + ]; + + if (this.req.headers['sec-websocket-protocol']){ + headers.push('Sec-WebSocket-Protocol: ' + + this.req.headers['sec-websocket-protocol']); + } + } else { + var headers = [ + 'HTTP/1.1 101 Web Socket Protocol Handshake' + , 'Upgrade: WebSocket' + , 'Connection: Upgrade' + , 'WebSocket-Origin: ' + origin + , 'WebSocket-Location: ' + location + ]; + } + + try { + this.socket.write(headers.concat('', '').join('\r\n')); + this.socket.setTimeout(0); + this.socket.setNoDelay(true); + this.socket.setEncoding('utf8'); + } catch (e) { + this.end(); + return; + } + + if (waitingForNonce) { + this.socket.setEncoding('binary'); + } else if (this.proveReception(headers)) { + self.flush(); + } + + var headBuffer = ''; + + this.socket.on('data', function (data) { + if (waitingForNonce) { + headBuffer += data; + + if (headBuffer.length < 8) { + return; + } + + // Restore the connection to utf8 encoding after receiving the nonce + self.socket.setEncoding('utf8'); + waitingForNonce = false; + + // Stuff the nonce into the location where it's expected to be + self.req.head = headBuffer.substr(0, 8); + headBuffer = ''; + + if (self.proveReception(headers)) { + self.flush(); + } + + return; + } + + self.parser.add(data); + }); +}; + +/** + * Writes to the socket. + * + * @api private + */ + +WebSocket.prototype.write = function (data) { + if (this.open) { + this.drained = false; + + if (this.buffer) { + this.buffered.push(data); + return this; + } + + var length = Buffer.byteLength(data) + , buffer = new Buffer(2 + length); + + buffer.write('\u0000', 'binary'); + buffer.write(data, 1, 'utf8'); + buffer.write('\uffff', 1 + length, 'binary'); + + try { + if (this.socket.write(buffer)) { + this.drained = true; + } + } catch (e) { + this.end(); + } + + this.log.debug(this.name + ' writing', data); + } +}; + +/** + * Flushes the internal buffer + * + * @api private + */ + +WebSocket.prototype.flush = function () { + this.buffer = false; + + for (var i = 0, l = this.buffered.length; i < l; i++) { + this.write(this.buffered.splice(0, 1)[0]); + } +}; + +/** + * Finishes the handshake. + * + * @api private + */ + +WebSocket.prototype.proveReception = function (headers) { + var self = this + , k1 = this.req.headers['sec-websocket-key1'] + , k2 = this.req.headers['sec-websocket-key2']; + + if (k1 && k2){ + var md5 = crypto.createHash('md5'); + + [k1, k2].forEach(function (k) { + var n = parseInt(k.replace(/[^\d]/g, '')) + , spaces = k.replace(/[^ ]/g, '').length; + + if (spaces === 0 || n % spaces !== 0){ + self.log.warn('Invalid ' + self.name + ' key: "' + k + '".'); + self.end(); + return false; + } + + n /= spaces; + + md5.update(String.fromCharCode( + n >> 24 & 0xFF, + n >> 16 & 0xFF, + n >> 8 & 0xFF, + n & 0xFF)); + }); + + md5.update(this.req.head.toString('binary')); + + try { + this.socket.write(md5.digest('binary'), 'binary'); + } catch (e) { + this.end(); + } + } + + return true; +}; + +/** + * Writes a payload. + * + * @api private + */ + +WebSocket.prototype.payload = function (msgs) { + for (var i = 0, l = msgs.length; i < l; i++) { + this.write(msgs[i]); + } + + return this; +}; + +/** + * Closes the connection. + * + * @api private + */ + +WebSocket.prototype.doClose = function () { + this.socket.end(); +}; + +/** + * WebSocket parser + * + * @api public + */ + +function Parser () { + this.buffer = ''; + this.i = 0; +}; + +/** + * Inherits from EventEmitter. + */ + +Parser.prototype.__proto__ = EventEmitter.prototype; + +/** + * Adds data to the buffer. + * + * @api public + */ + +Parser.prototype.add = function (data) { + this.buffer += data; + this.parse(); +}; + +/** + * Parses the buffer. + * + * @api private + */ + +Parser.prototype.parse = function () { + for (var i = this.i, chr, l = this.buffer.length; i < l; i++){ + chr = this.buffer[i]; + + if (this.buffer.length == 2 && this.buffer[1] == '\u0000') { + this.emit('close'); + this.buffer = ''; + this.i = 0; + return; + } + + if (i === 0){ + if (chr != '\u0000') + this.error('Bad framing. Expected null byte as first frame'); + else + continue; + } + + if (chr == '\ufffd'){ + this.emit('data', this.buffer.substr(1, i - 1)); + this.buffer = this.buffer.substr(i + 1); + this.i = 0; + return this.parse(); + } + } +}; + +/** + * Handles an error + * + * @api private + */ + +Parser.prototype.error = function (reason) { + this.buffer = ''; + this.i = 0; + this.emit('error', reason); + return this; +}; diff --git a/lib/transports/wsver/index.js b/lib/transports/wsver/index.js new file mode 100644 index 0000000000..6f94f5975a --- /dev/null +++ b/lib/transports/wsver/index.js @@ -0,0 +1,9 @@ + +/** + * Export websocket versions. + */ + +module.exports = { + 8: require('./8') + , default: require('./default') +}; diff --git a/lib/util.js b/lib/util.js index 25f31f2bd8..f7d9f2b4b4 100644 --- a/lib/util.js +++ b/lib/util.js @@ -23,3 +23,28 @@ exports.toArray = function (enu) { return arr; }; + +/** + * Unpacks a buffer to a number. + * + * @api public + */ + +exports.unpack = function (buffer) { + var n = 0; + for (var i = 0; i < buffer.length; ++i) { + n = (i == 0) ? buffer[i] : (n * 256) + buffer[i]; + } + return n; +} + +/** + * Left pads a string. + * + * @api public + */ + +exports.padl = function (s,n,c) { + return new Array(1 + n - s.length).join(c) + s; +} + From a0758703081ec8663194d2e4102d3659b623a9c8 Mon Sep 17 00:00:00 2001 From: einaros Date: Sat, 27 Aug 2011 17:45:39 +0200 Subject: [PATCH 06/28] added hybi10 parser tests --- ...transports.websocket.hybi10-parser.test.js | 208 ++++++++++++++++++ 1 file changed, 208 insertions(+) create mode 100644 test/transports.websocket.hybi10-parser.test.js diff --git a/test/transports.websocket.hybi10-parser.test.js b/test/transports.websocket.hybi10-parser.test.js new file mode 100644 index 0000000000..b65e999dd4 --- /dev/null +++ b/test/transports.websocket.hybi10-parser.test.js @@ -0,0 +1,208 @@ +var assert = require('assert'); +var Parser = require('../lib/transports/wsver/8.js').Parser; + +function makeBuffer(byteStr) { + var bytes = byteStr.split(' '); + var buf = new Buffer(bytes.length); + for (var i = 0; i < bytes.length; ++i) { + buf[i] = parseInt(bytes[i], 16); + } + return buf; +} + +function splitBuffer(buffer) { + var b1 = new Buffer(Math.ceil(buffer.length / 2)); + var b2 = new Buffer(Math.floor(buffer.length / 2)); + buffer.copy(b1, 0, 0, b1.length); + buffer.copy(b2, 0, b1.length, b1.length + b2.length); + return [b1, b2]; +} + +function mask(str, maskString) { + var buf = new Buffer(str); + var mask = makeBuffer(maskString || '34 83 a8 68'); + for (var i = 0; i < buf.length; ++i) { + buf[i] ^= mask[i % 4]; + } + return buf; +} + +function unpack(buffer) { + var n = 0; + for (var i = 0; i < buffer.length; ++i) { + n = (i == 0) ? buffer[i] : (n * 256) + buffer[i]; + } + return n; +} + +function pack(length, number) { + return padl(number.toString(16), length, '0').replace(/(\d\d)/g, '$1 ').trim(); +} + +function padl(s,n,c) { + return new Array(1 + n - s.length).join(c) + s; +} + +function dump(data) { + var s = ''; + for (var i = 0; i < data.length; ++i) { + s += padl(data[i].toString(16), 2, '0') + ' '; + } + return s.trim(); +} + +module.exports = { + 'can parse unmasked text message': function() { + var p = new Parser(); + var packet = '81 05 48 65 6c 6c 6f'; + + var gotData = false; + p.on('data', function(data) { + gotData = true; + assert.equal('Hello', data); + }); + + p.add(makeBuffer(packet)); + assert.ok(gotData); + }, + 'can parse masked text message': function() { + var p = new Parser(); + var packet = '81 93 34 83 a8 68 01 b9 92 52 4f a1 c6 09 59 e6 8a 52 16 e6 cb 00 5b a1 d5'; + + var gotData = false; + p.on('data', function(data) { + gotData = true; + assert.equal('5:::{"name":"echo"}', data); + }); + + p.add(makeBuffer(packet)); + assert.ok(gotData); + }, + 'can parse a masked text message longer of 300 bytes': function() { + var p = new Parser(); + var message = 'A'; + for (var i = 0; i < 300; ++i) message += (i % 5).toString(); + var packet = '81 FE ' + pack(4, message.length) + ' 34 83 a8 68 ' + dump(mask(message, '34 83 a8 68')); + + var gotData = false; + p.on('data', function(data) { + gotData = true; + assert.equal(message, data); + }); + + p.add(makeBuffer(packet)); + assert.ok(gotData); + }, + 'can parse a fragmented masked text message of 300 bytes': function() { + var p = new Parser(); + var message = 'A'; + for (var i = 0; i < 300; ++i) message += (i % 5).toString(); + var msgpiece1 = message.substr(0, 150); + var msgpiece2 = message.substr(150); + var packet1 = '01 FE ' + pack(4, msgpiece1.length) + ' 34 83 a8 68 ' + dump(mask(msgpiece1, '34 83 a8 68')); + var packet2 = '81 FE ' + pack(4, msgpiece2.length) + ' 34 83 a8 68 ' + dump(mask(msgpiece2, '34 83 a8 68')); + + var gotData = false; + p.on('data', function(data) { + gotData = true; + assert.equal(message, data); + }); + + p.add(makeBuffer(packet1)); + p.add(makeBuffer(packet2)); + assert.ok(gotData); + }, + 'can parse a ping message': function() { + var p = new Parser(); + var message = 'Hello'; + var packet = '89 FE ' + pack(4, message.length) + ' 34 83 a8 68 ' + dump(mask(message, '34 83 a8 68')); + + var gotPing = false; + p.on('ping', function(data) { + gotPing = true; + assert.equal(message, data); + }); + + p.add(makeBuffer(packet)); + assert.ok(gotPing); + }, + 'can parse a ping with no data': function() { + var p = new Parser(); + var packet = '89 00'; + + var gotPing = false; + p.on('ping', function(data) { + gotPing = true; + }); + + p.add(makeBuffer(packet)); + assert.ok(gotPing); + }, + 'can parse a fragmented masked text message of 300 bytes with a ping in the middle': function() { + var p = new Parser(); + var message = 'A'; + for (var i = 0; i < 300; ++i) message += (i % 5).toString(); + + var msgpiece1 = message.substr(0, 150); + var packet1 = '01 FE ' + pack(4, msgpiece1.length) + ' 34 83 a8 68 ' + dump(mask(msgpiece1, '34 83 a8 68')); + + var pingMessage = 'Hello'; + var pingPacket = '89 FE ' + pack(4, pingMessage.length) + ' 34 83 a8 68 ' + dump(mask(pingMessage, '34 83 a8 68')); + + var msgpiece2 = message.substr(150); + var packet2 = '81 FE ' + pack(4, msgpiece2.length) + ' 34 83 a8 68 ' + dump(mask(msgpiece2, '34 83 a8 68')); + + var gotData = false; + p.on('data', function(data) { + gotData = true; + assert.equal(message, data); + }); + var gotPing = false; + p.on('ping', function(data) { + gotPing = true; + assert.equal(pingMessage, data); + }); + + p.add(makeBuffer(packet1)); + p.add(makeBuffer(pingPacket)); + p.add(makeBuffer(packet2)); + assert.ok(gotData); + assert.ok(gotPing); + }, + 'can parse a fragmented masked text message of 300 bytes with a ping in the middle, which is delievered over sevaral tcp packets': function() { + var p = new Parser(); + var message = 'A'; + for (var i = 0; i < 300; ++i) message += (i % 5).toString(); + + var msgpiece1 = message.substr(0, 150); + var packet1 = '01 FE ' + pack(4, msgpiece1.length) + ' 34 83 a8 68 ' + dump(mask(msgpiece1, '34 83 a8 68')); + + var pingMessage = 'Hello'; + var pingPacket = '89 FE ' + pack(4, pingMessage.length) + ' 34 83 a8 68 ' + dump(mask(pingMessage, '34 83 a8 68')); + + var msgpiece2 = message.substr(150); + var packet2 = '81 FE ' + pack(4, msgpiece2.length) + ' 34 83 a8 68 ' + dump(mask(msgpiece2, '34 83 a8 68')); + + var gotData = false; + p.on('data', function(data) { + gotData = true; + assert.equal(message, data); + }); + var gotPing = false; + p.on('ping', function(data) { + gotPing = true; + assert.equal(pingMessage, data); + }); + + var buffers = []; + buffers = buffers.concat(splitBuffer(makeBuffer(packet1))); + buffers = buffers.concat(splitBuffer(makeBuffer(pingPacket))); + buffers = buffers.concat(splitBuffer(makeBuffer(packet2))); + for (var i = 0; i < buffers.length; ++i) { + p.add(buffers[i]); + } + assert.ok(gotData); + assert.ok(gotPing); + }, +}; + From 39ae8d46295c431e150f8e1b3d5301e1c462f9c1 Mon Sep 17 00:00:00 2001 From: einaros Date: Sat, 27 Aug 2011 17:54:13 +0200 Subject: [PATCH 07/28] added hybi10 close operation --- lib/transports/wsver/8.js | 6 ++- ...transports.websocket.hybi10-parser.test.js | 42 ++++++++++++------- 2 files changed, 32 insertions(+), 16 deletions(-) diff --git a/lib/transports/wsver/8.js b/lib/transports/wsver/8.js index 939ab30180..eb6706cdef 100644 --- a/lib/transports/wsver/8.js +++ b/lib/transports/wsver/8.js @@ -196,7 +196,7 @@ function Parser () { // text '1': function(data) { var finish = function(mask, data) { - self.currentMessage += self.unmask(mask, data); + self.currentMessage += self.unmask(mask, data); if (self.state.lastFragment) { self.emit('data', self.currentMessage); self.currentMessage = ''; @@ -236,6 +236,10 @@ function Parser () { }); } }, + '8': function(data) { + self.emit('close'); + self.reset(); + }, // ping '9': function(data) { if (self.state.lastFragment == false) { diff --git a/test/transports.websocket.hybi10-parser.test.js b/test/transports.websocket.hybi10-parser.test.js index b65e999dd4..78f451f6cb 100644 --- a/test/transports.websocket.hybi10-parser.test.js +++ b/test/transports.websocket.hybi10-parser.test.js @@ -1,7 +1,7 @@ var assert = require('assert'); var Parser = require('../lib/transports/wsver/8.js').Parser; -function makeBuffer(byteStr) { +function makeBufferFromHexString(byteStr) { var bytes = byteStr.split(' '); var buf = new Buffer(bytes.length); for (var i = 0; i < bytes.length; ++i) { @@ -20,7 +20,7 @@ function splitBuffer(buffer) { function mask(str, maskString) { var buf = new Buffer(str); - var mask = makeBuffer(maskString || '34 83 a8 68'); + var mask = makeBufferFromHexString(maskString || '34 83 a8 68'); for (var i = 0; i < buf.length; ++i) { buf[i] ^= mask[i % 4]; } @@ -62,9 +62,21 @@ module.exports = { assert.equal('Hello', data); }); - p.add(makeBuffer(packet)); + p.add(makeBufferFromHexString(packet)); assert.ok(gotData); }, + 'can parse close message': function() { + var p = new Parser(); + var packet = '88 00'; + + var gotClose = false; + p.on('close', function(data) { + gotClose = true; + }); + + p.add(makeBufferFromHexString(packet)); + assert.ok(gotClose); + }, 'can parse masked text message': function() { var p = new Parser(); var packet = '81 93 34 83 a8 68 01 b9 92 52 4f a1 c6 09 59 e6 8a 52 16 e6 cb 00 5b a1 d5'; @@ -75,7 +87,7 @@ module.exports = { assert.equal('5:::{"name":"echo"}', data); }); - p.add(makeBuffer(packet)); + p.add(makeBufferFromHexString(packet)); assert.ok(gotData); }, 'can parse a masked text message longer of 300 bytes': function() { @@ -90,7 +102,7 @@ module.exports = { assert.equal(message, data); }); - p.add(makeBuffer(packet)); + p.add(makeBufferFromHexString(packet)); assert.ok(gotData); }, 'can parse a fragmented masked text message of 300 bytes': function() { @@ -108,8 +120,8 @@ module.exports = { assert.equal(message, data); }); - p.add(makeBuffer(packet1)); - p.add(makeBuffer(packet2)); + p.add(makeBufferFromHexString(packet1)); + p.add(makeBufferFromHexString(packet2)); assert.ok(gotData); }, 'can parse a ping message': function() { @@ -123,7 +135,7 @@ module.exports = { assert.equal(message, data); }); - p.add(makeBuffer(packet)); + p.add(makeBufferFromHexString(packet)); assert.ok(gotPing); }, 'can parse a ping with no data': function() { @@ -135,7 +147,7 @@ module.exports = { gotPing = true; }); - p.add(makeBuffer(packet)); + p.add(makeBufferFromHexString(packet)); assert.ok(gotPing); }, 'can parse a fragmented masked text message of 300 bytes with a ping in the middle': function() { @@ -163,9 +175,9 @@ module.exports = { assert.equal(pingMessage, data); }); - p.add(makeBuffer(packet1)); - p.add(makeBuffer(pingPacket)); - p.add(makeBuffer(packet2)); + p.add(makeBufferFromHexString(packet1)); + p.add(makeBufferFromHexString(pingPacket)); + p.add(makeBufferFromHexString(packet2)); assert.ok(gotData); assert.ok(gotPing); }, @@ -195,9 +207,9 @@ module.exports = { }); var buffers = []; - buffers = buffers.concat(splitBuffer(makeBuffer(packet1))); - buffers = buffers.concat(splitBuffer(makeBuffer(pingPacket))); - buffers = buffers.concat(splitBuffer(makeBuffer(packet2))); + buffers = buffers.concat(splitBuffer(makeBufferFromHexString(packet1))); + buffers = buffers.concat(splitBuffer(makeBufferFromHexString(pingPacket))); + buffers = buffers.concat(splitBuffer(makeBufferFromHexString(packet2))); for (var i = 0; i < buffers.length; ++i) { p.add(buffers[i]); } From a9e9e64eab7a868eb322bb03b62709e1541fba25 Mon Sep 17 00:00:00 2001 From: einaros Date: Sat, 27 Aug 2011 18:27:52 +0200 Subject: [PATCH 08/28] updated to work with two-level websocket versioning --- lib/transports/flashsocket.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/transports/flashsocket.js b/lib/transports/flashsocket.js index 48b3d95906..16768196ac 100644 --- a/lib/transports/flashsocket.js +++ b/lib/transports/flashsocket.js @@ -24,7 +24,7 @@ exports = module.exports = FlashSocket; */ function FlashSocket (mng, data, req) { - WebSocket.call(this, mng, data, req); + return WebSocket.call(this, mng, data, req); } /** From 7e60d3717159b3d0d84aabd6731e523ae6f7b5a8 Mon Sep 17 00:00:00 2001 From: einaros Date: Sat, 27 Aug 2011 18:42:17 +0200 Subject: [PATCH 09/28] minor cleanups --- lib/transports/wsver/8.js | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/lib/transports/wsver/8.js b/lib/transports/wsver/8.js index eb6706cdef..467d8d9b2c 100644 --- a/lib/transports/wsver/8.js +++ b/lib/transports/wsver/8.js @@ -366,10 +366,11 @@ Parser.prototype.expect = function(what, length, handler) { Parser.prototype.processPacket = function (data) { if ((data[0] & 0x70) != 0) this.error('reserved fields not empty'); - if ((data[0] & 0x80) == 0x80) this.state.lastFragment = true; + this.state.lastFragment = (data[0] & 0x80) == 0x80; this.state.masked = (data[1] & 0x80) == 0x80; var opcode = data[0] & 0xf; if (opcode == 0) { + // continuation frame if (this.state.opcode != 1 || this.state.opcode != 2) { this.error('continuation frame cannot follow current opcode') return; From 71e013a1972aa0e090dff414f4ad6a61ecc7fbe0 Mon Sep 17 00:00:00 2001 From: einaros Date: Sat, 27 Aug 2011 19:21:01 +0200 Subject: [PATCH 10/28] added test and support for really long messages, but capped by 32 bit --- lib/transports/wsver/8.js | 5 +++++ test/transports.websocket.hybi10-parser.test.js | 17 ++++++++++++++++- 2 files changed, 21 insertions(+), 1 deletion(-) diff --git a/lib/transports/wsver/8.js b/lib/transports/wsver/8.js index 467d8d9b2c..763b8ae56e 100644 --- a/lib/transports/wsver/8.js +++ b/lib/transports/wsver/8.js @@ -232,6 +232,11 @@ function Parser () { } else if (firstLength == 127) { self.expect('Length', 8, function(data) { + if (util.unpack(data.slice(0, 4)) != 0) { + self.error('packets with length spanning more than 32 bit is currently not supported'); + return; + } + var lengthBytes = data.slice(4); // cap to 32 bit length expectData(util.unpack(data)); }); } diff --git a/test/transports.websocket.hybi10-parser.test.js b/test/transports.websocket.hybi10-parser.test.js index 78f451f6cb..02d36f8852 100644 --- a/test/transports.websocket.hybi10-parser.test.js +++ b/test/transports.websocket.hybi10-parser.test.js @@ -90,7 +90,7 @@ module.exports = { p.add(makeBufferFromHexString(packet)); assert.ok(gotData); }, - 'can parse a masked text message longer of 300 bytes': function() { + 'can parse a masked text message longer than 125 bytes': function() { var p = new Parser(); var message = 'A'; for (var i = 0; i < 300; ++i) message += (i % 5).toString(); @@ -105,6 +105,21 @@ module.exports = { p.add(makeBufferFromHexString(packet)); assert.ok(gotData); }, + 'can parse a really long masked text message': function() { + var p = new Parser(); + var message = 'A'; + for (var i = 0; i < 64*1024; ++i) message += (i % 5).toString(); + var packet = '81 FF ' + pack(16, message.length) + ' 34 83 a8 68 ' + dump(mask(message, '34 83 a8 68')); + + var gotData = false; + p.on('data', function(data) { + gotData = true; + assert.equal(message, data); + }); + + p.add(makeBufferFromHexString(packet)); + assert.ok(gotData); + }, 'can parse a fragmented masked text message of 300 bytes': function() { var p = new Parser(); var message = 'A'; From 169ad8245f2a3d8c3725bd41651ac2240b5ef39f Mon Sep 17 00:00:00 2001 From: Guillermo Rauch Date: Sat, 27 Aug 2011 11:42:55 -0700 Subject: [PATCH 11/28] Release 0.7.10 --- History.md | 5 +++++ lib/socket.io.js | 2 +- package.json | 4 ++-- 3 files changed, 8 insertions(+), 3 deletions(-) diff --git a/History.md b/History.md index f4d472c5c3..b0ec35ee9d 100644 --- a/History.md +++ b/History.md @@ -1,4 +1,9 @@ +0.7.10 / 2011-08-27 +=================== + + * Updated socket.io-client. + 0.7.9 / 2011-08-12 ================== diff --git a/lib/socket.io.js b/lib/socket.io.js index 55870b5d43..d3c5d4b66b 100644 --- a/lib/socket.io.js +++ b/lib/socket.io.js @@ -15,7 +15,7 @@ var client = require('socket.io-client'); * Version. */ -exports.version = '0.7.9'; +exports.version = '0.7.10'; /** * Supported protocol version. diff --git a/package.json b/package.json index 436af0d198..317f616672 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "socket.io" - , "version": "0.7.9" + , "version": "0.7.10" , "description": "Real-time apps made cross-browser & easy with a WebSocket-like API" , "homepage": "http://socket.io" , "keywords": ["websocket", "socket", "realtime", "socket.io", "comet", "ajax"] @@ -15,7 +15,7 @@ , "url": "https://github.com/LearnBoost/Socket.IO-node.git" } , "dependencies": { - "socket.io-client": "0.7.9" + "socket.io-client": "0.7.10" , "policyfile": "0.0.4" , "redis": "0.6.6" } From 51867889695a2b967c90b3542608eaeccf60b6ae Mon Sep 17 00:00:00 2001 From: einaros Date: Sat, 27 Aug 2011 23:43:16 +0200 Subject: [PATCH 12/28] cleanups and comments --- lib/transports/wsver/8.js | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/lib/transports/wsver/8.js b/lib/transports/wsver/8.js index 763b8ae56e..38c6e0584e 100644 --- a/lib/transports/wsver/8.js +++ b/lib/transports/wsver/8.js @@ -74,8 +74,6 @@ WebSocket.prototype.name = 'websocket'; WebSocket.prototype.onSocketConnect = function () { var self = this; - this.socket.setNoDelay(true); - if (this.req.headers.upgrade !== 'websocket') { this.log.warn(this.name + ' connection invalid'); this.end(); @@ -133,6 +131,12 @@ WebSocket.prototype.write = function (data) { } }; +/** + * Frame server-to-client output as a text packet. + * + * @api private + */ + WebSocket.prototype.frame = function (opcode, data) { var startOffset = 2, secondByte = data.length, buf; if (data.length > 125) { @@ -236,11 +240,12 @@ function Parser () { self.error('packets with length spanning more than 32 bit is currently not supported'); return; } - var lengthBytes = data.slice(4); // cap to 32 bit length + var lengthBytes = data.slice(4); // note: cap to 32 bit length expectData(util.unpack(data)); }); } }, + // close '8': function(data) { self.emit('close'); self.reset(); From c4b23246b4ea120c3fff34871e73ddd992e49f1e Mon Sep 17 00:00:00 2001 From: Guillermo Rauch Date: Sat, 27 Aug 2011 15:29:33 -0700 Subject: [PATCH 13/28] Release 0.7.11 --- History.md | 5 +++++ lib/socket.io.js | 2 +- package.json | 4 ++-- 3 files changed, 8 insertions(+), 3 deletions(-) diff --git a/History.md b/History.md index b0ec35ee9d..ef86c525af 100644 --- a/History.md +++ b/History.md @@ -1,4 +1,9 @@ +0.7.11 / 2011-08-27 +=================== + + * Updated socket.io-client. + 0.7.10 / 2011-08-27 =================== diff --git a/lib/socket.io.js b/lib/socket.io.js index d3c5d4b66b..aedc044c34 100644 --- a/lib/socket.io.js +++ b/lib/socket.io.js @@ -15,7 +15,7 @@ var client = require('socket.io-client'); * Version. */ -exports.version = '0.7.10'; +exports.version = '0.7.11'; /** * Supported protocol version. diff --git a/package.json b/package.json index 317f616672..7fd93d21cf 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "socket.io" - , "version": "0.7.10" + , "version": "0.7.11" , "description": "Real-time apps made cross-browser & easy with a WebSocket-like API" , "homepage": "http://socket.io" , "keywords": ["websocket", "socket", "realtime", "socket.io", "comet", "ajax"] @@ -15,7 +15,7 @@ , "url": "https://github.com/LearnBoost/Socket.IO-node.git" } , "dependencies": { - "socket.io-client": "0.7.10" + "socket.io-client": "0.7.11" , "policyfile": "0.0.4" , "redis": "0.6.6" } From 94fdbadaec49d46d696243386297cd40efcca9f0 Mon Sep 17 00:00:00 2001 From: einaros Date: Sun, 28 Aug 2011 10:44:07 +0200 Subject: [PATCH 14/28] added initial hybi07 protocol parser --- lib/transports/wsver/7.js | 461 ++++++++++++++++++++++++++++++++++ lib/transports/wsver/index.js | 5 +- 2 files changed, 464 insertions(+), 2 deletions(-) create mode 100644 lib/transports/wsver/7.js diff --git a/lib/transports/wsver/7.js b/lib/transports/wsver/7.js new file mode 100644 index 0000000000..38c6e0584e --- /dev/null +++ b/lib/transports/wsver/7.js @@ -0,0 +1,461 @@ + +/*! + * socket.io-node + * Copyright(c) 2011 LearnBoost + * MIT Licensed + */ + +/** + * Module requirements. + */ + +var Transport = require('../../transport') + , EventEmitter = process.EventEmitter + , crypto = require('crypto') + , parser = require('../../parser') + , util = require('../../util'); + +/** + * Export the constructor. + */ + +exports = module.exports = WebSocket; +exports.Parser = Parser; + +/** + * HTTP interface constructor. Interface compatible with all transports that + * depend on request-response cycles. + * + * @api public + */ + +function WebSocket (mng, data, req) { + // parser + var self = this; + + this.parser = new Parser(); + this.parser.on('data', function (packet) { + self.onMessage(parser.decodePacket(packet)); + }); + this.parser.on('ping', function () { + // version 8 ping => pong + this.socket.write('\u008a\u0000'); + }); + this.parser.on('close', function () { + self.end(); + }); + this.parser.on('error', function () { + self.end(); + }); + + Transport.call(this, mng, data, req); +}; + +/** + * Inherits from Transport. + */ + +WebSocket.prototype.__proto__ = Transport.prototype; + +/** + * Transport name + * + * @api public + */ + +WebSocket.prototype.name = 'websocket'; + +/** + * Called when the socket connects. + * + * @api private + */ + +WebSocket.prototype.onSocketConnect = function () { + var self = this; + + if (this.req.headers.upgrade !== 'websocket') { + this.log.warn(this.name + ' connection invalid'); + this.end(); + return; + } + + var origin = this.req.headers.origin + , location = (this.socket.encrypted ? 'wss' : 'ws') + + '://' + this.req.headers.host + this.req.url; + + if (!this.req.headers['sec-websocket-key']) { + this.log.warn(this.name + ' connection invalid: received no key'); + this.end(); + return; + } + + // calc key + var key = this.req.headers['sec-websocket-key']; + var shasum = crypto.createHash('sha1'); + shasum.update(key + "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"); + key = shasum.digest('base64'); + + var headers = [ + 'HTTP/1.1 101 Switching Protocols' + , 'Upgrade: websocket' + , 'Connection: Upgrade' + , 'Sec-WebSocket-Accept: ' + key + ]; + + try { + this.socket.write(headers.concat('', '').join('\r\n')); + this.socket.setTimeout(0); + this.socket.setNoDelay(true); + } catch (e) { + this.end(); + return; + } + + this.socket.on('data', function (data) { + self.parser.add(data); + }); +}; + +/** + * Writes to the socket. + * + * @api private + */ + +WebSocket.prototype.write = function (data) { + if (this.open) { + var buf = this.frame(0x81, data); + this.socket.write(buf, 'binary'); + this.log.debug(this.name + ' writing', data); + } +}; + +/** + * Frame server-to-client output as a text packet. + * + * @api private + */ + +WebSocket.prototype.frame = function (opcode, data) { + var startOffset = 2, secondByte = data.length, buf; + if (data.length > 125) { + // opcode = 0x81; + startOffset += 2; + secondByte = 126; + } + if (data.length > 65536) { + startOffset += 6; + secondByte = 127; + } + buf = new Buffer(data.length + startOffset, 'utf8'); + buf[0] = opcode; + buf[1] = secondByte; + switch (secondByte) { + case 126: + buf[2] = data.length >>> 8; + buf[3] = data.length % 256; + break; + case 127: + for (var i = 6; i > 2; i--) { + buf[i] = secondByte % 256; + secondByte >>>= 8; + } + } + buf.write(data, startOffset, 'utf8'); + return buf; +}; + +/** + * Closes the connection. + * + * @api private + */ + +WebSocket.prototype.doClose = function () { + this.socket.end(); +}; + +/** + * WebSocket parser + * + * @api public + */ + +function Parser () { + this.state = { + activeFragmentedOperation: null, + lastFragment: false, + masked: false, + opcode: 0 + }; + this.overflow = null; + this.expectOffset = 0; + this.expectBuffer = null; + this.expectHandler = null; + this.currentMessage = ''; + + var self = this; + this.opcodeHandlers = { + // text + '1': function(data) { + var finish = function(mask, data) { + self.currentMessage += self.unmask(mask, data); + if (self.state.lastFragment) { + self.emit('data', self.currentMessage); + self.currentMessage = ''; + } + self.endPacket(); + } + + var expectData = function(length) { + if (self.state.masked) { + self.expect('Mask', 4, function(data) { + var mask = data; + self.expect('Data', length, function(data) { + finish(mask, data); + }); + }); + } + else { + self.expect('Data', length, function(data) { + finish(null, data); + }); + } + } + + // decode length + var firstLength = data[1] & 0x7f; + if (firstLength < 126) { + expectData(firstLength); + } + else if (firstLength == 126) { + self.expect('Length', 2, function(data) { + expectData(util.unpack(data)); + }); + } + else if (firstLength == 127) { + self.expect('Length', 8, function(data) { + if (util.unpack(data.slice(0, 4)) != 0) { + self.error('packets with length spanning more than 32 bit is currently not supported'); + return; + } + var lengthBytes = data.slice(4); // note: cap to 32 bit length + expectData(util.unpack(data)); + }); + } + }, + // close + '8': function(data) { + self.emit('close'); + self.reset(); + }, + // ping + '9': function(data) { + if (self.state.lastFragment == false) { + self.error('fragmented ping is not supported'); + return; + } + + var finish = function(mask, data) { + self.emit('ping', self.unmask(mask, data)); + self.endPacket(); + } + + var expectData = function(length) { + if (self.state.masked) { + self.expect('Mask', 4, function(data) { + var mask = data; + self.expect('Data', length, function(data) { + finish(mask, data); + }); + }); + } + else { + self.expect('Data', length, function(data) { + finish(null, data); + }); + } + } + + // decode length + var firstLength = data[1] & 0x7f; + if (firstLength == 0) { + finish(null, null); + } + else if (firstLength < 126) { + expectData(firstLength); + } + else if (firstLength == 126) { + self.expect('Length', 2, function(data) { + expectData(util.unpack(data)); + }); + } + else if (firstLength == 127) { + self.expect('Length', 8, function(data) { + expectData(util.unpack(data)); + }); + } + } + } + + this.expect('Opcode', 2, this.processPacket); +}; + +/** + * Inherits from EventEmitter. + */ + +Parser.prototype.__proto__ = EventEmitter.prototype; + +/** + * Add new data to the parser. + * + * @api public + */ + +Parser.prototype.add = function(data) { + if (this.expectBuffer == null) { + this.addToOverflow(data); + return; + } + var toRead = Math.min(data.length, this.expectBuffer.length - this.expectOffset); + data.copy(this.expectBuffer, this.expectOffset, 0, toRead); + this.expectOffset += toRead; + if (toRead < data.length) { + // at this point the overflow buffer shouldn't at all exist + this.overflow = new Buffer(data.length - toRead); + data.copy(this.overflow, 0, toRead, toRead + this.overflow.length); + } + if (this.expectOffset == this.expectBuffer.length) { + var bufferForHandler = this.expectBuffer; + this.expectBuffer = null; + this.expectOffset = 0; + this.expectHandler.call(this, bufferForHandler); + } +} + +/** + * Adds a piece of data to the overflow. + * + * @api private + */ + +Parser.prototype.addToOverflow = function(data) { + if (this.overflow == null) this.overflow = data; + else { + var prevOverflow = this.overflow; + this.overflow = new Buffer(this.overflow.length + data.length); + prevOverflow.copy(this.overflow, 0); + data.copy(this.overflow, prevOverflow.length); + } +} + +/** + * Waits for a certain amount of bytes to be available, then fires a callback. + * + * @api private + */ + +Parser.prototype.expect = function(what, length, handler) { + this.expectBuffer = new Buffer(length); + this.expectOffset = 0; + this.expectHandler = handler; + if (this.overflow != null) { + var toOverflow = this.overflow; + this.overflow = null; + this.add(toOverflow); + } +} + +/** + * Start processing a new packet. + * + * @api private + */ + +Parser.prototype.processPacket = function (data) { + if ((data[0] & 0x70) != 0) this.error('reserved fields not empty'); + this.state.lastFragment = (data[0] & 0x80) == 0x80; + this.state.masked = (data[1] & 0x80) == 0x80; + var opcode = data[0] & 0xf; + if (opcode == 0) { + // continuation frame + if (this.state.opcode != 1 || this.state.opcode != 2) { + this.error('continuation frame cannot follow current opcode') + return; + } + } + else this.state.opcode = opcode; + this.state.opcode = data[0] & 0xf; + var handler = this.opcodeHandlers[this.state.opcode]; + if (typeof handler == 'undefined') this.error('no handler for opcode ' + this.state.opcode); + else handler(data); +} + +/** + * Endprocessing a packet. + * + * @api private + */ + +Parser.prototype.endPacket = function() { + this.expectOffset = 0; + this.expectBuffer = null; + this.expectHandler = null; + if (this.state.lastFragment && this.state.opcode == this.state.activeFragmentedOperation) { + // end current fragmented operation + this.state.activeFragmentedOperation = null; + } + this.state.lastFragment = false; + this.state.opcode = this.state.activeFragmentedOperation != null ? this.state.activeFragmentedOperation : 0; + this.state.masked = false; + this.expect('Opcode', 2, this.processPacket); +} + +/** + * Reset the parser state. + * + * @api private + */ + +Parser.prototype.reset = function() { + this.state = { + activeFragmentedOperation: null, + lastFragment: false, + masked: false, + opcode: 0 + }; + this.expectOffset = 0; + this.expectBuffer = null; + this.expectHandler = null; + this.overflow = null; + this.currentMessage = ''; +} + +/** + * Unmask received data. + * + * @api private + */ + +Parser.prototype.unmask = function (mask, buf) { + if (mask != null) { + for (var i = 0, ll = buf.length; i < ll; i++) { + buf[i] ^= mask[i % 4]; + } + } + return buf != null ? buf.toString('utf8') : ''; +} + +/** + * Handles an error + * + * @api private + */ + +Parser.prototype.error = function (reason) { + this.reset(); + this.emit('error', reason); + return this; +}; diff --git a/lib/transports/wsver/index.js b/lib/transports/wsver/index.js index 6f94f5975a..59dc5dd53c 100644 --- a/lib/transports/wsver/index.js +++ b/lib/transports/wsver/index.js @@ -4,6 +4,7 @@ */ module.exports = { - 8: require('./8') - , default: require('./default') + 7: require('./7'), + 8: require('./8'), + default: require('./default') }; From b69dac6f4d34222bed133d4b90505e47385441b5 Mon Sep 17 00:00:00 2001 From: einaros Date: Sun, 28 Aug 2011 11:19:22 +0200 Subject: [PATCH 15/28] added hybi07 tests --- ...transports.websocket.hybi07-parser.test.js | 235 ++++++++++++++++++ 1 file changed, 235 insertions(+) create mode 100644 test/transports.websocket.hybi07-parser.test.js diff --git a/test/transports.websocket.hybi07-parser.test.js b/test/transports.websocket.hybi07-parser.test.js new file mode 100644 index 0000000000..fe8749f292 --- /dev/null +++ b/test/transports.websocket.hybi07-parser.test.js @@ -0,0 +1,235 @@ +var assert = require('assert'); +var Parser = require('../lib/transports/wsver/7.js').Parser; + +function makeBufferFromHexString(byteStr) { + var bytes = byteStr.split(' '); + var buf = new Buffer(bytes.length); + for (var i = 0; i < bytes.length; ++i) { + buf[i] = parseInt(bytes[i], 16); + } + return buf; +} + +function splitBuffer(buffer) { + var b1 = new Buffer(Math.ceil(buffer.length / 2)); + var b2 = new Buffer(Math.floor(buffer.length / 2)); + buffer.copy(b1, 0, 0, b1.length); + buffer.copy(b2, 0, b1.length, b1.length + b2.length); + return [b1, b2]; +} + +function mask(str, maskString) { + var buf = new Buffer(str); + var mask = makeBufferFromHexString(maskString || '34 83 a8 68'); + for (var i = 0; i < buf.length; ++i) { + buf[i] ^= mask[i % 4]; + } + return buf; +} + +function unpack(buffer) { + var n = 0; + for (var i = 0; i < buffer.length; ++i) { + n = (i == 0) ? buffer[i] : (n * 256) + buffer[i]; + } + return n; +} + +function pack(length, number) { + return padl(number.toString(16), length, '0').replace(/(\d\d)/g, '$1 ').trim(); +} + +function padl(s,n,c) { + return new Array(1 + n - s.length).join(c) + s; +} + +function dump(data) { + var s = ''; + for (var i = 0; i < data.length; ++i) { + s += padl(data[i].toString(16), 2, '0') + ' '; + } + return s.trim(); +} + +module.exports = { + 'can parse unmasked text message': function() { + var p = new Parser(); + var packet = '81 05 48 65 6c 6c 6f'; + + var gotData = false; + p.on('data', function(data) { + gotData = true; + assert.equal('Hello', data); + }); + + p.add(makeBufferFromHexString(packet)); + assert.ok(gotData); + }, + 'can parse close message': function() { + var p = new Parser(); + var packet = '88 00'; + + var gotClose = false; + p.on('close', function(data) { + gotClose = true; + }); + + p.add(makeBufferFromHexString(packet)); + assert.ok(gotClose); + }, + 'can parse masked text message': function() { + var p = new Parser(); + var packet = '81 93 34 83 a8 68 01 b9 92 52 4f a1 c6 09 59 e6 8a 52 16 e6 cb 00 5b a1 d5'; + + var gotData = false; + p.on('data', function(data) { + gotData = true; + assert.equal('5:::{"name":"echo"}', data); + }); + + p.add(makeBufferFromHexString(packet)); + assert.ok(gotData); + }, + 'can parse a masked text message longer than 125 bytes': function() { + var p = new Parser(); + var message = 'A'; + for (var i = 0; i < 300; ++i) message += (i % 5).toString(); + var packet = '81 FE ' + pack(4, message.length) + ' 34 83 a8 68 ' + dump(mask(message, '34 83 a8 68')); + + var gotData = false; + p.on('data', function(data) { + gotData = true; + assert.equal(message, data); + }); + + p.add(makeBufferFromHexString(packet)); + assert.ok(gotData); + }, + 'can parse a really long masked text message': function() { + var p = new Parser(); + var message = 'A'; + for (var i = 0; i < 64*1024; ++i) message += (i % 5).toString(); + var packet = '81 FF ' + pack(16, message.length) + ' 34 83 a8 68 ' + dump(mask(message, '34 83 a8 68')); + + var gotData = false; + p.on('data', function(data) { + gotData = true; + assert.equal(message, data); + }); + + p.add(makeBufferFromHexString(packet)); + assert.ok(gotData); + }, + 'can parse a fragmented masked text message of 300 bytes': function() { + var p = new Parser(); + var message = 'A'; + for (var i = 0; i < 300; ++i) message += (i % 5).toString(); + var msgpiece1 = message.substr(0, 150); + var msgpiece2 = message.substr(150); + var packet1 = '01 FE ' + pack(4, msgpiece1.length) + ' 34 83 a8 68 ' + dump(mask(msgpiece1, '34 83 a8 68')); + var packet2 = '81 FE ' + pack(4, msgpiece2.length) + ' 34 83 a8 68 ' + dump(mask(msgpiece2, '34 83 a8 68')); + + var gotData = false; + p.on('data', function(data) { + gotData = true; + assert.equal(message, data); + }); + + p.add(makeBufferFromHexString(packet1)); + p.add(makeBufferFromHexString(packet2)); + assert.ok(gotData); + }, + 'can parse a ping message': function() { + var p = new Parser(); + var message = 'Hello'; + var packet = '89 FE ' + pack(4, message.length) + ' 34 83 a8 68 ' + dump(mask(message, '34 83 a8 68')); + + var gotPing = false; + p.on('ping', function(data) { + gotPing = true; + assert.equal(message, data); + }); + + p.add(makeBufferFromHexString(packet)); + assert.ok(gotPing); + }, + 'can parse a ping with no data': function() { + var p = new Parser(); + var packet = '89 00'; + + var gotPing = false; + p.on('ping', function(data) { + gotPing = true; + }); + + p.add(makeBufferFromHexString(packet)); + assert.ok(gotPing); + }, + 'can parse a fragmented masked text message of 300 bytes with a ping in the middle': function() { + var p = new Parser(); + var message = 'A'; + for (var i = 0; i < 300; ++i) message += (i % 5).toString(); + + var msgpiece1 = message.substr(0, 150); + var packet1 = '01 FE ' + pack(4, msgpiece1.length) + ' 34 83 a8 68 ' + dump(mask(msgpiece1, '34 83 a8 68')); + + var pingMessage = 'Hello'; + var pingPacket = '89 FE ' + pack(4, pingMessage.length) + ' 34 83 a8 68 ' + dump(mask(pingMessage, '34 83 a8 68')); + + var msgpiece2 = message.substr(150); + var packet2 = '81 FE ' + pack(4, msgpiece2.length) + ' 34 83 a8 68 ' + dump(mask(msgpiece2, '34 83 a8 68')); + + var gotData = false; + p.on('data', function(data) { + gotData = true; + assert.equal(message, data); + }); + var gotPing = false; + p.on('ping', function(data) { + gotPing = true; + assert.equal(pingMessage, data); + }); + + p.add(makeBufferFromHexString(packet1)); + p.add(makeBufferFromHexString(pingPacket)); + p.add(makeBufferFromHexString(packet2)); + assert.ok(gotData); + assert.ok(gotPing); + }, + 'can parse a fragmented masked text message of 300 bytes with a ping in the middle, which is delievered over sevaral tcp packets': function() { + var p = new Parser(); + var message = 'A'; + for (var i = 0; i < 300; ++i) message += (i % 5).toString(); + + var msgpiece1 = message.substr(0, 150); + var packet1 = '01 FE ' + pack(4, msgpiece1.length) + ' 34 83 a8 68 ' + dump(mask(msgpiece1, '34 83 a8 68')); + + var pingMessage = 'Hello'; + var pingPacket = '89 FE ' + pack(4, pingMessage.length) + ' 34 83 a8 68 ' + dump(mask(pingMessage, '34 83 a8 68')); + + var msgpiece2 = message.substr(150); + var packet2 = '81 FE ' + pack(4, msgpiece2.length) + ' 34 83 a8 68 ' + dump(mask(msgpiece2, '34 83 a8 68')); + + var gotData = false; + p.on('data', function(data) { + gotData = true; + assert.equal(message, data); + }); + var gotPing = false; + p.on('ping', function(data) { + gotPing = true; + assert.equal(pingMessage, data); + }); + + var buffers = []; + buffers = buffers.concat(splitBuffer(makeBufferFromHexString(packet1))); + buffers = buffers.concat(splitBuffer(makeBufferFromHexString(pingPacket))); + buffers = buffers.concat(splitBuffer(makeBufferFromHexString(packet2))); + for (var i = 0; i < buffers.length; ++i) { + p.add(buffers[i]); + } + assert.ok(gotData); + assert.ok(gotPing); + }, +}; + From 444229a9dcb889e9d95e34c68d088449c0151529 Mon Sep 17 00:00:00 2001 From: Guillermo Rauch Date: Sun, 28 Aug 2011 15:41:13 -0700 Subject: [PATCH 16/28] Changed protocols require path. --- lib/transports/websocket.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/transports/websocket.js b/lib/transports/websocket.js index 2082bd6fa1..84db53119e 100644 --- a/lib/transports/websocket.js +++ b/lib/transports/websocket.js @@ -9,7 +9,7 @@ * Module requirements. */ -var protocolVersions = require('./wsver'); +var protocolVersions = require('./websocket/'); /** * Export the constructor. From b7998e815aaad359be16cf68e2bebe5950252d12 Mon Sep 17 00:00:00 2001 From: Guillermo Rauch Date: Sun, 28 Aug 2011 15:41:55 -0700 Subject: [PATCH 17/28] Renamed wsver/ -> websocket/ --- lib/transports/{wsver => websocket}/7.js | 0 lib/transports/{wsver => websocket}/8.js | 0 lib/transports/{wsver => websocket}/default.js | 0 lib/transports/{wsver => websocket}/index.js | 0 4 files changed, 0 insertions(+), 0 deletions(-) rename lib/transports/{wsver => websocket}/7.js (100%) rename lib/transports/{wsver => websocket}/8.js (100%) rename lib/transports/{wsver => websocket}/default.js (100%) rename lib/transports/{wsver => websocket}/index.js (100%) diff --git a/lib/transports/wsver/7.js b/lib/transports/websocket/7.js similarity index 100% rename from lib/transports/wsver/7.js rename to lib/transports/websocket/7.js diff --git a/lib/transports/wsver/8.js b/lib/transports/websocket/8.js similarity index 100% rename from lib/transports/wsver/8.js rename to lib/transports/websocket/8.js diff --git a/lib/transports/wsver/default.js b/lib/transports/websocket/default.js similarity index 100% rename from lib/transports/wsver/default.js rename to lib/transports/websocket/default.js diff --git a/lib/transports/wsver/index.js b/lib/transports/websocket/index.js similarity index 100% rename from lib/transports/wsver/index.js rename to lib/transports/websocket/index.js From c8938a99b24eaa506e6db09cbe029b2fcfd205ef Mon Sep 17 00:00:00 2001 From: Guillermo Rauch Date: Sun, 28 Aug 2011 15:42:19 -0700 Subject: [PATCH 18/28] Release 0.8.0 --- History.md | 8 ++++++++ lib/socket.io.js | 2 +- package.json | 5 +++-- 3 files changed, 12 insertions(+), 3 deletions(-) diff --git a/History.md b/History.md index ef86c525af..6955bfc738 100644 --- a/History.md +++ b/History.md @@ -1,4 +1,12 @@ +0.8.0 / 2011-08-28 +================== + + * Updated to work with two-level websocket versioning. [einaros] + * Added hybi07 support. [einaros] + * Added hybi10 support. [einaros] + * Added http referrer verification to manager.js verifyOrigin. [einaors] + 0.7.11 / 2011-08-27 =================== diff --git a/lib/socket.io.js b/lib/socket.io.js index aedc044c34..5d23c120ef 100644 --- a/lib/socket.io.js +++ b/lib/socket.io.js @@ -15,7 +15,7 @@ var client = require('socket.io-client'); * Version. */ -exports.version = '0.7.11'; +exports.version = '0.8.0'; /** * Supported protocol version. diff --git a/package.json b/package.json index 7fd93d21cf..e3837d2f53 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "socket.io" - , "version": "0.7.11" + , "version": "0.8.0" , "description": "Real-time apps made cross-browser & easy with a WebSocket-like API" , "homepage": "http://socket.io" , "keywords": ["websocket", "socket", "realtime", "socket.io", "comet", "ajax"] @@ -9,13 +9,14 @@ { "name": "Guillermo Rauch", "email": "rauchg@gmail.com" } , { "name": "Arnout Kazemier", "email": "info@3rd-eden.com" } , { "name": "Vladimir Dronnikov", "email": "dronnikov@gmail.com" } + , { "name": "Einar Otto Stangvik", "email": "einaros@gmail.com" } ] , "repository":{ "type": "git" , "url": "https://github.com/LearnBoost/Socket.IO-node.git" } , "dependencies": { - "socket.io-client": "0.7.11" + "socket.io-client": "0.8.0" , "policyfile": "0.0.4" , "redis": "0.6.6" } From ce4c46b37dbf1db8f8ca1067ffa9f9f65c700065 Mon Sep 17 00:00:00 2001 From: einaros Date: Mon, 29 Aug 2011 07:30:29 +0200 Subject: [PATCH 19/28] fixed Parser library path and did some code cleanup --- ...transports.websocket.hybi07-parser.test.js | 42 +++++++++++++++++-- ...transports.websocket.hybi10-parser.test.js | 42 +++++++++++++++++-- 2 files changed, 78 insertions(+), 6 deletions(-) diff --git a/test/transports.websocket.hybi07-parser.test.js b/test/transports.websocket.hybi07-parser.test.js index fe8749f292..8c53ea9a60 100644 --- a/test/transports.websocket.hybi07-parser.test.js +++ b/test/transports.websocket.hybi07-parser.test.js @@ -1,5 +1,13 @@ +/** + * Test dependencies. + */ + var assert = require('assert'); -var Parser = require('../lib/transports/wsver/7.js').Parser; +var Parser = require('../lib/transports/websocket/7.js').Parser; + +/** + * Returns a Buffer from a "ff 00 ff"-type hex string. + */ function makeBufferFromHexString(byteStr) { var bytes = byteStr.split(' '); @@ -10,14 +18,22 @@ function makeBufferFromHexString(byteStr) { return buf; } +/** + * Splits a buffer in two parts. + */ + function splitBuffer(buffer) { var b1 = new Buffer(Math.ceil(buffer.length / 2)); - var b2 = new Buffer(Math.floor(buffer.length / 2)); buffer.copy(b1, 0, 0, b1.length); + var b2 = new Buffer(Math.floor(buffer.length / 2)); buffer.copy(b2, 0, b1.length, b1.length + b2.length); return [b1, b2]; } +/** + * Performs hybi07+ type masking on a hex string. + */ + function mask(str, maskString) { var buf = new Buffer(str); var mask = makeBufferFromHexString(maskString || '34 83 a8 68'); @@ -27,6 +43,10 @@ function mask(str, maskString) { return buf; } +/** + * Unpacks a Buffer into a number. + */ + function unpack(buffer) { var n = 0; for (var i = 0; i < buffer.length; ++i) { @@ -35,14 +55,26 @@ function unpack(buffer) { return n; } +/** + * Returns a hex string, representing a specific byte count 'length', from a number. + */ + function pack(length, number) { return padl(number.toString(16), length, '0').replace(/(\d\d)/g, '$1 ').trim(); } -function padl(s,n,c) { +/** + * Left pads the string 's' to a total length of 'n' with char 'c'. + */ + +function padl(s, n, c) { return new Array(1 + n - s.length).join(c) + s; } +/** + * Returns a hex string from a Buffer. + */ + function dump(data) { var s = ''; for (var i = 0; i < data.length; ++i) { @@ -51,6 +83,10 @@ function dump(data) { return s.trim(); } +/** + * Tests. + */ + module.exports = { 'can parse unmasked text message': function() { var p = new Parser(); diff --git a/test/transports.websocket.hybi10-parser.test.js b/test/transports.websocket.hybi10-parser.test.js index 02d36f8852..e7bc4a9000 100644 --- a/test/transports.websocket.hybi10-parser.test.js +++ b/test/transports.websocket.hybi10-parser.test.js @@ -1,5 +1,13 @@ +/** + * Test dependencies. + */ + var assert = require('assert'); -var Parser = require('../lib/transports/wsver/8.js').Parser; +var Parser = require('../lib/transports/websocket/8.js').Parser; + +/** + * Returns a Buffer from a "ff 00 ff"-type hex string. + */ function makeBufferFromHexString(byteStr) { var bytes = byteStr.split(' '); @@ -10,14 +18,22 @@ function makeBufferFromHexString(byteStr) { return buf; } +/** + * Splits a buffer in two parts. + */ + function splitBuffer(buffer) { var b1 = new Buffer(Math.ceil(buffer.length / 2)); - var b2 = new Buffer(Math.floor(buffer.length / 2)); buffer.copy(b1, 0, 0, b1.length); + var b2 = new Buffer(Math.floor(buffer.length / 2)); buffer.copy(b2, 0, b1.length, b1.length + b2.length); return [b1, b2]; } +/** + * Performs hybi07+ type masking on a hex string. + */ + function mask(str, maskString) { var buf = new Buffer(str); var mask = makeBufferFromHexString(maskString || '34 83 a8 68'); @@ -27,6 +43,10 @@ function mask(str, maskString) { return buf; } +/** + * Unpacks a Buffer into a number. + */ + function unpack(buffer) { var n = 0; for (var i = 0; i < buffer.length; ++i) { @@ -35,14 +55,26 @@ function unpack(buffer) { return n; } +/** + * Returns a hex string, representing a specific byte count 'length', from a number. + */ + function pack(length, number) { return padl(number.toString(16), length, '0').replace(/(\d\d)/g, '$1 ').trim(); } -function padl(s,n,c) { +/** + * Left pads the string 's' to a total length of 'n' with char 'c'. + */ + +function padl(s, n, c) { return new Array(1 + n - s.length).join(c) + s; } +/** + * Returns a hex string from a Buffer. + */ + function dump(data) { var s = ''; for (var i = 0; i < data.length; ++i) { @@ -51,6 +83,10 @@ function dump(data) { return s.trim(); } +/** + * Tests. + */ + module.exports = { 'can parse unmasked text message': function() { var p = new Parser(); From 48dadd8e10e1015a87ef77e27f397a97c08a1b79 Mon Sep 17 00:00:00 2001 From: einaros Date: Mon, 29 Aug 2011 07:56:40 +0200 Subject: [PATCH 20/28] joined compatible hybi protocol handlers and updated test reference --- lib/transports/websocket/8.js | 461 ------------------ .../websocket/{7.js => hybi-07-12.js} | 0 lib/transports/websocket/index.js | 4 +- ...sports.websocket.hybi07-12.parser.test.js} | 2 +- ...transports.websocket.hybi10-parser.test.js | 271 ---------- 5 files changed, 3 insertions(+), 735 deletions(-) delete mode 100644 lib/transports/websocket/8.js rename lib/transports/websocket/{7.js => hybi-07-12.js} (100%) rename test/{transports.websocket.hybi07-parser.test.js => transports.websocket.hybi07-12.parser.test.js} (99%) delete mode 100644 test/transports.websocket.hybi10-parser.test.js diff --git a/lib/transports/websocket/8.js b/lib/transports/websocket/8.js deleted file mode 100644 index 38c6e0584e..0000000000 --- a/lib/transports/websocket/8.js +++ /dev/null @@ -1,461 +0,0 @@ - -/*! - * socket.io-node - * Copyright(c) 2011 LearnBoost - * MIT Licensed - */ - -/** - * Module requirements. - */ - -var Transport = require('../../transport') - , EventEmitter = process.EventEmitter - , crypto = require('crypto') - , parser = require('../../parser') - , util = require('../../util'); - -/** - * Export the constructor. - */ - -exports = module.exports = WebSocket; -exports.Parser = Parser; - -/** - * HTTP interface constructor. Interface compatible with all transports that - * depend on request-response cycles. - * - * @api public - */ - -function WebSocket (mng, data, req) { - // parser - var self = this; - - this.parser = new Parser(); - this.parser.on('data', function (packet) { - self.onMessage(parser.decodePacket(packet)); - }); - this.parser.on('ping', function () { - // version 8 ping => pong - this.socket.write('\u008a\u0000'); - }); - this.parser.on('close', function () { - self.end(); - }); - this.parser.on('error', function () { - self.end(); - }); - - Transport.call(this, mng, data, req); -}; - -/** - * Inherits from Transport. - */ - -WebSocket.prototype.__proto__ = Transport.prototype; - -/** - * Transport name - * - * @api public - */ - -WebSocket.prototype.name = 'websocket'; - -/** - * Called when the socket connects. - * - * @api private - */ - -WebSocket.prototype.onSocketConnect = function () { - var self = this; - - if (this.req.headers.upgrade !== 'websocket') { - this.log.warn(this.name + ' connection invalid'); - this.end(); - return; - } - - var origin = this.req.headers.origin - , location = (this.socket.encrypted ? 'wss' : 'ws') - + '://' + this.req.headers.host + this.req.url; - - if (!this.req.headers['sec-websocket-key']) { - this.log.warn(this.name + ' connection invalid: received no key'); - this.end(); - return; - } - - // calc key - var key = this.req.headers['sec-websocket-key']; - var shasum = crypto.createHash('sha1'); - shasum.update(key + "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"); - key = shasum.digest('base64'); - - var headers = [ - 'HTTP/1.1 101 Switching Protocols' - , 'Upgrade: websocket' - , 'Connection: Upgrade' - , 'Sec-WebSocket-Accept: ' + key - ]; - - try { - this.socket.write(headers.concat('', '').join('\r\n')); - this.socket.setTimeout(0); - this.socket.setNoDelay(true); - } catch (e) { - this.end(); - return; - } - - this.socket.on('data', function (data) { - self.parser.add(data); - }); -}; - -/** - * Writes to the socket. - * - * @api private - */ - -WebSocket.prototype.write = function (data) { - if (this.open) { - var buf = this.frame(0x81, data); - this.socket.write(buf, 'binary'); - this.log.debug(this.name + ' writing', data); - } -}; - -/** - * Frame server-to-client output as a text packet. - * - * @api private - */ - -WebSocket.prototype.frame = function (opcode, data) { - var startOffset = 2, secondByte = data.length, buf; - if (data.length > 125) { - // opcode = 0x81; - startOffset += 2; - secondByte = 126; - } - if (data.length > 65536) { - startOffset += 6; - secondByte = 127; - } - buf = new Buffer(data.length + startOffset, 'utf8'); - buf[0] = opcode; - buf[1] = secondByte; - switch (secondByte) { - case 126: - buf[2] = data.length >>> 8; - buf[3] = data.length % 256; - break; - case 127: - for (var i = 6; i > 2; i--) { - buf[i] = secondByte % 256; - secondByte >>>= 8; - } - } - buf.write(data, startOffset, 'utf8'); - return buf; -}; - -/** - * Closes the connection. - * - * @api private - */ - -WebSocket.prototype.doClose = function () { - this.socket.end(); -}; - -/** - * WebSocket parser - * - * @api public - */ - -function Parser () { - this.state = { - activeFragmentedOperation: null, - lastFragment: false, - masked: false, - opcode: 0 - }; - this.overflow = null; - this.expectOffset = 0; - this.expectBuffer = null; - this.expectHandler = null; - this.currentMessage = ''; - - var self = this; - this.opcodeHandlers = { - // text - '1': function(data) { - var finish = function(mask, data) { - self.currentMessage += self.unmask(mask, data); - if (self.state.lastFragment) { - self.emit('data', self.currentMessage); - self.currentMessage = ''; - } - self.endPacket(); - } - - var expectData = function(length) { - if (self.state.masked) { - self.expect('Mask', 4, function(data) { - var mask = data; - self.expect('Data', length, function(data) { - finish(mask, data); - }); - }); - } - else { - self.expect('Data', length, function(data) { - finish(null, data); - }); - } - } - - // decode length - var firstLength = data[1] & 0x7f; - if (firstLength < 126) { - expectData(firstLength); - } - else if (firstLength == 126) { - self.expect('Length', 2, function(data) { - expectData(util.unpack(data)); - }); - } - else if (firstLength == 127) { - self.expect('Length', 8, function(data) { - if (util.unpack(data.slice(0, 4)) != 0) { - self.error('packets with length spanning more than 32 bit is currently not supported'); - return; - } - var lengthBytes = data.slice(4); // note: cap to 32 bit length - expectData(util.unpack(data)); - }); - } - }, - // close - '8': function(data) { - self.emit('close'); - self.reset(); - }, - // ping - '9': function(data) { - if (self.state.lastFragment == false) { - self.error('fragmented ping is not supported'); - return; - } - - var finish = function(mask, data) { - self.emit('ping', self.unmask(mask, data)); - self.endPacket(); - } - - var expectData = function(length) { - if (self.state.masked) { - self.expect('Mask', 4, function(data) { - var mask = data; - self.expect('Data', length, function(data) { - finish(mask, data); - }); - }); - } - else { - self.expect('Data', length, function(data) { - finish(null, data); - }); - } - } - - // decode length - var firstLength = data[1] & 0x7f; - if (firstLength == 0) { - finish(null, null); - } - else if (firstLength < 126) { - expectData(firstLength); - } - else if (firstLength == 126) { - self.expect('Length', 2, function(data) { - expectData(util.unpack(data)); - }); - } - else if (firstLength == 127) { - self.expect('Length', 8, function(data) { - expectData(util.unpack(data)); - }); - } - } - } - - this.expect('Opcode', 2, this.processPacket); -}; - -/** - * Inherits from EventEmitter. - */ - -Parser.prototype.__proto__ = EventEmitter.prototype; - -/** - * Add new data to the parser. - * - * @api public - */ - -Parser.prototype.add = function(data) { - if (this.expectBuffer == null) { - this.addToOverflow(data); - return; - } - var toRead = Math.min(data.length, this.expectBuffer.length - this.expectOffset); - data.copy(this.expectBuffer, this.expectOffset, 0, toRead); - this.expectOffset += toRead; - if (toRead < data.length) { - // at this point the overflow buffer shouldn't at all exist - this.overflow = new Buffer(data.length - toRead); - data.copy(this.overflow, 0, toRead, toRead + this.overflow.length); - } - if (this.expectOffset == this.expectBuffer.length) { - var bufferForHandler = this.expectBuffer; - this.expectBuffer = null; - this.expectOffset = 0; - this.expectHandler.call(this, bufferForHandler); - } -} - -/** - * Adds a piece of data to the overflow. - * - * @api private - */ - -Parser.prototype.addToOverflow = function(data) { - if (this.overflow == null) this.overflow = data; - else { - var prevOverflow = this.overflow; - this.overflow = new Buffer(this.overflow.length + data.length); - prevOverflow.copy(this.overflow, 0); - data.copy(this.overflow, prevOverflow.length); - } -} - -/** - * Waits for a certain amount of bytes to be available, then fires a callback. - * - * @api private - */ - -Parser.prototype.expect = function(what, length, handler) { - this.expectBuffer = new Buffer(length); - this.expectOffset = 0; - this.expectHandler = handler; - if (this.overflow != null) { - var toOverflow = this.overflow; - this.overflow = null; - this.add(toOverflow); - } -} - -/** - * Start processing a new packet. - * - * @api private - */ - -Parser.prototype.processPacket = function (data) { - if ((data[0] & 0x70) != 0) this.error('reserved fields not empty'); - this.state.lastFragment = (data[0] & 0x80) == 0x80; - this.state.masked = (data[1] & 0x80) == 0x80; - var opcode = data[0] & 0xf; - if (opcode == 0) { - // continuation frame - if (this.state.opcode != 1 || this.state.opcode != 2) { - this.error('continuation frame cannot follow current opcode') - return; - } - } - else this.state.opcode = opcode; - this.state.opcode = data[0] & 0xf; - var handler = this.opcodeHandlers[this.state.opcode]; - if (typeof handler == 'undefined') this.error('no handler for opcode ' + this.state.opcode); - else handler(data); -} - -/** - * Endprocessing a packet. - * - * @api private - */ - -Parser.prototype.endPacket = function() { - this.expectOffset = 0; - this.expectBuffer = null; - this.expectHandler = null; - if (this.state.lastFragment && this.state.opcode == this.state.activeFragmentedOperation) { - // end current fragmented operation - this.state.activeFragmentedOperation = null; - } - this.state.lastFragment = false; - this.state.opcode = this.state.activeFragmentedOperation != null ? this.state.activeFragmentedOperation : 0; - this.state.masked = false; - this.expect('Opcode', 2, this.processPacket); -} - -/** - * Reset the parser state. - * - * @api private - */ - -Parser.prototype.reset = function() { - this.state = { - activeFragmentedOperation: null, - lastFragment: false, - masked: false, - opcode: 0 - }; - this.expectOffset = 0; - this.expectBuffer = null; - this.expectHandler = null; - this.overflow = null; - this.currentMessage = ''; -} - -/** - * Unmask received data. - * - * @api private - */ - -Parser.prototype.unmask = function (mask, buf) { - if (mask != null) { - for (var i = 0, ll = buf.length; i < ll; i++) { - buf[i] ^= mask[i % 4]; - } - } - return buf != null ? buf.toString('utf8') : ''; -} - -/** - * Handles an error - * - * @api private - */ - -Parser.prototype.error = function (reason) { - this.reset(); - this.emit('error', reason); - return this; -}; diff --git a/lib/transports/websocket/7.js b/lib/transports/websocket/hybi-07-12.js similarity index 100% rename from lib/transports/websocket/7.js rename to lib/transports/websocket/hybi-07-12.js diff --git a/lib/transports/websocket/index.js b/lib/transports/websocket/index.js index 59dc5dd53c..5c615b292a 100644 --- a/lib/transports/websocket/index.js +++ b/lib/transports/websocket/index.js @@ -4,7 +4,7 @@ */ module.exports = { - 7: require('./7'), - 8: require('./8'), + 7: require('./hybi-07-12'), + 8: require('./hybi-07-12'), default: require('./default') }; diff --git a/test/transports.websocket.hybi07-parser.test.js b/test/transports.websocket.hybi07-12.parser.test.js similarity index 99% rename from test/transports.websocket.hybi07-parser.test.js rename to test/transports.websocket.hybi07-12.parser.test.js index 8c53ea9a60..94efaa6691 100644 --- a/test/transports.websocket.hybi07-parser.test.js +++ b/test/transports.websocket.hybi07-12.parser.test.js @@ -3,7 +3,7 @@ */ var assert = require('assert'); -var Parser = require('../lib/transports/websocket/7.js').Parser; +var Parser = require('../lib/transports/websocket/hybi-07-12.js').Parser; /** * Returns a Buffer from a "ff 00 ff"-type hex string. diff --git a/test/transports.websocket.hybi10-parser.test.js b/test/transports.websocket.hybi10-parser.test.js deleted file mode 100644 index e7bc4a9000..0000000000 --- a/test/transports.websocket.hybi10-parser.test.js +++ /dev/null @@ -1,271 +0,0 @@ -/** - * Test dependencies. - */ - -var assert = require('assert'); -var Parser = require('../lib/transports/websocket/8.js').Parser; - -/** - * Returns a Buffer from a "ff 00 ff"-type hex string. - */ - -function makeBufferFromHexString(byteStr) { - var bytes = byteStr.split(' '); - var buf = new Buffer(bytes.length); - for (var i = 0; i < bytes.length; ++i) { - buf[i] = parseInt(bytes[i], 16); - } - return buf; -} - -/** - * Splits a buffer in two parts. - */ - -function splitBuffer(buffer) { - var b1 = new Buffer(Math.ceil(buffer.length / 2)); - buffer.copy(b1, 0, 0, b1.length); - var b2 = new Buffer(Math.floor(buffer.length / 2)); - buffer.copy(b2, 0, b1.length, b1.length + b2.length); - return [b1, b2]; -} - -/** - * Performs hybi07+ type masking on a hex string. - */ - -function mask(str, maskString) { - var buf = new Buffer(str); - var mask = makeBufferFromHexString(maskString || '34 83 a8 68'); - for (var i = 0; i < buf.length; ++i) { - buf[i] ^= mask[i % 4]; - } - return buf; -} - -/** - * Unpacks a Buffer into a number. - */ - -function unpack(buffer) { - var n = 0; - for (var i = 0; i < buffer.length; ++i) { - n = (i == 0) ? buffer[i] : (n * 256) + buffer[i]; - } - return n; -} - -/** - * Returns a hex string, representing a specific byte count 'length', from a number. - */ - -function pack(length, number) { - return padl(number.toString(16), length, '0').replace(/(\d\d)/g, '$1 ').trim(); -} - -/** - * Left pads the string 's' to a total length of 'n' with char 'c'. - */ - -function padl(s, n, c) { - return new Array(1 + n - s.length).join(c) + s; -} - -/** - * Returns a hex string from a Buffer. - */ - -function dump(data) { - var s = ''; - for (var i = 0; i < data.length; ++i) { - s += padl(data[i].toString(16), 2, '0') + ' '; - } - return s.trim(); -} - -/** - * Tests. - */ - -module.exports = { - 'can parse unmasked text message': function() { - var p = new Parser(); - var packet = '81 05 48 65 6c 6c 6f'; - - var gotData = false; - p.on('data', function(data) { - gotData = true; - assert.equal('Hello', data); - }); - - p.add(makeBufferFromHexString(packet)); - assert.ok(gotData); - }, - 'can parse close message': function() { - var p = new Parser(); - var packet = '88 00'; - - var gotClose = false; - p.on('close', function(data) { - gotClose = true; - }); - - p.add(makeBufferFromHexString(packet)); - assert.ok(gotClose); - }, - 'can parse masked text message': function() { - var p = new Parser(); - var packet = '81 93 34 83 a8 68 01 b9 92 52 4f a1 c6 09 59 e6 8a 52 16 e6 cb 00 5b a1 d5'; - - var gotData = false; - p.on('data', function(data) { - gotData = true; - assert.equal('5:::{"name":"echo"}', data); - }); - - p.add(makeBufferFromHexString(packet)); - assert.ok(gotData); - }, - 'can parse a masked text message longer than 125 bytes': function() { - var p = new Parser(); - var message = 'A'; - for (var i = 0; i < 300; ++i) message += (i % 5).toString(); - var packet = '81 FE ' + pack(4, message.length) + ' 34 83 a8 68 ' + dump(mask(message, '34 83 a8 68')); - - var gotData = false; - p.on('data', function(data) { - gotData = true; - assert.equal(message, data); - }); - - p.add(makeBufferFromHexString(packet)); - assert.ok(gotData); - }, - 'can parse a really long masked text message': function() { - var p = new Parser(); - var message = 'A'; - for (var i = 0; i < 64*1024; ++i) message += (i % 5).toString(); - var packet = '81 FF ' + pack(16, message.length) + ' 34 83 a8 68 ' + dump(mask(message, '34 83 a8 68')); - - var gotData = false; - p.on('data', function(data) { - gotData = true; - assert.equal(message, data); - }); - - p.add(makeBufferFromHexString(packet)); - assert.ok(gotData); - }, - 'can parse a fragmented masked text message of 300 bytes': function() { - var p = new Parser(); - var message = 'A'; - for (var i = 0; i < 300; ++i) message += (i % 5).toString(); - var msgpiece1 = message.substr(0, 150); - var msgpiece2 = message.substr(150); - var packet1 = '01 FE ' + pack(4, msgpiece1.length) + ' 34 83 a8 68 ' + dump(mask(msgpiece1, '34 83 a8 68')); - var packet2 = '81 FE ' + pack(4, msgpiece2.length) + ' 34 83 a8 68 ' + dump(mask(msgpiece2, '34 83 a8 68')); - - var gotData = false; - p.on('data', function(data) { - gotData = true; - assert.equal(message, data); - }); - - p.add(makeBufferFromHexString(packet1)); - p.add(makeBufferFromHexString(packet2)); - assert.ok(gotData); - }, - 'can parse a ping message': function() { - var p = new Parser(); - var message = 'Hello'; - var packet = '89 FE ' + pack(4, message.length) + ' 34 83 a8 68 ' + dump(mask(message, '34 83 a8 68')); - - var gotPing = false; - p.on('ping', function(data) { - gotPing = true; - assert.equal(message, data); - }); - - p.add(makeBufferFromHexString(packet)); - assert.ok(gotPing); - }, - 'can parse a ping with no data': function() { - var p = new Parser(); - var packet = '89 00'; - - var gotPing = false; - p.on('ping', function(data) { - gotPing = true; - }); - - p.add(makeBufferFromHexString(packet)); - assert.ok(gotPing); - }, - 'can parse a fragmented masked text message of 300 bytes with a ping in the middle': function() { - var p = new Parser(); - var message = 'A'; - for (var i = 0; i < 300; ++i) message += (i % 5).toString(); - - var msgpiece1 = message.substr(0, 150); - var packet1 = '01 FE ' + pack(4, msgpiece1.length) + ' 34 83 a8 68 ' + dump(mask(msgpiece1, '34 83 a8 68')); - - var pingMessage = 'Hello'; - var pingPacket = '89 FE ' + pack(4, pingMessage.length) + ' 34 83 a8 68 ' + dump(mask(pingMessage, '34 83 a8 68')); - - var msgpiece2 = message.substr(150); - var packet2 = '81 FE ' + pack(4, msgpiece2.length) + ' 34 83 a8 68 ' + dump(mask(msgpiece2, '34 83 a8 68')); - - var gotData = false; - p.on('data', function(data) { - gotData = true; - assert.equal(message, data); - }); - var gotPing = false; - p.on('ping', function(data) { - gotPing = true; - assert.equal(pingMessage, data); - }); - - p.add(makeBufferFromHexString(packet1)); - p.add(makeBufferFromHexString(pingPacket)); - p.add(makeBufferFromHexString(packet2)); - assert.ok(gotData); - assert.ok(gotPing); - }, - 'can parse a fragmented masked text message of 300 bytes with a ping in the middle, which is delievered over sevaral tcp packets': function() { - var p = new Parser(); - var message = 'A'; - for (var i = 0; i < 300; ++i) message += (i % 5).toString(); - - var msgpiece1 = message.substr(0, 150); - var packet1 = '01 FE ' + pack(4, msgpiece1.length) + ' 34 83 a8 68 ' + dump(mask(msgpiece1, '34 83 a8 68')); - - var pingMessage = 'Hello'; - var pingPacket = '89 FE ' + pack(4, pingMessage.length) + ' 34 83 a8 68 ' + dump(mask(pingMessage, '34 83 a8 68')); - - var msgpiece2 = message.substr(150); - var packet2 = '81 FE ' + pack(4, msgpiece2.length) + ' 34 83 a8 68 ' + dump(mask(msgpiece2, '34 83 a8 68')); - - var gotData = false; - p.on('data', function(data) { - gotData = true; - assert.equal(message, data); - }); - var gotPing = false; - p.on('ping', function(data) { - gotPing = true; - assert.equal(pingMessage, data); - }); - - var buffers = []; - buffers = buffers.concat(splitBuffer(makeBufferFromHexString(packet1))); - buffers = buffers.concat(splitBuffer(makeBufferFromHexString(pingPacket))); - buffers = buffers.concat(splitBuffer(makeBufferFromHexString(packet2))); - for (var i = 0; i < buffers.length; ++i) { - p.add(buffers[i]); - } - assert.ok(gotData); - assert.ok(gotPing); - }, -}; - From 2c3dc42ae8f20fb4ecfa501dbca03fa8d8d9fa56 Mon Sep 17 00:00:00 2001 From: einaros Date: Mon, 29 Aug 2011 08:00:03 +0200 Subject: [PATCH 21/28] corrected ping handling from websocket transport, and added warning output on parser error --- lib/transports/websocket/hybi-07-12.js | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/lib/transports/websocket/hybi-07-12.js b/lib/transports/websocket/hybi-07-12.js index 38c6e0584e..bf685c7554 100644 --- a/lib/transports/websocket/hybi-07-12.js +++ b/lib/transports/websocket/hybi-07-12.js @@ -39,12 +39,13 @@ function WebSocket (mng, data, req) { }); this.parser.on('ping', function () { // version 8 ping => pong - this.socket.write('\u008a\u0000'); + self.socket.write('\u008a\u0000'); }); this.parser.on('close', function () { self.end(); }); - this.parser.on('error', function () { + this.parser.on('error', function (reason) { + self.log.warn(self.name + ' parser error: ' + reason); self.end(); }); From 12fc168516f45d7a3e042d4bac10c7f3f9bde28f Mon Sep 17 00:00:00 2001 From: einaros Date: Mon, 29 Aug 2011 08:33:30 +0200 Subject: [PATCH 22/28] fixed bug in send framing for over 64kB of data --- lib/transports/websocket/hybi-07-12.js | 22 ++++++++++++---------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/lib/transports/websocket/hybi-07-12.js b/lib/transports/websocket/hybi-07-12.js index bf685c7554..3c5318a2ac 100644 --- a/lib/transports/websocket/hybi-07-12.js +++ b/lib/transports/websocket/hybi-07-12.js @@ -139,16 +139,17 @@ WebSocket.prototype.write = function (data) { */ WebSocket.prototype.frame = function (opcode, data) { - var startOffset = 2, secondByte = data.length, buf; - if (data.length > 125) { - // opcode = 0x81; - startOffset += 2; - secondByte = 126; - } + var startOffset = 2 + , secondByte = data.length + , buf; if (data.length > 65536) { - startOffset += 6; + startOffset = 10; secondByte = 127; } + else if (data.length > 125) { + startOffset = 4; + secondByte = 126; + } buf = new Buffer(data.length + startOffset, 'utf8'); buf[0] = opcode; buf[1] = secondByte; @@ -158,9 +159,10 @@ WebSocket.prototype.frame = function (opcode, data) { buf[3] = data.length % 256; break; case 127: - for (var i = 6; i > 2; i--) { - buf[i] = secondByte % 256; - secondByte >>>= 8; + var l = data.length; + for (var i = 1; i <= 8; ++i) { + buf[startOffset - i] = l & 0xff; + l >>>= 8; } } buf.write(data, startOffset, 'utf8'); From 140ed41907c70078c63ad85636f7069218d33a36 Mon Sep 17 00:00:00 2001 From: Markus Hedlund Date: Mon, 29 Aug 2011 20:23:38 +0300 Subject: [PATCH 23/28] Fixed typo. --- Readme.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Readme.md b/Readme.md index 875e5dfd89..86c023333e 100644 --- a/Readme.md +++ b/Readme.md @@ -60,7 +60,7 @@ Besides `connect`, `message` and `disconnect`, you can emit custom events: var io = require('socket.io').listen(80); io.sockets.on('connection', function (socket) { - io.sockets.emit('this', { will: 'be received by everyone'); + io.sockets.emit('this', { will: 'be received by everyone' }); socket.on('private message', function (from, msg) { console.log('I received a private message by ', from, ' saying ', msg); From 72a79e5cec8ef424e32daf3b81fdae8b6e4d888e Mon Sep 17 00:00:00 2001 From: einaros Date: Mon, 29 Aug 2011 09:43:12 +0200 Subject: [PATCH 24/28] fixed utf8 bug in send framing --- lib/transports/websocket/hybi-07-12.js | 29 +++++++++++++------------- 1 file changed, 15 insertions(+), 14 deletions(-) diff --git a/lib/transports/websocket/hybi-07-12.js b/lib/transports/websocket/hybi-07-12.js index 3c5318a2ac..f09ebb2464 100644 --- a/lib/transports/websocket/hybi-07-12.js +++ b/lib/transports/websocket/hybi-07-12.js @@ -138,35 +138,36 @@ WebSocket.prototype.write = function (data) { * @api private */ -WebSocket.prototype.frame = function (opcode, data) { +WebSocket.prototype.frame = function (opcode, str) { + var dataBuffer = new Buffer(str); + var dataLength = dataBuffer.length; var startOffset = 2 - , secondByte = data.length - , buf; - if (data.length > 65536) { + , secondByte = dataLength; + if (dataLength > 65536) { startOffset = 10; secondByte = 127; } - else if (data.length > 125) { + else if (dataLength > 125) { startOffset = 4; secondByte = 126; } - buf = new Buffer(data.length + startOffset, 'utf8'); - buf[0] = opcode; - buf[1] = secondByte; + var outputBuffer = new Buffer(dataLength + startOffset); + outputBuffer[0] = opcode; + outputBuffer[1] = secondByte; + dataBuffer.copy(outputBuffer, startOffset); switch (secondByte) { case 126: - buf[2] = data.length >>> 8; - buf[3] = data.length % 256; + outputBuffer[2] = dataLength >>> 8; + outputBuffer[3] = dataLength % 256; break; case 127: - var l = data.length; + var l = dataLength; for (var i = 1; i <= 8; ++i) { - buf[startOffset - i] = l & 0xff; + outputBuffer[startOffset - i] = l & 0xff; l >>>= 8; } } - buf.write(data, startOffset, 'utf8'); - return buf; + return outputBuffer; }; /** From 93c963e30f7ff8895afbba7c42676648604ed7eb Mon Sep 17 00:00:00 2001 From: Guillermo Rauch Date: Mon, 29 Aug 2011 09:42:16 -0700 Subject: [PATCH 25/28] Release 0.8.1 --- History.md | 8 ++++++++ lib/socket.io.js | 2 +- package.json | 4 ++-- 3 files changed, 11 insertions(+), 3 deletions(-) diff --git a/History.md b/History.md index 6955bfc738..b24903dd4e 100644 --- a/History.md +++ b/History.md @@ -1,4 +1,12 @@ +0.8.1 / 2011-08-29 +================== + + * Fixed utf8 bug in send framing in websocket [einaros] + * Fixed typo in docs [Znarkus] + * Fixed bug in send framing for over 64kB of data in websocket [einaros] + * Corrected ping handling in websocket transport [einaros] + 0.8.0 / 2011-08-28 ================== diff --git a/lib/socket.io.js b/lib/socket.io.js index 5d23c120ef..e12b73b5cc 100644 --- a/lib/socket.io.js +++ b/lib/socket.io.js @@ -15,7 +15,7 @@ var client = require('socket.io-client'); * Version. */ -exports.version = '0.8.0'; +exports.version = '0.8.1'; /** * Supported protocol version. diff --git a/package.json b/package.json index e3837d2f53..4ae0537bd6 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "socket.io" - , "version": "0.8.0" + , "version": "0.8.1" , "description": "Real-time apps made cross-browser & easy with a WebSocket-like API" , "homepage": "http://socket.io" , "keywords": ["websocket", "socket", "realtime", "socket.io", "comet", "ajax"] @@ -16,7 +16,7 @@ , "url": "https://github.com/LearnBoost/Socket.IO-node.git" } , "dependencies": { - "socket.io-client": "0.8.0" + "socket.io-client": "0.8.1" , "policyfile": "0.0.4" , "redis": "0.6.6" } From d88575dadf4203e5bc2f64b0ec2f9adbb887933f Mon Sep 17 00:00:00 2001 From: Guillermo Rauch Date: Mon, 29 Aug 2011 10:36:23 -0700 Subject: [PATCH 26/28] Release 0.8.2 --- History.md | 5 +++++ lib/socket.io.js | 2 +- package.json | 4 ++-- 3 files changed, 8 insertions(+), 3 deletions(-) diff --git a/History.md b/History.md index b24903dd4e..d4dd0f2854 100644 --- a/History.md +++ b/History.md @@ -1,4 +1,9 @@ +0.8.2 / 2011-08-29 +================== + + * Updated client. + 0.8.1 / 2011-08-29 ================== diff --git a/lib/socket.io.js b/lib/socket.io.js index e12b73b5cc..e243d33c7a 100644 --- a/lib/socket.io.js +++ b/lib/socket.io.js @@ -15,7 +15,7 @@ var client = require('socket.io-client'); * Version. */ -exports.version = '0.8.1'; +exports.version = '0.8.2'; /** * Supported protocol version. diff --git a/package.json b/package.json index 4ae0537bd6..9436de48b2 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "socket.io" - , "version": "0.8.1" + , "version": "0.8.2" , "description": "Real-time apps made cross-browser & easy with a WebSocket-like API" , "homepage": "http://socket.io" , "keywords": ["websocket", "socket", "realtime", "socket.io", "comet", "ajax"] @@ -16,7 +16,7 @@ , "url": "https://github.com/LearnBoost/Socket.IO-node.git" } , "dependencies": { - "socket.io-client": "0.8.1" + "socket.io-client": "0.8.2" , "policyfile": "0.0.4" , "redis": "0.6.6" } From cc0a96a8d9baf57c5eede6844c819e4a7a920e87 Mon Sep 17 00:00:00 2001 From: Daniel Shaw Date: Thu, 1 Sep 2011 14:22:53 -0700 Subject: [PATCH 27/28] Wow, really. --- lib/transports/websocket/default.js | 1865 +++++---------------------- 1 file changed, 345 insertions(+), 1520 deletions(-) diff --git a/lib/transports/websocket/default.js b/lib/transports/websocket/default.js index 527eedaf64..a05c9203da 100644 --- a/lib/transports/websocket/default.js +++ b/lib/transports/websocket/default.js @@ -1,1525 +1,350 @@ - +/*! + * socket.io-node + * Copyright(c) 2011 LearnBoost + * MIT Licensed + */ + +/** + * Module requirements. + */ + +var Transport = require('../../transport') + , EventEmitter = process.EventEmitter + , crypto = require('crypto') + , parser = require('../../parser'); + +/** + * Export the constructor. + */ + +exports = module.exports = WebSocket; + +/** + * HTTP interface constructor. Interface compatible with all transports that + * depend on request-response cycles. + * + * @api public + */ + +function WebSocket (mng, data, req) { + // parser + var self = this; + + this.parser = new Parser(); + this.parser.on('data', function (packet) { + self.log.debug(self.name + ' received data packet', packet); + self.onMessage(parser.decodePacket(packet)); + }); + this.parser.on('close', function () { + self.end(); + }); + this.parser.on('error', function () { + self.end(); + }); + + Transport.call(this, mng, data, req); +}; + +/** + * Inherits from Transport. + */ + +WebSocket.prototype.__proto__ = Transport.prototype; + +/** + * Transport name + * + * @api public + */ + +WebSocket.prototype.name = 'websocket'; + +/** + * Called when the socket connects. + * + * @api private + */ + +WebSocket.prototype.onSocketConnect = function () { + var self = this; + + this.socket.setNoDelay(true); + + this.buffer = true; + this.buffered = []; + + if (this.req.headers.upgrade !== 'WebSocket') { + this.log.warn(this.name + ' connection invalid'); + this.end(); + return; + } + + var origin = this.req.headers.origin + , location = (this.socket.encrypted ? 'wss' : 'ws') + + '://' + this.req.headers.host + this.req.url + , waitingForNonce = false; + + if (this.req.headers['sec-websocket-key1']) { + // If we don't have the nonce yet, wait for it (HAProxy compatibility). + if (! (this.req.head && this.req.head.length >= 8)) { + waitingForNonce = true; + } + + var headers = [ + 'HTTP/1.1 101 WebSocket Protocol Handshake' + , 'Upgrade: WebSocket' + , 'Connection: Upgrade' + , 'Sec-WebSocket-Origin: ' + origin + , 'Sec-WebSocket-Location: ' + location + ]; + + if (this.req.headers['sec-websocket-protocol']){ + headers.push('Sec-WebSocket-Protocol: ' + + this.req.headers['sec-websocket-protocol']); + } + } else { + var headers = [ + 'HTTP/1.1 101 Web Socket Protocol Handshake' + , 'Upgrade: WebSocket' + , 'Connection: Upgrade' + , 'WebSocket-Origin: ' + origin + , 'WebSocket-Location: ' + location + ]; + } + + try { + this.socket.write(headers.concat('', '').join('\r\n')); + this.socket.setTimeout(0); + this.socket.setNoDelay(true); + this.socket.setEncoding('utf8'); + } catch (e) { + this.end(); + return; + } + + if (waitingForNonce) { + this.socket.setEncoding('binary'); + } else if (this.proveReception(headers)) { + self.flush(); + } + + var headBuffer = ''; + + this.socket.on('data', function (data) { + if (waitingForNonce) { + headBuffer += data; + + if (headBuffer.length < 8) { + return; + } + + // Restore the connection to utf8 encoding after receiving the nonce + self.socket.setEncoding('utf8'); + waitingForNonce = false; + + // Stuff the nonce into the location where it's expected to be + self.req.head = headBuffer.substr(0, 8); + headBuffer = ''; + + if (self.proveReception(headers)) { + self.flush(); + } + + return; + } + + self.parser.add(data); + }); +}; - +/** + * Writes to the socket. + * + * @api private + */ - - - - - - - lib/transports/websocket/default.js at master from LearnBoost/socket.io - GitHub - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
-

Markdown Cheat Sheet

- -
- -
-
-

Format Text

-

Headers

-
-# This is an <h1> tag
-## This is an <h2> tag
-###### This is an <h6> tag
-

Text styles

-
-*This text will be italic*
-_This will also be italic_
-**This text will be bold**
-__This will also be bold__
-
-*You **can** combine them*
-
-
-
-

Lists

-

Unordered

-
-* Item 1
-* Item 2
-  * Item 2a
-  * Item 2b
-

Ordered

-
-1. Item 1
-2. Item 2
-3. Item 3
-   * Item 3a
-   * Item 3b
-
-
-

Miscellaneous

-

Images

-
-![GitHub Logo](/images/logo.png)
-Format: ![Alt Text](url)
-
-

Links

-
-http://github.com - automatic!
-[GitHub](http://github.com)
-

Blockquotes

-
-As Kanye West said:
-> We're living the future so
-> the present is our past.
-
-
-
-
- -

Code Examples in Markdown

-
-

Syntax highlighting with GFM

-
-```javascript
-function fancyAlert(arg) {
-  if(arg) {
-    $.facebox({div:'#foo'})
+WebSocket.prototype.write = function (data) {
+  if (this.open) {
+    this.drained = false;
+
+    if (this.buffer) {
+      this.buffered.push(data);
+      return this;
+    }
+
+    var length = Buffer.byteLength(data)
+      , buffer = new Buffer(2 + length);
+
+    buffer.write('\x00', 'binary');
+    buffer.write(data, 1, 'utf8');
+    buffer.write('\xff', 1 + length, 'binary');
+
+    try {
+      if (this.socket.write(buffer)) {
+        this.drained = true;
+      }
+    } catch (e) {
+      this.end();
+    }
+
+    this.log.debug(this.name + ' writing', data);
   }
-}
-```
-
-
-

Or, indent your code 4 spaces

-
-Here is a Python code example
-without syntax highlighting:
-
-    def foo:
-      if not bar:
-        return true
-
-
-

Inline code for comments

-
-I think you should use an
-`<addr>` element here instead.
-
-
- -
- - - - - - - - - +}; + +/** + * Flushes the internal buffer + * + * @api private + */ +WebSocket.prototype.flush = function () { + this.buffer = false; + + for (var i = 0, l = this.buffered.length; i < l; i++) { + this.write(this.buffered.splice(0, 1)[0]); + } +}; + +/** + * Finishes the handshake. + * + * @api private + */ + +WebSocket.prototype.proveReception = function (headers) { + var self = this + , k1 = this.req.headers['sec-websocket-key1'] + , k2 = this.req.headers['sec-websocket-key2']; + + if (k1 && k2){ + var md5 = crypto.createHash('md5'); + + [k1, k2].forEach(function (k) { + var n = parseInt(k.replace(/[^\d]/g, '')) + , spaces = k.replace(/[^ ]/g, '').length; + + if (spaces === 0 || n % spaces !== 0){ + self.log.warn('Invalid ' + self.name + ' key: "' + k + '".'); + self.end(); + return false; + } + + n /= spaces; + + md5.update(String.fromCharCode( + n >> 24 & 0xFF, + n >> 16 & 0xFF, + n >> 8 & 0xFF, + n & 0xFF)); + }); + + md5.update(this.req.head.toString('binary')); + + try { + this.socket.write(md5.digest('binary'), 'binary'); + } catch (e) { + this.end(); + } + } + + return true; +}; + +/** + * Writes a payload. + * + * @api private + */ + +WebSocket.prototype.payload = function (msgs) { + for (var i = 0, l = msgs.length; i < l; i++) { + this.write(msgs[i]); + } + + return this; +}; + +/** + * Closes the connection. + * + * @api private + */ + +WebSocket.prototype.doClose = function () { + this.socket.end(); +}; + +/** + * WebSocket parser + * + * @api public + */ + +function Parser () { + this.buffer = ''; + this.i = 0; +}; + +/** + * Inherits from EventEmitter. + */ + +Parser.prototype.__proto__ = EventEmitter.prototype; + +/** + * Adds data to the buffer. + * + * @api public + */ + +Parser.prototype.add = function (data) { + this.buffer += data; + this.parse(); +}; + +/** + * Parses the buffer. + * + * @api private + */ + +Parser.prototype.parse = function () { + for (var i = this.i, chr, l = this.buffer.length; i < l; i++){ + chr = this.buffer[i]; + + if (this.buffer.length == 2 && this.buffer[1] == '\u0000') { + this.emit('close'); + this.buffer = ''; + this.i = 0; + return; + } + + if (i === 0){ + if (chr != '\u0000') + this.error('Bad framing. Expected null byte as first frame'); + else + continue; + } + + if (chr == '\ufffd'){ + this.emit('data', this.buffer.substr(1, i - 1)); + this.buffer = this.buffer.substr(i + 1); + this.i = 0; + return this.parse(); + } + } +}; + +/** + * Handles an error + * + * @api private + */ + +Parser.prototype.error = function (reason) { + this.buffer = ''; + this.i = 0; + this.emit('error', reason); + return this; +}; From c6b3549b619961743ab81d002f90ec877e4c63be Mon Sep 17 00:00:00 2001 From: Daniel Shaw Date: Mon, 12 Sep 2011 00:20:40 -0700 Subject: [PATCH 28/28] Minimal RedisClient configs. --- lib/stores/redis.js | 24 ++++++++++++++++++++---- 1 file changed, 20 insertions(+), 4 deletions(-) diff --git a/lib/stores/redis.js b/lib/stores/redis.js index 6d6445c907..d4818950fc 100644 --- a/lib/stores/redis.js +++ b/lib/stores/redis.js @@ -60,12 +60,28 @@ function Redis (opts) { } } - var redis = opts.redis || require('redis'); + var redis = opts.redis || require('redis') + , RedisClient = redis.RedisClient; // initialize a pubsub client and a regular client - this.pub = redis.createClient(opts.redisPub); - this.sub = redis.createClient(opts.redisSub); - this.cmd = redis.createClient(opts.redisClient); + if (opts.redisPub instanceof RedisClient) { + this.pub = opts.redisPub; + } else { + opts.redisPub || (opts.redisPub = {}); + this.pub = redis.createClient(opts.redisPub.port, opts.redisPub.host, opts.redisPub); + } + if (opts.redisSub instanceof RedisClient) { + this.sub = opts.redisSub; + } else { + opts.redisSub || (opts.redisSub = {}); + this.sub = redis.createClient(opts.redisSub.port, opts.redisSub.host, opts.redisSub); + } + if (opts.redisClient instanceof RedisClient) { + this.cmd = opts.redisClient; + } else { + opts.redisClient || (opts.redisClient = {}); + this.cmd = redis.createClient(opts.redisClient.port, opts.redisClient.host, opts.redisClient); + } Store.call(this, opts); };