From 28137d436d2bc815c47e01f70605f75cedfea424 Mon Sep 17 00:00:00 2001 From: Orgad Shaneh Date: Thu, 12 Feb 2026 23:55:09 +0200 Subject: [PATCH] Fix resource leaks in DirectLineStreaming after end() is called - Cancel pending timers on end() via AbortController-aware sleep() - Stop refreshToken() loop early on abort and after fatal 403 - Unsubscribe from connectionStatus$ in waitUntilOnline() on resolve/reject - Use cancellable sleep() for retry delays in connectWithRetryAsync() - Add unit tests covering all leak fixes (sync and async paths) Fixes #433 --- src/directLineStreaming.leaks.test.ts | 392 ++++++++++++++++++++++++++ src/directLineStreaming.ts | 42 ++- 2 files changed, 429 insertions(+), 5 deletions(-) create mode 100644 src/directLineStreaming.leaks.test.ts diff --git a/src/directLineStreaming.leaks.test.ts b/src/directLineStreaming.leaks.test.ts new file mode 100644 index 00000000..738dbbe2 --- /dev/null +++ b/src/directLineStreaming.leaks.test.ts @@ -0,0 +1,392 @@ +/** + * Tests for leak fixes in DirectLineStreaming after end() is called. + * + * Covers: + * 1. sleep() resolves immediately when end() is called (abort signal) + * 2. refreshToken() exits early when end() is called (before and after sleep) + * 3. refreshToken() returns after fatal 403 instead of continuing the loop + * 4. connectWithRetryAsync() retry delay is cancellable via end() + * 5. waitUntilOnline() unsubscribes from connectionStatus$ on resolve/reject + */ + +import { ConnectionStatus } from './directLine'; + +// We need to capture the WebSocketClient instances to control them in tests. +let mockWebSocketClientInstances: MockWebSocketClient[] = []; + +class MockWebSocketClient { + #disconnectionHandler: ((reason?: string) => void) | undefined; + #connectResolve: (() => void) | undefined; + #connectReject: ((err: Error) => void) | undefined; + + connect: jest.Mock; + disconnect: jest.Mock; + send: jest.Mock; + + constructor(init: any) { + this.#disconnectionHandler = init.disconnectionHandler; + + this.connect = jest.fn( + () => + new Promise((resolve, reject) => { + this.#connectResolve = resolve; + this.#connectReject = reject; + }) + ); + + this.disconnect = jest.fn(() => { + this.#disconnectionHandler?.('disconnect() called'); + }); + + this.send = jest.fn(async () => ({ + statusCode: 200, + streams: [ + { + readAsString: async () => + JSON.stringify({ conversationId: 'conv-123' }) + } + ] + })); + + mockWebSocketClientInstances.push(this); + } + + // Test helpers to simulate connection lifecycle. + __test__resolveConnect() { + this.#connectResolve?.(); + } + + __test__rejectConnect(err: Error) { + this.#connectReject?.(err); + } + + __test__simulateDisconnect(reason?: string) { + this.#disconnectionHandler?.(reason); + } +} + +jest.mock('./streaming/WebSocketClientWithNetworkInformation', () => ({ + __esModule: true, + default: function (...args: any[]) { + return new MockWebSocketClient(args[0]); + } +})); + +// Mock cross-fetch to prevent real network calls. +// jest.mock is hoisted, so we use jest.fn() inline and retrieve it later. +jest.mock('cross-fetch', () => ({ + __esModule: true, + default: jest.fn() +})); + +// Import after mocks. +import { DirectLineStreaming } from './directLineStreaming'; +import _mockFetchImport from 'cross-fetch'; + +const mockFetch = _mockFetchImport as unknown as jest.Mock; + +beforeEach(() => { + jest.useFakeTimers({ now: 0 }); + mockWebSocketClientInstances = []; + mockFetch.mockReset(); + + // Default: token refresh returns 200. + mockFetch.mockResolvedValue({ + ok: true, + json: async () => ({ token: 'new-token' }) + }); +}); + +afterEach(() => { + jest.useRealTimers(); +}); + +/** + * Helper: creates a DirectLineStreaming instance, subscribes to activity$, + * and drives the connection through to Online state. + */ +async function createAndConnect(): Promise<{ + directLine: DirectLineStreaming; + client: MockWebSocketClient; +}> { + const directLine = new DirectLineStreaming({ + domain: 'https://test.bot', + token: 'test-token' + }); + + // Subscribe to activity$ to kick off the connection. + directLine.activity$.subscribe({ next() {}, error() {}, complete() {} }); + + // Let microtasks flush so connectWithRetryAsync starts. + await jest.advanceTimersByTimeAsync(0); + + const client = mockWebSocketClientInstances[mockWebSocketClientInstances.length - 1]; + + // Simulate successful WebSocket connection. + client.__test__resolveConnect(); + await jest.advanceTimersByTimeAsync(0); + + return { directLine, client }; +} + +// --------------------------------------------------------------------------- +// 1. sleep() resolves immediately when _endAbortController is aborted +// --------------------------------------------------------------------------- +describe('sleep() abort on end()', () => { + test('calling end() during refreshToken sleep should stop the token refresh loop (no dangling timer)', async () => { + const { directLine, client } = await createAndConnect(); + + // At this point, refreshToken() is waiting for waitUntilOnline() which has resolved, + // and then it sleeps for refreshTokenInterval (15 minutes). + // Advance only partway through the 15min sleep. + await jest.advanceTimersByTimeAsync(5 * 60 * 1000); + + // Now call end(). This should abort the sleep immediately. + directLine.end(); + + // Let microtasks complete. + await jest.advanceTimersByTimeAsync(0); + + // The refresh loop should have exited. No further fetch calls should be made. + const fetchCallCountAfterEnd = mockFetch.mock.calls.length; + + // Advance time way past when the next refresh would have happened. + await jest.advanceTimersByTimeAsync(60 * 60 * 1000); + + // No new fetch calls should have been made. + expect(mockFetch.mock.calls.length).toBe(fetchCallCountAfterEnd); + }); +}); + +// --------------------------------------------------------------------------- +// 2. refreshToken() exits early when end() is called before/after sleep +// --------------------------------------------------------------------------- +describe('refreshToken() abort checks', () => { + test('refreshToken should not make fetch calls after end() is called', async () => { + const { directLine, client } = await createAndConnect(); + + // No refresh fetch yet (haven't advanced 15 minutes). + const fetchCalls = mockFetch.mock.calls.filter( + ([url]: [string]) => typeof url === 'string' && url.includes('/tokens/refresh') + ); + expect(fetchCalls).toHaveLength(0); + + // Call end(). + directLine.end(); + await jest.advanceTimersByTimeAsync(0); + + // Advance past the refresh interval. + await jest.advanceTimersByTimeAsync(30 * 60 * 1000); + + // There should be no refresh calls. + const refreshCallsAfterEnd = mockFetch.mock.calls.filter( + ([url]: [string]) => typeof url === 'string' && url.includes('/tokens/refresh') + ); + expect(refreshCallsAfterEnd).toHaveLength(0); + }); + + test('refreshToken should stop after abort even between sleep and fetch', async () => { + const { directLine, client } = await createAndConnect(); + + // Advance to just before the refresh sleep would end. + await jest.advanceTimersByTimeAsync(15 * 60 * 1000 - 100); + + // Call end() right before the sleep resolves. + directLine.end(); + await jest.advanceTimersByTimeAsync(0); + + // The sleep resolves immediately due to abort, then the abort check returns. + // Advance more time. + await jest.advanceTimersByTimeAsync(30 * 60 * 1000); + + // No token refresh should have been attempted. + const refreshCalls = mockFetch.mock.calls.filter( + ([url]: [string]) => typeof url === 'string' && url.includes('/tokens/refresh') + ); + expect(refreshCalls).toHaveLength(0); + }); +}); + +// --------------------------------------------------------------------------- +// 3. refreshToken() returns after fatal 403 +// --------------------------------------------------------------------------- +describe('refreshToken() on fatal 403', () => { + test('should stop refresh loop and not continue retrying after 403', async () => { + const { directLine, client } = await createAndConnect(); + + // Make the refresh endpoint return 403. + mockFetch.mockResolvedValue({ + ok: false, + status: 403, + statusText: 'Forbidden', + json: async () => ({}) + }); + + // Advance through the first refresh interval. + await jest.advanceTimersByTimeAsync(15 * 60 * 1000); + await jest.advanceTimersByTimeAsync(0); + + // The first refresh attempt should have been made and returned 403. + const refreshCalls = mockFetch.mock.calls.filter( + ([url]: [string]) => typeof url === 'string' && url.includes('/tokens/refresh') + ); + expect(refreshCalls.length).toBeGreaterThanOrEqual(1); + + const callCountAfter403 = mockFetch.mock.calls.length; + + // Advance far past further refresh intervals. + await jest.advanceTimersByTimeAsync(60 * 60 * 1000); + + // No more refresh calls should be made (the loop returned after 403). + expect(mockFetch.mock.calls.length).toBe(callCountAfter403); + }); +}); + +// --------------------------------------------------------------------------- +// 4. connectWithRetryAsync() retry delay is cancellable via end() +// --------------------------------------------------------------------------- +describe('connectWithRetryAsync() retry sleep cancellation', () => { + test('calling end() during retry delay should stop reconnection attempts promptly', async () => { + const { directLine, client } = await createAndConnect(); + + // Simulate a disconnect to trigger reconnection retries. + client.__test__simulateDisconnect('test disconnect'); + await jest.advanceTimersByTimeAsync(0); + + // The first retry should begin after a delay of 3-15 seconds. + // Advance partway into the retry delay. + await jest.advanceTimersByTimeAsync(1000); + + // Call end() while waiting for the retry delay. + directLine.end(); + await jest.advanceTimersByTimeAsync(0); + + // Record the current state after end() has been processed. + const clientCountAfterEnd = mockWebSocketClientInstances.length; + + // Advance time far past any retry delays. + await jest.advanceTimersByTimeAsync(60 * 1000); + + // No additional WebSocket clients should have been created after end() settled. + expect(mockWebSocketClientInstances.length).toBe(clientCountAfterEnd); + }); + + test('calling end() should prevent further reconnection attempts', async () => { + const { directLine, client } = await createAndConnect(); + + // Tick for 1 minute to make the connection "stable" (resets retry count). + await jest.advanceTimersByTimeAsync(60_000); + + const statusValues: ConnectionStatus[] = []; + directLine.connectionStatus$.subscribe(s => statusValues.push(s)); + + // Simulate disconnect. + client.__test__simulateDisconnect('test disconnect'); + await jest.advanceTimersByTimeAsync(0); + + // Call end() immediately. + directLine.end(); + await jest.advanceTimersByTimeAsync(0); + + // Should observe Ended status. + expect(statusValues).toContain(ConnectionStatus.Ended); + + // Advance time past all possible retries. + await jest.advanceTimersByTimeAsync(120_000); + + const clientsAfterEnd = mockWebSocketClientInstances.length; + + // No further connection attempts. + await jest.advanceTimersByTimeAsync(120_000); + expect(mockWebSocketClientInstances.length).toBe(clientsAfterEnd); + }); +}); + +// --------------------------------------------------------------------------- +// 5. waitUntilOnline() cleans up subscription +// --------------------------------------------------------------------------- +describe('waitUntilOnline() subscription cleanup', () => { + test('should unsubscribe from connectionStatus$ after going online (async case)', async () => { + const directLine = new DirectLineStreaming({ + domain: 'https://test.bot', + token: 'test-token' + }); + + // Subscribe to activity$ to kick off the connection. + directLine.activity$.subscribe({ next() {}, error() {}, complete() {} }); + await jest.advanceTimersByTimeAsync(0); + + const client = mockWebSocketClientInstances[mockWebSocketClientInstances.length - 1]; + + const observerCountBeforeOnline = (directLine.connectionStatus$ as any).observers.length; + + // Simulate successful connection — this triggers Online and refreshToken's waitUntilOnline resolves. + client.__test__resolveConnect(); + await jest.advanceTimersByTimeAsync(0); + + // After going Online, the waitUntilOnline() subscription from refreshToken should be cleaned up. + // Observer count should not have grown (the waitUntilOnline subscription was added then removed). + const observerCountAfterOnline = (directLine.connectionStatus$ as any).observers.length; + expect(observerCountAfterOnline).toBeLessThanOrEqual(observerCountBeforeOnline); + + // Clean up. + directLine.end(); + await jest.advanceTimersByTimeAsync(0); + + // After end(), no observers should remain subscribed (connectionStatus$ completed). + expect((directLine.connectionStatus$ as any).observers.length).toBe(0); + }); + + test('should unsubscribe from connectionStatus$ when already online (synchronous case)', async () => { + const { directLine } = await createAndConnect(); + + // Status is already Online. Calling waitUntilOnline() will subscribe to a BehaviorSubject + // that synchronously emits Online. The subscription must still be cleaned up. + const observerCountBefore = (directLine.connectionStatus$ as any).observers.length; + + // Call waitUntilOnline() via refreshToken indirectly by triggering another refresh cycle. + // Instead, we can test it more directly: force a second waitUntilOnline by disconnecting + // and reconnecting, then checking observer count is stable. + + // Disconnect and reconnect to trigger another waitUntilOnline() call in refreshToken. + const client = mockWebSocketClientInstances[mockWebSocketClientInstances.length - 1]; + client.__test__simulateDisconnect('test'); + await jest.advanceTimersByTimeAsync(0); + + // A new connection attempt is made. Simulate success. + const newClient = mockWebSocketClientInstances[mockWebSocketClientInstances.length - 1]; + newClient.__test__resolveConnect(); + await jest.advanceTimersByTimeAsync(0); + + // Now we're back Online. A new refreshToken waitUntilOnline() resolved synchronously + // (status was briefly Connecting, then Online). Observer count should not have grown. + const observerCountAfterReconnect = (directLine.connectionStatus$ as any).observers.length; + expect(observerCountAfterReconnect).toBeLessThanOrEqual(observerCountBefore); + + // Clean up. + directLine.end(); + await jest.advanceTimersByTimeAsync(0); + expect((directLine.connectionStatus$ as any).observers.length).toBe(0); + }); +}); + +// --------------------------------------------------------------------------- +// Integration: end() should not leave any pending timers +// --------------------------------------------------------------------------- +describe('end() cleanup integration', () => { + test('after end(), advancing time should not trigger any activity', async () => { + const { directLine, client } = await createAndConnect(); + + directLine.end(); + await jest.advanceTimersByTimeAsync(0); + + const fetchCountAfterEnd = mockFetch.mock.calls.length; + const clientCountAfterEnd = mockWebSocketClientInstances.length; + + // Advance time by 2 hours - nothing should happen. + await jest.advanceTimersByTimeAsync(2 * 60 * 60 * 1000); + + expect(mockFetch.mock.calls.length).toBe(fetchCountAfterEnd); + expect(mockWebSocketClientInstances.length).toBe(clientCountAfterEnd); + }); +}); diff --git a/src/directLineStreaming.ts b/src/directLineStreaming.ts index fabc3114..bea6492d 100644 --- a/src/directLineStreaming.ts +++ b/src/directLineStreaming.ts @@ -86,7 +86,6 @@ class StreamHandler implements BFSE.RequestHandler { } public flush() { - this.connectionStatus$.subscribe(() => {}); this.activityQueue.forEach(a => this.subscriber.next(a)); this.activityQueue = []; } @@ -112,6 +111,7 @@ export class DirectLineStreaming implements IBotConnection { private queueActivities: boolean; private _botAgent = ''; + private _endAbortController = new AbortController(); #networkInformation: NetworkInformation | undefined; @@ -172,6 +172,8 @@ export class DirectLineStreaming implements IBotConnection { end() { // Once end() is called, no reconnection can be made. + this._endAbortController.abort(); + this.activitySubscriber.complete(); this.connectionStatus$.next(ConnectionStatus.Ended); @@ -202,8 +204,17 @@ export class DirectLineStreaming implements IBotConnection { let numberOfAttempts = 0; while (numberOfAttempts < MAX_RETRY_COUNT) { + if (this._endAbortController.signal.aborted) { + return; + } + numberOfAttempts++; - await new Promise(r => setTimeout(r, refreshTokenInterval)); + await this.sleep(refreshTokenInterval); + + if (this._endAbortController.signal.aborted) { + return; + } + try { const res = await fetch(`${this.domain}/tokens/refresh`, { method: 'POST', headers: this.commonHeaders() }); if (res.ok) { @@ -214,6 +225,7 @@ export class DirectLineStreaming implements IBotConnection { if (res.status === 403 || res.status === 403) { console.error(`Fatal error while refreshing the token: ${res.status} ${res.statusText}`); this.streamConnection.disconnect(); + return; } else { console.warn(`Refresh attempt #${numberOfAttempts} failed: ${res.status} ${res.statusText}`); } @@ -227,6 +239,16 @@ export class DirectLineStreaming implements IBotConnection { this.streamConnection.disconnect(); } + private sleep(ms: number): Promise { + return new Promise(resolve => { + const timer = setTimeout(resolve, ms); + this._endAbortController.signal.addEventListener('abort', () => { + clearTimeout(timer); + resolve(); + }, { once: true }); + }); + } + postActivity(activity: Activity) { if ( this.connectionStatus$.value === ConnectionStatus.Ended || @@ -323,14 +345,24 @@ export class DirectLineStreaming implements IBotConnection { private async waitUntilOnline() { return new Promise((resolve, reject) => { - this.connectionStatus$.subscribe( + let done = false; + const subscription = this.connectionStatus$.subscribe( cs => { if (cs === ConnectionStatus.Online) { + done = true; + subscription?.unsubscribe(); return resolve(); } }, - e => reject(e) + e => { + done = true; + subscription?.unsubscribe(); + reject(e); + } ); + if (done) { + subscription.unsubscribe(); + } }); } @@ -448,7 +480,7 @@ export class DirectLineStreaming implements IBotConnection { numRetries = MAX_RETRY_COUNT; } else if (numRetries > 0) { // Sleep only if we are doing retry. Otherwise, we are going to break the loop and signal FailedToConnect. - await new Promise(r => setTimeout(r, this.getRetryDelay())); + await this.sleep(this.getRetryDelay()); } }