Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 23 additions & 17 deletions lib/adapter.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
*/

var Emitter = require('events').EventEmitter;
var parser = require('socket.io-parser');

/**
* Module exports.
Expand All @@ -22,6 +23,7 @@ function Adapter(nsp){
this.nsp = nsp;
this.rooms = {};
this.sids = {};
this.encoder = new parser.Encoder();
}

/**
Expand Down Expand Up @@ -97,26 +99,30 @@ Adapter.prototype.broadcast = function(packet, opts){
var rooms = opts.rooms || [];
var except = opts.except || [];
var ids = {};
var self = this;
var socket;

if (rooms.length) {
for (var i = 0; i < rooms.length; i++) {
var room = this.rooms[rooms[i]];
if (!room) continue;
for (var id in room) {
if (ids[id] || ~except.indexOf(id)) continue;
socket = this.nsp.connected[id];
if (socket) {
socket.packet(packet);
ids[id] = true;
packet.nsp = this.nsp.name;
this.encoder.encode(packet, function(encodedPackets) {
if (rooms.length) {
for (var i = 0; i < rooms.length; i++) {
var room = self.rooms[rooms[i]];
if (!room) continue;
for (var id in room) {
if (ids[id] || ~except.indexOf(id)) continue;
socket = self.nsp.connected[id];
if (socket) {
socket.packet(encodedPackets, true);
ids[id] = true;
}
}
}
} else {
for (var id in self.sids) {
if (~except.indexOf(id)) continue;
socket = self.nsp.connected[id];
if (socket) socket.packet(encodedPackets, true);
}
}
} else {
for (var id in this.sids) {
if (~except.indexOf(id)) continue;
socket = this.nsp.connected[id];
if (socket) socket.packet(packet);
}
}
});
};
25 changes: 17 additions & 8 deletions lib/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -129,16 +129,25 @@ Client.prototype.close = function(){
* @api private
*/

Client.prototype.packet = function(packet){
Client.prototype.packet = function(packet, preEncoded){
var self = this;

// this writes to the actual connection
function writeToEngine(encodedPackets) {
for (var i = 0; i < encodedPackets.length; i++) {
self.conn.write(encodedPackets[i]);
}
}

if ('open' == this.conn.readyState) {
debug('writing packet %j', packet);
var self = this;

this.encoder.encode(packet, function (encodedPackets) { // encode, then write results to engine
for (var i = 0; i < encodedPackets.length; i++) {
self.conn.write(encodedPackets[i]);
}
});
if(!preEncoded) { // not broadcasting, need to encode
this.encoder.encode(packet, function (encodedPackets) { // encode, then write results to engine
writeToEngine(encodedPackets);
});
} else { // a broadcast pre-encodes a packet
writeToEngine(packet);
}
} else {
debug('ignoring packet write %j', packet);
}
Expand Down
6 changes: 5 additions & 1 deletion lib/namespace.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ var Socket = require('./socket');
var Emitter = require('events').EventEmitter;
var parser = require('socket.io-parser');
var debug = require('debug')('socket.io:namespace');
var hasBin = require('has-binary-data');

/**
* Module exports.
Expand Down Expand Up @@ -193,7 +194,10 @@ Namespace.prototype.emit = function(ev){
} else {
// set up packet object
var args = Array.prototype.slice.call(arguments);
var packet = { type: parser.EVENT, data: args };
var parserType = parser.EVENT; // default
if (hasBin(args)) { parserType = parser.BINARY_EVENT; } // binary

var packet = { type: parserType, data: args };

if ('function' == typeof args[args.length - 1]) {
throw new Error('Callbacks are not supported when broadcasting');
Expand Down
4 changes: 2 additions & 2 deletions lib/socket.js
Original file line number Diff line number Diff line change
Expand Up @@ -180,9 +180,9 @@ Socket.prototype.write = function(){
* @api private
*/

Socket.prototype.packet = function(packet){
Socket.prototype.packet = function(packet, preEncoded){
packet.nsp = this.nsp.name;
this.client.packet(packet);
this.client.packet(packet, preEncoded);
};

/**
Expand Down
80 changes: 80 additions & 0 deletions test/socket.io.js
Original file line number Diff line number Diff line change
Expand Up @@ -537,6 +537,39 @@ describe('socket.io', function(){
});
});

it('emits binary data to a namespace', function(done){
var srv = http();
var sio = io(srv);
var total = 2;

srv.listen(function(){
var socket1 = client(srv, { multiplex: false });
var socket2 = client(srv, { multiplex: false });
var socket3 = client(srv, '/test');
socket1.on('bin', function(a){
expect(Buffer.isBuffer(a)).to.be(true);
--total || done();
});
socket2.on('bin', function(a){
expect(Buffer.isBuffer(a)).to.be(true);
--total || done();
});
socket3.on('bin', function(){ done(new Error('not')); });

var sockets = 3;
sio.on('connection', function(socket){
--sockets || emit();
});
sio.of('/test', function(socket){
--sockets || emit();
});

function emit(){
sio.emit('bin', new Buffer(10));
}
});
});

it('emits to the rest', function(done){
var srv = http();
var sio = io(srv);
Expand Down Expand Up @@ -677,6 +710,53 @@ describe('socket.io', function(){
});
});

it('broadcasts binary data to rooms', function(done){
var srv = http();
var sio = io(srv);
var total = 2;

srv.listen(function(){
var socket1 = client(srv, { multiplex: false });
var socket2 = client(srv, { multiplex: false });
var socket3 = client(srv, { multiplex: false });

socket1.emit('join', 'woot');
socket2.emit('join', 'test');
socket3.emit('join', 'test', function(){
socket3.emit('broadcast');
});

socket1.on('bin', function(data){
throw new Error('got bin in socket1');
});
socket2.on('bin', function(data){
expect(Buffer.isBuffer(data)).to.be(true);
--total || done();
});
socket2.on('bin2', function(data) {
throw new Error('socket2 got bin2');
});
socket3.on('bin', function(data) {
throw new Error('socket3 got bin');
});
socket3.on('bin2', function(data) {
expect(Buffer.isBuffer(data)).to.be(true);
--total || done();
});

sio.on('connection', function(socket){
socket.on('join', function(room, fn){
socket.join(room, fn);
});
socket.on('broadcast', function(){
socket.broadcast.to('test').emit('bin', new Buffer(5));
socket.emit('bin2', new Buffer(5));
});
});
});
});


it('keeps track of rooms', function(done){
var srv = http();
var sio = io(srv);
Expand Down