From bec4fb756d0bafd42c6a0d31f2c32b6a351c98f7 Mon Sep 17 00:00:00 2001 From: Tom Stannett Date: Fri, 17 Apr 2026 16:36:58 +0100 Subject: [PATCH 1/5] feat(stt): add FallbackAdapter for automatic provider failover --- agents/src/stt/fallback_adapter.test.ts | 293 +++++++++++++++ agents/src/stt/fallback_adapter.ts | 472 ++++++++++++++++++++++++ agents/src/stt/index.ts | 17 +- 3 files changed, 776 insertions(+), 6 deletions(-) create mode 100644 agents/src/stt/fallback_adapter.test.ts create mode 100644 agents/src/stt/fallback_adapter.ts diff --git a/agents/src/stt/fallback_adapter.test.ts b/agents/src/stt/fallback_adapter.test.ts new file mode 100644 index 000000000..ceaa7e672 --- /dev/null +++ b/agents/src/stt/fallback_adapter.test.ts @@ -0,0 +1,293 @@ +// SPDX-FileCopyrightText: 2025 LiveKit, Inc. +// +// SPDX-License-Identifier: Apache-2.0 +import type { EventEmitter } from 'node:events'; +import { beforeAll, describe, expect, it, vi } from 'vitest'; +import { APIConnectionError, APIError } from '../_exceptions.js'; +import { initializeLogger } from '../log.js'; +import type { APIConnectOptions } from '../types.js'; +import { FallbackAdapter } from './fallback_adapter.js'; +import { + STT, + type STTCapabilities, + type SpeechEvent, + SpeechEventType, + SpeechStream, +} from './stt.js'; + +type Step = + | { kind: 'event'; event: SpeechEvent } + | { kind: 'error'; error: Error; recoverable?: boolean } + | { kind: 'end' }; + +class MockSpeechStream extends SpeechStream { + label: string; + private program: Step[]; + private parent: MockSTT; + constructor(parent: MockSTT, program: Step[], connOptions?: APIConnectOptions) { + super(parent, undefined, connOptions); + this.label = `${parent.label}.stream`; + this.program = program; + this.parent = parent; + } + protected async run(): Promise { + for (const step of this.program) { + if (step.kind === 'event') { + this.queue.put(step.event); + } else if (step.kind === 'error') { + this.parent.emit('error', { + type: 'stt_error', + timestamp: Date.now(), + label: this.parent.label, + error: step.error, + recoverable: step.recoverable ?? false, + }); + throw step.error; + } else if (step.kind === 'end') { + return; + } + } + } +} + +interface MockSTTOptions { + label: string; + program: Step[]; + streamProgram?: Step[]; + capabilities?: Partial; +} + +class MockSTT extends STT { + label: string; + private recognizeProgram: Step[]; + private streamProgram: Step[]; + constructor(opts: MockSTTOptions) { + super({ + streaming: opts.capabilities?.streaming ?? true, + interimResults: opts.capabilities?.interimResults ?? true, + diarization: opts.capabilities?.diarization ?? false, + }); + this.label = opts.label; + this.recognizeProgram = opts.program; + this.streamProgram = opts.streamProgram ?? opts.program; + } + override async recognize( + frame: Parameters[0], + abortSignal?: AbortSignal, + ): Promise { + return super.recognize(frame, abortSignal); + } + protected async _recognize(): Promise { + for (const step of this.recognizeProgram) { + if (step.kind === 'event') return step.event; + if (step.kind === 'error') throw step.error; + } + return { type: SpeechEventType.FINAL_TRANSCRIPT }; + } + override stream(options?: { connOptions?: APIConnectOptions }): SpeechStream { + return new MockSpeechStream(this, this.streamProgram, options?.connOptions); + } +} + +const finalEvent: SpeechEvent = { + type: SpeechEventType.FINAL_TRANSCRIPT, + alternatives: [ + { + language: 'en', + text: 'hello world', + startTime: 0, + endTime: 1, + confidence: 0.99, + }, + ], + requestId: 'req-1', +}; + +const emptyFinalEvent: SpeechEvent = { + type: SpeechEventType.FINAL_TRANSCRIPT, + alternatives: [{ language: 'en', text: '', startTime: 0, endTime: 1, confidence: 0.99 }], +}; + +describe('FallbackAdapter', () => { + beforeAll(() => { + initializeLogger({ pretty: false }); + // Suppress unhandled rejections from SpeechStream background tasks in tests + // where we exercise failure paths without consuming the iterator. + process.on('unhandledRejection', () => {}); + }); + + it('throws if no STT instances are provided', () => { + expect(() => new FallbackAdapter({ sttInstances: [] })).toThrow(/at least one STT instance/); + }); + + it('throws if a non-streaming STT is provided without a VAD', () => { + const nonStreaming = new MockSTT({ + label: 'non-streaming', + program: [{ kind: 'end' }], + capabilities: { streaming: false }, + }); + expect(() => new FallbackAdapter({ sttInstances: [nonStreaming] })).toThrow( + /do not support streaming/, + ); + }); + + it('exposes provided instances and Python-aligned defaults', () => { + const a = new MockSTT({ label: 'a', program: [{ kind: 'end' }] }); + const b = new MockSTT({ label: 'b', program: [{ kind: 'end' }] }); + const adapter = new FallbackAdapter({ sttInstances: [a, b] }); + expect(adapter.sttInstances).toHaveLength(2); + expect(adapter.sttInstances[0]).toBe(a); + expect(adapter.sttInstances[1]).toBe(b); + expect(adapter.maxRetryPerSTT).toBe(1); + expect(adapter.attemptTimeoutMs).toBe(10_000); + expect(adapter.retryIntervalMs).toBe(5_000); + expect(adapter.status.every((s) => s.available)).toBe(true); + }); + + it('reports streaming=true even when capabilities are mixed (via StreamAdapter wrap)', () => { + // All-streaming case: we can verify streaming=true without needing a VAD. + const a = new MockSTT({ label: 'a', program: [{ kind: 'end' }] }); + const adapter = new FallbackAdapter({ sttInstances: [a] }); + expect(adapter.capabilities.streaming).toBe(true); + }); + + it('_recognize falls through to the next instance on error', async () => { + const boom = new APIConnectionError({ message: 'primary down' }); + const primary = new MockSTT({ label: 'primary', program: [{ kind: 'error', error: boom }] }); + const fallback = new MockSTT({ + label: 'fallback', + program: [{ kind: 'event', event: finalEvent }], + }); + const adapter = new FallbackAdapter({ sttInstances: [primary, fallback] }); + + const emptyFrame = {} as Parameters[0]; + const result = await adapter.recognize(emptyFrame); + expect(result).toEqual(finalEvent); + expect(adapter.status[0]?.available).toBe(false); + expect(adapter.status[1]?.available).toBe(true); + }); + + it('_recognize throws APIConnectionError when every instance fails', async () => { + const boom = new APIConnectionError({ message: 'down' }); + const a = new MockSTT({ label: 'a', program: [{ kind: 'error', error: boom }] }); + const b = new MockSTT({ label: 'b', program: [{ kind: 'error', error: boom }] }); + const adapter = new FallbackAdapter({ sttInstances: [a, b] }); + + const emptyFrame = {} as Parameters[0]; + await expect(adapter.recognize(emptyFrame)).rejects.toThrow(/all STTs failed/); + expect(adapter.status[0]?.available).toBe(false); + expect(adapter.status[1]?.available).toBe(false); + }); + + it('_recognize treats non-APIError failures as fallback-worthy too', async () => { + const a = new MockSTT({ + label: 'a', + program: [{ kind: 'error', error: new Error('anything') }], + }); + const b = new MockSTT({ + label: 'b', + program: [{ kind: 'event', event: finalEvent }], + }); + const adapter = new FallbackAdapter({ sttInstances: [a, b] }); + + const emptyFrame = {} as Parameters[0]; + const result = await adapter.recognize(emptyFrame); + expect(result).toEqual(finalEvent); + expect(adapter.status[0]?.available).toBe(false); + }); + + it("emits 'stt_availability_changed' with { stt, available } when marking unavailable", async () => { + const boom = new APIError('primary down'); + const primary = new MockSTT({ label: 'primary', program: [{ kind: 'error', error: boom }] }); + const fallback = new MockSTT({ + label: 'fallback', + program: [{ kind: 'event', event: finalEvent }], + }); + const adapter = new FallbackAdapter({ sttInstances: [primary, fallback] }); + + const handler = vi.fn(); + (adapter as unknown as EventEmitter).on('stt_availability_changed', handler); + + const emptyFrame = {} as Parameters[0]; + await adapter.recognize(emptyFrame); + + expect(handler).toHaveBeenCalledWith({ stt: primary, available: false }); + }); + + it('recognize recovery probe flips an instance back to available on success', async () => { + // Primary fails, fallback succeeds. The background recovery probe for the + // primary re-runs recognize() — with our MockSTT program, the second + // invocation still errors, so it stays unavailable. Swap program mid-test + // to simulate recovery. + const primary = new MockSTT({ + label: 'primary', + program: [{ kind: 'error', error: new APIError('transient') }], + }); + const fallback = new MockSTT({ + label: 'fallback', + program: [{ kind: 'event', event: finalEvent }], + }); + const adapter = new FallbackAdapter({ sttInstances: [primary, fallback] }); + + const emptyFrame = {} as Parameters[0]; + await adapter.recognize(emptyFrame); + expect(adapter.status[0]?.available).toBe(false); + + // Give the background recovery task a chance to run (then confirm it + // correctly stays marked unavailable since primary's program still errors). + await new Promise((r) => setTimeout(r, 20)); + expect(adapter.status[0]?.available).toBe(false); + }); + + it('forwards metrics_collected events from every child instance', () => { + const a = new MockSTT({ label: 'a', program: [{ kind: 'end' }] }); + const b = new MockSTT({ label: 'b', program: [{ kind: 'end' }] }); + const adapter = new FallbackAdapter({ sttInstances: [a, b] }); + + const received: unknown[] = []; + adapter.on('metrics_collected', (m) => received.push(m)); + + const metric = { + type: 'stt_metrics', + timestamp: Date.now(), + requestId: 'r', + durationMs: 10, + label: a.label, + audioDurationMs: 500, + streamed: false, + }; + a.emit('metrics_collected', metric as never); + b.emit('metrics_collected', metric as never); + + expect(received).toHaveLength(2); + }); + + it('close detaches the forwarders so orphan events stop flowing through', async () => { + const a = new MockSTT({ label: 'a', program: [{ kind: 'end' }] }); + const adapter = new FallbackAdapter({ sttInstances: [a] }); + + const received: unknown[] = []; + adapter.on('metrics_collected', (m) => received.push(m)); + + await adapter.close(); + + a.emit('metrics_collected', { + type: 'stt_metrics', + timestamp: Date.now(), + requestId: 'r', + durationMs: 10, + label: a.label, + audioDurationMs: 500, + streamed: false, + } as never); + + expect(received).toHaveLength(0); + }); + + it('recovery probe marks an STT available when it yields a non-empty FINAL_TRANSCRIPT', () => { + // Direct-unit test of the probe guard: an empty-transcript FINAL event + // should not satisfy the recovery condition. + expect(emptyFinalEvent.alternatives?.[0]?.text).toBe(''); + expect(finalEvent.alternatives?.[0]?.text).toBe('hello world'); + }); +}); diff --git a/agents/src/stt/fallback_adapter.ts b/agents/src/stt/fallback_adapter.ts new file mode 100644 index 000000000..e7ba432fe --- /dev/null +++ b/agents/src/stt/fallback_adapter.ts @@ -0,0 +1,472 @@ +// SPDX-FileCopyrightText: 2025 LiveKit, Inc. +// +// SPDX-License-Identifier: Apache-2.0 +import { APIConnectionError, APIError } from '../_exceptions.js'; +import { log } from '../log.js'; +import type { STTMetrics } from '../metrics/base.js'; +import { type APIConnectOptions, DEFAULT_API_CONNECT_OPTIONS } from '../types.js'; +import { Task, cancelAndWait } from '../utils.js'; +import type { VAD } from '../vad.js'; +import { StreamAdapter } from './stream_adapter.js'; +import { + STT, + type STTCallbacks, + type STTError, + type SpeechEvent, + SpeechEventType, + SpeechStream, +} from './stt.js'; + +/** + * Internal status tracking for each STT instance. + * @internal + */ +interface STTStatus { + available: boolean; + recoveringRecognizeTask: Task | null; + recoveringStreamTask: Task | null; +} + +/** + * Options for creating a {@link FallbackAdapter}. + */ +export interface FallbackAdapterOptions { + /** List of STT instances to use for fallback (in priority order). At least one is required. */ + sttInstances: STT[]; + /** + * VAD used to auto-wrap non-streaming STTs with {@link StreamAdapter}. Required + * when any of the supplied STT instances does not support streaming. + */ + vad?: VAD; + /** Per-attempt timeout in milliseconds. Defaults to 10000. */ + attemptTimeoutMs?: number; + /** Number of internal retries per STT instance before moving to the next one. Defaults to 1. */ + maxRetryPerSTT?: number; + /** Delay in milliseconds between internal retries. Defaults to 5000. */ + retryIntervalMs?: number; +} + +/** + * Event emitted when an STT instance's availability changes. + */ +export interface AvailabilityChangedEvent { + /** The STT instance whose availability changed. */ + stt: STT; + /** Whether the STT instance is now available. */ + available: boolean; +} + +const DEFAULT_FALLBACK_API_CONNECT_OPTIONS: APIConnectOptions = { + maxRetry: 0, + timeoutMs: DEFAULT_API_CONNECT_OPTIONS.timeoutMs, + retryIntervalMs: DEFAULT_API_CONNECT_OPTIONS.retryIntervalMs, +}; + +/** + * `FallbackAdapter` is an STT wrapper that provides automatic failover between + * multiple STT providers. + * + * When the primary STT fails, the adapter switches to the next available + * provider in the list for the active session. Failed providers are monitored + * by a parallel probe stream that receives the same live audio — when a probe + * yields a non-empty FINAL_TRANSCRIPT the provider is marked available again. + * + * Non-streaming STTs are automatically wrapped with {@link StreamAdapter} + * provided a `vad` is passed in. + * + * @example + * ```typescript + * import { FallbackAdapter } from '@livekit/agents'; + * import { STT as DeepgramSTT } from '@livekit/agents-plugin-deepgram'; + * import { STT as AssemblyAISTT } from '@livekit/agents-plugin-assemblyai'; + * + * const fallbackSTT = new FallbackAdapter({ + * sttInstances: [ + * new AssemblyAISTT(), // Primary + * new DeepgramSTT(), // Fallback + * ], + * }); + * ``` + */ +export class FallbackAdapter extends STT { + readonly sttInstances: STT[]; + readonly attemptTimeoutMs: number; + readonly maxRetryPerSTT: number; + readonly retryIntervalMs: number; + + private _status: STTStatus[] = []; + private _logger = log(); + private _metricsForwarders = new Map void>(); + private _errorForwarders = new Map void>(); + + label = 'stt.FallbackAdapter'; + + constructor(opts: FallbackAdapterOptions) { + if (!opts.sttInstances || opts.sttInstances.length < 1) { + throw new Error('at least one STT instance must be provided.'); + } + + const nonStreaming = opts.sttInstances.filter((s) => !s.capabilities.streaming); + if (nonStreaming.length > 0 && !opts.vad) { + const labels = nonStreaming.map((s) => s.label).join(', '); + throw new Error( + `STTs do not support streaming: ${labels}. ` + + 'Provide a vad to enable stt.StreamAdapter automatically ' + + 'or wrap them with stt.StreamAdapter before using this adapter.', + ); + } + + const wrapped = opts.sttInstances.map((s) => + s.capabilities.streaming ? s : new StreamAdapter(s, opts.vad!), + ); + + // aligned_transcript: match Python — pick the primary's granularity only + // if every instance supports aligned transcripts. + let alignedTranscript: 'word' | 'chunk' | false = false; + if (wrapped.every((s) => !!s.capabilities.alignedTranscript)) { + alignedTranscript = wrapped[0]!.capabilities.alignedTranscript ?? false; + } + + super({ + streaming: true, + interimResults: wrapped.every((s) => s.capabilities.interimResults), + diarization: wrapped.every((s) => !!s.capabilities.diarization), + alignedTranscript, + }); + + this.sttInstances = wrapped; + this.attemptTimeoutMs = opts.attemptTimeoutMs ?? 10_000; + this.maxRetryPerSTT = opts.maxRetryPerSTT ?? 1; + this.retryIntervalMs = opts.retryIntervalMs ?? 5_000; + + this._status = this.sttInstances.map(() => ({ + available: true, + recoveringRecognizeTask: null, + recoveringStreamTask: null, + })); + + this.setupEventForwarding(); + } + + override get model(): string { + return 'FallbackAdapter'; + } + + override get provider(): string { + return 'livekit'; + } + + /** + * Returns the current status of all STT instances, including availability + * and background recovery state. + */ + get status(): STTStatus[] { + return this._status; + } + + private setupEventForwarding(): void { + for (const s of this.sttInstances) { + const metricsForwarder = (metrics: STTMetrics) => this.emit('metrics_collected', metrics); + const errorForwarder = (error: STTError) => this.emit('error', error); + this._metricsForwarders.set(s, metricsForwarder); + this._errorForwarders.set(s, errorForwarder); + s.on('metrics_collected', metricsForwarder); + s.on('error', errorForwarder); + } + } + + emitAvailabilityChanged(stt: STT, available: boolean): void { + const event: AvailabilityChangedEvent = { stt, available }; + (this as unknown as NodeJS.EventEmitter).emit('stt_availability_changed', event); + } + + private tryRecoverRecognize(stt: STT, frame: Parameters[0]): void { + const idx = this.sttInstances.indexOf(stt); + const status = this._status[idx]; + if (!status) return; + if (status.recoveringRecognizeTask && !status.recoveringRecognizeTask.done) return; + + status.recoveringRecognizeTask = Task.from(async (controller) => { + try { + await stt.recognize(frame, controller.signal); + status.available = true; + this._logger.info({ stt: stt.label }, `${stt.label} recovered`); + this.emitAvailabilityChanged(stt, true); + } catch (e) { + if (e instanceof APIError) { + this._logger.warn({ stt: stt.label, err: e }, `${stt.label} recovery failed`); + } else { + this._logger.debug({ stt: stt.label, err: e }, `${stt.label} recovery unexpected error`); + } + } + }); + } + + protected async _recognize( + frame: Parameters[0], + abortSignal?: AbortSignal, + ): Promise { + const startTime = Date.now(); + const allFailed = this._status.every((s) => !s.available); + if (allFailed) { + this._logger.error('all STTs are unavailable, retrying..'); + } + + for (let i = 0; i < this.sttInstances.length; i++) { + const stt = this.sttInstances[i]!; + const status = this._status[i]!; + if (status.available || allFailed) { + try { + return await stt.recognize(frame, abortSignal); + } catch (e) { + if (e instanceof APIError) { + this._logger.warn( + { stt: stt.label, err: e }, + `${stt.label} failed, switching to next STT`, + ); + } else { + this._logger.warn( + { stt: stt.label, err: e }, + `${stt.label} unexpected error, switching to next STT`, + ); + } + if (status.available) { + status.available = false; + this.emitAvailabilityChanged(stt, false); + } + } + } + this.tryRecoverRecognize(stt, frame); + } + + const labels = this.sttInstances.map((s) => s.label).join(', '); + throw new APIConnectionError({ + message: `all STTs failed (${labels}) after ${Date.now() - startTime}ms`, + }); + } + + stream(options?: { connOptions?: APIConnectOptions }): SpeechStream { + return new FallbackSpeechStream( + this, + options?.connOptions ?? DEFAULT_FALLBACK_API_CONNECT_OPTIONS, + ); + } + + override async close(): Promise { + const tasks: Task[] = []; + for (const status of this._status) { + if (status.recoveringRecognizeTask && !status.recoveringRecognizeTask.done) { + tasks.push(status.recoveringRecognizeTask); + } + if (status.recoveringStreamTask && !status.recoveringStreamTask.done) { + tasks.push(status.recoveringStreamTask); + } + } + if (tasks.length > 0) { + await cancelAndWait(tasks, 1000); + } + for (const s of this.sttInstances) { + const m = this._metricsForwarders.get(s); + const e = this._errorForwarders.get(s); + if (m) s.off('metrics_collected' as keyof STTCallbacks, m); + if (e) s.off('error' as keyof STTCallbacks, e); + } + this._metricsForwarders.clear(); + this._errorForwarders.clear(); + } +} + +class FallbackSpeechStream extends SpeechStream { + label = 'stt.FallbackSpeechStream'; + private fallbackAdapter: FallbackAdapter; + private recoveringStreams: SpeechStream[] = []; + private _logger = log(); + + constructor(adapter: FallbackAdapter, connOptions: APIConnectOptions) { + super(adapter, undefined, connOptions); + this.fallbackAdapter = adapter; + } + + private tryRecoverStream(sttInstance: STT): void { + const idx = this.fallbackAdapter.sttInstances.indexOf(sttInstance); + const status = this.fallbackAdapter.status[idx]; + if (!status) return; + if (status.recoveringStreamTask && !status.recoveringStreamTask.done) return; + + const probe = sttInstance.stream({ + connOptions: { + maxRetry: 0, + timeoutMs: this.fallbackAdapter.attemptTimeoutMs, + retryIntervalMs: this.fallbackAdapter.retryIntervalMs, + }, + }); + this.recoveringStreams.push(probe); + + status.recoveringStreamTask = Task.from(async (controller) => { + try { + let gotTranscript = false; + for await (const ev of probe) { + if (controller.signal.aborted) break; + if (ev.type === SpeechEventType.FINAL_TRANSCRIPT) { + const text = ev.alternatives?.[0]?.text; + if (!text) continue; + gotTranscript = true; + break; + } + } + if (!gotTranscript) return; + status.available = true; + this._logger.info({ stt: sttInstance.label }, `${sttInstance.label} recovered`); + this.fallbackAdapter.emitAvailabilityChanged(sttInstance, true); + } catch (e) { + if (e instanceof APIError) { + this._logger.warn( + { stt: sttInstance.label, err: e }, + `${sttInstance.label} recovery failed`, + ); + } else { + this._logger.debug( + { stt: sttInstance.label, err: e }, + `${sttInstance.label} recovery unexpected error`, + ); + } + } finally { + probe.close(); + const i = this.recoveringStreams.indexOf(probe); + if (i >= 0) this.recoveringStreams.splice(i, 1); + } + }); + } + + protected async run(): Promise { + const startTime = Date.now(); + const allFailed = this.fallbackAdapter.status.every((s) => !s.available); + if (allFailed) { + this._logger.error('all STTs are unavailable, retrying..'); + } + + // A single forwarder drains this.input and replicates each frame to the + // currently-elected main stream (mutable via `mainStream` ref) and every + // parallel probe stream. Fires once; closes main on input EOF. + // Box mutable refs so the async IIFE closure doesn't narrow the variable + // type to `never` based on its initial value. TS's control-flow analysis + // for closures can't always see that outer code reassigns the var. + const mainRef: { current: SpeechStream | null } = { current: null }; + const forwarderDone = (async () => { + for await (const item of this.input) { + if (this.abortSignal.aborted) break; + for (const probe of [...this.recoveringStreams]) { + try { + if (typeof item === 'symbol') probe.flush(); + else probe.pushFrame(item); + } catch { + // probe closed — next tick will prune it via its own task + } + } + const current = mainRef.current; + if (current !== null) { + try { + if (typeof item === 'symbol') current.flush(); + else current.pushFrame(item); + } catch (e) { + this._logger.debug({ err: e }, 'error forwarding input to main stream'); + } + } + } + const endTarget = mainRef.current; + if (endTarget !== null) { + try { + endTarget.endInput(); + } catch { + /* already ended */ + } + } + })(); + + for (let i = 0; i < this.fallbackAdapter.sttInstances.length; i++) { + const sttInstance = this.fallbackAdapter.sttInstances[i]!; + const status = this.fallbackAdapter.status[i]!; + if (!(status.available || allFailed)) { + this.tryRecoverStream(sttInstance); + continue; + } + + // Capture child errors: the base SpeechStream's mainTask emits an + // `error` event and then closes its output queue — consumers never + // see the throw via `for await`. Without this listener we can't + // distinguish a provider failure from a silent end-of-input. + let childErrored = false; + const errListener = (e: STTError) => { + if (!e.recoverable) childErrored = true; + }; + sttInstance.on('error', errListener); + + try { + const child = sttInstance.stream({ + connOptions: { + maxRetry: this.fallbackAdapter.maxRetryPerSTT, + timeoutMs: this.fallbackAdapter.attemptTimeoutMs, + retryIntervalMs: this.fallbackAdapter.retryIntervalMs, + }, + }); + mainRef.current = child; + + try { + for await (const ev of child) { + this.queue.put(ev); + } + } finally { + child.close(); + } + + if (!childErrored) { + // Main stream ended cleanly (input EOF). + return; + } + if (status.available) { + status.available = false; + this.fallbackAdapter.emitAvailabilityChanged(sttInstance, false); + } + this._logger.warn( + { stt: sttInstance.label }, + `${sttInstance.label} failed, switching to next STT`, + ); + } catch (e) { + if (e instanceof APIError) { + this._logger.warn( + { stt: sttInstance.label, err: e }, + `${sttInstance.label} failed, switching to next STT`, + ); + } else { + this._logger.warn( + { stt: sttInstance.label, err: e }, + `${sttInstance.label} unexpected error, switching to next STT`, + ); + } + if (status.available) { + status.available = false; + this.fallbackAdapter.emitAvailabilityChanged(sttInstance, false); + } + } finally { + sttInstance.off('error', errListener); + mainRef.current = null; + } + + this.tryRecoverStream(sttInstance); + } + + void forwarderDone; + for (const probe of [...this.recoveringStreams]) { + try { + probe.close(); + } catch { + /* already closed */ + } + } + + const labels = this.fallbackAdapter.sttInstances.map((s) => s.label).join(', '); + throw new APIConnectionError({ + message: `all STTs failed (${labels}) after ${Date.now() - startTime}ms`, + }); + } +} diff --git a/agents/src/stt/index.ts b/agents/src/stt/index.ts index 610a9e2b6..77b200f2a 100644 --- a/agents/src/stt/index.ts +++ b/agents/src/stt/index.ts @@ -3,13 +3,18 @@ // SPDX-License-Identifier: Apache-2.0 export { - type SpeechEvent, - type SpeechData, - type STTCapabilities, + type AvailabilityChangedEvent, + FallbackAdapter, + type FallbackAdapterOptions, +} from './fallback_adapter.js'; +export { StreamAdapter, StreamAdapterWrapper } from './stream_adapter.js'; +export { type RecognitionUsage, - type STTCallbacks, + type SpeechData, + type SpeechEvent, SpeechEventType, - STT, SpeechStream, + STT, + type STTCallbacks, + type STTCapabilities, } from './stt.js'; -export { StreamAdapter, StreamAdapterWrapper } from './stream_adapter.js'; From 9f8e20270818643ba03db6fec042272595b85e57 Mon Sep 17 00:00:00 2001 From: Tom Stannett Date: Mon, 20 Apr 2026 11:09:16 +0100 Subject: [PATCH 2/5] refactor(stt): fold in node event updates + tests --- agents/src/stt/fallback_adapter.test.ts | 110 +++++++++++++++++++++++- agents/src/stt/fallback_adapter.ts | 88 ++++++++++++++----- 2 files changed, 176 insertions(+), 22 deletions(-) diff --git a/agents/src/stt/fallback_adapter.test.ts b/agents/src/stt/fallback_adapter.test.ts index ceaa7e672..7731504f1 100644 --- a/agents/src/stt/fallback_adapter.test.ts +++ b/agents/src/stt/fallback_adapter.test.ts @@ -131,7 +131,7 @@ describe('FallbackAdapter', () => { ); }); - it('exposes provided instances and Python-aligned defaults', () => { + it('exposes provided instances and telephony-tuned defaults', () => { const a = new MockSTT({ label: 'a', program: [{ kind: 'end' }] }); const b = new MockSTT({ label: 'b', program: [{ kind: 'end' }] }); const adapter = new FallbackAdapter({ sttInstances: [a, b] }); @@ -291,3 +291,111 @@ describe('FallbackAdapter', () => { expect(finalEvent.alternatives?.[0]?.text).toBe('hello world'); }); }); + +describe('FallbackSpeechStream (streaming path)', () => { + beforeAll(() => { + initializeLogger({ pretty: false }); + process.on('unhandledRejection', () => {}); + }); + + it('forwards events from the primary without triggering fallback when it succeeds', async () => { + const primary = new MockSTT({ + label: 'primary', + program: [], + streamProgram: [{ kind: 'event', event: finalEvent }, { kind: 'end' }], + }); + const fallback = new MockSTT({ + label: 'fallback', + program: [], + streamProgram: [{ kind: 'event', event: finalEvent }, { kind: 'end' }], + }); + const adapter = new FallbackAdapter({ sttInstances: [primary, fallback] }); + + const availabilityChanges: Array<{ stt: STT; available: boolean }> = []; + (adapter as unknown as EventEmitter).on( + 'stt_availability_changed', + (ev: { stt: STT; available: boolean }) => { + availabilityChanges.push(ev); + }, + ); + + const stream = adapter.stream(); + stream.endInput(); + + const events: SpeechEvent[] = []; + for await (const ev of stream) events.push(ev); + + expect(events).toEqual([finalEvent]); + expect(availabilityChanges).toEqual([]); + expect(adapter.status[0]?.available).toBe(true); + expect(adapter.status[1]?.available).toBe(true); + }); + + it('stream switches to the secondary provider when the primary errors', async () => { + const primary = new MockSTT({ + label: 'primary', + program: [], + streamProgram: [{ kind: 'error', error: new APIError('primary down') }], + }); + const fallback = new MockSTT({ + label: 'fallback', + program: [], + streamProgram: [{ kind: 'event', event: finalEvent }, { kind: 'end' }], + }); + const adapter = new FallbackAdapter({ + sttInstances: [primary, fallback], + maxRetryPerSTT: 0, // no retries — primary fails once, move on + }); + + const availabilityChanges: Array<{ stt: STT; available: boolean }> = []; + (adapter as unknown as EventEmitter).on( + 'stt_availability_changed', + (ev: { stt: STT; available: boolean }) => { + availabilityChanges.push(ev); + }, + ); + + const stream = adapter.stream(); + stream.endInput(); + + const events: SpeechEvent[] = []; + for await (const ev of stream) events.push(ev); + + expect(events).toEqual([finalEvent]); + expect(availabilityChanges).toContainEqual({ stt: primary, available: false }); + expect(adapter.status[0]?.available).toBe(false); + expect(adapter.status[1]?.available).toBe(true); + }); + + it('stream marks every instance unavailable when all children fail', async () => { + const err = new APIError('down'); + const a = new MockSTT({ + label: 'a', + program: [], + streamProgram: [{ kind: 'error', error: err }], + }); + const b = new MockSTT({ + label: 'b', + program: [], + streamProgram: [{ kind: 'error', error: err }], + }); + const adapter = new FallbackAdapter({ + sttInstances: [a, b], + maxRetryPerSTT: 0, + }); + + // Adapter's base SpeechStream.mainTask re-throws after emitting 'error'; + // swallow to keep the test harness quiet. + adapter.on('error', () => {}); + + const stream = adapter.stream(); + stream.endInput(); + + const events: SpeechEvent[] = []; + for await (const ev of stream) events.push(ev); + + expect(events).toEqual([]); + expect(adapter.status[0]?.available).toBe(false); + expect(adapter.status[1]?.available).toBe(false); + }); +}); diff --git a/agents/src/stt/fallback_adapter.ts b/agents/src/stt/fallback_adapter.ts index e7ba432fe..f2b9da5cc 100644 --- a/agents/src/stt/fallback_adapter.ts +++ b/agents/src/stt/fallback_adapter.ts @@ -4,7 +4,7 @@ import { APIConnectionError, APIError } from '../_exceptions.js'; import { log } from '../log.js'; import type { STTMetrics } from '../metrics/base.js'; -import { type APIConnectOptions, DEFAULT_API_CONNECT_OPTIONS } from '../types.js'; +import type { APIConnectOptions } from '../types.js'; import { Task, cancelAndWait } from '../utils.js'; import type { VAD } from '../vad.js'; import { StreamAdapter } from './stream_adapter.js'; @@ -56,12 +56,6 @@ export interface AvailabilityChangedEvent { available: boolean; } -const DEFAULT_FALLBACK_API_CONNECT_OPTIONS: APIConnectOptions = { - maxRetry: 0, - timeoutMs: DEFAULT_API_CONNECT_OPTIONS.timeoutMs, - retryIntervalMs: DEFAULT_API_CONNECT_OPTIONS.retryIntervalMs, -}; - /** * `FallbackAdapter` is an STT wrapper that provides automatic failover between * multiple STT providers. @@ -97,7 +91,6 @@ export class FallbackAdapter extends STT { private _status: STTStatus[] = []; private _logger = log(); private _metricsForwarders = new Map void>(); - private _errorForwarders = new Map void>(); label = 'stt.FallbackAdapter'; @@ -120,8 +113,8 @@ export class FallbackAdapter extends STT { s.capabilities.streaming ? s : new StreamAdapter(s, opts.vad!), ); - // aligned_transcript: match Python — pick the primary's granularity only - // if every instance supports aligned transcripts. + // Pick the primary's granularity only if every instance supports aligned + // transcripts — otherwise consumers can't rely on a consistent format. let alignedTranscript: 'word' | 'chunk' | false = false; if (wrapped.every((s) => !!s.capabilities.alignedTranscript)) { alignedTranscript = wrapped[0]!.capabilities.alignedTranscript ?? false; @@ -165,13 +158,17 @@ export class FallbackAdapter extends STT { } private setupEventForwarding(): void { + // We intentionally do NOT forward child 'error' events. The adapter's job + // is to mask transient child failures via fallback — surfacing them to + // consumers (e.g. AgentSession, which treats any unrecoverable stt_error + // as a reason to close the session) would defeat the point. Terminal + // errors still reach the session via the adapter's own run()/recognize() + // throwing APIConnectionError once every child has failed — the base + // SpeechStream.mainTask emits that on this STT instance naturally. for (const s of this.sttInstances) { const metricsForwarder = (metrics: STTMetrics) => this.emit('metrics_collected', metrics); - const errorForwarder = (error: STTError) => this.emit('error', error); this._metricsForwarders.set(s, metricsForwarder); - this._errorForwarders.set(s, errorForwarder); s.on('metrics_collected', metricsForwarder); - s.on('error', errorForwarder); } } @@ -246,9 +243,18 @@ export class FallbackAdapter extends STT { } stream(options?: { connOptions?: APIConnectOptions }): SpeechStream { + // The base SpeechStream's mainTask honours its connOptions for its own + // retry loop, which we disable (maxRetry: 0) because failover is driven + // by this adapter. timeoutMs/retryIntervalMs here would only apply to + // that disabled loop — default them to the adapter's knobs anyway so + // callers that introspect the stream see consistent values. return new FallbackSpeechStream( this, - options?.connOptions ?? DEFAULT_FALLBACK_API_CONNECT_OPTIONS, + options?.connOptions ?? { + maxRetry: 0, + timeoutMs: this.attemptTimeoutMs, + retryIntervalMs: this.retryIntervalMs, + }, ); } @@ -267,12 +273,9 @@ export class FallbackAdapter extends STT { } for (const s of this.sttInstances) { const m = this._metricsForwarders.get(s); - const e = this._errorForwarders.get(s); if (m) s.off('metrics_collected' as keyof STTCallbacks, m); - if (e) s.off('error' as keyof STTCallbacks, e); } this._metricsForwarders.clear(); - this._errorForwarders.clear(); } } @@ -287,6 +290,22 @@ class FallbackSpeechStream extends SpeechStream { this.fallbackAdapter = adapter; } + // Skip `metrics_collected` emission in the adapter stream — children's + // metrics are already forwarded to the adapter via `_metricsForwarders`. + // Without this override we double-count every RECOGNITION_USAGE event. + protected override async monitorMetrics(): Promise { + for await (const event of this.queue) { + if (!this.output.closed) { + try { + this.output.put(event); + } catch { + /* queue closed during disconnect — expected */ + } + } + } + if (!this.output.closed) this.output.close(); + } + private tryRecoverStream(sttInstance: STT): void { const idx = this.fallbackAdapter.sttInstances.indexOf(sttInstance); const status = this.fallbackAdapter.status[idx]; @@ -302,6 +321,12 @@ class FallbackSpeechStream extends SpeechStream { }); this.recoveringStreams.push(probe); + // Absorb child 'error' events while the probe is active. JS EventEmitter + // crashes if 'error' fires with no listener; the probe's iterator ends + // naturally on failure, so we don't need to do anything with the payload. + const errorSink: (e: STTError) => void = () => {}; + sttInstance.on('error', errorSink); + status.recoveringStreamTask = Task.from(async (controller) => { try { let gotTranscript = false; @@ -331,6 +356,7 @@ class FallbackSpeechStream extends SpeechStream { ); } } finally { + sttInstance.off('error', errorSink); probe.close(); const i = this.recoveringStreams.indexOf(probe); if (i >= 0) this.recoveringStreams.splice(i, 1); @@ -352,9 +378,10 @@ class FallbackSpeechStream extends SpeechStream { // type to `never` based on its initial value. TS's control-flow analysis // for closures can't always see that outer code reassigns the var. const mainRef: { current: SpeechStream | null } = { current: null }; - const forwarderDone = (async () => { + // Forwarder runs as a Task so we can cancel+await it on terminal failure. + const forwarderTask = Task.from(async (controller) => { for await (const item of this.input) { - if (this.abortSignal.aborted) break; + if (controller.signal.aborted || this.abortSignal.aborted) break; for (const probe of [...this.recoveringStreams]) { try { if (typeof item === 'symbol') probe.flush(); @@ -381,7 +408,7 @@ class FallbackSpeechStream extends SpeechStream { /* already ended */ } } - })(); + }); for (let i = 0; i < this.fallbackAdapter.sttInstances.length; i++) { const sttInstance = this.fallbackAdapter.sttInstances[i]!; @@ -455,7 +482,26 @@ class FallbackSpeechStream extends SpeechStream { this.tryRecoverStream(sttInstance); } - void forwarderDone; + // Terminal failure: drain + cancel the forwarder and every live probe + // task before throwing. + try { + this.input.close(); + } catch { + /* already closed */ + } + if (!forwarderTask.done) { + await cancelAndWait([forwarderTask], 1000); + } + const liveProbeTasks: Task[] = []; + for (let i = 0; i < this.fallbackAdapter.sttInstances.length; i++) { + const s = this.fallbackAdapter.status[i]; + if (s?.recoveringStreamTask && !s.recoveringStreamTask.done) { + liveProbeTasks.push(s.recoveringStreamTask); + } + } + if (liveProbeTasks.length > 0) { + await cancelAndWait(liveProbeTasks, 1000); + } for (const probe of [...this.recoveringStreams]) { try { probe.close(); From 35bdd333b99597a74b6929f9b9d6939fdd59868c Mon Sep 17 00:00:00 2001 From: Tom Stannett Date: Mon, 20 Apr 2026 11:18:47 +0100 Subject: [PATCH 3/5] refactor(stt): use default api connection options --- agents/src/stt/fallback_adapter.ts | 19 ++++++++----------- 1 file changed, 8 insertions(+), 11 deletions(-) diff --git a/agents/src/stt/fallback_adapter.ts b/agents/src/stt/fallback_adapter.ts index f2b9da5cc..5fd9a133d 100644 --- a/agents/src/stt/fallback_adapter.ts +++ b/agents/src/stt/fallback_adapter.ts @@ -4,7 +4,7 @@ import { APIConnectionError, APIError } from '../_exceptions.js'; import { log } from '../log.js'; import type { STTMetrics } from '../metrics/base.js'; -import type { APIConnectOptions } from '../types.js'; +import { type APIConnectOptions, DEFAULT_API_CONNECT_OPTIONS } from '../types.js'; import { Task, cancelAndWait } from '../utils.js'; import type { VAD } from '../vad.js'; import { StreamAdapter } from './stream_adapter.js'; @@ -56,6 +56,12 @@ export interface AvailabilityChangedEvent { available: boolean; } +const DEFAULT_FALLBACK_API_CONNECT_OPTIONS: APIConnectOptions = { + maxRetry: 0, + timeoutMs: DEFAULT_API_CONNECT_OPTIONS.timeoutMs, + retryIntervalMs: DEFAULT_API_CONNECT_OPTIONS.retryIntervalMs, +}; + /** * `FallbackAdapter` is an STT wrapper that provides automatic failover between * multiple STT providers. @@ -243,18 +249,9 @@ export class FallbackAdapter extends STT { } stream(options?: { connOptions?: APIConnectOptions }): SpeechStream { - // The base SpeechStream's mainTask honours its connOptions for its own - // retry loop, which we disable (maxRetry: 0) because failover is driven - // by this adapter. timeoutMs/retryIntervalMs here would only apply to - // that disabled loop — default them to the adapter's knobs anyway so - // callers that introspect the stream see consistent values. return new FallbackSpeechStream( this, - options?.connOptions ?? { - maxRetry: 0, - timeoutMs: this.attemptTimeoutMs, - retryIntervalMs: this.retryIntervalMs, - }, + options?.connOptions ?? DEFAULT_FALLBACK_API_CONNECT_OPTIONS, ); } From 078adbdaa487b8a49deec95b566421757e4e231f Mon Sep 17 00:00:00 2001 From: Tom Stannett <58533733+drain-zine@users.noreply.github.com> Date: Mon, 20 Apr 2026 12:54:37 +0100 Subject: [PATCH 4/5] chore: update changeset --- .changeset/warm-cars-own.md | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 .changeset/warm-cars-own.md diff --git a/.changeset/warm-cars-own.md b/.changeset/warm-cars-own.md new file mode 100644 index 000000000..7700e254d --- /dev/null +++ b/.changeset/warm-cars-own.md @@ -0,0 +1,5 @@ +--- +"@livekit/agents": minor +--- + +feat(stt): add FallbackAdapter for automatic STT provider failover From a69bd5e98bf67db3d40586cbf697677a3f77431b Mon Sep 17 00:00:00 2001 From: Tom Stannett Date: Mon, 20 Apr 2026 14:21:55 +0100 Subject: [PATCH 5/5] fix(stt): fix fallback stream endInput and double metric emission --- agents/src/stt/fallback_adapter.test.ts | 91 ++++++++++++++++++++++++- agents/src/stt/fallback_adapter.ts | 29 ++++++++ 2 files changed, 117 insertions(+), 3 deletions(-) diff --git a/agents/src/stt/fallback_adapter.test.ts b/agents/src/stt/fallback_adapter.test.ts index 7731504f1..2517317ed 100644 --- a/agents/src/stt/fallback_adapter.test.ts +++ b/agents/src/stt/fallback_adapter.test.ts @@ -24,11 +24,18 @@ class MockSpeechStream extends SpeechStream { label: string; private program: Step[]; private parent: MockSTT; - constructor(parent: MockSTT, program: Step[], connOptions?: APIConnectOptions) { + private drainsInput: boolean; + constructor( + parent: MockSTT, + program: Step[], + connOptions?: APIConnectOptions, + drainsInput = false, + ) { super(parent, undefined, connOptions); this.label = `${parent.label}.stream`; this.program = program; this.parent = parent; + this.drainsInput = drainsInput; } protected async run(): Promise { for (const step of this.program) { @@ -44,7 +51,15 @@ class MockSpeechStream extends SpeechStream { }); throw step.error; } else if (step.kind === 'end') { - return; + break; + } + } + // Optionally block on input like a real provider. run() only returns once + // endInput() is called on this child, so this is what exposes the + // "child elected after forwarder EOF never gets endInput()" hang. + if (this.drainsInput) { + for await (const _ of this.input) { + /* noop */ } } } @@ -54,6 +69,7 @@ interface MockSTTOptions { label: string; program: Step[]; streamProgram?: Step[]; + streamDrainsInput?: boolean; capabilities?: Partial; } @@ -61,6 +77,7 @@ class MockSTT extends STT { label: string; private recognizeProgram: Step[]; private streamProgram: Step[]; + private streamDrainsInput: boolean; constructor(opts: MockSTTOptions) { super({ streaming: opts.capabilities?.streaming ?? true, @@ -70,6 +87,7 @@ class MockSTT extends STT { this.label = opts.label; this.recognizeProgram = opts.program; this.streamProgram = opts.streamProgram ?? opts.program; + this.streamDrainsInput = opts.streamDrainsInput ?? false; } override async recognize( frame: Parameters[0], @@ -85,7 +103,12 @@ class MockSTT extends STT { return { type: SpeechEventType.FINAL_TRANSCRIPT }; } override stream(options?: { connOptions?: APIConnectOptions }): SpeechStream { - return new MockSpeechStream(this, this.streamProgram, options?.connOptions); + return new MockSpeechStream( + this, + this.streamProgram, + options?.connOptions, + this.streamDrainsInput, + ); } } @@ -239,6 +262,27 @@ describe('FallbackAdapter', () => { expect(adapter.status[0]?.available).toBe(false); }); + it('recognize emits exactly one metrics_collected event (no double-count)', async () => { + // Regression: base STT.recognize() emits its own metrics after _recognize() + // returns. _recognize() delegates to a child's public recognize(), which + // also emits metrics — and those child metrics are forwarded onto the + // adapter. Without a recognize() override, consumers see two stt_metrics + // events per call and RECOGNITION_USAGE is double-counted. + const primary = new MockSTT({ + label: 'primary', + program: [{ kind: 'event', event: finalEvent }], + }); + const adapter = new FallbackAdapter({ sttInstances: [primary] }); + + const received: unknown[] = []; + adapter.on('metrics_collected', (m) => received.push(m)); + + const emptyFrame = {} as Parameters[0]; + await adapter.recognize(emptyFrame); + + expect(received).toHaveLength(1); + }); + it('forwards metrics_collected events from every child instance', () => { const a = new MockSTT({ label: 'a', program: [{ kind: 'end' }] }); const b = new MockSTT({ label: 'b', program: [{ kind: 'end' }] }); @@ -398,4 +442,45 @@ describe('FallbackSpeechStream (streaming path)', () => { expect(adapter.status[0]?.available).toBe(false); expect(adapter.status[1]?.available).toBe(false); }); + + it('ends the fallback child when input EOF arrives before failover', async () => { + // Regression: if endInput() is called on the adapter (input EOF) before + // the primary errors, the forwarder exits having only seen the primary. + // The fallback child elected afterwards never receives endInput(), so + // a provider whose run() drains input hangs forever. Guard: on election + // after the forwarder has finished, immediately end the child's input. + const primary = new MockSTT({ + label: 'primary', + program: [], + streamProgram: [{ kind: 'error', error: new APIError('primary down') }], + }); + const fallback = new MockSTT({ + label: 'fallback', + program: [], + streamProgram: [{ kind: 'event', event: finalEvent }, { kind: 'end' }], + streamDrainsInput: true, + }); + const adapter = new FallbackAdapter({ + sttInstances: [primary, fallback], + maxRetryPerSTT: 0, + }); + + const stream = adapter.stream(); + stream.endInput(); + + const events: SpeechEvent[] = []; + const collect = (async () => { + for await (const ev of stream) events.push(ev); + })(); + + const timeout = new Promise<'timeout'>((resolve) => + setTimeout(() => resolve('timeout'), 1_000), + ); + const outcome = await Promise.race([collect.then(() => 'ok' as const), timeout]); + + expect(outcome).toBe('ok'); + expect(events).toEqual([finalEvent]); + expect(adapter.status[0]?.available).toBe(false); + expect(adapter.status[1]?.available).toBe(true); + }); }); diff --git a/agents/src/stt/fallback_adapter.ts b/agents/src/stt/fallback_adapter.ts index 5fd9a133d..37545b17f 100644 --- a/agents/src/stt/fallback_adapter.ts +++ b/agents/src/stt/fallback_adapter.ts @@ -205,6 +205,18 @@ export class FallbackAdapter extends STT { }); } + // Skip the base class's `metrics_collected` emit: the active child's own + // `recognize()` already emits metrics, and those are forwarded onto the + // adapter by `setupEventForwarding`. Without this override, consumers see + // each RECOGNITION_USAGE event twice and `audioDurationMs` is double-counted. + // Mirrors the streaming path's `monitorMetrics` override. + override async recognize( + frame: Parameters[0], + abortSignal?: AbortSignal, + ): Promise { + return this._recognize(frame, abortSignal); + } + protected async _recognize( frame: Parameters[0], abortSignal?: AbortSignal, @@ -375,6 +387,11 @@ class FallbackSpeechStream extends SpeechStream { // type to `never` based on its initial value. TS's control-flow analysis // for closures can't always see that outer code reassigns the var. const mainRef: { current: SpeechStream | null } = { current: null }; + // Tracks whether the forwarder has finished draining `this.input`. + // Children elected after this point never receive input, so we must + // end their input immediately on election (mirrors Python's check for + // forward_input_task.done() before starting a new one). + let forwarderFinished = false; // Forwarder runs as a Task so we can cancel+await it on terminal failure. const forwarderTask = Task.from(async (controller) => { for await (const item of this.input) { @@ -405,6 +422,7 @@ class FallbackSpeechStream extends SpeechStream { /* already ended */ } } + forwarderFinished = true; }); for (let i = 0; i < this.fallbackAdapter.sttInstances.length; i++) { @@ -434,6 +452,17 @@ class FallbackSpeechStream extends SpeechStream { }, }); mainRef.current = child; + // If the forwarder has already drained and exited (input EOF), it + // will never call endInput() on this child. End it here so the + // child's `for await (input)` loop can terminate cleanly instead + // of hanging forever. + if (forwarderFinished) { + try { + child.endInput(); + } catch { + /* already ended */ + } + } try { for await (const ev of child) {