From 9674d71d9a5e2f3a23f46dbfe4e6390f4bd0541a Mon Sep 17 00:00:00 2001 From: Ethan Waite Date: Wed, 13 Dec 2017 22:02:13 +0000 Subject: [PATCH 1/2] feat(socket): use round robin for endpoints when the connection fails --- package-lock.json | 18 +++++++++--------- src/Client.spec.ts | 4 ++-- src/GameClient.ts | 2 +- src/ParticipantClient.ts | 2 +- src/wire/Socket.spec.ts | 30 ++++++++++++++++++++++++++---- src/wire/Socket.ts | 19 ++++++++++++++----- 6 files changed, 53 insertions(+), 22 deletions(-) diff --git a/package-lock.json b/package-lock.json index 4a8c5b3..b98c891 100644 --- a/package-lock.json +++ b/package-lock.json @@ -4195,15 +4195,6 @@ "integrity": "sha1-1cdSgl5TZ+eG944Y5EXqIjoVWVI=", "dev": true }, - "string_decoder": { - "version": "1.0.3", - "resolved": "https://registry.npmjs.org/string_decoder/-/string_decoder-1.0.3.tgz", - "integrity": "sha512-4AH6Z5fzNNBcH+6XDMfA/BTt87skxqJlO0lAh3Dker5zThcAxG6mKz+iGu308UKoPPQ8Dcqx/4JhujzltRa+hQ==", - "dev": true, - "requires": { - "safe-buffer": "5.1.1" - } - }, "string-length": { "version": "1.0.1", "resolved": "https://registry.npmjs.org/string-length/-/string-length-1.0.1.tgz", @@ -4240,6 +4231,15 @@ } } }, + "string_decoder": { + "version": "1.0.3", + "resolved": "https://registry.npmjs.org/string_decoder/-/string_decoder-1.0.3.tgz", + "integrity": "sha512-4AH6Z5fzNNBcH+6XDMfA/BTt87skxqJlO0lAh3Dker5zThcAxG6mKz+iGu308UKoPPQ8Dcqx/4JhujzltRa+hQ==", + "dev": true, + "requires": { + "safe-buffer": "5.1.1" + } + }, "strip-ansi": { "version": "3.0.1", "resolved": "https://registry.npmjs.org/strip-ansi/-/strip-ansi-3.0.1.tgz", diff --git a/src/Client.spec.ts b/src/Client.spec.ts index 80e2788..678da18 100644 --- a/src/Client.spec.ts +++ b/src/Client.spec.ts @@ -11,7 +11,7 @@ setWebSocket(WebSocket); const port = process.env.SERVER_PORT || 1339; describe('client', () => { - const url = `ws://127.0.0.1:${port}/`; + const urls = [`ws://127.0.0.1:${port}/`]; let client: Client; let server: WebSocket.Server; let ws: WebSocket; @@ -24,7 +24,7 @@ describe('client', () => { } const socketOptions = { - url, + urls, }; function createClient(): Client { return new Client(ClientType.GameClient); diff --git a/src/GameClient.ts b/src/GameClient.ts index f7bf3bc..52454c5 100644 --- a/src/GameClient.ts +++ b/src/GameClient.ts @@ -58,7 +58,7 @@ export class GameClient extends Client { .then(endpoints => { return super.open({ authToken: options.authToken, - url: endpoints[0].address, + urls: endpoints.map(({ address }) => address), extraHeaders: extraHeaders, }); }); diff --git a/src/ParticipantClient.ts b/src/ParticipantClient.ts index c674632..ad3742e 100644 --- a/src/ParticipantClient.ts +++ b/src/ParticipantClient.ts @@ -31,7 +31,7 @@ export class ParticipantClient extends Client { public open(options: IParticipantOptions): Promise { return super.open({ - url: options.url, + urls: [options.url], reconnectChecker: options.reconnectChecker, queryParams: { 'x-protocol-version': '2.0', diff --git a/src/wire/Socket.spec.ts b/src/wire/Socket.spec.ts index c26f364..0959be2 100644 --- a/src/wire/Socket.spec.ts +++ b/src/wire/Socket.spec.ts @@ -28,7 +28,7 @@ describe('socket', () => { let server: WebSocketModule.Server; let socket: InteractiveSocket; - const url = `ws://127.0.0.1:${port}/`; + const urls = [`ws://127.0.0.1:${port}/`]; beforeEach(ready => { server = new WebSocketModule.Server({ port }, ready); @@ -45,7 +45,7 @@ describe('socket', () => { describe('connecting', () => { it('connects with no auth', done => { - socket = new InteractiveSocket({ url }).connect(); + socket = new InteractiveSocket({ urls }).connect(); server.on('connection', (ws: WebSocketModule) => { expect(ws.upgradeReq.url).to.equal('/'); expect(ws.upgradeReq.headers.authorization).to.equal( @@ -59,7 +59,7 @@ describe('socket', () => { it('connects with an OAuth token', done => { socket = new InteractiveSocket({ - url, + urls, authToken: 'asdf!', }).connect(); server.on('connection', (ws: WebSocketModule) => { @@ -126,7 +126,7 @@ describe('socket', () => { checker = sinon.stub(); checker.resolves(); socket = new InteractiveSocket({ - url, + urls, pingInterval: 100, replyTimeout: 50, }).connect(); @@ -176,6 +176,28 @@ describe('socket', () => { }); }); + it('reconnects to the next server on disconnection', done => { + socket.setOptions({ urls: [...urls, `ws://127.0.0.1:${port + 1}/`] }); + + // Connect to the first server. + socket.once('open', () => { + const fallbackServer = new WebSocketModule.Server({ port: port + 1 }, () => { + closeNormal(ws); + + // Connect to the second server. + fallbackServer.once('connection', (ws2: WebSocketModule) => { + closeNormal(ws2); + + // Connect to the first server again. + awaitConnect((ws3: WebSocketModule) => { + closeNormal(ws3); + fallbackServer.close(done); + }); + }); + }); + }); + }); + it('respects closing the socket during a reconnection', done => { greet(); resolveOn(socket, 'method') diff --git a/src/wire/Socket.ts b/src/wire/Socket.ts index 32ce52c..6775ae7 100644 --- a/src/wire/Socket.ts +++ b/src/wire/Socket.ts @@ -13,7 +13,7 @@ import { /** * Close codes that are deemed to be recoverable by the reconnection policy */ -export const recoverableCloseCodes = [1000, 1011]; +export const recoverableCloseCodes = [1000, 1001, 1006, 1011, 1012]; //We don't support lz4 due to time constraints right now export type CompressionScheme = 'none' | 'gzip'; @@ -27,8 +27,8 @@ export interface ISocketOptions { reconnectionPolicy?: IReconnectionPolicy; autoReconnect?: boolean; - // Websocket URL to connect to, defaults to - url?: string; + // Array of possible websocket URLs to connect to. + urls?: string[]; //compression scheme, defaults to none, Will remain none until pako typings are updated compressionScheme?: CompressionScheme; @@ -92,7 +92,7 @@ export enum SocketState { function getDefaults(): ISocketOptions { return { - url: '', + urls: [], replyTimeout: 10000, compressionScheme: 'none', autoReconnect: true, @@ -119,6 +119,7 @@ export class InteractiveSocket extends EventEmitter { private socket: any; private queue: Set = new Set(); private lastSequenceNumber = 0; + private endpointIndex = 0; constructor(options: ISocketOptions = {}) { super(); @@ -213,7 +214,7 @@ export class InteractiveSocket extends EventEmitter { headers, }; - const url = Url.parse(this.options.url, true); + const url = Url.parse(this.getURL(), true); // Clear out search so it populates query using the query // https://nodejs.org/api/url.html#url_url_format_urlobject url.search = null; @@ -369,6 +370,14 @@ export class InteractiveSocket extends EventEmitter { this.socket.send(payload); } + private getURL(): string { + const addresses = this.options.urls; + if (this.endpointIndex >= addresses.length) { + this.endpointIndex = 0; + } + return addresses[this.endpointIndex++]; + } + private extractMessage(packet: string | Buffer) { let messageString: string; messageString = packet; From 4d52f6b5f7b9742f771d5e3246f7e2ffdd8050e4 Mon Sep 17 00:00:00 2001 From: Ethan Waite Date: Tue, 9 Jan 2018 22:06:32 +0000 Subject: [PATCH 2/2] address feedback & reconnect on 1xxx codes --- src/wire/Socket.ts | 17 +++++------------ 1 file changed, 5 insertions(+), 12 deletions(-) diff --git a/src/wire/Socket.ts b/src/wire/Socket.ts index 6775ae7..eab9d88 100644 --- a/src/wire/Socket.ts +++ b/src/wire/Socket.ts @@ -10,11 +10,6 @@ import { IReconnectionPolicy, } from './reconnection'; -/** - * Close codes that are deemed to be recoverable by the reconnection policy - */ -export const recoverableCloseCodes = [1000, 1001, 1006, 1011, 1012]; - //We don't support lz4 due to time constraints right now export type CompressionScheme = 'none' | 'gzip'; @@ -144,9 +139,10 @@ export class InteractiveSocket extends EventEmitter { }); this.on('close', (evt: ICloseEvent) => { - // If this close event's code is not within our recoverable code array - // We raise it as an error and refuse to connect. - if (recoverableCloseCodes.indexOf(evt.code) === -1) { + // If this close event's code is an application error (e.g. bad authentication) + // or invalid status code (for Edge), we raise it as an error and refuse to + // reconnect. + if (evt.code < 1000 || evt.code > 1999 || evt.code === 1005) { const err = InteractiveError.fromSocketMessage({ code: evt.code, message: evt.reason, @@ -372,10 +368,7 @@ export class InteractiveSocket extends EventEmitter { private getURL(): string { const addresses = this.options.urls; - if (this.endpointIndex >= addresses.length) { - this.endpointIndex = 0; - } - return addresses[this.endpointIndex++]; + return this.options.urls[this.endpointIndex++ % addresses.length]; } private extractMessage(packet: string | Buffer) {