diff --git a/.changeset/warm-papayas-smash.md b/.changeset/warm-papayas-smash.md new file mode 100644 index 000000000..3e4fdd61d --- /dev/null +++ b/.changeset/warm-papayas-smash.md @@ -0,0 +1,5 @@ +--- +"@livekit/agents": patch +--- + +Use ThrowsPromise helper across agent package diff --git a/agents/package.json b/agents/package.json index 225baf55e..b66cea054 100644 --- a/agents/package.json +++ b/agents/package.json @@ -52,7 +52,7 @@ "@livekit/mutex": "^1.1.1", "@livekit/protocol": "^1.45.3", "@livekit/typed-emitter": "^3.0.0", - "@livekit/throws-transformer": "0.0.0-20260320165515", + "@livekit/throws-transformer": "0.1.8", "@opentelemetry/api": "^1.9.0", "@opentelemetry/api-logs": "^0.54.0", "@opentelemetry/core": "^2.2.0", diff --git a/agents/src/beta/workflows/task_group.ts b/agents/src/beta/workflows/task_group.ts index c5ead6e03..8147c814b 100644 --- a/agents/src/beta/workflows/task_group.ts +++ b/agents/src/beta/workflows/task_group.ts @@ -4,6 +4,7 @@ import { z } from 'zod'; import type { ChatContext } from '../../llm/chat_context.js'; import { LLM, ToolError, ToolFlag, tool } from '../../llm/index.js'; +import { asError } from '../../utils.js'; import { AgentTask } from '../../voice/agent.js'; interface FactoryInfo { @@ -114,7 +115,7 @@ export class TaskGroup extends AgentTask { taskResults[taskId] = e; continue; } else { - this.complete(e instanceof Error ? e : new Error(String(e))); + this.complete(asError(e)); return; } } diff --git a/agents/src/connection_pool.ts b/agents/src/connection_pool.ts index b2f13c798..206c569fc 100644 --- a/agents/src/connection_pool.ts +++ b/agents/src/connection_pool.ts @@ -2,6 +2,7 @@ // // SPDX-License-Identifier: Apache-2.0 import { Mutex } from '@livekit/mutex'; +import { ThrowsPromise } from '@livekit/throws-transformer/throws'; import { waitForAbort } from './utils.js'; /** @@ -273,7 +274,7 @@ export class ConnectionPool { try { const fnPromise = fn(conn); const result = signal - ? await Promise.race([ + ? await ThrowsPromise.race([ fnPromise.then((value) => ({ type: 'result' as const, value })), waitForAbort(signal).then(() => ({ type: 'abort' as const })), ]).then((r) => { diff --git a/agents/src/http_server.ts b/agents/src/http_server.ts index ebf72a52b..54ebaf0a5 100644 --- a/agents/src/http_server.ts +++ b/agents/src/http_server.ts @@ -1,6 +1,7 @@ // SPDX-FileCopyrightText: 2024 LiveKit, Inc. // // SPDX-License-Identifier: Apache-2.0 +import { ThrowsPromise } from '@livekit/throws-transformer/throws'; import { type IncomingMessage, type Server, type ServerResponse, createServer } from 'node:http'; import { log } from './log.js'; @@ -53,7 +54,7 @@ export class HTTPServer { } async run(): Promise { - return new Promise((resolve, reject) => { + return new ThrowsPromise((resolve, reject) => { this.app.listen(this.port, this.host, (err?: Error) => { if (err) reject(err); const address = this.app.address(); @@ -66,7 +67,7 @@ export class HTTPServer { } async close(): Promise { - return new Promise((resolve, reject) => { + return new ThrowsPromise((resolve, reject) => { this.app.close((err?: Error) => { if (err) reject(err); resolve(); diff --git a/agents/src/inference/interruption/interruption_detector.ts b/agents/src/inference/interruption/interruption_detector.ts index d115918f6..99778181c 100644 --- a/agents/src/inference/interruption/interruption_detector.ts +++ b/agents/src/inference/interruption/interruption_detector.ts @@ -1,10 +1,12 @@ // SPDX-FileCopyrightText: 2026 LiveKit, Inc. // // SPDX-License-Identifier: Apache-2.0 +import { ThrowsPromise } from '@livekit/throws-transformer/throws'; import type { TypedEventEmitter } from '@livekit/typed-emitter'; import EventEmitter from 'events'; import { log } from '../../log.js'; import type { InterruptionMetrics } from '../../metrics/base.js'; +import { asError } from '../../utils.js'; import { DEFAULT_INFERENCE_URL, STAGING_INFERENCE_URL, getDefaultInferenceUrl } from '../utils.js'; import { FRAMES_PER_SECOND, SAMPLE_RATE, interruptionOptionDefaults } from './defaults.js'; import { InterruptionDetectionError } from './errors.js'; @@ -165,7 +167,7 @@ export class AdaptiveInterruptionDetector extends (EventEmitter as new () => Typ this.streams.add(streamBase); return streamBase; } catch (e) { - const cause = e instanceof Error ? e : new Error(String(e)); + const cause = asError(e); this.emitError(new InterruptionDetectionError(cause.message, Date.now(), this._label, false)); throw e; } @@ -199,6 +201,6 @@ export class AdaptiveInterruptionDetector extends (EventEmitter as new () => Typ for (const stream of this.streams) { updatePromises.push(stream.updateOptions(options)); } - await Promise.all(updatePromises); + await ThrowsPromise.all(updatePromises); } } diff --git a/agents/src/inference/interruption/ws_transport.ts b/agents/src/inference/interruption/ws_transport.ts index 13865ca72..2e99187d3 100644 --- a/agents/src/inference/interruption/ws_transport.ts +++ b/agents/src/inference/interruption/ws_transport.ts @@ -1,13 +1,12 @@ // SPDX-FileCopyrightText: 2025 LiveKit, Inc. // // SPDX-License-Identifier: Apache-2.0 -import type { Throws } from '@livekit/throws-transformer/throws'; +import { type Throws, ThrowsPromise } from '@livekit/throws-transformer/throws'; import { TransformStream } from 'stream/web'; import WebSocket from 'ws'; import { z } from 'zod'; import { APIConnectionError, APIStatusError, APITimeoutError } from '../../_exceptions.js'; import { log } from '../../log.js'; -import TypedPromise from '../../typed_promise.js'; import { buildMetadataHeaders, createAccessToken } from '../utils.js'; import { InterruptionCacheEntry } from './interruption_cache_entry.js'; import type { OverlappingSpeechEvent } from './types.js'; @@ -83,7 +82,7 @@ async function connectWebSocket( headers: { ...buildMetadataHeaders(), Authorization: `Bearer ${token}` }, }); - await new TypedPromise( + await new ThrowsPromise( (resolve, reject) => { const timeout = setTimeout(() => { ws.terminate(); diff --git a/agents/src/inference/stt.ts b/agents/src/inference/stt.ts index 8c3a49da0..03da41f3e 100644 --- a/agents/src/inference/stt.ts +++ b/agents/src/inference/stt.ts @@ -2,6 +2,7 @@ // // SPDX-License-Identifier: Apache-2.0 import { type AudioFrame } from '@livekit/rtc-node'; +import { ThrowsPromise } from '@livekit/throws-transformer/throws'; import type { WebSocket } from 'ws'; import { APIError, APIStatusError } from '../_exceptions.js'; import { AudioByteStream } from '../audio.js'; @@ -403,7 +404,7 @@ export class SpeechStream extends BaseSpeechStream { }; const createWsListener = async (ws: WebSocket, signal: AbortSignal) => { - return new Promise((resolve, reject) => { + return new ThrowsPromise((resolve, reject) => { const onAbort = () => { resourceCleanup(); reject(new Error('WebSocket connection aborted')); @@ -446,7 +447,7 @@ export class SpeechStream extends BaseSpeechStream { ); // Create abort promise once to avoid memory leak - const abortPromise = new Promise((_, reject) => { + const abortPromise = new ThrowsPromise((_, reject) => { if (signal.aborted) { return reject(new Error('Send aborted')); } @@ -458,7 +459,7 @@ export class SpeechStream extends BaseSpeechStream { const iterator = this.input[Symbol.asyncIterator](); try { while (true) { - const result = await Promise.race([iterator.next(), abortPromise]); + const result = await ThrowsPromise.race([iterator.next(), abortPromise]); if (result.done) break; const ev = result.value; @@ -560,13 +561,13 @@ export class SpeechStream extends BaseSpeechStream { ); const recvTask = Task.from(({ signal }) => recv(signal), connController); const waitReconnectTask = Task.from( - ({ signal }) => Promise.race([this.reconnectEvent.wait(), waitForAbort(signal)]), + ({ signal }) => ThrowsPromise.race([this.reconnectEvent.wait(), waitForAbort(signal)]), connController, ); try { - await Promise.race([ - Promise.all([sendTask.result, wsListenerTask.result, recvTask.result]), + await ThrowsPromise.race([ + ThrowsPromise.all([sendTask.result, wsListenerTask.result, recvTask.result]), waitReconnectTask.result, ]); diff --git a/agents/src/inference/tts.ts b/agents/src/inference/tts.ts index 3b91041f1..b2c213af9 100644 --- a/agents/src/inference/tts.ts +++ b/agents/src/inference/tts.ts @@ -2,6 +2,7 @@ // // SPDX-License-Identifier: Apache-2.0 import type { AudioFrame } from '@livekit/rtc-node'; +import { ThrowsPromise } from '@livekit/throws-transformer/throws'; import { WebSocket } from 'ws'; import { APIError, APIStatusError, APITimeoutError } from '../_exceptions.js'; import { AudioByteStream } from '../audio.js'; @@ -710,7 +711,7 @@ export class SynthesizeStream extends BaseSynthesizeSt ]; try { - await Promise.all(tasks.map((t) => t.result)); + await ThrowsPromise.all(tasks.map((t) => t.result)); } finally { // Mirror python finally: unblock recv and cancel all tasks. inputSentEvent.set(); diff --git a/agents/src/inference/utils.ts b/agents/src/inference/utils.ts index 5bd85dd16..ad0093b94 100644 --- a/agents/src/inference/utils.ts +++ b/agents/src/inference/utils.ts @@ -1,6 +1,7 @@ // SPDX-FileCopyrightText: 2025 LiveKit, Inc. // // SPDX-License-Identifier: Apache-2.0 +import { ThrowsPromise } from '@livekit/throws-transformer/throws'; import { AccessToken } from 'livekit-server-sdk'; import { WebSocket } from 'ws'; import { APIConnectionError, APIStatusError } from '../_exceptions.js'; @@ -78,7 +79,7 @@ export async function connectWs( headers: Record, timeoutMs: number, ): Promise { - return new Promise((resolve, reject) => { + return new ThrowsPromise((resolve, reject) => { const socket = new WebSocket(url, { headers: { ...buildMetadataHeaders(), ...headers } }); const timeout = setTimeout(() => { diff --git a/agents/src/ipc/inference_proc_executor.ts b/agents/src/ipc/inference_proc_executor.ts index f83a8f6f7..b3eefe1e7 100644 --- a/agents/src/ipc/inference_proc_executor.ts +++ b/agents/src/ipc/inference_proc_executor.ts @@ -1,6 +1,7 @@ // SPDX-FileCopyrightText: 2025 LiveKit, Inc. // // SPDX-License-Identifier: Apache-2.0 +import { ThrowsPromise } from '@livekit/throws-transformer/throws'; import type { ChildProcess } from 'node:child_process'; import { fork } from 'node:child_process'; import { extname } from 'node:path'; @@ -11,9 +12,11 @@ import type { IPCMessage } from './message.js'; import { SupervisedProc } from './supervised_proc.js'; class PendingInference { - promise = new Promise<{ requestId: string; data: unknown; error?: Error }>((resolve) => { - this.resolve = resolve; - }); + promise = new ThrowsPromise<{ requestId: string; data: unknown; error?: Error }, never>( + (resolve) => { + this.resolve = resolve; + }, + ); resolve(arg: { requestId: string; data: unknown; error?: Error }) { arg; } diff --git a/agents/src/ipc/inference_proc_lazy_main.ts b/agents/src/ipc/inference_proc_lazy_main.ts index 08f7a5c7f..dd5a2d1a1 100644 --- a/agents/src/ipc/inference_proc_lazy_main.ts +++ b/agents/src/ipc/inference_proc_lazy_main.ts @@ -1,6 +1,7 @@ // SPDX-FileCopyrightText: 2025 LiveKit, Inc. // // SPDX-License-Identifier: Apache-2.0 +import { ThrowsPromise } from '@livekit/throws-transformer/throws'; import { once } from 'node:events'; import type { InferenceRunner } from '../inference_runner.js'; import { initializeLogger, log } from '../log.js'; @@ -34,7 +35,7 @@ const ORPHANED_TIMEOUT = 15 * 1000; }); const logger = log().child({ pid: process.pid }); - const runners: { [id: string]: InferenceRunner } = await Promise.all( + const runners: { [id: string]: InferenceRunner } = await ThrowsPromise.all( Object.entries(JSON.parse(process.argv[2]!)).map(async ([k, v]) => { return [ k, @@ -52,7 +53,7 @@ const ORPHANED_TIMEOUT = 15 * 1000; }), ).then(Object.fromEntries); - await Promise.all( + await ThrowsPromise.all( Object.entries(runners).map(async ([runner, v]) => { logger.child({ runner }).debug('initializing inference runner'); await v.initialize(); @@ -101,7 +102,7 @@ const ORPHANED_TIMEOUT = 15 * 1000; clearTimeout(orphanedTimeout); // Remove our message handler to stop processing new messages process.off('message', messageHandler); - Promise.all(Object.values(runners).map((r) => r.close())) + ThrowsPromise.all(Object.values(runners).map((r) => r.close())) .then(() => { logger.debug('Inference runners closed'); process.send!({ case: 'done' }); diff --git a/agents/src/ipc/job_proc_lazy_main.ts b/agents/src/ipc/job_proc_lazy_main.ts index d608123f5..dba451ada 100644 --- a/agents/src/ipc/job_proc_lazy_main.ts +++ b/agents/src/ipc/job_proc_lazy_main.ts @@ -2,6 +2,7 @@ // // SPDX-License-Identifier: Apache-2.0 import { Room, RoomEvent, dispose } from '@livekit/rtc-node'; +import { ThrowsPromise } from '@livekit/throws-transformer/throws'; import { EventEmitter, once } from 'node:events'; import { pathToFileURL } from 'node:url'; import type { Logger } from 'pino'; @@ -40,9 +41,11 @@ type JobTask = { }; class PendingInference { - promise = new Promise<{ requestId: string; data: unknown; error?: Error }>((resolve) => { - this.resolve = resolve; // this is how JavaScript lets you resolve promises externally - }); + promise = new ThrowsPromise<{ requestId: string; data: unknown; error?: Error }, never>( + (resolve) => { + this.resolve = resolve; // this is how JavaScript lets you resolve promises externally + }, + ); resolve(arg: { requestId: string; data: unknown; error?: Error }) { arg; // useless call to counteract TypeScript E6133 } @@ -177,7 +180,7 @@ const startJob = ( for (const callback of ctx.shutdownCallbacks) { shutdownTasks.push(callback()); } - await Promise.all(shutdownTasks).catch((error) => + await ThrowsPromise.all(shutdownTasks).catch((error) => logger.error({ error }, 'error while shutting down the job'), ); diff --git a/agents/src/ipc/proc_pool.ts b/agents/src/ipc/proc_pool.ts index 8499f30e1..3dffacbe5 100644 --- a/agents/src/ipc/proc_pool.ts +++ b/agents/src/ipc/proc_pool.ts @@ -2,7 +2,7 @@ // // SPDX-License-Identifier: Apache-2.0 import { MultiMutex, Mutex } from '@livekit/mutex'; -import type { Throws } from '@livekit/throws-transformer/throws'; +import { type Throws, ThrowsPromise } from '@livekit/throws-transformer/throws'; import type { RunningJobInfo } from '../job.js'; import { Queue } from '../utils.js'; import type { InferenceExecutor } from './inference_executor.js'; @@ -174,6 +174,6 @@ export class ProcPool { e.proc.close(); }); this.executors.forEach((e) => e.close()); - await Promise.allSettled(this.tasks); + await ThrowsPromise.allSettled(this.tasks); } } diff --git a/agents/src/job.ts b/agents/src/job.ts index 99e16e90b..264913bd3 100644 --- a/agents/src/job.ts +++ b/agents/src/job.ts @@ -10,6 +10,7 @@ import type { RtcConfiguration, } from '@livekit/rtc-node'; import { ParticipantKind, RoomEvent, TrackKind } from '@livekit/rtc-node'; +import { ThrowsPromise } from '@livekit/throws-transformer/throws'; import { AsyncLocalStorage } from 'node:async_hooks'; import * as os from 'node:os'; import * as path from 'node:path'; @@ -188,7 +189,7 @@ export class JobContext { } } - return new Promise((resolve, reject) => { + return new ThrowsPromise((resolve, reject) => { const onParticipantConnected = (participant: RemoteParticipant) => { if ( (!identity || participant.identity === identity) && diff --git a/agents/src/stream/merge_readable_streams.ts b/agents/src/stream/merge_readable_streams.ts index 9ee455fe3..dbce90c67 100644 --- a/agents/src/stream/merge_readable_streams.ts +++ b/agents/src/stream/merge_readable_streams.ts @@ -1,8 +1,9 @@ // SPDX-FileCopyrightText: 2025 LiveKit, Inc. // // SPDX-License-Identifier: Apache-2.0 +import { ThrowsPromise } from '@livekit/throws-transformer/throws'; import { ReadableStream } from 'node:stream/web'; -import { withResolvers } from '../utils.js'; +import { asError, withResolvers } from '../utils.js'; // Adapted from https://github.com/denoland/std/blob/main/streams/merge_readable_streams.ts // we manually adapted to make ReadableStream typing compatible with our current node @@ -12,7 +13,7 @@ export function mergeReadableStreams(...streams: ReadableStream[]): Readab return new ReadableStream({ start(controller) { let mustClose = false; - Promise.all(resolvePromises.map(({ promise }) => promise)) + ThrowsPromise.all(resolvePromises.map(({ promise }) => promise)) .then(() => { controller.close(); }) @@ -31,7 +32,7 @@ export function mergeReadableStreams(...streams: ReadableStream[]): Readab } resolvePromises[index]!.resolve(); } catch (error) { - resolvePromises[index]!.reject(error); + resolvePromises[index]!.reject(asError(error)); } })(); } diff --git a/agents/src/stream/multi_input_stream.ts b/agents/src/stream/multi_input_stream.ts index 99f03dfc4..c5a92c980 100644 --- a/agents/src/stream/multi_input_stream.ts +++ b/agents/src/stream/multi_input_stream.ts @@ -1,6 +1,7 @@ // SPDX-FileCopyrightText: 2025 LiveKit, Inc. // // SPDX-License-Identifier: Apache-2.0 +import { ThrowsPromise } from '@livekit/throws-transformer/throws'; import type { ReadableStream, ReadableStreamDefaultReader, @@ -115,7 +116,7 @@ export class MultiInputStream { this.inputs.clear(); // Wait for every pump loop to finish before touching the writer. - await Promise.allSettled([...this.pumpPromises.values()]); + await ThrowsPromise.allSettled([...this.pumpPromises.values()]); this.pumpPromises.clear(); // Close the output writer + writable side of the transform. diff --git a/agents/src/stt/stream_adapter.ts b/agents/src/stt/stream_adapter.ts index d2092c2aa..d8651ea5b 100644 --- a/agents/src/stt/stream_adapter.ts +++ b/agents/src/stt/stream_adapter.ts @@ -2,6 +2,7 @@ // // SPDX-License-Identifier: Apache-2.0 import type { AudioFrame } from '@livekit/rtc-node'; +import { ThrowsPromise } from '@livekit/throws-transformer/throws'; import { log } from '../log.js'; import type { APIConnectOptions } from '../types.js'; import { isStreamClosedError } from '../utils.js'; @@ -113,6 +114,6 @@ export class StreamAdapterWrapper extends SpeechStream { } }; - await Promise.all([forwardInput(), recognize()]); + await ThrowsPromise.all([forwardInput(), recognize()]); } } diff --git a/agents/src/telemetry/logging.ts b/agents/src/telemetry/logging.ts index eefb05c98..4f589875f 100644 --- a/agents/src/telemetry/logging.ts +++ b/agents/src/telemetry/logging.ts @@ -1,6 +1,7 @@ // SPDX-FileCopyrightText: 2025 LiveKit, Inc. // // SPDX-License-Identifier: Apache-2.0 +import { ThrowsPromise } from '@livekit/throws-transformer/throws'; import type { Attributes } from '@opentelemetry/api'; import type { LogRecord, LogRecordProcessor } from '@opentelemetry/sdk-logs'; @@ -24,11 +25,11 @@ export class MetadataLogProcessor implements LogRecordProcessor { } shutdown(): Promise { - return Promise.resolve(); + return ThrowsPromise.resolve(); } forceFlush(): Promise { - return Promise.resolve(); + return ThrowsPromise.resolve(); } } @@ -46,10 +47,10 @@ export class ExtraDetailsProcessor implements LogRecordProcessor { } shutdown(): Promise { - return Promise.resolve(); + return ThrowsPromise.resolve(); } forceFlush(): Promise { - return Promise.resolve(); + return ThrowsPromise.resolve(); } } diff --git a/agents/src/telemetry/traces.ts b/agents/src/telemetry/traces.ts index 8ee52e586..ea996ada7 100644 --- a/agents/src/telemetry/traces.ts +++ b/agents/src/telemetry/traces.ts @@ -2,6 +2,7 @@ // // SPDX-License-Identifier: Apache-2.0 import { MetricsRecordingHeader } from '@livekit/protocol'; +import { ThrowsPromise } from '@livekit/throws-transformer/throws'; import { type Attributes, type Context, @@ -164,11 +165,11 @@ class MetadataSpanProcessor implements SpanProcessor { onEnd(_span: ReadableSpan): void {} shutdown(): Promise { - return Promise.resolve(); + return ThrowsPromise.resolve(); } forceFlush(): Promise { - return Promise.resolve(); + return ThrowsPromise.resolve(); } } @@ -624,7 +625,7 @@ export async function uploadSessionReport(options: { // Upload to LiveKit Cloud using form-data's submit method // This properly streams the multipart form with all headers including Content-Length - return new Promise((resolve, reject) => { + return new ThrowsPromise((resolve, reject) => { formData.submit( { protocol: 'https:', diff --git a/agents/src/transcription.ts b/agents/src/transcription.ts index 63d301413..f46ff952e 100644 --- a/agents/src/transcription.ts +++ b/agents/src/transcription.ts @@ -3,6 +3,8 @@ // SPDX-License-Identifier: Apache-2.0 import { TranscriptionSegment } from '@livekit/protocol'; import { AudioFrame } from '@livekit/rtc-node'; +import { ThrowsPromise } from '@livekit/throws-transformer/promise'; +import type { Throws } from '@livekit/throws-transformer/throws'; import type { TypedEventEmitter as TypedEmitter } from '@livekit/typed-emitter'; import { EventEmitter } from 'node:events'; import { basic } from './tokenize/index.js'; @@ -59,7 +61,7 @@ export class TextAudioSynchronizer extends (EventEmitter as new () => TypedEmitt #closed = false; #interrupted = false; - #closeFut = new Future(); + #closeFut = new Future(); #playingSegIndex = -1; #finishedSegIndex = -1; @@ -288,10 +290,10 @@ export class TextAudioSynchronizer extends (EventEmitter as new () => TypedEmitt textData.forwardedSentences++; } - async #sleepIfNotClosed(delay: number) { - await Promise.race([ + async #sleepIfNotClosed(delay: number): Promise> { + await ThrowsPromise.race([ this.#closeFut.await, - new Promise((resolve) => setTimeout(resolve, delay)), + new ThrowsPromise((resolve) => setTimeout(resolve, delay)), ]); } diff --git a/agents/src/tts/fallback_adapter.ts b/agents/src/tts/fallback_adapter.ts index 121062260..e5ec5e0ca 100644 --- a/agents/src/tts/fallback_adapter.ts +++ b/agents/src/tts/fallback_adapter.ts @@ -2,7 +2,7 @@ // // SPDX-License-Identifier: Apache-2.0 import { AudioResampler } from '@livekit/rtc-node'; -import type { Throws } from '@livekit/throws-transformer/throws'; +import { type Throws, ThrowsPromise } from '@livekit/throws-transformer/throws'; import { APIConnectionError, APIError } from '../_exceptions.js'; import { log } from '../log.js'; import { basic } from '../tokenize/index.js'; @@ -285,7 +285,7 @@ export class FallbackAdapter extends TTS { } // Close all TTS instances - await Promise.all(this.ttsInstances.map((tts) => tts.close())); + await ThrowsPromise.all(this.ttsInstances.map((tts) => tts.close())); } } @@ -476,7 +476,7 @@ class FallbackSynthesizeStream extends SynthesizeStream { stream.pushText(token); } } - await new Promise((resolve) => setTimeout(resolve, FORWARD_POLL_MS)); + await new ThrowsPromise((resolve) => setTimeout(resolve, FORWARD_POLL_MS)); if (this.abortController.signal.aborted || streamOutputCompleted) { stream.endInput(); return; @@ -546,7 +546,7 @@ class FallbackSynthesizeStream extends SynthesizeStream { streamOutputCompleted = true; } }; - const [outputResult, forwardBufferResult] = await Promise.allSettled([ + const [outputResult, forwardBufferResult] = await ThrowsPromise.allSettled([ processOutput(), forwardBufferToTTS().catch((err) => { stream.close(); // Close stream so processOutput can exit diff --git a/agents/src/tts/stream_adapter.ts b/agents/src/tts/stream_adapter.ts index 058c947cd..c3933badd 100644 --- a/agents/src/tts/stream_adapter.ts +++ b/agents/src/tts/stream_adapter.ts @@ -1,6 +1,7 @@ // SPDX-FileCopyrightText: 2024 LiveKit, Inc. // // SPDX-License-Identifier: Apache-2.0 +import { ThrowsPromise } from '@livekit/throws-transformer/throws'; import type { SentenceStream, SentenceTokenizer } from '../tokenize/index.js'; import type { APIConnectOptions } from '../types.js'; import { USERDATA_TIMED_TRANSCRIPT } from '../types.js'; @@ -87,7 +88,7 @@ export class StreamAdapterWrapper extends SynthesizeStream { tokenCompletionTasks.push(task); } - await Promise.all(tokenCompletionTasks.map((t) => t.result)); + await ThrowsPromise.all(tokenCompletionTasks.map((t) => t.result)); this.queue.put(SynthesizeStream.END_OF_STREAM); }; @@ -127,6 +128,6 @@ export class StreamAdapterWrapper extends SynthesizeStream { } }; - await Promise.all([forwardInput(), synthesizeSentenceStream()]); + await ThrowsPromise.all([forwardInput(), synthesizeSentenceStream()]); } } diff --git a/agents/src/tts/tts.ts b/agents/src/tts/tts.ts index def4833d9..70ea5a582 100644 --- a/agents/src/tts/tts.ts +++ b/agents/src/tts/tts.ts @@ -2,6 +2,7 @@ // // SPDX-License-Identifier: Apache-2.0 import type { AudioFrame } from '@livekit/rtc-node'; +import { ThrowsPromise } from '@livekit/throws-transformer/throws'; import type { TypedEventEmitter as TypedEmitter } from '@livekit/typed-emitter'; import type { Span } from '@opentelemetry/api'; import { EventEmitter } from 'node:events'; @@ -524,7 +525,7 @@ export abstract class ChunkedStream implements AsyncIterableIterator this.mainTask().finally(() => this.queue.close())); + ThrowsPromise.resolve().then(() => this.mainTask().finally(() => this.queue.close())); } private _mainTaskImpl = async (span: Span) => { diff --git a/agents/src/typed_promise.ts b/agents/src/typed_promise.ts deleted file mode 100644 index 711d1938c..000000000 --- a/agents/src/typed_promise.ts +++ /dev/null @@ -1,67 +0,0 @@ -// SPDX-FileCopyrightText: 2026 LiveKit, Inc. -// -// SPDX-License-Identifier: Apache-2.0 -type InferErrors = T extends TypedPromise ? E : never; - -interface PromiseRejectedResult { - status: 'rejected'; - reason: E; -} - -type SettledResult = - T extends TypedPromise - ? PromiseFulfilledResult | PromiseRejectedResult - : T extends PromiseLike - ? PromiseFulfilledResult | PromiseRejectedResult - : PromiseFulfilledResult | PromiseRejectedResult; - -export default class TypedPromise extends Promise { - // eslint-disable-next-line @typescript-eslint/no-useless-constructor - constructor( - executor: (resolve: (value: T | PromiseLike) => void, reject: (reason: E) => void) => void, - ) { - super(executor); - } - - catch( - onrejected?: ((reason: E) => TResult | PromiseLike) | null | undefined, - ): TypedPromise { - return super.catch(onrejected); - } - - static resolve: { - (): TypedPromise; - (value: V): TypedPromise, never>; - } = (value?: V): TypedPromise, never> => { - return super.resolve(value) as TypedPromise, never>; - }; - - static reject(reason: E): TypedPromise { - return super.reject(reason); - } - - static all( - values: T, - ): TypedPromise<{ -readonly [P in keyof T]: Awaited }, InferErrors> { - return super.all(values) as any; - } - - static allSettled( - values: T, - ): TypedPromise<{ -readonly [P in keyof T]: SettledResult }, never> { - return super.allSettled(values) as any; - } - - static race | any)[]>( - values: T, - ): TypedPromise< - T[number] extends TypedPromise - ? U - : T[number] extends PromiseLike - ? U - : Awaited, - InferErrors - > { - return super.race(values); - } -} diff --git a/agents/src/utils.ts b/agents/src/utils.ts index 248749b5b..50f559377 100644 --- a/agents/src/utils.ts +++ b/agents/src/utils.ts @@ -9,7 +9,7 @@ import type { TrackKind, } from '@livekit/rtc-node'; import { AudioFrame, AudioResampler, RoomEvent } from '@livekit/rtc-node'; -import type { Throws } from '@livekit/throws-transformer/throws'; +import { type Throws, ThrowsPromise } from '@livekit/throws-transformer/throws'; import { AsyncLocalStorage } from 'node:async_hooks'; import { EventEmitter, once } from 'node:events'; import type { ReadableStream } from 'node:stream/web'; @@ -39,9 +39,12 @@ export type AudioBuffer = AudioFrame[] | AudioFrame; export const noop = () => {}; -export const isPending = async (promise: Promise): Promise => { +export const isPending = async (promise: Promise): Promise> => { const sentinel = Symbol('sentinel'); - const result = await Promise.race([promise, Promise.resolve(sentinel)]); + const result = await Promise.race([ + ThrowsPromise.fromPromise(promise), + ThrowsPromise.resolve(sentinel), + ]); return result === sentinel; }; @@ -122,17 +125,17 @@ export class Queue { } /** @internal */ -export class Future { - #await: Promise; +export class Future { + #await: ThrowsPromise; #resolvePromise!: (value: T) => void; - #rejectPromise!: (error: Error) => void; + #rejectPromise!: (error: E) => void; #done: boolean = false; #rejected: boolean = false; #result: T | undefined = undefined; #error: Error | undefined = undefined; constructor() { - this.#await = new Promise((resolve, reject) => { + this.#await = new ThrowsPromise((resolve, reject) => { this.#resolvePromise = resolve; this.#rejectPromise = reject; }); @@ -169,7 +172,7 @@ export class Future { this.#resolvePromise(value); } - reject(error: Error) { + reject(error: E) { this.#done = true; this.#rejected = true; this.#error = error; @@ -191,7 +194,7 @@ export class Event { if (this.#isSet) return true; let resolve: () => void = noop; - const waiter = new Promise((r) => { + const waiter = new ThrowsPromise((r) => { resolve = r; this.#waiters.push(resolve); }); @@ -225,26 +228,26 @@ export class Event { } /** @internal */ -export class CancellablePromise { - #promise: Promise; +export class CancellablePromise { + #promise: ThrowsPromise; #cancelFn: () => void; #isCancelled: boolean = false; - #error: Error | null = null; + #error: E | null = null; constructor( executor: ( resolve: (value: T | PromiseLike) => void, - reject: (reason?: unknown) => void, + reject: (reason: E) => void, onCancel: (cancelFn: () => void) => void, ) => void, ) { let cancel: () => void; - this.#promise = new Promise((resolve, reject) => { + this.#promise = new ThrowsPromise((resolve, reject) => { executor( resolve, (reason) => { - this.#error = reason instanceof Error ? reason : new Error(String(reason)); + this.#error = reason; reject(reason); }, (cancelFn) => { @@ -269,18 +272,18 @@ export class CancellablePromise { then( onfulfilled?: ((value: T) => TResult1 | Promise) | null, - onrejected?: ((reason: unknown) => TResult2 | Promise) | null, + onrejected?: ((reason: E) => TResult2 | Promise) | null, ): Promise { return this.#promise.then(onfulfilled, onrejected); } catch( - onrejected?: ((reason: unknown) => TResult | Promise) | null, - ): Promise { + onrejected?: ((reason: E) => TResult | Promise) | null, + ): Promise> { return this.#promise.catch(onrejected); } - finally(onfinally?: (() => void) | null): Promise { + finally(onfinally?: (() => void) | null): Promise> { return this.#promise.finally(onfinally); } @@ -288,6 +291,8 @@ export class CancellablePromise { this.#cancelFn(); } + static from(promise: Promise>): CancellablePromise; + static from(promise: Promise): CancellablePromise; static from(promise: Promise): CancellablePromise { return new CancellablePromise((resolve, reject) => { promise.then(resolve).catch(reject); @@ -550,7 +555,7 @@ export class Task { promises.push(delay(timeout).then(() => TaskResult.Timeout)); } - const result = await Promise.race(promises); + const result = await ThrowsPromise.race(promises); // Check what happened if (result === TaskResult.Timeout) { @@ -588,19 +593,19 @@ export class Task { } export async function waitFor(tasks: Task[]): Promise { - await Promise.allSettled(tasks.map((task) => task.result)); + await ThrowsPromise.allSettled(tasks.map((task) => task.result)); } // eslint-disable-next-line @typescript-eslint/no-explicit-any export async function cancelAndWait(tasks: Task[], timeout?: number): Promise { - await Promise.allSettled(tasks.map((task) => task.cancelAndWait(timeout))); + await ThrowsPromise.allSettled(tasks.map((task) => task.cancelAndWait(timeout))); } export function withResolvers() { let resolve!: (value: T | PromiseLike) => void; - let reject!: (reason?: unknown) => void; + let reject!: (reason: Error) => void; - const promise = new Promise((res, rej) => { + const promise = new ThrowsPromise((res, rej) => { resolve = res; reject = rej; }); @@ -806,8 +811,8 @@ export type DelayOptions = { */ export function delay(ms: number, options: DelayOptions = {}): Promise { const { signal } = options; - if (signal?.aborted) return Promise.reject(signal.reason ?? new Error('delay aborted')); - return new Promise((resolve, reject) => { + if (signal?.aborted) return ThrowsPromise.reject(signal.reason ?? new Error('delay aborted')); + return new ThrowsPromise((resolve, reject) => { const abort = () => { clearTimeout(i); reject(signal?.reason ?? new Error('delay aborted')); @@ -840,9 +845,9 @@ export function waitUntilTimeout( throwError?: () => E, ): Promise> { let timer: ReturnType | undefined; - return Promise.race([ + return ThrowsPromise.race([ promise, - new Promise((_, reject) => { + new ThrowsPromise((_, reject) => { timer = setTimeout(() => reject(throwError?.() ?? new IdleTimeoutError()), timeoutMs); }), ]).finally(() => clearTimeout(timer)) as Promise>; @@ -977,7 +982,7 @@ export async function* readStream( if (signal) { const abortPromise = waitForAbort(signal); while (true) { - const result = await Promise.race([reader.read(), abortPromise]); + const result = await ThrowsPromise.race([reader.read(), abortPromise]); if (!result) break; const { done, value } = result; if (done) break; @@ -1066,3 +1071,10 @@ export const isDevMode = (): boolean => { export const isHosted = (): boolean => { return process.env.LIVEKIT_REMOTE_EOT_URL !== undefined; }; + +export function asError(maybeError: unknown): Error { + if (maybeError instanceof Error) { + return maybeError; + } + return new Error(String(maybeError)); +} diff --git a/agents/src/voice/agent_activity.ts b/agents/src/voice/agent_activity.ts index 34a9f2309..fa1889df3 100644 --- a/agents/src/voice/agent_activity.ts +++ b/agents/src/voice/agent_activity.ts @@ -3,6 +3,7 @@ // SPDX-License-Identifier: Apache-2.0 import { Mutex } from '@livekit/mutex'; import type { AudioFrame } from '@livekit/rtc-node'; +import { ThrowsPromise } from '@livekit/throws-transformer/throws'; import type { Span } from '@opentelemetry/api'; import { ROOT_CONTEXT, context as otelContext, trace } from '@opentelemetry/api'; import { Heap } from 'heap-js'; @@ -128,7 +129,7 @@ export async function cleanupReusableResources( } if (tasks.length > 0) { - const outputs = await Promise.allSettled(tasks); + const outputs = await ThrowsPromise.allSettled(tasks); for (const output of outputs) { if (output.status === 'rejected') { if (logger) { @@ -168,7 +169,7 @@ export class AgentActivity implements RecognitionHooks { private _drainBlockedTasks: Task[] = []; private _currentSpeech?: SpeechHandle; private speechQueue: Heap<[number, number, SpeechHandle]>; // [priority, timestamp, speechHandle] - private q_updated: Future; + private q_updated: Future; private speechTasks: Set> = new Set(); private lock = new Mutex(); private audioStream = new MultiInputStream(); @@ -1315,7 +1316,7 @@ export class AgentActivity implements RecognitionHooks { } private async mainTask(signal: AbortSignal): Promise { - const abortFuture = new Future(); + const abortFuture = new Future(); const abortHandler = () => { abortFuture.resolve(); signal.removeEventListener('abort', abortHandler); @@ -1323,7 +1324,7 @@ export class AgentActivity implements RecognitionHooks { signal.addEventListener('abort', abortHandler); while (true) { - await Promise.race([this.q_updated.await, abortFuture.await]); + await ThrowsPromise.race([this.q_updated.await, abortFuture.await]); if (signal.aborted) break; while (this.speechQueue.size() > 0) { @@ -2010,11 +2011,11 @@ export class AgentActivity implements RecognitionHooks { // Check if we should use TTS aligned transcripts if (this.useTtsAlignedTranscript && this.tts?.capabilities.alignedTranscript && ttsGenData) { // Race timedTextsFut with ttsTask to avoid hanging if TTS fails before resolving the future - const timedTextsStream = await Promise.race([ + const timedTextsStream = await ThrowsPromise.race([ ttsGenData.timedTextsFut.await, ttsTask?.result.catch(() => this.logger.warn('TTS task failed before resolving timedTextsFut'), - ) ?? Promise.resolve(), + ) ?? ThrowsPromise.resolve(), ]); if (timedTextsStream) { this.logger.debug('Using TTS aligned transcripts for transcription node input'); @@ -2755,7 +2756,7 @@ export class AgentActivity implements RecognitionHooks { await this.currentSpeech.waitForPlayout(); } else { // Don't block the event loop - await new Promise((resolve) => setImmediate(resolve)); + await new ThrowsPromise((resolve) => setImmediate(resolve)); } } const chatCtx = this.realtimeSession.chatCtx.copy(); diff --git a/agents/src/voice/agent_session.ts b/agents/src/voice/agent_session.ts index 3f4e813d0..2b5ba75ad 100644 --- a/agents/src/voice/agent_session.ts +++ b/agents/src/voice/agent_session.ts @@ -3,6 +3,7 @@ // SPDX-License-Identifier: Apache-2.0 import { Mutex } from '@livekit/mutex'; import type { AudioFrame, Room } from '@livekit/rtc-node'; +import { ThrowsPromise } from '@livekit/throws-transformer/throws'; import type { TypedEventEmitter as TypedEmitter } from '@livekit/typed-emitter'; import type { Context, Span } from '@opentelemetry/api'; import { ROOT_CONTEXT, context as otelContext, trace } from '@opentelemetry/api'; @@ -36,7 +37,7 @@ import { type ResolvedSessionConnectOptions, type SessionConnectOptions, } from '../types.js'; -import { Task } from '../utils.js'; +import { Task, asError } from '../utils.js'; import type { VAD } from '../vad.js'; import type { Agent } from './agent.js'; import { @@ -484,7 +485,7 @@ export class AgentSession< // Initial start does not wait on onEnter tasks.push(this._updateActivity(this.agent, { waitOnEnter: false })); - await Promise.allSettled(tasks); + await ThrowsPromise.allSettled(tasks); if (this.sessionHost) { await this.sessionHost.start(); @@ -767,7 +768,7 @@ export class AgentSession< unlock(); this.generateReply({ userInput }); } catch (e) { - runState._reject(e instanceof Error ? e : new Error(String(e))); + runState._reject(asError(e)); } })(); diff --git a/agents/src/voice/audio_recognition.ts b/agents/src/voice/audio_recognition.ts index 5390fa984..cb2defef4 100644 --- a/agents/src/voice/audio_recognition.ts +++ b/agents/src/voice/audio_recognition.ts @@ -4,6 +4,7 @@ import { Mutex } from '@livekit/mutex'; import type { ParticipantKind } from '@livekit/rtc-node'; import { AudioFrame } from '@livekit/rtc-node'; +import { ThrowsPromise } from '@livekit/throws-transformer/throws'; import { type Context, ROOT_CONTEXT, @@ -1108,7 +1109,7 @@ export class AudioRecognition { try { while (!signal.aborted) { - const res = await Promise.race([inputReader.read(), abortPromise]); + const res = await ThrowsPromise.race([inputReader.read(), abortPromise]); if (!res) break; const { value, done } = res; @@ -1131,7 +1132,7 @@ export class AudioRecognition { const abortPromise = waitForAbort(signal); while (!signal.aborted) { - const res = await Promise.race([eventReader.read(), abortPromise]); + const res = await ThrowsPromise.race([eventReader.read(), abortPromise]); if (!res) break; const { done, value: ev } = res; if (done) break; diff --git a/agents/src/voice/generation.ts b/agents/src/voice/generation.ts index 8cc1b04b1..8c52410de 100644 --- a/agents/src/voice/generation.ts +++ b/agents/src/voice/generation.ts @@ -3,6 +3,7 @@ // SPDX-License-Identifier: Apache-2.0 import type { AudioFrame } from '@livekit/rtc-node'; import { AudioResampler } from '@livekit/rtc-node'; +import { ThrowsPromise } from '@livekit/throws-transformer/throws'; import type { Span } from '@opentelemetry/api'; import { context as otelContext } from '@opentelemetry/api'; import type { ReadableStream, ReadableStreamDefaultReader } from 'stream/web'; @@ -82,7 +83,7 @@ export interface _TTSGenerationData { /** * Future that resolves to a stream of timed transcripts, or null if TTS doesn't support it. */ - timedTextsFut: Future | null>; + timedTextsFut: Future | null, never>; /** Time to first byte (set when first audio frame is received) */ ttfb?: number; } @@ -471,7 +472,7 @@ export function performLLMInference( while (true) { if (signal.aborted) break; - const result = await Promise.race([llmStreamReader.read(), abortPromise]); + const result = await ThrowsPromise.race([llmStreamReader.read(), abortPromise]); if (result === undefined) break; const { done, value: chunk } = result; @@ -565,7 +566,7 @@ export function performTTSInference( const outputWriter = audioStream.writable.getWriter(); const audioOutputStream = audioStream.readable; - const timedTextsFut = new Future | null>(); + const timedTextsFut = new Future | null, never>(); const timedTextsStream = new IdentityTransform(); const timedTextsWriter = timedTextsStream.writable.getWriter(); @@ -1097,7 +1098,7 @@ export function performToolExecutions({ tasks.push(toolTask); } - await Promise.allSettled(tasks.map((task) => task.result)); + await ThrowsPromise.allSettled(tasks.map((task) => task.result)); if (toolOutput.output.length > 0) { logger.debug( { diff --git a/agents/src/voice/remote_session.ts b/agents/src/voice/remote_session.ts index 2f764fd1b..d333cc40d 100644 --- a/agents/src/voice/remote_session.ts +++ b/agents/src/voice/remote_session.ts @@ -4,6 +4,7 @@ import { Timestamp } from '@bufbuild/protobuf'; import { AgentSession as pb } from '@livekit/protocol'; import type { ByteStreamReader, Room, TextStreamInfo } from '@livekit/rtc-node'; +import { ThrowsPromise } from '@livekit/throws-transformer/throws'; import type { TypedEventEmitter } from '@livekit/typed-emitter'; import EventEmitter from 'events'; import { TOPIC_SESSION_MESSAGES } from '../constants.js'; @@ -198,7 +199,7 @@ export class RoomSessionTransport extends SessionTransport { return { next: (): Promise> => { if (this.closed && this.pendingMessages.length === 0) { - return Promise.resolve({ + return ThrowsPromise.resolve({ value: undefined as unknown as pb.AgentSessionMessage, done: true, }); @@ -206,16 +207,16 @@ export class RoomSessionTransport extends SessionTransport { const pending = this.pendingMessages.shift(); if (pending) { - return Promise.resolve({ value: pending, done: false }); + return ThrowsPromise.resolve({ value: pending, done: false }); } - return new Promise>((resolve) => { + return new ThrowsPromise, never>((resolve) => { this.waitingResolve = resolve; }); }, return: (): Promise> => { this.close(); - return Promise.resolve({ + return ThrowsPromise.resolve({ value: undefined as unknown as pb.AgentSessionMessage, done: true, }); @@ -519,7 +520,7 @@ export class SessionHost { this.recvTask.cancel(); } - await Promise.allSettled([...this.tasks].map((task) => task.cancelAndWait())); + await ThrowsPromise.allSettled([...this.tasks].map((task) => task.cancelAndWait())); this.tasks.clear(); await this.transport.close(); diff --git a/agents/src/voice/speech_handle.ts b/agents/src/voice/speech_handle.ts index a3cde5aa6..c369c807f 100644 --- a/agents/src/voice/speech_handle.ts +++ b/agents/src/voice/speech_handle.ts @@ -1,6 +1,7 @@ // SPDX-FileCopyrightText: 2024 LiveKit, Inc. // // SPDX-License-Identifier: Apache-2.0 +import { ThrowsPromise } from '@livekit/throws-transformer/throws'; import type { Context } from '@opentelemetry/api'; import type { ChatItem } from '../llm/index.js'; import type { Task } from '../utils.js'; @@ -164,9 +165,9 @@ export class SpeechHandle { } async waitIfNotInterrupted(aw: Promise[]): Promise { - const allTasksPromise = Promise.all(aw); + const allTasksPromise = ThrowsPromise.all(aw); const fs: Promise[] = [allTasksPromise, this.interruptFut.await]; - await Promise.race(fs); + await ThrowsPromise.race(fs); } addDoneCallback(callback: (sh: SpeechHandle) => void) { diff --git a/agents/src/worker.ts b/agents/src/worker.ts index e500a6a86..3904a0d6f 100644 --- a/agents/src/worker.ts +++ b/agents/src/worker.ts @@ -10,7 +10,7 @@ import { WorkerMessage, WorkerStatus, } from '@livekit/protocol'; -import type { Throws } from '@livekit/throws-transformer/throws'; +import { type Throws, ThrowsPromise } from '@livekit/throws-transformer/throws'; import type { ParticipantInfo } from 'livekit-server-sdk'; import { AccessToken, RoomServiceClient } from 'livekit-server-sdk'; import { EventEmitter } from 'node:events'; @@ -225,7 +225,7 @@ export class ServerOptions { } class PendingAssignment { - promise = new Promise((resolve) => { + promise = new ThrowsPromise((resolve) => { this.resolve = resolve; // this is how JavaScript lets you resolve promises externally }); resolve(arg: JobAssignment) { @@ -384,10 +384,10 @@ export class AgentServer { }); try { - await new Promise((resolve, reject) => { + await new ThrowsPromise((resolve, reject) => { this.#session!.on('open', resolve); this.#session!.on('error', (error) => reject(error)); - this.#session!.on('close', (code) => reject(`WebSocket returned ${code}`)); + this.#session!.on('close', (code) => reject(new Error(`WebSocket returned ${code}`))); }); retries = 0; @@ -409,12 +409,12 @@ export class AgentServer { `failed to connect to LiveKit server (${this.#opts.wsURL}), retrying in ${delay} seconds: (${retries}/${this.#opts.maxRetry})`, ); - await new Promise((resolve) => setTimeout(resolve, delay * 1000)); + await new ThrowsPromise((resolve) => setTimeout(resolve, delay * 1000)); } } }; - await Promise.all([workerWS(), this.#httpServer.run()]); + await ThrowsPromise.all([workerWS(), this.#httpServer.run()]); this.#close.resolve(); } @@ -429,7 +429,7 @@ export class AgentServer { } /** @throws {@link WorkerError} if worker did not drain in time */ - async drain(timeout?: number): Promise> { + async drain(timeout?: number): Promise> { if (this.#draining) { return; } @@ -449,8 +449,8 @@ export class AgentServer { }), ); - const joinJobs = async () => { - return Promise.all( + const joinJobs = async (): Promise> => { + return ThrowsPromise.all( this.#procPool.processes.map((proc): Promise> => { if (!proc.runningJob) { proc.close(); @@ -469,7 +469,7 @@ export class AgentServer { }), ); } - await Promise.race(promises); + await ThrowsPromise.race(promises); } async simulateJob(roomName: string, participantIdentity?: string) { @@ -518,7 +518,7 @@ export class AgentServer { }; this.event.on('worker_msg', send); - const close = new Promise((resolve) => { + const close = new ThrowsPromise((resolve) => { ws.addEventListener('close', () => { closingWS = true; if (!this.#closed) { @@ -818,7 +818,7 @@ export class AgentServer { await this.#inferenceExecutor?.close(); await this.#procPool.close(); await this.#httpServer.close(); - await Promise.allSettled(this.#tasks); + await ThrowsPromise.allSettled(this.#tasks); this.#session?.close(); await this.#close.await; diff --git a/examples/src/testing/agent_task.test.ts b/examples/src/testing/agent_task.test.ts index 927faf68b..163d232a1 100644 --- a/examples/src/testing/agent_task.test.ts +++ b/examples/src/testing/agent_task.test.ts @@ -1,7 +1,7 @@ // SPDX-FileCopyrightText: 2026 LiveKit, Inc. // // SPDX-License-Identifier: Apache-2.0 -import { Future, initializeLogger, llm, voice } from '@livekit/agents'; +import { Future, asError, initializeLogger, llm, voice } from '@livekit/agents'; import * as openai from '@livekit/agents-plugin-openai'; import { afterEach, describe, expect, it } from 'vitest'; import { z } from 'zod'; @@ -27,10 +27,6 @@ initializeLogger({ pretty: true, level: 'warn' }); * NOT COVERED in this suite due to known deadlock limitation. */ -function asError(error: unknown): Error { - return error instanceof Error ? error : new Error(String(error)); -} - async function withFutureResolution(done: Future, fn: () => Promise): Promise { try { done.resolve(await fn()); diff --git a/examples/src/testing/basic_task_group.test.ts b/examples/src/testing/basic_task_group.test.ts index 7cbc9580c..afbc7400e 100644 --- a/examples/src/testing/basic_task_group.test.ts +++ b/examples/src/testing/basic_task_group.test.ts @@ -26,7 +26,7 @@ * - onEnter() signals readiness instead of awaiting generateReply() * - Tasks tested in isolation and as a group */ -import { Future, beta, initializeLogger, llm, voice } from '@livekit/agents'; +import { Future, asError, beta, initializeLogger, llm, voice } from '@livekit/agents'; import { afterEach, describe, expect, it } from 'vitest'; import { z } from 'zod'; @@ -40,10 +40,6 @@ initializeLogger({ pretty: true, level: 'warn' }); // Helpers // --------------------------------------------------------------------------- -function asError(error: unknown): Error { - return error instanceof Error ? error : new Error(String(error)); -} - /** Run `fn` and forward its result or error to `done`. */ async function withFutureResolution(done: Future, fn: () => Promise): Promise { try { diff --git a/examples/src/testing/task_group.test.ts b/examples/src/testing/task_group.test.ts index ed2687e87..3d5afff06 100644 --- a/examples/src/testing/task_group.test.ts +++ b/examples/src/testing/task_group.test.ts @@ -1,7 +1,7 @@ // SPDX-FileCopyrightText: 2026 LiveKit, Inc. // // SPDX-License-Identifier: Apache-2.0 -import { Future, beta, initializeLogger, llm, voice } from '@livekit/agents'; +import { Future, asError, beta, initializeLogger, llm, voice } from '@livekit/agents'; import { afterEach, describe, expect, it } from 'vitest'; import { z } from 'zod'; @@ -11,10 +11,6 @@ type TaskCompletedEvent = beta.TaskCompletedEvent; initializeLogger({ pretty: true, level: 'warn' }); -function asError(error: unknown): Error { - return error instanceof Error ? error : new Error(String(error)); -} - async function withFutureResolution(done: Future, fn: () => Promise): Promise { try { done.resolve(await fn()); diff --git a/plugins/cartesia/src/tts.ts b/plugins/cartesia/src/tts.ts index 867f730c9..966c57f2d 100644 --- a/plugins/cartesia/src/tts.ts +++ b/plugins/cartesia/src/tts.ts @@ -8,6 +8,7 @@ import { AudioByteStream, Future, type TimedString, + asError, createTimedString, getBaseLanguage, log, @@ -554,8 +555,6 @@ export class SynthesizeStream extends tts.SynthesizeStream { } } -const asError = (e: unknown): Error => (e instanceof Error ? e : new Error(String(e))); - const transientNetworkCodes = new Set([ 'ETIMEDOUT', 'ECONNRESET', diff --git a/plugins/elevenlabs/src/tts.ts b/plugins/elevenlabs/src/tts.ts index 1f5bbf71b..c1e4ad592 100644 --- a/plugins/elevenlabs/src/tts.ts +++ b/plugins/elevenlabs/src/tts.ts @@ -10,6 +10,7 @@ import { AudioByteStream, Future, type TimedString, + asError, createTimedString, getBaseLanguage, log, @@ -591,7 +592,7 @@ class Connection { } catch (e) { this.#logger.warn({ error: e }, 'recv loop error'); for (const ctx of this.#contextData.values()) { - ctx.waiter.reject(e instanceof Error ? e : new Error(String(e))); + ctx.waiter.reject(asError(e)); } this.#contextData.clear(); } finally { diff --git a/plugins/inworld/src/tts.ts b/plugins/inworld/src/tts.ts index dd2272e69..28848b95b 100644 --- a/plugins/inworld/src/tts.ts +++ b/plugins/inworld/src/tts.ts @@ -5,6 +5,7 @@ import { type APIConnectOptions, AudioByteStream, type TimedString, + asError, createTimedString, log, shortuuid, @@ -193,7 +194,7 @@ class WSConnectionPool { try { return await this.#attemptConnection(); } catch (err) { - lastError = err instanceof Error ? err : new Error(String(err)); + lastError = asError(err); this.#connecting = undefined; if (attempt < MAX_RETRIES) { diff --git a/plugins/phonic/src/realtime/realtime_model.ts b/plugins/phonic/src/realtime/realtime_model.ts index 51e9fea47..a966e06f6 100644 --- a/plugins/phonic/src/realtime/realtime_model.ts +++ b/plugins/phonic/src/realtime/realtime_model.ts @@ -6,6 +6,7 @@ import { AudioByteStream, DEFAULT_API_CONNECT_OPTIONS, Future, + asError, llm, log, shortuuid, @@ -250,7 +251,7 @@ export class RealtimeSession extends llm.RealtimeSession { private configSent = false; private instructionsReady = new Future(); private toolsReady = new Future(); - private closedFuture = new Future(); + private closedFuture = new Future(); private connectTask: Promise; private toolDefinitions: Record[] = []; private pendingToolCallIds = new Set(); @@ -273,7 +274,7 @@ export class RealtimeSession extends llm.RealtimeSession { (PHONIC_INPUT_SAMPLE_RATE * PHONIC_INPUT_FRAME_MS) / 1000, ); this.connectTask = this.connect().catch((error: unknown) => { - const normalizedError = error instanceof Error ? error : new Error(String(error)); + const normalizedError = asError(error); this.emitError(normalizedError, false); }); } diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index aeefc7e53..1d4236cc9 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -119,8 +119,8 @@ importers: specifier: ^1.45.3 version: 1.45.3 '@livekit/throws-transformer': - specifier: 0.0.0-20260320165515 - version: 0.0.0-20260320165515(typescript@5.4.5) + specifier: 0.1.8 + version: 0.1.8(typescript@5.4.5) '@livekit/typed-emitter': specifier: ^3.0.0 version: 3.0.0 @@ -2169,8 +2169,8 @@ packages: resolution: {integrity: sha512-4tL58O2DdTDP+g1ajyP5mgEOzjymD/u06IxWWVKBee1goEwDSQlMqEog/DJW34FoNNqXp1yRMCsphI4V/T1ILg==} engines: {node: '>= 18'} - '@livekit/throws-transformer@0.0.0-20260320165515': - resolution: {integrity: sha512-3L4UKOov1VXuX6sHIBuonJTaPzsSkpqZT3htvamgUYR0pL/aJ+0piiWzTPoCx9WSfmmUUAQqjd42IPgHXQVdvQ==} + '@livekit/throws-transformer@0.1.8': + resolution: {integrity: sha512-AaSwQfIaG6YArKOCO+5/DgI8HfL19oHdrkyI0LJJVCqGeZXzPsZIO/Nm/SKUHbT1ERGhF5c34X8CcVWeOePpIQ==} hasBin: true peerDependencies: typescript: '>=4.7.0' @@ -6574,7 +6574,7 @@ snapshots: pino: 9.6.0 pino-pretty: 13.0.0 - '@livekit/throws-transformer@0.0.0-20260320165515(typescript@5.4.5)': + '@livekit/throws-transformer@0.1.8(typescript@5.4.5)': dependencies: glob: 13.0.6 typescript: 5.4.5 @@ -9998,7 +9998,7 @@ snapshots: dependencies: '@isaacs/fs-minipass': 4.0.1 chownr: 3.0.0 - minipass: 7.1.2 + minipass: 7.1.3 minizlib: 3.0.1 mkdirp: 3.0.1 yallist: 5.0.0