From 8b8a08d20bfe0f392ff24aa7189689cf8af475b7 Mon Sep 17 00:00:00 2001 From: William Wong Date: Fri, 30 Oct 2020 11:43:58 -0700 Subject: [PATCH 1/5] Initial commit --- __tests__/rxjsToAsyncIterable.js | 43 ++++++++++++++++++++ __tests__/setup/rxjsToAsyncIterable.js | 36 +++++++++++++++++ __tests__/unhappy.brokenWebSocket.js | 45 +++++++++++++++++++++ package-lock.json | 55 ++++++++++++++++++++++++-- package.json | 2 + 5 files changed, 178 insertions(+), 3 deletions(-) create mode 100644 __tests__/rxjsToAsyncIterable.js create mode 100644 __tests__/setup/rxjsToAsyncIterable.js create mode 100644 __tests__/unhappy.brokenWebSocket.js diff --git a/__tests__/rxjsToAsyncIterable.js b/__tests__/rxjsToAsyncIterable.js new file mode 100644 index 000000000..4067a9e89 --- /dev/null +++ b/__tests__/rxjsToAsyncIterable.js @@ -0,0 +1,43 @@ +import { Observable } from 'rxjs/Observable'; + +import rxjsToAsyncIterable from './setup/rxjsToAsyncIterable'; + +test('should iterate 3 items', async () => { + const actual = []; + const observable = new Observable(observer => { + observer.next(1); + observer.next(2); + observer.next(3); + observer.complete(); + }); + + const iterable = rxjsToAsyncIterable(observable); + + for await (let value of iterable) { + actual.push(value); + } + + expect(actual).toEqual([1, 2, 3]); +}); + +test('should iterate 3 items then reject', async () => { + const actual = []; + const observable = new Observable(observer => { + observer.next(1); + observer.next(2); + observer.next(3); + observer.error(new Error('artificial')); + }); + + const iterable = rxjsToAsyncIterable(observable); + + await expect( + (async () => { + for await (let value of iterable) { + actual.push(value); + } + })() + ).rejects.toThrow('artificial'); + + expect(actual).toEqual([1, 2, 3]); +}); diff --git a/__tests__/setup/rxjsToAsyncIterable.js b/__tests__/setup/rxjsToAsyncIterable.js new file mode 100644 index 000000000..d6d33c812 --- /dev/null +++ b/__tests__/setup/rxjsToAsyncIterable.js @@ -0,0 +1,36 @@ +import createDeferred from 'p-defer'; + +export default function rxjsToAsyncIterable(observable) { + return { + [Symbol.asyncIterator]() { + const queue = []; + let deferred = createDeferred(); + + observable.subscribe( + value => { + queue.push({ value }); + deferred.resolve(); + deferred = createDeferred(); + }, + error => queue.push({ error }), + () => queue.push({ complete: 1 }) + ); + + return { + async next() { + queue.length || (await deferred.promise); + + const { complete, error, value } = queue.shift(); + + if (complete) { + return { done: true }; + } else if (error) { + return Promise.reject(error); + } else { + return { done: false, value }; + } + } + }; + } + }; +} diff --git a/__tests__/unhappy.brokenWebSocket.js b/__tests__/unhappy.brokenWebSocket.js new file mode 100644 index 000000000..4510d420b --- /dev/null +++ b/__tests__/unhappy.brokenWebSocket.js @@ -0,0 +1,45 @@ +import 'dotenv/config'; +import 'global-agent/bootstrap'; + +import nock from 'nock'; + +import onErrorResumeNext from 'on-error-resume-next'; + +import { timeouts } from './constants.json'; +// import * as createDirectLine from './setup/createDirectLine'; +import { DirectLine } from '../src/directLine'; +import rxjsToAsyncIterable from './setup/rxjsToAsyncIterable'; + +describe('Unhappy path', () => { + let unsubscribes; + + beforeEach(() => (unsubscribes = [])); + afterEach(() => unsubscribes.forEach(fn => onErrorResumeNext(fn))); + + describe('broken Web Socket', () => { + beforeEach(async () => { + nock('https://directline.botframework.com') + .post('/v3/directline/conversations') + .reply( + 200, + JSON.stringify({ + conversationId: '123', + token: '456', + streamUrl: 'wss://localhost/' + }) + ); + }); + + test('should not through uncaught exception', async () => { + const directLine = new DirectLine({ + token: '123' + }); + + unsubscribes.push(directLine.end.bind(directLine)); + + for await (let connectionStatus of rxjsToAsyncIterable(directLine.connectionStatus$)) { + console.log(connectionStatus); + } + }); + }); +}); diff --git a/package-lock.json b/package-lock.json index 57b9797cf..ea986ceca 100644 --- a/package-lock.json +++ b/package-lock.json @@ -7177,6 +7177,12 @@ "integrity": "sha512-8xOcRHvCjnocdS5cpwXQXVzmmh5e5+saE2QGoeQmbKmRS6J3VQppPOIt0MnmE+4xlZoumy0GPG0D0MVIQbNA1A==", "dev": true }, + "lodash.set": { + "version": "4.3.2", + "resolved": "https://registry.npmjs.org/lodash.set/-/lodash.set-4.3.2.tgz", + "integrity": "sha1-2HV7HagH3eJIFrDWqEvqGnYjCyM=", + "dev": true + }, "lodash.sortby": { "version": "4.7.0", "resolved": "https://registry.npmjs.org/lodash.sortby/-/lodash.sortby-4.7.0.tgz", @@ -7247,6 +7253,14 @@ "dev": true, "requires": { "p-defer": "^1.0.0" + }, + "dependencies": { + "p-defer": { + "version": "1.0.0", + "resolved": "https://registry.npmjs.org/p-defer/-/p-defer-1.0.0.tgz", + "integrity": "sha1-n26xgvbJqozXQwBKfU+WsZaw+ww=", + "dev": true + } } }, "map-cache": { @@ -7599,6 +7613,35 @@ "integrity": "sha512-1nh45deeb5olNY7eX82BkPO7SSxR5SSYJiPTrTdFUVYwAl8CKMA5N9PjTYkHiRjisVcxcQ1HXdLhx2qxxJzLNQ==", "dev": true }, + "nock": { + "version": "13.0.4", + "resolved": "https://registry.npmjs.org/nock/-/nock-13.0.4.tgz", + "integrity": "sha512-alqTV8Qt7TUbc74x1pKRLSENzfjp4nywovcJgi/1aXDiUxXdt7TkruSTF5MDWPP7UoPVgea4F9ghVdmX0xxnSA==", + "dev": true, + "requires": { + "debug": "^4.1.0", + "json-stringify-safe": "^5.0.1", + "lodash.set": "^4.3.2", + "propagate": "^2.0.0" + }, + "dependencies": { + "debug": { + "version": "4.2.0", + "resolved": "https://registry.npmjs.org/debug/-/debug-4.2.0.tgz", + "integrity": "sha512-IX2ncY78vDTjZMFUdmsvIRFY2Cf4FnD0wRs+nQwJU8Lu99/tPFdb0VybiiMTPe3I6rQmwsqQqRBvxU+bZ/I8sg==", + "dev": true, + "requires": { + "ms": "2.1.2" + } + }, + "ms": { + "version": "2.1.2", + "resolved": "https://registry.npmjs.org/ms/-/ms-2.1.2.tgz", + "integrity": "sha512-sGkPx+VjMtmA6MX27oA4FBFELFCZZ4S4XqeGOXCv68tT+jb3vk/RyaKWP0PTKyWtmLSM0b+adUTEvbs1PEaH2w==", + "dev": true + } + } + }, "node-fetch": { "version": "2.6.0", "resolved": "https://registry.npmjs.org/node-fetch/-/node-fetch-2.6.0.tgz", @@ -7889,9 +7932,9 @@ } }, "p-defer": { - "version": "1.0.0", - "resolved": "https://registry.npmjs.org/p-defer/-/p-defer-1.0.0.tgz", - "integrity": "sha1-n26xgvbJqozXQwBKfU+WsZaw+ww=", + "version": "3.0.0", + "resolved": "https://registry.npmjs.org/p-defer/-/p-defer-3.0.0.tgz", + "integrity": "sha512-ugZxsxmtTln604yeYd29EGrNhazN2lywetzpKhfmQjW/VJmhpDmWbiX+h0zL8V91R0UXkhb3KtPmyq9PZw3aYw==", "dev": true }, "p-each-series": { @@ -8189,6 +8232,12 @@ "sisteransi": "^1.0.3" } }, + "propagate": { + "version": "2.0.1", + "resolved": "https://registry.npmjs.org/propagate/-/propagate-2.0.1.tgz", + "integrity": "sha512-vGrhOavPSTz4QVNuBNdcNXePNdNMaO1xj9yBeH1ScQPjk/rhg9sSlCXPhMkFuaNNW/syTvYqsnbIJxMBfRbbag==", + "dev": true + }, "prr": { "version": "1.0.1", "resolved": "https://registry.npmjs.org/prr/-/prr-1.0.1.tgz", diff --git a/package.json b/package.json index 316176b33..2c9e14cc5 100644 --- a/package.json +++ b/package.json @@ -56,8 +56,10 @@ "jest": "^24.9.0", "jest-environment-jsdom-fourteen": "^0.1.0", "jsdom": "^14.1.0", + "nock": "^13.0.4", "node-fetch": "^2.6.0", "on-error-resume-next": "^1.1.0", + "p-defer": "^3.0.0", "restify": "^8.4.0", "rimraf": "^3.0.0", "simple-update-in": "^2.1.1", From 82f4d15833c00d0e2109e9051a756221c44874dd Mon Sep 17 00:00:00 2001 From: William Wong Date: Thu, 26 Nov 2020 02:04:47 -0800 Subject: [PATCH 2/5] Handle onError and add tests for repeated retries --- __tests__/unhappy.brokenWebSocket.js | 96 ++++++++++++++++++++++++---- package-lock.json | 6 ++ package.json | 1 + src/directLine.ts | 17 ++++- 4 files changed, 107 insertions(+), 13 deletions(-) diff --git a/__tests__/unhappy.brokenWebSocket.js b/__tests__/unhappy.brokenWebSocket.js index 4510d420b..7f1180619 100644 --- a/__tests__/unhappy.brokenWebSocket.js +++ b/__tests__/unhappy.brokenWebSocket.js @@ -1,14 +1,27 @@ import 'dotenv/config'; import 'global-agent/bootstrap'; +import { defineEventAttribute, EventTarget } from 'event-target-shim'; import nock from 'nock'; - import onErrorResumeNext from 'on-error-resume-next'; -import { timeouts } from './constants.json'; -// import * as createDirectLine from './setup/createDirectLine'; import { DirectLine } from '../src/directLine'; -import rxjsToAsyncIterable from './setup/rxjsToAsyncIterable'; + +function corsReply(nockRequest) { + nockRequest.reply(function () { + const { headers } = this.req; + + return [ + 200, + null, + { + 'Access-Control-Allow-Headers': headers['access-control-request-headers'], + 'Access-Control-Allow-Methods': headers['access-control-request-method'], + 'Access-Control-Allow-Origin': headers.origin + } + ]; + }); +} describe('Unhappy path', () => { let unsubscribes; @@ -17,29 +30,88 @@ describe('Unhappy path', () => { afterEach(() => unsubscribes.forEach(fn => onErrorResumeNext(fn))); describe('broken Web Socket', () => { + let numErrors; + let numReconnections; + beforeEach(async () => { + numErrors = 0; + numReconnections = 0; + nock('https://directline.botframework.com') - .post('/v3/directline/conversations') + .persist() + .post(uri => uri.startsWith('/v3/directline/conversations')) + .reply( + 200, + JSON.stringify({ + conversationId: '123', + token: '456', + streamUrl: 'wss://not-exist-domain' + }) + ) + .get(uri => uri.startsWith('/v3/directline/conversations')) .reply( 200, JSON.stringify({ conversationId: '123', token: '456', - streamUrl: 'wss://localhost/' + streamUrl: 'wss://not-exist-domain' }) ); + + corsReply( + nock('https://directline.botframework.com') + .persist() + .options(uri => uri.startsWith('/v3/directline/conversations')) + ); + + window.WebSocket = class extends ( + EventTarget + ) { + constructor() { + super(); + + numReconnections++; + + setTimeout(() => { + numErrors++; + + this.dispatchEvent(new ErrorEvent('error', { error: new Error('artificial') })); + this.dispatchEvent(new CustomEvent('close')); + }, 10); + } + }; + + defineEventAttribute(window.WebSocket.prototype, 'close'); + defineEventAttribute(window.WebSocket.prototype, 'error'); }); - test('should not through uncaught exception', async () => { + test('should reconnect only once for every error', async () => { const directLine = new DirectLine({ - token: '123' + token: '123', + webSocket: true }); - unsubscribes.push(directLine.end.bind(directLine)); + // Remove retry delay + directLine.getRetryDelay = () => 0; - for await (let connectionStatus of rxjsToAsyncIterable(directLine.connectionStatus$)) { - console.log(connectionStatus); - } + unsubscribes.push(() => directLine.end()); + + await new Promise(resolve => { + const subscription = directLine.activity$.subscribe(() => {}); + + setTimeout(() => { + subscription.unsubscribe(); + resolve(); + }, 2000); + }); + + // Because we abruptly stopped reconnection after 2 seconds, there is a + // 10ms window that the number of reconnections is 1 more than number of errors. + expect(Math.abs(numReconnections - numErrors)).toBeLessThanOrEqual(1); + + // As we loop reconnections for 2000 ms, and we inject errors every 10 ms. + // We should only see at most 200 errors and reconnections. + expect(numReconnections).toBeLessThanOrEqual(200); }); }); }); diff --git a/package-lock.json b/package-lock.json index ea986ceca..fb8f1d05f 100644 --- a/package-lock.json +++ b/package-lock.json @@ -4251,6 +4251,12 @@ "integrity": "sha1-Qa4u62XvpiJorr/qg6x9eSmbCIc=", "dev": true }, + "event-target-shim": { + "version": "5.0.1", + "resolved": "https://registry.npmjs.org/event-target-shim/-/event-target-shim-5.0.1.tgz", + "integrity": "sha512-i/2XbnSz/uxRCU6+NdVJgKWDTM427+MqYbkQzD321DuCQJUqOuJKIA0IM2+W2xtYHdKOmZ4dR6fExsd4SXL+WQ==", + "dev": true + }, "eventemitter3": { "version": "4.0.4", "resolved": "https://registry.npmjs.org/eventemitter3/-/eventemitter3-4.0.4.tgz", diff --git a/package.json b/package.json index 2c9e14cc5..95785bb17 100644 --- a/package.json +++ b/package.json @@ -49,6 +49,7 @@ "babel-plugin-transform-inline-environment-variables": "^0.4.3", "concurrently": "^4.1.2", "dotenv": "^8.1.0", + "event-target-shim": "^5.0.1", "get-port": "^5.0.0", "global-agent": "^2.0.2", "has-resolved": "^1.1.0", diff --git a/src/directLine.ts b/src/directLine.ts index 5545ab1c1..1df36bd64 100644 --- a/src/directLine.ts +++ b/src/directLine.ts @@ -930,6 +930,7 @@ export class DirectLine implements IBotConnection { konsole.log("creating WebSocket", this.streamUrl); const ws = new this.services.WebSocket(this.streamUrl); let sub: Subscription; + let closed: boolean; ws.onopen = open => { konsole.log("WebSocket open", open); @@ -949,7 +950,21 @@ export class DirectLine implements IBotConnection { ws.onclose = close => { konsole.log("WebSocket close", close); if (sub) sub.unsubscribe(); - subscriber.error(close); + + // RxJS.retryWhen has a bug that would cause "error" signal to be sent after the observable is completed/errored. + // We need to guard against extraneous "error" signal to workaround the bug. + !closed && subscriber.error(close); + closed = true; + } + + ws.onerror = error => { + konsole.log("WebSocket error", error); + if (sub) sub.unsubscribe(); + + // RxJS.retryWhen has a bug that would cause "error" signal to be sent after the observable is completed/errored. + // We need to guard against extraneous "error" signal to workaround the bug. + !closed && subscriber.error(error); + closed = true; } ws.onmessage = message => message.data && subscriber.next(JSON.parse(message.data)); From 3fe1a758569088115250b504ca4a4c325c849f95 Mon Sep 17 00:00:00 2001 From: William Wong Date: Thu, 26 Nov 2020 02:22:24 -0800 Subject: [PATCH 3/5] Clean up --- __tests__/rxjsToAsyncIterable.js | 43 -------------------------- __tests__/setup/rxjsToAsyncIterable.js | 36 --------------------- package.json | 1 - 3 files changed, 80 deletions(-) delete mode 100644 __tests__/rxjsToAsyncIterable.js delete mode 100644 __tests__/setup/rxjsToAsyncIterable.js diff --git a/__tests__/rxjsToAsyncIterable.js b/__tests__/rxjsToAsyncIterable.js deleted file mode 100644 index 4067a9e89..000000000 --- a/__tests__/rxjsToAsyncIterable.js +++ /dev/null @@ -1,43 +0,0 @@ -import { Observable } from 'rxjs/Observable'; - -import rxjsToAsyncIterable from './setup/rxjsToAsyncIterable'; - -test('should iterate 3 items', async () => { - const actual = []; - const observable = new Observable(observer => { - observer.next(1); - observer.next(2); - observer.next(3); - observer.complete(); - }); - - const iterable = rxjsToAsyncIterable(observable); - - for await (let value of iterable) { - actual.push(value); - } - - expect(actual).toEqual([1, 2, 3]); -}); - -test('should iterate 3 items then reject', async () => { - const actual = []; - const observable = new Observable(observer => { - observer.next(1); - observer.next(2); - observer.next(3); - observer.error(new Error('artificial')); - }); - - const iterable = rxjsToAsyncIterable(observable); - - await expect( - (async () => { - for await (let value of iterable) { - actual.push(value); - } - })() - ).rejects.toThrow('artificial'); - - expect(actual).toEqual([1, 2, 3]); -}); diff --git a/__tests__/setup/rxjsToAsyncIterable.js b/__tests__/setup/rxjsToAsyncIterable.js deleted file mode 100644 index d6d33c812..000000000 --- a/__tests__/setup/rxjsToAsyncIterable.js +++ /dev/null @@ -1,36 +0,0 @@ -import createDeferred from 'p-defer'; - -export default function rxjsToAsyncIterable(observable) { - return { - [Symbol.asyncIterator]() { - const queue = []; - let deferred = createDeferred(); - - observable.subscribe( - value => { - queue.push({ value }); - deferred.resolve(); - deferred = createDeferred(); - }, - error => queue.push({ error }), - () => queue.push({ complete: 1 }) - ); - - return { - async next() { - queue.length || (await deferred.promise); - - const { complete, error, value } = queue.shift(); - - if (complete) { - return { done: true }; - } else if (error) { - return Promise.reject(error); - } else { - return { done: false, value }; - } - } - }; - } - }; -} diff --git a/package.json b/package.json index 95785bb17..631981163 100644 --- a/package.json +++ b/package.json @@ -60,7 +60,6 @@ "nock": "^13.0.4", "node-fetch": "^2.6.0", "on-error-resume-next": "^1.1.0", - "p-defer": "^3.0.0", "restify": "^8.4.0", "rimraf": "^3.0.0", "simple-update-in": "^2.1.1", From 026a75915de18393d9a98878c035a53af36a8047 Mon Sep 17 00:00:00 2001 From: William Wong Date: Thu, 26 Nov 2020 02:30:42 -0800 Subject: [PATCH 4/5] Clean up deps --- package-lock.json | 26 +++++++++----------------- package.json | 2 +- 2 files changed, 10 insertions(+), 18 deletions(-) diff --git a/package-lock.json b/package-lock.json index fb8f1d05f..8ad6d0e56 100644 --- a/package-lock.json +++ b/package-lock.json @@ -7259,14 +7259,6 @@ "dev": true, "requires": { "p-defer": "^1.0.0" - }, - "dependencies": { - "p-defer": { - "version": "1.0.0", - "resolved": "https://registry.npmjs.org/p-defer/-/p-defer-1.0.0.tgz", - "integrity": "sha1-n26xgvbJqozXQwBKfU+WsZaw+ww=", - "dev": true - } } }, "map-cache": { @@ -7620,9 +7612,9 @@ "dev": true }, "nock": { - "version": "13.0.4", - "resolved": "https://registry.npmjs.org/nock/-/nock-13.0.4.tgz", - "integrity": "sha512-alqTV8Qt7TUbc74x1pKRLSENzfjp4nywovcJgi/1aXDiUxXdt7TkruSTF5MDWPP7UoPVgea4F9ghVdmX0xxnSA==", + "version": "13.0.5", + "resolved": "https://registry.npmjs.org/nock/-/nock-13.0.5.tgz", + "integrity": "sha512-1ILZl0zfFm2G4TIeJFW0iHknxr2NyA+aGCMTjDVUsBY4CkMRispF1pfIYkTRdAR/3Bg+UzdEuK0B6HczMQZcCg==", "dev": true, "requires": { "debug": "^4.1.0", @@ -7632,9 +7624,9 @@ }, "dependencies": { "debug": { - "version": "4.2.0", - "resolved": "https://registry.npmjs.org/debug/-/debug-4.2.0.tgz", - "integrity": "sha512-IX2ncY78vDTjZMFUdmsvIRFY2Cf4FnD0wRs+nQwJU8Lu99/tPFdb0VybiiMTPe3I6rQmwsqQqRBvxU+bZ/I8sg==", + "version": "4.3.1", + "resolved": "https://registry.npmjs.org/debug/-/debug-4.3.1.tgz", + "integrity": "sha512-doEwdvm4PCeK4K3RQN2ZC2BYUBaxwLARCqZmMjtF8a51J2Rb0xpVloFRnCODwqjpwnAoao4pelN8l3RJdv3gRQ==", "dev": true, "requires": { "ms": "2.1.2" @@ -7938,9 +7930,9 @@ } }, "p-defer": { - "version": "3.0.0", - "resolved": "https://registry.npmjs.org/p-defer/-/p-defer-3.0.0.tgz", - "integrity": "sha512-ugZxsxmtTln604yeYd29EGrNhazN2lywetzpKhfmQjW/VJmhpDmWbiX+h0zL8V91R0UXkhb3KtPmyq9PZw3aYw==", + "version": "1.0.0", + "resolved": "https://registry.npmjs.org/p-defer/-/p-defer-1.0.0.tgz", + "integrity": "sha1-n26xgvbJqozXQwBKfU+WsZaw+ww=", "dev": true }, "p-each-series": { diff --git a/package.json b/package.json index 631981163..536e47d9e 100644 --- a/package.json +++ b/package.json @@ -57,7 +57,7 @@ "jest": "^24.9.0", "jest-environment-jsdom-fourteen": "^0.1.0", "jsdom": "^14.1.0", - "nock": "^13.0.4", + "nock": "^13.0.5", "node-fetch": "^2.6.0", "on-error-resume-next": "^1.1.0", "restify": "^8.4.0", From 6fc932dd1e3c4ad11fb84db02bd0d47ade987dd3 Mon Sep 17 00:00:00 2001 From: William Wong Date: Thu, 26 Nov 2020 02:31:40 -0800 Subject: [PATCH 5/5] Clean up --- src/directLine.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/directLine.ts b/src/directLine.ts index 1df36bd64..07f5d6ea1 100644 --- a/src/directLine.ts +++ b/src/directLine.ts @@ -953,7 +953,7 @@ export class DirectLine implements IBotConnection { // RxJS.retryWhen has a bug that would cause "error" signal to be sent after the observable is completed/errored. // We need to guard against extraneous "error" signal to workaround the bug. - !closed && subscriber.error(close); + closed || subscriber.error(close); closed = true; } @@ -963,7 +963,7 @@ export class DirectLine implements IBotConnection { // RxJS.retryWhen has a bug that would cause "error" signal to be sent after the observable is completed/errored. // We need to guard against extraneous "error" signal to workaround the bug. - !closed && subscriber.error(error); + closed || subscriber.error(error); closed = true; }