Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
117 changes: 117 additions & 0 deletions __tests__/unhappy.brokenWebSocket.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
import 'dotenv/config';
import 'global-agent/bootstrap';

import { defineEventAttribute, EventTarget } from 'event-target-shim';
import nock from 'nock';
import onErrorResumeNext from 'on-error-resume-next';

import { DirectLine } from '../src/directLine';

function corsReply(nockRequest) {
nockRequest.reply(function () {
const { headers } = this.req;

return [
200,
null,
{
'Access-Control-Allow-Headers': headers['access-control-request-headers'],
'Access-Control-Allow-Methods': headers['access-control-request-method'],
'Access-Control-Allow-Origin': headers.origin
}
];
});
}

describe('Unhappy path', () => {
let unsubscribes;

beforeEach(() => (unsubscribes = []));
afterEach(() => unsubscribes.forEach(fn => onErrorResumeNext(fn)));

describe('broken Web Socket', () => {
let numErrors;
let numReconnections;

beforeEach(async () => {
numErrors = 0;
numReconnections = 0;

nock('https://directline.botframework.com')
.persist()
.post(uri => uri.startsWith('/v3/directline/conversations'))
.reply(
200,
JSON.stringify({
conversationId: '123',
token: '456',
streamUrl: 'wss://not-exist-domain'
})
)
.get(uri => uri.startsWith('/v3/directline/conversations'))
.reply(
200,
JSON.stringify({
conversationId: '123',
token: '456',
streamUrl: 'wss://not-exist-domain'
})
);

corsReply(
nock('https://directline.botframework.com')
.persist()
.options(uri => uri.startsWith('/v3/directline/conversations'))
);

window.WebSocket = class extends (
EventTarget
) {
constructor() {
super();

numReconnections++;

setTimeout(() => {
numErrors++;

this.dispatchEvent(new ErrorEvent('error', { error: new Error('artificial') }));
this.dispatchEvent(new CustomEvent('close'));
}, 10);
}
};

defineEventAttribute(window.WebSocket.prototype, 'close');
defineEventAttribute(window.WebSocket.prototype, 'error');
});

test('should reconnect only once for every error', async () => {
const directLine = new DirectLine({
token: '123',
webSocket: true
});

// Remove retry delay
directLine.getRetryDelay = () => 0;

unsubscribes.push(() => directLine.end());

await new Promise(resolve => {
const subscription = directLine.activity$.subscribe(() => {});

setTimeout(() => {
subscription.unsubscribe();
resolve();
}, 2000);
});

// Because we abruptly stopped reconnection after 2 seconds, there is a
// 10ms window that the number of reconnections is 1 more than number of errors.
expect(Math.abs(numReconnections - numErrors)).toBeLessThanOrEqual(1);

// As we loop reconnections for 2000 ms, and we inject errors every 10 ms.
// We should only see at most 200 errors and reconnections.
expect(numReconnections).toBeLessThanOrEqual(200);
});
});
});
47 changes: 47 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,15 @@
"babel-plugin-transform-inline-environment-variables": "^0.4.3",
"concurrently": "^4.1.2",
"dotenv": "^8.1.0",
"event-target-shim": "^5.0.1",
"get-port": "^5.0.0",
"global-agent": "^2.0.2",
"has-resolved": "^1.1.0",
"http-proxy": "^1.18.1",
"jest": "^24.9.0",
"jest-environment-jsdom-fourteen": "^0.1.0",
"jsdom": "^14.1.0",
"nock": "^13.0.5",
"node-fetch": "^2.6.0",
"on-error-resume-next": "^1.1.0",
"restify": "^8.4.0",
Expand Down
17 changes: 16 additions & 1 deletion src/directLine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -930,6 +930,7 @@ export class DirectLine implements IBotConnection {
konsole.log("creating WebSocket", this.streamUrl);
const ws = new this.services.WebSocket(this.streamUrl);
let sub: Subscription;
let closed: boolean;

ws.onopen = open => {
konsole.log("WebSocket open", open);
Expand All @@ -949,7 +950,21 @@ export class DirectLine implements IBotConnection {
ws.onclose = close => {
konsole.log("WebSocket close", close);
if (sub) sub.unsubscribe();
subscriber.error(close);

// RxJS.retryWhen has a bug that would cause "error" signal to be sent after the observable is completed/errored.
// We need to guard against extraneous "error" signal to workaround the bug.
closed || subscriber.error(close);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

error [](start = 37, length = 5)

normal web socket close should lead to non-error RX stream completion, right?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am keeping the original logic as-is, i.e. onclose will throw an error. And I think it's intentional.

If you look at line 910,

// WebSockets can be closed by the server or the browser. In the former case we need to
, the comment said:

this.observableWebSocket<ActivityGroup>()
  // WebSockets can be closed by the server or the browser. In the former case we need to
  // retrieve a new streamUrl. In the latter case we could first retry with the current streamUrl,
  // but it's simpler just to always fetch a new one.
  .retryWhen(error$ => error$.delay(this.getRetryDelay(), this.services.scheduler).mergeMap(error => this.reconnectToConversation()))

Looks like the author of the logic intentionally want to retry whenever the server or browser close the connection. And to do that, the code need to throw error on graceful close.

What do you think?

closed = true;
}

ws.onerror = error => {
konsole.log("WebSocket error", error);
if (sub) sub.unsubscribe();

// RxJS.retryWhen has a bug that would cause "error" signal to be sent after the observable is completed/errored.
// We need to guard against extraneous "error" signal to workaround the bug.
closed || subscriber.error(error);
closed = true;
}

ws.onmessage = message => message.data && subscriber.next(JSON.parse(message.data));
Expand Down