diff --git a/.babelrc.js b/.babelrc.js index 402fb2b7d..0d36eacc0 100644 --- a/.babelrc.js +++ b/.babelrc.js @@ -21,7 +21,13 @@ module.exports = { '@babel/plugin-proposal-class-properties', '@babel/plugin-proposal-object-rest-spread', '@babel/plugin-transform-runtime', - 'babel-plugin-transform-inline-environment-variables' + [ + "transform-inline-environment-variables", { + "include": [ + "npm_package_version" + ] + } + ] ], presets: [ ['@babel/preset-env', { diff --git a/.vscode/settings.json b/.vscode/settings.json index 6a6f21c48..b2433c70b 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -5,5 +5,6 @@ "files.trimTrailingWhitespace": true, "search.exclude": { "lib": true - } + }, + "debug.node.autoAttach": "on" } diff --git a/src/directLine.mock.ts b/src/directLine.mock.ts new file mode 100644 index 000000000..152fba1af --- /dev/null +++ b/src/directLine.mock.ts @@ -0,0 +1,287 @@ +import * as DirectLineExport from "./directLine"; +import { TestScheduler, Observable } from "rxjs"; +import { AjaxCreationMethod, AjaxRequest, AjaxResponse } from "rxjs/observable/dom/AjaxObservable"; +import { URL, URLSearchParams } from 'url'; + +// MOCK helpers + +const notImplemented = (): never => { throw new Error('not implemented') }; + +// MOCK Activity + +export const mockActivity = (text: string): DirectLineExport.Activity => ({ type: 'message', from: { id: 'sender' }, text }); + +// MOCK DirectLine Server (shared state used by Observable.ajax and WebSocket mocks) + +interface ActivitySocket { + play: (start: number, after: number) => void; +} + +export type Socket = WebSocket & ActivitySocket; + +export interface Conversation { + sockets: Set; + conversationId: string; + history: Array; + token: string; +} + +export interface Server { + scheduler: TestScheduler; + conversation: Conversation; +} + +const tokenPrefix = 'token'; + +export const mockServer = (scheduler: TestScheduler): Server => ({ + scheduler, + conversation: { + sockets: new Set(), + conversationId: 'OneConversation', + history: [], + token: tokenPrefix + '1', + } +}); + +const tokenResponse = (server: Server, request: AjaxRequest): AjaxResponse | null => { + const { headers } = request; + const authorization = headers['Authorization']; + if (authorization === `Bearer ${server.conversation.token}`) { + return null; + } + + const response: Partial = { + status: 403, + } + + return response as AjaxResponse; +} + +export const injectClose = (server: Server): void => + server.conversation.sockets.forEach(s => s.onclose(new CloseEvent('close'))); + +export const injectNewToken = (server: Server): void => { + const { conversation } = server; + const suffix = Number.parseInt(conversation.token.substring(tokenPrefix.length), 10) + 1 + conversation.token = tokenPrefix + suffix; +} + +const keyWatermark = 'watermark'; + +type ajaxType = (urlOrRequest: string | AjaxRequest) => AjaxResponse; + +// MOCK Observable.ajax (uses shared state in Server) + +export const mockAjax = (server: Server, customAjax?: ajaxType): AjaxCreationMethod => { + + const uriBase = new URL('https://directline.botframework.com/v3/directline/'); + const createStreamUrl = (watermark: number): string => { + const uri = new URL('conversations/stream', uriBase); + if (watermark > 0) { + const params = new URLSearchParams(); + params.append(keyWatermark, watermark.toString(10)); + uri.search = params.toString(); + } + + return uri.toString(); + } + + const jax = customAjax || ((urlOrRequest: string | AjaxRequest): AjaxResponse => { + if (typeof urlOrRequest === 'string') { + throw new Error(); + } + + const uri = new URL(urlOrRequest.url); + + const { pathname, searchParams } = uri; + + const parts = pathname.split('/'); + + if (parts[3] === 'tokens' && parts[4] === 'refresh') { + + const response: Partial = { + response: { token: server.conversation.token } + }; + + return response as AjaxResponse; + } + + if (parts[3] !== 'conversations') { + throw new Error(); + } + + if (parts.length === 4) { + const conversation: DirectLineExport.Conversation = { + conversationId: server.conversation.conversationId, + token: server.conversation.token, + streamUrl: createStreamUrl(0), + }; + + const response: Partial = { + response: conversation, + } + + return response as AjaxResponse; + } + + if (parts[4] !== server.conversation.conversationId) { + throw new Error(); + } + + if (parts[5] === 'activities') { + const responseToken = tokenResponse(server, urlOrRequest); + if (responseToken !== null) { + return responseToken; + } + + const activity: DirectLineExport.Activity = urlOrRequest.body; + + const after = server.conversation.history.push(activity); + const start = after - 1; + + for (const socket of server.conversation.sockets) { + socket.play(start, after); + } + + const response: Partial = { + response: { id: 'messageId' }, + } + + return response as AjaxResponse; + } + else if (parts.length === 5) { + const responseToken = tokenResponse(server, urlOrRequest); + if (responseToken !== null) { + return responseToken; + } + + const watermark = searchParams.get('watermark'); + const start = Number.parseInt(watermark, 10); + + const conversation: DirectLineExport.Conversation = { + conversationId: server.conversation.conversationId, + token: server.conversation.token, + streamUrl: createStreamUrl(start), + }; + + const response: Partial = { + response: conversation, + } + + return response as AjaxResponse; + } + + throw new Error(); + }); + + const method = (urlOrRequest: string | AjaxRequest): Observable => + new Observable(subscriber => { + try { + subscriber.next(jax(urlOrRequest)); + subscriber.complete(); + } + catch (error) { + subscriber.error(error); + } + }); + + type ValueType = { + [K in keyof T]: T[K] extends V ? T[K] : never; + } + + type Properties = ValueType; + + const properties: Properties = { + get: (url: string, headers?: Object): Observable => notImplemented(), + post: (url: string, body?: any, headers?: Object): Observable => notImplemented(), + put: (url: string, body?: any, headers?: Object): Observable => notImplemented(), + patch: (url: string, body?: any, headers?: Object): Observable => notImplemented(), + delete: (url: string, headers?: Object): Observable => notImplemented(), + getJSON: (url: string, headers?: Object) => notImplemented(), + }; + + return Object.assign(method, properties); +} + +// MOCK WebSocket (uses shared state in Server) + +type WebSocketConstructor = typeof WebSocket; +type EventHandler = (this: WebSocket, ev: E) => any; + +export const mockWebSocket = (server: Server): WebSocketConstructor => + class MockWebSocket implements WebSocket, ActivitySocket { + constructor(url: string, protocols?: string | string[]) { + + server.scheduler.schedule(() => { + this.readyState = WebSocket.CONNECTING; + server.conversation.sockets.add(this); + this.onopen(new Event('open')); + this.readyState = WebSocket.OPEN; + const uri = new URL(url); + const watermark = uri.searchParams.get(keyWatermark) + if (watermark !== null) { + const start = Number.parseInt(watermark, 10); + this.play(start, server.conversation.history.length); + } + }); + } + + play(start: number, after: number) { + + const { conversation: { history } } = server; + const activities = history.slice(start, after); + const watermark = history.length.toString(); + const activityGroup: DirectLineExport.ActivityGroup = { + activities, + watermark, + } + + const message = new MessageEvent('type', { data: JSON.stringify(activityGroup) }); + + this.onmessage(message); + } + + binaryType: BinaryType = 'arraybuffer'; + readonly bufferedAmount: number = 0; + readonly extensions: string = ''; + readonly protocol: string = 'https'; + readyState: number = WebSocket.CLOSED; + readonly url: string = ''; + readonly CLOSED: number = WebSocket.CLOSED; + readonly CLOSING: number = WebSocket.CLOSING; + readonly CONNECTING: number = WebSocket.CONNECTING; + readonly OPEN: number = WebSocket.OPEN; + + onclose: EventHandler; + onerror: EventHandler; + onmessage: EventHandler; + onopen: EventHandler; + + close(code?: number, reason?: string): void { + this.readyState = WebSocket.CLOSING; + this.onclose(new CloseEvent('close')) + server.conversation.sockets.delete(this); + this.readyState = WebSocket.CLOSED; + } + + send(data: string | ArrayBufferLike | Blob | ArrayBufferView): void { + } + + addEventListener() { throw new Error(); } + removeEventListener() { throw new Error(); } + dispatchEvent(): boolean { throw new Error(); } + + static CLOSED = WebSocket.CLOSED; + static CLOSING = WebSocket.CLOSING; + static CONNECTING = WebSocket.CONNECTING; + static OPEN = WebSocket.OPEN; + }; + +// MOCK services (top-level aggregation of all mocks) + +export const mockServices = (server: Server, scheduler: TestScheduler): DirectLineExport.Services => ({ + scheduler, + WebSocket: mockWebSocket(server), + ajax: mockAjax(server), + random: () => 0, +}); diff --git a/src/directLine.test.ts b/src/directLine.test.ts index 98486c7a4..a2c33a730 100644 --- a/src/directLine.test.ts +++ b/src/directLine.test.ts @@ -1,4 +1,8 @@ import * as DirectLineExport from "./directLine"; +import * as DirectLineMock from './directLine.mock'; +import { TestScheduler, Observable, Subscription, AjaxResponse } from "rxjs"; +// @ts-ignore +import { version } from "../package.json"; declare var process: { arch: string; @@ -17,7 +21,7 @@ test("#setConnectionStatusFallback", () => { const testFallback = setConnectionStatusFallback(0, 1); let idx = 4; while (idx--) { - expect(testFallback(0)).toBe(0); + expect(testFallback(0)).toBe(0); } // fallback will be triggered expect(testFallback(0)).toBe(1); @@ -29,7 +33,7 @@ test("#setConnectionStatusFallback", () => { }); describe("#commonHeaders", () => { - const botAgent = "DirectLine/3.0 (directlinejs; custom-bot-agent)"; + const botAgent = `DirectLine/3.0 (directlinejs; custom-bot-agent ${version})`; let botConnection; beforeEach(() => { @@ -61,3 +65,224 @@ describe("#commonHeaders", () => { }); }) }); + +describe('MockSuite', () => { + + const lazyConcat = (items: Iterable>): Observable => + new Observable(subscriber => { + const iterator = items[Symbol.iterator](); + let inner: Subscription | undefined; + + const pump = () => { + try { + const result = iterator.next(); + if (result.done === true) { + subscriber.complete(); + } + else { + inner = result.value.subscribe( + value => subscriber.next(value), + error => subscriber.error(error), + pump + ); + } + } + catch (error) { + subscriber.error(error); + } + }; + + pump(); + + return () => { + if (typeof inner !== 'undefined') { + inner.unsubscribe(); + } + }; + }); + + let scheduler: TestScheduler; + let server: DirectLineMock.Server; + let services: DirectLineExport.Services; + let subscriptions: Array; + let directline: DirectLineExport.DirectLine; + + beforeEach(() => { + scheduler = new TestScheduler((actual, expected) => expect(expected).toBe(actual)); + scheduler.maxFrames = 60 * 1000; + server = DirectLineMock.mockServer(scheduler); + services = DirectLineMock.mockServices(server, scheduler); + directline = new DirectLineExport.DirectLine(services); + subscriptions = []; + }); + + afterEach(() => { + for (const subscription of subscriptions) { + subscription.unsubscribe(); + } + }) + + const expected = { + x: DirectLineMock.mockActivity('x'), + y: DirectLineMock.mockActivity('y'), + z: DirectLineMock.mockActivity('z'), + }; + + test('HappyPath', () => { + // arrange + + const scenario = function* (): IterableIterator> { + yield Observable.timer(200, scheduler); + yield directline.postActivity(expected.x); + yield Observable.timer(200, scheduler); + yield directline.postActivity(expected.y); + yield Observable.timer(200, scheduler); + }; + + subscriptions.push(lazyConcat(scenario()).observeOn(scheduler).subscribe()); + + const actual: Array = []; + subscriptions.push(directline.activity$.subscribe(a => actual.push(a))); + + // act + + scheduler.flush(); + + // assert + + expect(actual).toStrictEqual([expected.x, expected.y]); + }); + + test('ReconnectOnClose', () => { + // arrange + + const scenario = function* (): IterableIterator> { + yield Observable.timer(200, scheduler); + yield directline.postActivity(expected.x); + DirectLineMock.injectClose(server); + yield Observable.timer(200, scheduler); + yield directline.postActivity(expected.y); + yield Observable.timer(200, scheduler); + }; + + subscriptions.push(lazyConcat(scenario()).observeOn(scheduler).subscribe()); + + const actual: Array = []; + subscriptions.push(directline.activity$.subscribe(a => actual.push(a))); + + // act + + scheduler.flush(); + + // assert + + expect(actual).toStrictEqual([expected.x, expected.y]); + }); + + test('BotAgentWithMocks', () => { + const expected: string = `DirectLine/3.0 (directlinejs ${version})`; + + //@ts-ignore + const actual: string = directline.commonHeaders()["x-ms-bot-agent"]; + expect(actual).toStrictEqual(expected) + }); + + test('RetryAfterHeader', () => { + services.ajax = DirectLineMock.mockAjax(server, (urlOrRequest) => { + + if(typeof urlOrRequest === 'string'){ + throw new Error(); + } + + if(urlOrRequest.url && urlOrRequest.url.indexOf(server.conversation.conversationId)>0){ + const response: Partial = { + status: 429, + xhr:{ + getResponseHeader: (name) => "10" + } as XMLHttpRequest + }; + const error = new Error('Ajax Error'); + throw Object.assign(error, response); + } + else if(urlOrRequest.url && urlOrRequest.url.indexOf('/conversations') > 0){ + // start conversation + const response: Partial = { + response: server.conversation, + status: 201, + xhr: { + getResponseHeader: (name) => 'n/a' + } as XMLHttpRequest + }; + return response as AjaxResponse; + } + throw new Error(); + }); + directline = new DirectLineExport.DirectLine(services); + + let startTime: number; + let endTime: number; + const scenario = function* (): IterableIterator> { + yield Observable.timer(200, scheduler); + startTime = scheduler.now(); + yield directline.postActivity(expected.x); + }; + + let actualError: Error; + try{ + subscriptions.push(lazyConcat(scenario()).observeOn(scheduler).subscribe()); + scheduler.flush(); + } + catch(error){ + actualError = error; + endTime = scheduler.now(); + } + expect(actualError.message).toStrictEqual('Ajax Error'); + // @ts-ignore + expect(actualError.status).toStrictEqual(429); + expect(endTime - startTime).toStrictEqual(10); + }); + + test('SendDebugHeader', () => { + let expectedBotId:string; + const actual = 'botid'; + services.ajax = DirectLineMock.mockAjax(server, (urlOrRequest) => { + if(typeof urlOrRequest === 'string'){ + throw new Error(); + } + + if(urlOrRequest.url && urlOrRequest.url.indexOf(server.conversation.conversationId)>0){ + const response: Partial = { + response: {id:'blah'}, + status: 200 + }; + expectedBotId = urlOrRequest.headers['x-ms-botid']; + return response as AjaxResponse; + } + else if(urlOrRequest.url && urlOrRequest.url.indexOf('/conversations') > 0){ + // start conversation + const response: Partial = { + response: server.conversation, + status: 201, + xhr: { + getResponseHeader: (name) => actual + } as XMLHttpRequest + }; + return response as AjaxResponse; + } + throw new Error(); + }); + directline = new DirectLineExport.DirectLine(services); + const scenario = function* (): IterableIterator> { + yield Observable.timer(200, scheduler); + yield directline.postActivity(expected.x).catch(() => Observable.empty(scheduler)); + }; + + // lack of subscribe arguments means that the empty subscriber is used + // the empty subscriber will propagate observable errors on the JS call stack + // within the scheduler notification action handling loop because of the observeOn + subscriptions.push(lazyConcat(scenario()).observeOn(scheduler).subscribe()); + scheduler.flush(); + + expect(expectedBotId).toBe(actual); + }); +}); diff --git a/src/directLine.ts b/src/directLine.ts index 54c426c64..0b9e174a5 100644 --- a/src/directLine.ts +++ b/src/directLine.ts @@ -1,10 +1,12 @@ // In order to keep file size down, only import the parts of rxjs that we use -import { AjaxResponse, AjaxRequest } from 'rxjs/observable/dom/AjaxObservable'; +import { AjaxResponse, AjaxCreationMethod, AjaxRequest, AjaxError } from 'rxjs/observable/dom/AjaxObservable'; import { BehaviorSubject } from 'rxjs/BehaviorSubject'; import { Observable } from 'rxjs/Observable'; +import { IScheduler } from 'rxjs/Scheduler'; import { Subscriber } from 'rxjs/Subscriber'; import { Subscription } from 'rxjs/Subscription'; +import { async as AsyncScheduler } from 'rxjs/scheduler/async'; import 'rxjs/add/operator/catch'; import 'rxjs/add/operator/combineLatest'; @@ -14,6 +16,7 @@ import 'rxjs/add/operator/do'; import 'rxjs/add/operator/filter'; import 'rxjs/add/operator/map'; import 'rxjs/add/operator/mergeMap'; +import 'rxjs/add/operator/concatMap'; import 'rxjs/add/operator/retryWhen'; import 'rxjs/add/operator/share'; import 'rxjs/add/operator/take'; @@ -26,6 +29,7 @@ import 'rxjs/add/observable/of'; import 'rxjs/add/observable/throw'; import dedupeFilenames from './dedupeFilenames'; +import { objectExpression } from '@babel/types'; const DIRECT_LINE_VERSION = 'DirectLine/3.0'; @@ -33,6 +37,7 @@ declare var process: { arch: string; env: { VERSION: string; + npm_package_version: string; }; platform: string; release: string; @@ -333,7 +338,7 @@ export interface EventActivity extends IActivity { export type Activity = Message | Typing | EventActivity; -interface ActivityGroup { +export interface ActivityGroup { activities: Activity[], watermark: string } @@ -362,6 +367,57 @@ export interface DirectLineOptions { botAgent?: string } +export interface Services { + scheduler: IScheduler; + WebSocket: typeof WebSocket; + ajax: AjaxCreationMethod; + random: () => number; +} + +const wrapAjaxWithRetry = (source: AjaxCreationMethod, scheduler: IScheduler): AjaxCreationMethod =>{ + + const notImplemented = (): never => { throw new Error('not implemented'); }; + + const inner = (response$ : Observable) => { + return response$ + .catch((err) => { + if(err.status === 429){ + const retryAfterValue = err.xhr.getResponseHeader('Retry-After'); + const retryAfter = Number(retryAfterValue); + if(!isNaN(retryAfter)){ + return Observable.timer(retryAfter, scheduler) + .flatMap(_ => Observable.throw(err, scheduler)); + } + } + + return Observable.throw(err, scheduler); + }); + }; + + const outer = (urlOrRequest: string| AjaxRequest) => { + return inner(source(urlOrRequest)); + }; + + return Object.assign(outer, { + get: (url: string, headers?: Object): Observable => notImplemented(), + post: (url: string, body?: any, headers?: Object): Observable => notImplemented(), + put: (url: string, body?: any, headers?: Object): Observable => notImplemented(), + patch: (url: string, body?: any, headers?: Object): Observable => notImplemented(), + delete: (url: string, headers?: Object): Observable => notImplemented(), + getJSON: (url: string, headers?: Object): Observable => notImplemented() + }); +} + +const makeServices = (services: Partial): Services => { + const scheduler = services.scheduler || AsyncScheduler; + return { + scheduler, + ajax: wrapAjaxWithRetry(services.ajax || Observable.ajax, scheduler), + WebSocket: services.WebSocket || WebSocket, + random: services.random || Math.random, + } +} + const lifetimeRefreshToken = 30 * 60 * 1000; const intervalRefreshToken = lifetimeRefreshToken / 2; const timeout = 20 * 1000; @@ -403,14 +459,16 @@ export class DirectLine implements IBotConnection { private watermark = ''; private streamUrl: string; private _botAgent = ''; + private services: Services; private _userAgent: string; public referenceGrammarId: string; + private botIdHeader: string; private pollingInterval: number = 1000; //ms private tokenRefreshSubscription: Subscription; - constructor(options: DirectLineOptions) { + constructor(options: DirectLineOptions & Partial) { this.secret = options.secret; this.token = options.secret || options.token; this.webSocket = (options.webSocket === undefined ? true : options.webSocket) && typeof WebSocket !== 'undefined' && WebSocket !== undefined; @@ -437,6 +495,8 @@ export class DirectLine implements IBotConnection { this._botAgent = this.getBotAgent(options.botAgent); + this.services = makeServices(options); + const parsedPollingInterval = ~~options.pollingInterval; if (parsedPollingInterval < POLLING_INTERVAL_LOWER_BOUND) { @@ -470,7 +530,7 @@ export class DirectLine implements IBotConnection { //if token and streamUrl are defined it means reconnect has already been done. Skipping it. if (this.token && this.streamUrl) { this.connectionStatus$.next(ConnectionStatus.Online); - return Observable.of(connectionStatus); + return Observable.of(connectionStatus, this.services.scheduler); } else { return this.startConversation().do(conversation => { this.conversationId = conversation.conversationId; @@ -488,23 +548,23 @@ export class DirectLine implements IBotConnection { } } else { - return Observable.of(connectionStatus); + return Observable.of(connectionStatus, this.services.scheduler); } }) .filter(connectionStatus => connectionStatus != ConnectionStatus.Uninitialized && connectionStatus != ConnectionStatus.Connecting) .flatMap(connectionStatus => { switch (connectionStatus) { case ConnectionStatus.Ended: - return Observable.throw(errorConversationEnded); + return Observable.throw(errorConversationEnded, this.services.scheduler); case ConnectionStatus.FailedToConnect: - return Observable.throw(errorFailedToConnect); + return Observable.throw(errorFailedToConnect, this.services.scheduler); case ConnectionStatus.ExpiredToken: - return Observable.of(connectionStatus); + return Observable.of(connectionStatus, this.services.scheduler); default: - return Observable.of(connectionStatus); + return Observable.of(connectionStatus, this.services.scheduler); } }) @@ -545,8 +605,7 @@ export class DirectLine implements IBotConnection { ? `${this.domain}/conversations/${this.conversationId}?watermark=${this.watermark}` : `${this.domain}/conversations`; const method = this.conversationId ? "GET" : "POST"; - - return Observable.ajax({ + return this.services.ajax({ method, url, timeout, @@ -556,21 +615,30 @@ export class DirectLine implements IBotConnection { } }) // .do(ajaxResponse => konsole.log("conversation ajaxResponse", ajaxResponse.response)) - .map(ajaxResponse => ajaxResponse.response as Conversation) + .map(ajaxResponse => { + try{ + if(!this.botIdHeader ){ + this.botIdHeader = ajaxResponse.xhr.getResponseHeader('x-ms-bot-id'); + } + } + catch{/*don't care if the above throws for any reason*/} + return ajaxResponse.response as Conversation; + }) .retryWhen(error$ => // for now we deem 4xx and 5xx errors as unrecoverable // for everything else (timeouts), retry for a while - error$.mergeMap(error => error.status >= 400 && error.status < 600 - ? Observable.throw(error) - : Observable.of(error) - ) - .delay(timeout) + error$.mergeMap((error) => { + return error.status >= 400 && error.status < 600 + ? Observable.throw(error, this.services.scheduler) + : Observable.of(error, this.services.scheduler) + }) + .delay(timeout, this.services.scheduler) .take(retries) ) } private refreshTokenLoop() { - this.tokenRefreshSubscription = Observable.interval(intervalRefreshToken) + this.tokenRefreshSubscription = Observable.interval(intervalRefreshToken, this.services.scheduler) .flatMap(_ => this.refreshToken()) .subscribe(token => { konsole.log("refreshing token", token, "at", new Date()); @@ -581,7 +649,7 @@ export class DirectLine implements IBotConnection { private refreshToken() { return this.checkConnection(true) .flatMap(_ => - Observable.ajax({ + this.services.ajax({ method: "POST", url: `${this.domain}/tokens/refresh`, timeout, @@ -595,15 +663,15 @@ export class DirectLine implements IBotConnection { if (error.status === 403) { // if the token is expired there's no reason to keep trying this.expiredToken(); - return Observable.throw(error); + return Observable.throw(error, this.services.scheduler); } else if (error.status === 404) { // If the bot is gone, we should stop retrying - return Observable.throw(error); + return Observable.throw(error, this.services.scheduler); } - return Observable.of(error); + return Observable.of(error, this.services.scheduler); }) - .delay(timeout) + .delay(timeout, this.services.scheduler) .take(retries) ) ) @@ -634,7 +702,7 @@ export class DirectLine implements IBotConnection { konsole.log("getSessionId"); return this.checkConnection(true) .flatMap(_ => - Observable.ajax({ + this.services.ajax({ method: "GET", url: `${this.domain}/session/getsessionid`, withCredentials: true, @@ -653,7 +721,7 @@ export class DirectLine implements IBotConnection { }) .catch(error => { konsole.log("getSessionId error: " + error.status); - return Observable.of(''); + return Observable.of('', this.services.scheduler); }) ) .catch(error => this.catchExpiredToken(error)); @@ -671,7 +739,7 @@ export class DirectLine implements IBotConnection { konsole.log("postActivity", activity); return this.checkConnection(true) .flatMap(_ => - Observable.ajax({ + this.services.ajax({ method: "POST", url: `${this.domain}/conversations/${this.conversationId}/activities`, body: activity, @@ -711,9 +779,9 @@ export class DirectLine implements IBotConnection { attachments: cleansedAttachments.map(({ contentUrl: string, ...others }) => ({ ...others })) })], { type: 'application/vnd.microsoft.activity' })); - return Observable.from(cleansedAttachments) + return Observable.from(cleansedAttachments, this.services.scheduler) .flatMap((media: Media) => - Observable.ajax({ + this.services.ajax({ method: "GET", url: media.contentUrl, responseType: 'arraybuffer' @@ -725,7 +793,7 @@ export class DirectLine implements IBotConnection { .count() }) .flatMap(_ => - Observable.ajax({ + this.services.ajax({ method: "POST", url: `${this.domain}/conversations/${this.conversationId}/upload?userId=${message.from.id}`, body: formData, @@ -746,14 +814,14 @@ export class DirectLine implements IBotConnection { this.expiredToken(); else if (error.status >= 400 && error.status < 500) // more unrecoverable errors - return Observable.throw(error); - return Observable.of("retry"); + return Observable.throw(error, this.services.scheduler); + return Observable.of("retry", this.services.scheduler); } private catchExpiredToken(error: any) { return error === errorExpiredToken - ? Observable.of("retry") - : Observable.throw(error); + ? Observable.of("retry", this.services.scheduler) + : Observable.throw(error, this.services.scheduler); } private pollingGetActivity$() { @@ -762,11 +830,13 @@ export class DirectLine implements IBotConnection { // the first event is produced immediately. const trigger$ = new BehaviorSubject({}); + // TODO: remove Date.now, use reactive interval to space out every request + trigger$.subscribe(() => { if (this.connectionStatus$.getValue() === ConnectionStatus.Online) { const startTimestamp = Date.now(); - Observable.ajax({ + this.services.ajax({ headers: { Accept: 'application/json', ...this.commonHeaders() @@ -811,7 +881,7 @@ export class DirectLine implements IBotConnection { private observableFromActivityGroup(activityGroup: ActivityGroup) { if (activityGroup.watermark) this.watermark = activityGroup.watermark; - return Observable.from(activityGroup.activities); + return Observable.from(activityGroup.activities, this.services.scheduler); } private webSocketActivity$(): Observable { @@ -821,23 +891,23 @@ export class DirectLine implements IBotConnection { // WebSockets can be closed by the server or the browser. In the former case we need to // retrieve a new streamUrl. In the latter case we could first retry with the current streamUrl, // but it's simpler just to always fetch a new one. - .retryWhen(error$ => error$.delay(this.getRetryDelay()).mergeMap(error => this.reconnectToConversation())) + .retryWhen(error$ => error$.delay(this.getRetryDelay(), this.services.scheduler).mergeMap(error => this.reconnectToConversation())) ) .flatMap(activityGroup => this.observableFromActivityGroup(activityGroup)) } // Returns the delay duration in milliseconds private getRetryDelay() { - return Math.floor(3000 + Math.random() * 12000); + return Math.floor(3000 + this.services.random() * 12000); } - // Originally we used Observable.webSocket, but it's fairly opionated and I ended up writing + // Originally we used Observable.webSocket, but it's fairly opinionated and I ended up writing // a lot of code to work around their implemention details. Since WebChat is meant to be a reference // implementation, I decided roll the below, where the logic is more purposeful. - @billba private observableWebSocket() { return Observable.create((subscriber: Subscriber) => { konsole.log("creating WebSocket", this.streamUrl); - const ws = new WebSocket(this.streamUrl); + const ws = new this.services.WebSocket(this.streamUrl); let sub: Subscription; ws.onopen = open => { @@ -846,7 +916,7 @@ export class DirectLine implements IBotConnection { // If we periodically ping the server with empty messages, it helps Chrome // realize when connection breaks, and close the socket. We then throw an // error, and that give us the opportunity to attempt to reconnect. - sub = Observable.interval(timeout).subscribe(_ => { + sub = Observable.interval(timeout, this.services.scheduler).subscribe(_ => { try { ws.send("") } catch(e) { @@ -876,7 +946,7 @@ export class DirectLine implements IBotConnection { private reconnectToConversation() { return this.checkConnection(true) .flatMap(_ => - Observable.ajax({ + this.services.ajax({ method: "GET", url: `${this.domain}/conversations/${this.conversationId}?watermark=${this.watermark}`, timeout, @@ -898,22 +968,22 @@ export class DirectLine implements IBotConnection { // website might eventually call reconnect() with a new token and streamUrl. this.expiredToken(); } else if (error.status === 404) { - return Observable.throw(errorConversationEnded); + return Observable.throw(errorConversationEnded, this.services.scheduler); } - return Observable.of(error); + return Observable.of(error, this.services.scheduler); }) - .delay(timeout) + .delay(timeout, this.services.scheduler) .take(retries) ) ) } private commonHeaders() { - return { - "Authorization": `Bearer ${this.token}`, - "x-ms-bot-agent": this._botAgent - }; + return Object.assign({ + "Authorization": `Bearer ${this.token}`, + "x-ms-bot-agent": this._botAgent + }, this.botIdHeader ? {'x-ms-bot-id': this.botIdHeader}: null); } private getBotAgent(customAgent: string = ''): string { @@ -923,6 +993,8 @@ export class DirectLine implements IBotConnection { clientAgent += `; ${customAgent}` } - return `${DIRECT_LINE_VERSION} (${clientAgent})`; + return `${DIRECT_LINE_VERSION} (${clientAgent} ${process.env.npm_package_version})`; } + + }