From 2fa5d598ccc532f69a8d8e3422179fbec271e1ec Mon Sep 17 00:00:00 2001 From: lukasIO Date: Tue, 14 Apr 2026 14:06:01 +0200 Subject: [PATCH 01/13] Use ThrowsPromise helper across agent package --- agents/package.json | 2 +- agents/src/connection_pool.ts | 3 +- agents/src/http_server.ts | 5 +- .../interruption/interruption_detector.ts | 3 +- .../inference/interruption/ws_transport.ts | 5 +- agents/src/inference/stt.ts | 13 ++-- agents/src/inference/tts.ts | 3 +- agents/src/inference/utils.ts | 3 +- agents/src/ipc/inference_proc_executor.ts | 9 ++- agents/src/ipc/inference_proc_lazy_main.ts | 7 +- agents/src/ipc/job_proc_lazy_main.ts | 11 +-- agents/src/ipc/proc_pool.ts | 4 +- agents/src/job.ts | 3 +- agents/src/stream/merge_readable_streams.ts | 7 +- agents/src/stream/multi_input_stream.ts | 3 +- agents/src/stt/stream_adapter.ts | 3 +- agents/src/telemetry/logging.ts | 9 +-- agents/src/telemetry/traces.ts | 7 +- agents/src/transcription.ts | 1 + agents/src/tts/fallback_adapter.ts | 8 +-- agents/src/tts/stream_adapter.ts | 5 +- agents/src/tts/tts.ts | 3 +- agents/src/typed_promise.ts | 67 ------------------- agents/src/utils.ts | 37 +++++----- agents/src/voice/agent_activity.ts | 11 +-- agents/src/voice/agent_session.ts | 3 +- agents/src/voice/audio_recognition.ts | 5 +- agents/src/voice/generation.ts | 5 +- agents/src/voice/remote_session.ts | 11 +-- agents/src/voice/room_io/_output.ts | 1 + agents/src/voice/speech_handle.ts | 5 +- agents/src/worker.ts | 24 +++---- pnpm-lock.yaml | 12 ++-- 33 files changed, 133 insertions(+), 165 deletions(-) delete mode 100644 agents/src/typed_promise.ts diff --git a/agents/package.json b/agents/package.json index 225baf55e..b66cea054 100644 --- a/agents/package.json +++ b/agents/package.json @@ -52,7 +52,7 @@ "@livekit/mutex": "^1.1.1", "@livekit/protocol": "^1.45.3", "@livekit/typed-emitter": "^3.0.0", - "@livekit/throws-transformer": "0.0.0-20260320165515", + "@livekit/throws-transformer": "0.1.8", "@opentelemetry/api": "^1.9.0", "@opentelemetry/api-logs": "^0.54.0", "@opentelemetry/core": "^2.2.0", diff --git a/agents/src/connection_pool.ts b/agents/src/connection_pool.ts index b2f13c798..206c569fc 100644 --- a/agents/src/connection_pool.ts +++ b/agents/src/connection_pool.ts @@ -2,6 +2,7 @@ // // SPDX-License-Identifier: Apache-2.0 import { Mutex } from '@livekit/mutex'; +import { ThrowsPromise } from '@livekit/throws-transformer/throws'; import { waitForAbort } from './utils.js'; /** @@ -273,7 +274,7 @@ export class ConnectionPool { try { const fnPromise = fn(conn); const result = signal - ? await Promise.race([ + ? await ThrowsPromise.race([ fnPromise.then((value) => ({ type: 'result' as const, value })), waitForAbort(signal).then(() => ({ type: 'abort' as const })), ]).then((r) => { diff --git a/agents/src/http_server.ts b/agents/src/http_server.ts index ebf72a52b..54ebaf0a5 100644 --- a/agents/src/http_server.ts +++ b/agents/src/http_server.ts @@ -1,6 +1,7 @@ // SPDX-FileCopyrightText: 2024 LiveKit, Inc. // // SPDX-License-Identifier: Apache-2.0 +import { ThrowsPromise } from '@livekit/throws-transformer/throws'; import { type IncomingMessage, type Server, type ServerResponse, createServer } from 'node:http'; import { log } from './log.js'; @@ -53,7 +54,7 @@ export class HTTPServer { } async run(): Promise { - return new Promise((resolve, reject) => { + return new ThrowsPromise((resolve, reject) => { this.app.listen(this.port, this.host, (err?: Error) => { if (err) reject(err); const address = this.app.address(); @@ -66,7 +67,7 @@ export class HTTPServer { } async close(): Promise { - return new Promise((resolve, reject) => { + return new ThrowsPromise((resolve, reject) => { this.app.close((err?: Error) => { if (err) reject(err); resolve(); diff --git a/agents/src/inference/interruption/interruption_detector.ts b/agents/src/inference/interruption/interruption_detector.ts index d115918f6..585fbad2b 100644 --- a/agents/src/inference/interruption/interruption_detector.ts +++ b/agents/src/inference/interruption/interruption_detector.ts @@ -2,6 +2,7 @@ // // SPDX-License-Identifier: Apache-2.0 import type { TypedEventEmitter } from '@livekit/typed-emitter'; +import { ThrowsPromise } from '@livekit/throws-transformer/throws'; import EventEmitter from 'events'; import { log } from '../../log.js'; import type { InterruptionMetrics } from '../../metrics/base.js'; @@ -199,6 +200,6 @@ export class AdaptiveInterruptionDetector extends (EventEmitter as new () => Typ for (const stream of this.streams) { updatePromises.push(stream.updateOptions(options)); } - await Promise.all(updatePromises); + await ThrowsPromise.all(updatePromises); } } diff --git a/agents/src/inference/interruption/ws_transport.ts b/agents/src/inference/interruption/ws_transport.ts index 13865ca72..2e99187d3 100644 --- a/agents/src/inference/interruption/ws_transport.ts +++ b/agents/src/inference/interruption/ws_transport.ts @@ -1,13 +1,12 @@ // SPDX-FileCopyrightText: 2025 LiveKit, Inc. // // SPDX-License-Identifier: Apache-2.0 -import type { Throws } from '@livekit/throws-transformer/throws'; +import { type Throws, ThrowsPromise } from '@livekit/throws-transformer/throws'; import { TransformStream } from 'stream/web'; import WebSocket from 'ws'; import { z } from 'zod'; import { APIConnectionError, APIStatusError, APITimeoutError } from '../../_exceptions.js'; import { log } from '../../log.js'; -import TypedPromise from '../../typed_promise.js'; import { buildMetadataHeaders, createAccessToken } from '../utils.js'; import { InterruptionCacheEntry } from './interruption_cache_entry.js'; import type { OverlappingSpeechEvent } from './types.js'; @@ -83,7 +82,7 @@ async function connectWebSocket( headers: { ...buildMetadataHeaders(), Authorization: `Bearer ${token}` }, }); - await new TypedPromise( + await new ThrowsPromise( (resolve, reject) => { const timeout = setTimeout(() => { ws.terminate(); diff --git a/agents/src/inference/stt.ts b/agents/src/inference/stt.ts index 8c3a49da0..03da41f3e 100644 --- a/agents/src/inference/stt.ts +++ b/agents/src/inference/stt.ts @@ -2,6 +2,7 @@ // // SPDX-License-Identifier: Apache-2.0 import { type AudioFrame } from '@livekit/rtc-node'; +import { ThrowsPromise } from '@livekit/throws-transformer/throws'; import type { WebSocket } from 'ws'; import { APIError, APIStatusError } from '../_exceptions.js'; import { AudioByteStream } from '../audio.js'; @@ -403,7 +404,7 @@ export class SpeechStream extends BaseSpeechStream { }; const createWsListener = async (ws: WebSocket, signal: AbortSignal) => { - return new Promise((resolve, reject) => { + return new ThrowsPromise((resolve, reject) => { const onAbort = () => { resourceCleanup(); reject(new Error('WebSocket connection aborted')); @@ -446,7 +447,7 @@ export class SpeechStream extends BaseSpeechStream { ); // Create abort promise once to avoid memory leak - const abortPromise = new Promise((_, reject) => { + const abortPromise = new ThrowsPromise((_, reject) => { if (signal.aborted) { return reject(new Error('Send aborted')); } @@ -458,7 +459,7 @@ export class SpeechStream extends BaseSpeechStream { const iterator = this.input[Symbol.asyncIterator](); try { while (true) { - const result = await Promise.race([iterator.next(), abortPromise]); + const result = await ThrowsPromise.race([iterator.next(), abortPromise]); if (result.done) break; const ev = result.value; @@ -560,13 +561,13 @@ export class SpeechStream extends BaseSpeechStream { ); const recvTask = Task.from(({ signal }) => recv(signal), connController); const waitReconnectTask = Task.from( - ({ signal }) => Promise.race([this.reconnectEvent.wait(), waitForAbort(signal)]), + ({ signal }) => ThrowsPromise.race([this.reconnectEvent.wait(), waitForAbort(signal)]), connController, ); try { - await Promise.race([ - Promise.all([sendTask.result, wsListenerTask.result, recvTask.result]), + await ThrowsPromise.race([ + ThrowsPromise.all([sendTask.result, wsListenerTask.result, recvTask.result]), waitReconnectTask.result, ]); diff --git a/agents/src/inference/tts.ts b/agents/src/inference/tts.ts index 3b91041f1..b2c213af9 100644 --- a/agents/src/inference/tts.ts +++ b/agents/src/inference/tts.ts @@ -2,6 +2,7 @@ // // SPDX-License-Identifier: Apache-2.0 import type { AudioFrame } from '@livekit/rtc-node'; +import { ThrowsPromise } from '@livekit/throws-transformer/throws'; import { WebSocket } from 'ws'; import { APIError, APIStatusError, APITimeoutError } from '../_exceptions.js'; import { AudioByteStream } from '../audio.js'; @@ -710,7 +711,7 @@ export class SynthesizeStream extends BaseSynthesizeSt ]; try { - await Promise.all(tasks.map((t) => t.result)); + await ThrowsPromise.all(tasks.map((t) => t.result)); } finally { // Mirror python finally: unblock recv and cancel all tasks. inputSentEvent.set(); diff --git a/agents/src/inference/utils.ts b/agents/src/inference/utils.ts index 5bd85dd16..ad0093b94 100644 --- a/agents/src/inference/utils.ts +++ b/agents/src/inference/utils.ts @@ -1,6 +1,7 @@ // SPDX-FileCopyrightText: 2025 LiveKit, Inc. // // SPDX-License-Identifier: Apache-2.0 +import { ThrowsPromise } from '@livekit/throws-transformer/throws'; import { AccessToken } from 'livekit-server-sdk'; import { WebSocket } from 'ws'; import { APIConnectionError, APIStatusError } from '../_exceptions.js'; @@ -78,7 +79,7 @@ export async function connectWs( headers: Record, timeoutMs: number, ): Promise { - return new Promise((resolve, reject) => { + return new ThrowsPromise((resolve, reject) => { const socket = new WebSocket(url, { headers: { ...buildMetadataHeaders(), ...headers } }); const timeout = setTimeout(() => { diff --git a/agents/src/ipc/inference_proc_executor.ts b/agents/src/ipc/inference_proc_executor.ts index f83a8f6f7..b3eefe1e7 100644 --- a/agents/src/ipc/inference_proc_executor.ts +++ b/agents/src/ipc/inference_proc_executor.ts @@ -1,6 +1,7 @@ // SPDX-FileCopyrightText: 2025 LiveKit, Inc. // // SPDX-License-Identifier: Apache-2.0 +import { ThrowsPromise } from '@livekit/throws-transformer/throws'; import type { ChildProcess } from 'node:child_process'; import { fork } from 'node:child_process'; import { extname } from 'node:path'; @@ -11,9 +12,11 @@ import type { IPCMessage } from './message.js'; import { SupervisedProc } from './supervised_proc.js'; class PendingInference { - promise = new Promise<{ requestId: string; data: unknown; error?: Error }>((resolve) => { - this.resolve = resolve; - }); + promise = new ThrowsPromise<{ requestId: string; data: unknown; error?: Error }, never>( + (resolve) => { + this.resolve = resolve; + }, + ); resolve(arg: { requestId: string; data: unknown; error?: Error }) { arg; } diff --git a/agents/src/ipc/inference_proc_lazy_main.ts b/agents/src/ipc/inference_proc_lazy_main.ts index 08f7a5c7f..dd5a2d1a1 100644 --- a/agents/src/ipc/inference_proc_lazy_main.ts +++ b/agents/src/ipc/inference_proc_lazy_main.ts @@ -1,6 +1,7 @@ // SPDX-FileCopyrightText: 2025 LiveKit, Inc. // // SPDX-License-Identifier: Apache-2.0 +import { ThrowsPromise } from '@livekit/throws-transformer/throws'; import { once } from 'node:events'; import type { InferenceRunner } from '../inference_runner.js'; import { initializeLogger, log } from '../log.js'; @@ -34,7 +35,7 @@ const ORPHANED_TIMEOUT = 15 * 1000; }); const logger = log().child({ pid: process.pid }); - const runners: { [id: string]: InferenceRunner } = await Promise.all( + const runners: { [id: string]: InferenceRunner } = await ThrowsPromise.all( Object.entries(JSON.parse(process.argv[2]!)).map(async ([k, v]) => { return [ k, @@ -52,7 +53,7 @@ const ORPHANED_TIMEOUT = 15 * 1000; }), ).then(Object.fromEntries); - await Promise.all( + await ThrowsPromise.all( Object.entries(runners).map(async ([runner, v]) => { logger.child({ runner }).debug('initializing inference runner'); await v.initialize(); @@ -101,7 +102,7 @@ const ORPHANED_TIMEOUT = 15 * 1000; clearTimeout(orphanedTimeout); // Remove our message handler to stop processing new messages process.off('message', messageHandler); - Promise.all(Object.values(runners).map((r) => r.close())) + ThrowsPromise.all(Object.values(runners).map((r) => r.close())) .then(() => { logger.debug('Inference runners closed'); process.send!({ case: 'done' }); diff --git a/agents/src/ipc/job_proc_lazy_main.ts b/agents/src/ipc/job_proc_lazy_main.ts index d608123f5..a24ae54f0 100644 --- a/agents/src/ipc/job_proc_lazy_main.ts +++ b/agents/src/ipc/job_proc_lazy_main.ts @@ -1,6 +1,7 @@ // SPDX-FileCopyrightText: 2024 LiveKit, Inc. // // SPDX-License-Identifier: Apache-2.0 +import { ThrowsPromise } from '@livekit/throws-transformer/throws'; import { Room, RoomEvent, dispose } from '@livekit/rtc-node'; import { EventEmitter, once } from 'node:events'; import { pathToFileURL } from 'node:url'; @@ -40,9 +41,11 @@ type JobTask = { }; class PendingInference { - promise = new Promise<{ requestId: string; data: unknown; error?: Error }>((resolve) => { - this.resolve = resolve; // this is how JavaScript lets you resolve promises externally - }); + promise = new ThrowsPromise<{ requestId: string; data: unknown; error?: Error }, never>( + (resolve) => { + this.resolve = resolve; // this is how JavaScript lets you resolve promises externally + }, + ); resolve(arg: { requestId: string; data: unknown; error?: Error }) { arg; // useless call to counteract TypeScript E6133 } @@ -177,7 +180,7 @@ const startJob = ( for (const callback of ctx.shutdownCallbacks) { shutdownTasks.push(callback()); } - await Promise.all(shutdownTasks).catch((error) => + await ThrowsPromise.all(shutdownTasks).catch((error) => logger.error({ error }, 'error while shutting down the job'), ); diff --git a/agents/src/ipc/proc_pool.ts b/agents/src/ipc/proc_pool.ts index 8499f30e1..3dffacbe5 100644 --- a/agents/src/ipc/proc_pool.ts +++ b/agents/src/ipc/proc_pool.ts @@ -2,7 +2,7 @@ // // SPDX-License-Identifier: Apache-2.0 import { MultiMutex, Mutex } from '@livekit/mutex'; -import type { Throws } from '@livekit/throws-transformer/throws'; +import { type Throws, ThrowsPromise } from '@livekit/throws-transformer/throws'; import type { RunningJobInfo } from '../job.js'; import { Queue } from '../utils.js'; import type { InferenceExecutor } from './inference_executor.js'; @@ -174,6 +174,6 @@ export class ProcPool { e.proc.close(); }); this.executors.forEach((e) => e.close()); - await Promise.allSettled(this.tasks); + await ThrowsPromise.allSettled(this.tasks); } } diff --git a/agents/src/job.ts b/agents/src/job.ts index 99e16e90b..264913bd3 100644 --- a/agents/src/job.ts +++ b/agents/src/job.ts @@ -10,6 +10,7 @@ import type { RtcConfiguration, } from '@livekit/rtc-node'; import { ParticipantKind, RoomEvent, TrackKind } from '@livekit/rtc-node'; +import { ThrowsPromise } from '@livekit/throws-transformer/throws'; import { AsyncLocalStorage } from 'node:async_hooks'; import * as os from 'node:os'; import * as path from 'node:path'; @@ -188,7 +189,7 @@ export class JobContext { } } - return new Promise((resolve, reject) => { + return new ThrowsPromise((resolve, reject) => { const onParticipantConnected = (participant: RemoteParticipant) => { if ( (!identity || participant.identity === identity) && diff --git a/agents/src/stream/merge_readable_streams.ts b/agents/src/stream/merge_readable_streams.ts index 9ee455fe3..1ebb9d032 100644 --- a/agents/src/stream/merge_readable_streams.ts +++ b/agents/src/stream/merge_readable_streams.ts @@ -1,8 +1,9 @@ // SPDX-FileCopyrightText: 2025 LiveKit, Inc. // // SPDX-License-Identifier: Apache-2.0 +import { ThrowsPromise } from '@livekit/throws-transformer/throws'; import { ReadableStream } from 'node:stream/web'; -import { withResolvers } from '../utils.js'; +import { unknownToError, withResolvers } from '../utils.js'; // Adapted from https://github.com/denoland/std/blob/main/streams/merge_readable_streams.ts // we manually adapted to make ReadableStream typing compatible with our current node @@ -12,7 +13,7 @@ export function mergeReadableStreams(...streams: ReadableStream[]): Readab return new ReadableStream({ start(controller) { let mustClose = false; - Promise.all(resolvePromises.map(({ promise }) => promise)) + ThrowsPromise.all(resolvePromises.map(({ promise }) => promise)) .then(() => { controller.close(); }) @@ -31,7 +32,7 @@ export function mergeReadableStreams(...streams: ReadableStream[]): Readab } resolvePromises[index]!.resolve(); } catch (error) { - resolvePromises[index]!.reject(error); + resolvePromises[index]!.reject(unknownToError(error)); } })(); } diff --git a/agents/src/stream/multi_input_stream.ts b/agents/src/stream/multi_input_stream.ts index 99f03dfc4..c5a92c980 100644 --- a/agents/src/stream/multi_input_stream.ts +++ b/agents/src/stream/multi_input_stream.ts @@ -1,6 +1,7 @@ // SPDX-FileCopyrightText: 2025 LiveKit, Inc. // // SPDX-License-Identifier: Apache-2.0 +import { ThrowsPromise } from '@livekit/throws-transformer/throws'; import type { ReadableStream, ReadableStreamDefaultReader, @@ -115,7 +116,7 @@ export class MultiInputStream { this.inputs.clear(); // Wait for every pump loop to finish before touching the writer. - await Promise.allSettled([...this.pumpPromises.values()]); + await ThrowsPromise.allSettled([...this.pumpPromises.values()]); this.pumpPromises.clear(); // Close the output writer + writable side of the transform. diff --git a/agents/src/stt/stream_adapter.ts b/agents/src/stt/stream_adapter.ts index d2092c2aa..d8651ea5b 100644 --- a/agents/src/stt/stream_adapter.ts +++ b/agents/src/stt/stream_adapter.ts @@ -2,6 +2,7 @@ // // SPDX-License-Identifier: Apache-2.0 import type { AudioFrame } from '@livekit/rtc-node'; +import { ThrowsPromise } from '@livekit/throws-transformer/throws'; import { log } from '../log.js'; import type { APIConnectOptions } from '../types.js'; import { isStreamClosedError } from '../utils.js'; @@ -113,6 +114,6 @@ export class StreamAdapterWrapper extends SpeechStream { } }; - await Promise.all([forwardInput(), recognize()]); + await ThrowsPromise.all([forwardInput(), recognize()]); } } diff --git a/agents/src/telemetry/logging.ts b/agents/src/telemetry/logging.ts index eefb05c98..5f3130560 100644 --- a/agents/src/telemetry/logging.ts +++ b/agents/src/telemetry/logging.ts @@ -3,6 +3,7 @@ // SPDX-License-Identifier: Apache-2.0 import type { Attributes } from '@opentelemetry/api'; import type { LogRecord, LogRecordProcessor } from '@opentelemetry/sdk-logs'; +import { ThrowsPromise } from '@livekit/throws-transformer/throws'; /** * Metadata log processor that injects metadata (room_id, job_id) into all log records. @@ -24,11 +25,11 @@ export class MetadataLogProcessor implements LogRecordProcessor { } shutdown(): Promise { - return Promise.resolve(); + return ThrowsPromise.resolve(); } forceFlush(): Promise { - return Promise.resolve(); + return ThrowsPromise.resolve(); } } @@ -46,10 +47,10 @@ export class ExtraDetailsProcessor implements LogRecordProcessor { } shutdown(): Promise { - return Promise.resolve(); + return ThrowsPromise.resolve(); } forceFlush(): Promise { - return Promise.resolve(); + return ThrowsPromise.resolve(); } } diff --git a/agents/src/telemetry/traces.ts b/agents/src/telemetry/traces.ts index 8ee52e586..ea996ada7 100644 --- a/agents/src/telemetry/traces.ts +++ b/agents/src/telemetry/traces.ts @@ -2,6 +2,7 @@ // // SPDX-License-Identifier: Apache-2.0 import { MetricsRecordingHeader } from '@livekit/protocol'; +import { ThrowsPromise } from '@livekit/throws-transformer/throws'; import { type Attributes, type Context, @@ -164,11 +165,11 @@ class MetadataSpanProcessor implements SpanProcessor { onEnd(_span: ReadableSpan): void {} shutdown(): Promise { - return Promise.resolve(); + return ThrowsPromise.resolve(); } forceFlush(): Promise { - return Promise.resolve(); + return ThrowsPromise.resolve(); } } @@ -624,7 +625,7 @@ export async function uploadSessionReport(options: { // Upload to LiveKit Cloud using form-data's submit method // This properly streams the multipart form with all headers including Content-Length - return new Promise((resolve, reject) => { + return new ThrowsPromise((resolve, reject) => { formData.submit( { protocol: 'https:', diff --git a/agents/src/transcription.ts b/agents/src/transcription.ts index 63d301413..2c50cdf5b 100644 --- a/agents/src/transcription.ts +++ b/agents/src/transcription.ts @@ -5,6 +5,7 @@ import { TranscriptionSegment } from '@livekit/protocol'; import { AudioFrame } from '@livekit/rtc-node'; import type { TypedEventEmitter as TypedEmitter } from '@livekit/typed-emitter'; import { EventEmitter } from 'node:events'; +import { ThrowsPromise } from '@livekit/throws-transformer/throws'; import { basic } from './tokenize/index.js'; import type { SentenceStream, SentenceTokenizer } from './tokenize/tokenizer.js'; import { AsyncIterableQueue, Future, shortuuid } from './utils.js'; diff --git a/agents/src/tts/fallback_adapter.ts b/agents/src/tts/fallback_adapter.ts index 121062260..e5ec5e0ca 100644 --- a/agents/src/tts/fallback_adapter.ts +++ b/agents/src/tts/fallback_adapter.ts @@ -2,7 +2,7 @@ // // SPDX-License-Identifier: Apache-2.0 import { AudioResampler } from '@livekit/rtc-node'; -import type { Throws } from '@livekit/throws-transformer/throws'; +import { type Throws, ThrowsPromise } from '@livekit/throws-transformer/throws'; import { APIConnectionError, APIError } from '../_exceptions.js'; import { log } from '../log.js'; import { basic } from '../tokenize/index.js'; @@ -285,7 +285,7 @@ export class FallbackAdapter extends TTS { } // Close all TTS instances - await Promise.all(this.ttsInstances.map((tts) => tts.close())); + await ThrowsPromise.all(this.ttsInstances.map((tts) => tts.close())); } } @@ -476,7 +476,7 @@ class FallbackSynthesizeStream extends SynthesizeStream { stream.pushText(token); } } - await new Promise((resolve) => setTimeout(resolve, FORWARD_POLL_MS)); + await new ThrowsPromise((resolve) => setTimeout(resolve, FORWARD_POLL_MS)); if (this.abortController.signal.aborted || streamOutputCompleted) { stream.endInput(); return; @@ -546,7 +546,7 @@ class FallbackSynthesizeStream extends SynthesizeStream { streamOutputCompleted = true; } }; - const [outputResult, forwardBufferResult] = await Promise.allSettled([ + const [outputResult, forwardBufferResult] = await ThrowsPromise.allSettled([ processOutput(), forwardBufferToTTS().catch((err) => { stream.close(); // Close stream so processOutput can exit diff --git a/agents/src/tts/stream_adapter.ts b/agents/src/tts/stream_adapter.ts index 058c947cd..c3933badd 100644 --- a/agents/src/tts/stream_adapter.ts +++ b/agents/src/tts/stream_adapter.ts @@ -1,6 +1,7 @@ // SPDX-FileCopyrightText: 2024 LiveKit, Inc. // // SPDX-License-Identifier: Apache-2.0 +import { ThrowsPromise } from '@livekit/throws-transformer/throws'; import type { SentenceStream, SentenceTokenizer } from '../tokenize/index.js'; import type { APIConnectOptions } from '../types.js'; import { USERDATA_TIMED_TRANSCRIPT } from '../types.js'; @@ -87,7 +88,7 @@ export class StreamAdapterWrapper extends SynthesizeStream { tokenCompletionTasks.push(task); } - await Promise.all(tokenCompletionTasks.map((t) => t.result)); + await ThrowsPromise.all(tokenCompletionTasks.map((t) => t.result)); this.queue.put(SynthesizeStream.END_OF_STREAM); }; @@ -127,6 +128,6 @@ export class StreamAdapterWrapper extends SynthesizeStream { } }; - await Promise.all([forwardInput(), synthesizeSentenceStream()]); + await ThrowsPromise.all([forwardInput(), synthesizeSentenceStream()]); } } diff --git a/agents/src/tts/tts.ts b/agents/src/tts/tts.ts index def4833d9..70ea5a582 100644 --- a/agents/src/tts/tts.ts +++ b/agents/src/tts/tts.ts @@ -2,6 +2,7 @@ // // SPDX-License-Identifier: Apache-2.0 import type { AudioFrame } from '@livekit/rtc-node'; +import { ThrowsPromise } from '@livekit/throws-transformer/throws'; import type { TypedEventEmitter as TypedEmitter } from '@livekit/typed-emitter'; import type { Span } from '@opentelemetry/api'; import { EventEmitter } from 'node:events'; @@ -524,7 +525,7 @@ export abstract class ChunkedStream implements AsyncIterableIterator this.mainTask().finally(() => this.queue.close())); + ThrowsPromise.resolve().then(() => this.mainTask().finally(() => this.queue.close())); } private _mainTaskImpl = async (span: Span) => { diff --git a/agents/src/typed_promise.ts b/agents/src/typed_promise.ts deleted file mode 100644 index 711d1938c..000000000 --- a/agents/src/typed_promise.ts +++ /dev/null @@ -1,67 +0,0 @@ -// SPDX-FileCopyrightText: 2026 LiveKit, Inc. -// -// SPDX-License-Identifier: Apache-2.0 -type InferErrors = T extends TypedPromise ? E : never; - -interface PromiseRejectedResult { - status: 'rejected'; - reason: E; -} - -type SettledResult = - T extends TypedPromise - ? PromiseFulfilledResult | PromiseRejectedResult - : T extends PromiseLike - ? PromiseFulfilledResult | PromiseRejectedResult - : PromiseFulfilledResult | PromiseRejectedResult; - -export default class TypedPromise extends Promise { - // eslint-disable-next-line @typescript-eslint/no-useless-constructor - constructor( - executor: (resolve: (value: T | PromiseLike) => void, reject: (reason: E) => void) => void, - ) { - super(executor); - } - - catch( - onrejected?: ((reason: E) => TResult | PromiseLike) | null | undefined, - ): TypedPromise { - return super.catch(onrejected); - } - - static resolve: { - (): TypedPromise; - (value: V): TypedPromise, never>; - } = (value?: V): TypedPromise, never> => { - return super.resolve(value) as TypedPromise, never>; - }; - - static reject(reason: E): TypedPromise { - return super.reject(reason); - } - - static all( - values: T, - ): TypedPromise<{ -readonly [P in keyof T]: Awaited }, InferErrors> { - return super.all(values) as any; - } - - static allSettled( - values: T, - ): TypedPromise<{ -readonly [P in keyof T]: SettledResult }, never> { - return super.allSettled(values) as any; - } - - static race | any)[]>( - values: T, - ): TypedPromise< - T[number] extends TypedPromise - ? U - : T[number] extends PromiseLike - ? U - : Awaited, - InferErrors - > { - return super.race(values); - } -} diff --git a/agents/src/utils.ts b/agents/src/utils.ts index 248749b5b..e18b8ddb6 100644 --- a/agents/src/utils.ts +++ b/agents/src/utils.ts @@ -9,7 +9,7 @@ import type { TrackKind, } from '@livekit/rtc-node'; import { AudioFrame, AudioResampler, RoomEvent } from '@livekit/rtc-node'; -import type { Throws } from '@livekit/throws-transformer/throws'; +import { type Throws, ThrowsPromise } from '@livekit/throws-transformer/throws'; import { AsyncLocalStorage } from 'node:async_hooks'; import { EventEmitter, once } from 'node:events'; import type { ReadableStream } from 'node:stream/web'; @@ -132,7 +132,7 @@ export class Future { #error: Error | undefined = undefined; constructor() { - this.#await = new Promise((resolve, reject) => { + this.#await = new ThrowsPromise((resolve, reject) => { this.#resolvePromise = resolve; this.#rejectPromise = reject; }); @@ -191,7 +191,7 @@ export class Event { if (this.#isSet) return true; let resolve: () => void = noop; - const waiter = new Promise((r) => { + const waiter = new ThrowsPromise((r) => { resolve = r; this.#waiters.push(resolve); }); @@ -240,12 +240,12 @@ export class CancellablePromise { ) { let cancel: () => void; - this.#promise = new Promise((resolve, reject) => { + this.#promise = new ThrowsPromise((resolve, reject) => { executor( resolve, (reason) => { this.#error = reason instanceof Error ? reason : new Error(String(reason)); - reject(reason); + reject(this.#error); }, (cancelFn) => { cancel = () => { @@ -550,7 +550,7 @@ export class Task { promises.push(delay(timeout).then(() => TaskResult.Timeout)); } - const result = await Promise.race(promises); + const result = await ThrowsPromise.race(promises); // Check what happened if (result === TaskResult.Timeout) { @@ -588,19 +588,19 @@ export class Task { } export async function waitFor(tasks: Task[]): Promise { - await Promise.allSettled(tasks.map((task) => task.result)); + await ThrowsPromise.allSettled(tasks.map((task) => task.result)); } // eslint-disable-next-line @typescript-eslint/no-explicit-any export async function cancelAndWait(tasks: Task[], timeout?: number): Promise { - await Promise.allSettled(tasks.map((task) => task.cancelAndWait(timeout))); + await ThrowsPromise.allSettled(tasks.map((task) => task.cancelAndWait(timeout))); } export function withResolvers() { let resolve!: (value: T | PromiseLike) => void; - let reject!: (reason?: unknown) => void; + let reject!: (reason: Error) => void; - const promise = new Promise((res, rej) => { + const promise = new ThrowsPromise((res, rej) => { resolve = res; reject = rej; }); @@ -806,8 +806,8 @@ export type DelayOptions = { */ export function delay(ms: number, options: DelayOptions = {}): Promise { const { signal } = options; - if (signal?.aborted) return Promise.reject(signal.reason ?? new Error('delay aborted')); - return new Promise((resolve, reject) => { + if (signal?.aborted) return ThrowsPromise.reject(signal.reason ?? new Error('delay aborted')); + return new ThrowsPromise((resolve, reject) => { const abort = () => { clearTimeout(i); reject(signal?.reason ?? new Error('delay aborted')); @@ -840,9 +840,9 @@ export function waitUntilTimeout( throwError?: () => E, ): Promise> { let timer: ReturnType | undefined; - return Promise.race([ + return ThrowsPromise.race([ promise, - new Promise((_, reject) => { + new ThrowsPromise((_, reject) => { timer = setTimeout(() => reject(throwError?.() ?? new IdleTimeoutError()), timeoutMs); }), ]).finally(() => clearTimeout(timer)) as Promise>; @@ -977,7 +977,7 @@ export async function* readStream( if (signal) { const abortPromise = waitForAbort(signal); while (true) { - const result = await Promise.race([reader.read(), abortPromise]); + const result = await ThrowsPromise.race([reader.read(), abortPromise]); if (!result) break; const { done, value } = result; if (done) break; @@ -1066,3 +1066,10 @@ export const isDevMode = (): boolean => { export const isHosted = (): boolean => { return process.env.LIVEKIT_REMOTE_EOT_URL !== undefined; }; + +export function unknownToError(maybeError: unknown): Error { + if (maybeError instanceof Error) { + return maybeError; + } + return new Error(String(maybeError)); +} diff --git a/agents/src/voice/agent_activity.ts b/agents/src/voice/agent_activity.ts index 34a9f2309..dafd6ca47 100644 --- a/agents/src/voice/agent_activity.ts +++ b/agents/src/voice/agent_activity.ts @@ -3,6 +3,7 @@ // SPDX-License-Identifier: Apache-2.0 import { Mutex } from '@livekit/mutex'; import type { AudioFrame } from '@livekit/rtc-node'; +import { ThrowsPromise } from '@livekit/throws-transformer/throws'; import type { Span } from '@opentelemetry/api'; import { ROOT_CONTEXT, context as otelContext, trace } from '@opentelemetry/api'; import { Heap } from 'heap-js'; @@ -128,7 +129,7 @@ export async function cleanupReusableResources( } if (tasks.length > 0) { - const outputs = await Promise.allSettled(tasks); + const outputs = await ThrowsPromise.allSettled(tasks); for (const output of outputs) { if (output.status === 'rejected') { if (logger) { @@ -1323,7 +1324,7 @@ export class AgentActivity implements RecognitionHooks { signal.addEventListener('abort', abortHandler); while (true) { - await Promise.race([this.q_updated.await, abortFuture.await]); + await ThrowsPromise.race([this.q_updated.await, abortFuture.await]); if (signal.aborted) break; while (this.speechQueue.size() > 0) { @@ -2010,11 +2011,11 @@ export class AgentActivity implements RecognitionHooks { // Check if we should use TTS aligned transcripts if (this.useTtsAlignedTranscript && this.tts?.capabilities.alignedTranscript && ttsGenData) { // Race timedTextsFut with ttsTask to avoid hanging if TTS fails before resolving the future - const timedTextsStream = await Promise.race([ + const timedTextsStream = await ThrowsPromise.race([ ttsGenData.timedTextsFut.await, ttsTask?.result.catch(() => this.logger.warn('TTS task failed before resolving timedTextsFut'), - ) ?? Promise.resolve(), + ) ?? ThrowsPromise.resolve(), ]); if (timedTextsStream) { this.logger.debug('Using TTS aligned transcripts for transcription node input'); @@ -2755,7 +2756,7 @@ export class AgentActivity implements RecognitionHooks { await this.currentSpeech.waitForPlayout(); } else { // Don't block the event loop - await new Promise((resolve) => setImmediate(resolve)); + await new ThrowsPromise((resolve) => setImmediate(resolve)); } } const chatCtx = this.realtimeSession.chatCtx.copy(); diff --git a/agents/src/voice/agent_session.ts b/agents/src/voice/agent_session.ts index 3f4e813d0..76fc3d339 100644 --- a/agents/src/voice/agent_session.ts +++ b/agents/src/voice/agent_session.ts @@ -3,6 +3,7 @@ // SPDX-License-Identifier: Apache-2.0 import { Mutex } from '@livekit/mutex'; import type { AudioFrame, Room } from '@livekit/rtc-node'; +import { ThrowsPromise } from '@livekit/throws-transformer/throws'; import type { TypedEventEmitter as TypedEmitter } from '@livekit/typed-emitter'; import type { Context, Span } from '@opentelemetry/api'; import { ROOT_CONTEXT, context as otelContext, trace } from '@opentelemetry/api'; @@ -484,7 +485,7 @@ export class AgentSession< // Initial start does not wait on onEnter tasks.push(this._updateActivity(this.agent, { waitOnEnter: false })); - await Promise.allSettled(tasks); + await ThrowsPromise.allSettled(tasks); if (this.sessionHost) { await this.sessionHost.start(); diff --git a/agents/src/voice/audio_recognition.ts b/agents/src/voice/audio_recognition.ts index 5390fa984..cb2defef4 100644 --- a/agents/src/voice/audio_recognition.ts +++ b/agents/src/voice/audio_recognition.ts @@ -4,6 +4,7 @@ import { Mutex } from '@livekit/mutex'; import type { ParticipantKind } from '@livekit/rtc-node'; import { AudioFrame } from '@livekit/rtc-node'; +import { ThrowsPromise } from '@livekit/throws-transformer/throws'; import { type Context, ROOT_CONTEXT, @@ -1108,7 +1109,7 @@ export class AudioRecognition { try { while (!signal.aborted) { - const res = await Promise.race([inputReader.read(), abortPromise]); + const res = await ThrowsPromise.race([inputReader.read(), abortPromise]); if (!res) break; const { value, done } = res; @@ -1131,7 +1132,7 @@ export class AudioRecognition { const abortPromise = waitForAbort(signal); while (!signal.aborted) { - const res = await Promise.race([eventReader.read(), abortPromise]); + const res = await ThrowsPromise.race([eventReader.read(), abortPromise]); if (!res) break; const { done, value: ev } = res; if (done) break; diff --git a/agents/src/voice/generation.ts b/agents/src/voice/generation.ts index 8cc1b04b1..871ee477b 100644 --- a/agents/src/voice/generation.ts +++ b/agents/src/voice/generation.ts @@ -3,6 +3,7 @@ // SPDX-License-Identifier: Apache-2.0 import type { AudioFrame } from '@livekit/rtc-node'; import { AudioResampler } from '@livekit/rtc-node'; +import { ThrowsPromise } from '@livekit/throws-transformer/throws'; import type { Span } from '@opentelemetry/api'; import { context as otelContext } from '@opentelemetry/api'; import type { ReadableStream, ReadableStreamDefaultReader } from 'stream/web'; @@ -471,7 +472,7 @@ export function performLLMInference( while (true) { if (signal.aborted) break; - const result = await Promise.race([llmStreamReader.read(), abortPromise]); + const result = await ThrowsPromise.race([llmStreamReader.read(), abortPromise]); if (result === undefined) break; const { done, value: chunk } = result; @@ -1097,7 +1098,7 @@ export function performToolExecutions({ tasks.push(toolTask); } - await Promise.allSettled(tasks.map((task) => task.result)); + await ThrowsPromise.allSettled(tasks.map((task) => task.result)); if (toolOutput.output.length > 0) { logger.debug( { diff --git a/agents/src/voice/remote_session.ts b/agents/src/voice/remote_session.ts index 2f764fd1b..d333cc40d 100644 --- a/agents/src/voice/remote_session.ts +++ b/agents/src/voice/remote_session.ts @@ -4,6 +4,7 @@ import { Timestamp } from '@bufbuild/protobuf'; import { AgentSession as pb } from '@livekit/protocol'; import type { ByteStreamReader, Room, TextStreamInfo } from '@livekit/rtc-node'; +import { ThrowsPromise } from '@livekit/throws-transformer/throws'; import type { TypedEventEmitter } from '@livekit/typed-emitter'; import EventEmitter from 'events'; import { TOPIC_SESSION_MESSAGES } from '../constants.js'; @@ -198,7 +199,7 @@ export class RoomSessionTransport extends SessionTransport { return { next: (): Promise> => { if (this.closed && this.pendingMessages.length === 0) { - return Promise.resolve({ + return ThrowsPromise.resolve({ value: undefined as unknown as pb.AgentSessionMessage, done: true, }); @@ -206,16 +207,16 @@ export class RoomSessionTransport extends SessionTransport { const pending = this.pendingMessages.shift(); if (pending) { - return Promise.resolve({ value: pending, done: false }); + return ThrowsPromise.resolve({ value: pending, done: false }); } - return new Promise>((resolve) => { + return new ThrowsPromise, never>((resolve) => { this.waitingResolve = resolve; }); }, return: (): Promise> => { this.close(); - return Promise.resolve({ + return ThrowsPromise.resolve({ value: undefined as unknown as pb.AgentSessionMessage, done: true, }); @@ -519,7 +520,7 @@ export class SessionHost { this.recvTask.cancel(); } - await Promise.allSettled([...this.tasks].map((task) => task.cancelAndWait())); + await ThrowsPromise.allSettled([...this.tasks].map((task) => task.cancelAndWait())); this.tasks.clear(); await this.transport.close(); diff --git a/agents/src/voice/room_io/_output.ts b/agents/src/voice/room_io/_output.ts index 01dd31625..9f92a70d9 100644 --- a/agents/src/voice/room_io/_output.ts +++ b/agents/src/voice/room_io/_output.ts @@ -15,6 +15,7 @@ import { TrackPublishOptions, TrackSource, } from '@livekit/rtc-node'; +import { ThrowsPromise } from '@livekit/throws-transformer/throws'; import { ATTRIBUTE_TRANSCRIPTION_FINAL, ATTRIBUTE_TRANSCRIPTION_SEGMENT_ID, diff --git a/agents/src/voice/speech_handle.ts b/agents/src/voice/speech_handle.ts index a3cde5aa6..c369c807f 100644 --- a/agents/src/voice/speech_handle.ts +++ b/agents/src/voice/speech_handle.ts @@ -1,6 +1,7 @@ // SPDX-FileCopyrightText: 2024 LiveKit, Inc. // // SPDX-License-Identifier: Apache-2.0 +import { ThrowsPromise } from '@livekit/throws-transformer/throws'; import type { Context } from '@opentelemetry/api'; import type { ChatItem } from '../llm/index.js'; import type { Task } from '../utils.js'; @@ -164,9 +165,9 @@ export class SpeechHandle { } async waitIfNotInterrupted(aw: Promise[]): Promise { - const allTasksPromise = Promise.all(aw); + const allTasksPromise = ThrowsPromise.all(aw); const fs: Promise[] = [allTasksPromise, this.interruptFut.await]; - await Promise.race(fs); + await ThrowsPromise.race(fs); } addDoneCallback(callback: (sh: SpeechHandle) => void) { diff --git a/agents/src/worker.ts b/agents/src/worker.ts index e500a6a86..3904a0d6f 100644 --- a/agents/src/worker.ts +++ b/agents/src/worker.ts @@ -10,7 +10,7 @@ import { WorkerMessage, WorkerStatus, } from '@livekit/protocol'; -import type { Throws } from '@livekit/throws-transformer/throws'; +import { type Throws, ThrowsPromise } from '@livekit/throws-transformer/throws'; import type { ParticipantInfo } from 'livekit-server-sdk'; import { AccessToken, RoomServiceClient } from 'livekit-server-sdk'; import { EventEmitter } from 'node:events'; @@ -225,7 +225,7 @@ export class ServerOptions { } class PendingAssignment { - promise = new Promise((resolve) => { + promise = new ThrowsPromise((resolve) => { this.resolve = resolve; // this is how JavaScript lets you resolve promises externally }); resolve(arg: JobAssignment) { @@ -384,10 +384,10 @@ export class AgentServer { }); try { - await new Promise((resolve, reject) => { + await new ThrowsPromise((resolve, reject) => { this.#session!.on('open', resolve); this.#session!.on('error', (error) => reject(error)); - this.#session!.on('close', (code) => reject(`WebSocket returned ${code}`)); + this.#session!.on('close', (code) => reject(new Error(`WebSocket returned ${code}`))); }); retries = 0; @@ -409,12 +409,12 @@ export class AgentServer { `failed to connect to LiveKit server (${this.#opts.wsURL}), retrying in ${delay} seconds: (${retries}/${this.#opts.maxRetry})`, ); - await new Promise((resolve) => setTimeout(resolve, delay * 1000)); + await new ThrowsPromise((resolve) => setTimeout(resolve, delay * 1000)); } } }; - await Promise.all([workerWS(), this.#httpServer.run()]); + await ThrowsPromise.all([workerWS(), this.#httpServer.run()]); this.#close.resolve(); } @@ -429,7 +429,7 @@ export class AgentServer { } /** @throws {@link WorkerError} if worker did not drain in time */ - async drain(timeout?: number): Promise> { + async drain(timeout?: number): Promise> { if (this.#draining) { return; } @@ -449,8 +449,8 @@ export class AgentServer { }), ); - const joinJobs = async () => { - return Promise.all( + const joinJobs = async (): Promise> => { + return ThrowsPromise.all( this.#procPool.processes.map((proc): Promise> => { if (!proc.runningJob) { proc.close(); @@ -469,7 +469,7 @@ export class AgentServer { }), ); } - await Promise.race(promises); + await ThrowsPromise.race(promises); } async simulateJob(roomName: string, participantIdentity?: string) { @@ -518,7 +518,7 @@ export class AgentServer { }; this.event.on('worker_msg', send); - const close = new Promise((resolve) => { + const close = new ThrowsPromise((resolve) => { ws.addEventListener('close', () => { closingWS = true; if (!this.#closed) { @@ -818,7 +818,7 @@ export class AgentServer { await this.#inferenceExecutor?.close(); await this.#procPool.close(); await this.#httpServer.close(); - await Promise.allSettled(this.#tasks); + await ThrowsPromise.allSettled(this.#tasks); this.#session?.close(); await this.#close.await; diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index aeefc7e53..1d4236cc9 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -119,8 +119,8 @@ importers: specifier: ^1.45.3 version: 1.45.3 '@livekit/throws-transformer': - specifier: 0.0.0-20260320165515 - version: 0.0.0-20260320165515(typescript@5.4.5) + specifier: 0.1.8 + version: 0.1.8(typescript@5.4.5) '@livekit/typed-emitter': specifier: ^3.0.0 version: 3.0.0 @@ -2169,8 +2169,8 @@ packages: resolution: {integrity: sha512-4tL58O2DdTDP+g1ajyP5mgEOzjymD/u06IxWWVKBee1goEwDSQlMqEog/DJW34FoNNqXp1yRMCsphI4V/T1ILg==} engines: {node: '>= 18'} - '@livekit/throws-transformer@0.0.0-20260320165515': - resolution: {integrity: sha512-3L4UKOov1VXuX6sHIBuonJTaPzsSkpqZT3htvamgUYR0pL/aJ+0piiWzTPoCx9WSfmmUUAQqjd42IPgHXQVdvQ==} + '@livekit/throws-transformer@0.1.8': + resolution: {integrity: sha512-AaSwQfIaG6YArKOCO+5/DgI8HfL19oHdrkyI0LJJVCqGeZXzPsZIO/Nm/SKUHbT1ERGhF5c34X8CcVWeOePpIQ==} hasBin: true peerDependencies: typescript: '>=4.7.0' @@ -6574,7 +6574,7 @@ snapshots: pino: 9.6.0 pino-pretty: 13.0.0 - '@livekit/throws-transformer@0.0.0-20260320165515(typescript@5.4.5)': + '@livekit/throws-transformer@0.1.8(typescript@5.4.5)': dependencies: glob: 13.0.6 typescript: 5.4.5 @@ -9998,7 +9998,7 @@ snapshots: dependencies: '@isaacs/fs-minipass': 4.0.1 chownr: 3.0.0 - minipass: 7.1.2 + minipass: 7.1.3 minizlib: 3.0.1 mkdirp: 3.0.1 yallist: 5.0.0 From 7c60a787fd51ddd5f0f380a7645ed31129388894 Mon Sep 17 00:00:00 2001 From: lukasIO Date: Tue, 14 Apr 2026 14:13:25 +0200 Subject: [PATCH 02/13] base future on throwspromise --- agents/src/utils.ts | 10 +++++----- agents/src/voice/agent_activity.ts | 4 ++-- agents/src/voice/generation.ts | 4 ++-- agents/src/voice/room_io/_output.ts | 1 - 4 files changed, 9 insertions(+), 10 deletions(-) diff --git a/agents/src/utils.ts b/agents/src/utils.ts index e18b8ddb6..b2ac0bd69 100644 --- a/agents/src/utils.ts +++ b/agents/src/utils.ts @@ -122,17 +122,17 @@ export class Queue { } /** @internal */ -export class Future { - #await: Promise; +export class Future { + #await: ThrowsPromise; #resolvePromise!: (value: T) => void; - #rejectPromise!: (error: Error) => void; + #rejectPromise!: (error: E) => void; #done: boolean = false; #rejected: boolean = false; #result: T | undefined = undefined; #error: Error | undefined = undefined; constructor() { - this.#await = new ThrowsPromise((resolve, reject) => { + this.#await = new ThrowsPromise((resolve, reject) => { this.#resolvePromise = resolve; this.#rejectPromise = reject; }); @@ -169,7 +169,7 @@ export class Future { this.#resolvePromise(value); } - reject(error: Error) { + reject(error: E) { this.#done = true; this.#rejected = true; this.#error = error; diff --git a/agents/src/voice/agent_activity.ts b/agents/src/voice/agent_activity.ts index dafd6ca47..fa1889df3 100644 --- a/agents/src/voice/agent_activity.ts +++ b/agents/src/voice/agent_activity.ts @@ -169,7 +169,7 @@ export class AgentActivity implements RecognitionHooks { private _drainBlockedTasks: Task[] = []; private _currentSpeech?: SpeechHandle; private speechQueue: Heap<[number, number, SpeechHandle]>; // [priority, timestamp, speechHandle] - private q_updated: Future; + private q_updated: Future; private speechTasks: Set> = new Set(); private lock = new Mutex(); private audioStream = new MultiInputStream(); @@ -1316,7 +1316,7 @@ export class AgentActivity implements RecognitionHooks { } private async mainTask(signal: AbortSignal): Promise { - const abortFuture = new Future(); + const abortFuture = new Future(); const abortHandler = () => { abortFuture.resolve(); signal.removeEventListener('abort', abortHandler); diff --git a/agents/src/voice/generation.ts b/agents/src/voice/generation.ts index 871ee477b..8c52410de 100644 --- a/agents/src/voice/generation.ts +++ b/agents/src/voice/generation.ts @@ -83,7 +83,7 @@ export interface _TTSGenerationData { /** * Future that resolves to a stream of timed transcripts, or null if TTS doesn't support it. */ - timedTextsFut: Future | null>; + timedTextsFut: Future | null, never>; /** Time to first byte (set when first audio frame is received) */ ttfb?: number; } @@ -566,7 +566,7 @@ export function performTTSInference( const outputWriter = audioStream.writable.getWriter(); const audioOutputStream = audioStream.readable; - const timedTextsFut = new Future | null>(); + const timedTextsFut = new Future | null, never>(); const timedTextsStream = new IdentityTransform(); const timedTextsWriter = timedTextsStream.writable.getWriter(); diff --git a/agents/src/voice/room_io/_output.ts b/agents/src/voice/room_io/_output.ts index 9f92a70d9..01dd31625 100644 --- a/agents/src/voice/room_io/_output.ts +++ b/agents/src/voice/room_io/_output.ts @@ -15,7 +15,6 @@ import { TrackPublishOptions, TrackSource, } from '@livekit/rtc-node'; -import { ThrowsPromise } from '@livekit/throws-transformer/throws'; import { ATTRIBUTE_TRANSCRIPTION_FINAL, ATTRIBUTE_TRANSCRIPTION_SEGMENT_ID, From dab73623594b2c05c7fe31687e813adba12a35db Mon Sep 17 00:00:00 2001 From: lukasIO Date: Tue, 14 Apr 2026 14:21:33 +0200 Subject: [PATCH 03/13] unknownToError helper --- agents/src/beta/workflows/task_group.ts | 3 ++- agents/src/inference/interruption/interruption_detector.ts | 5 +++-- agents/src/utils.ts | 2 +- agents/src/voice/agent_session.ts | 4 ++-- plugins/cartesia/src/tts.ts | 3 ++- plugins/elevenlabs/src/tts.ts | 3 ++- plugins/inworld/src/tts.ts | 3 ++- plugins/phonic/src/realtime/realtime_model.ts | 5 +++-- 8 files changed, 17 insertions(+), 11 deletions(-) diff --git a/agents/src/beta/workflows/task_group.ts b/agents/src/beta/workflows/task_group.ts index c5ead6e03..247390886 100644 --- a/agents/src/beta/workflows/task_group.ts +++ b/agents/src/beta/workflows/task_group.ts @@ -1,6 +1,7 @@ // SPDX-FileCopyrightText: 2026 LiveKit, Inc. // // SPDX-License-Identifier: Apache-2.0 +import { unknownToError } from 'agents/src/utils.js'; import { z } from 'zod'; import type { ChatContext } from '../../llm/chat_context.js'; import { LLM, ToolError, ToolFlag, tool } from '../../llm/index.js'; @@ -114,7 +115,7 @@ export class TaskGroup extends AgentTask { taskResults[taskId] = e; continue; } else { - this.complete(e instanceof Error ? e : new Error(String(e))); + this.complete(unknownToError(e)); return; } } diff --git a/agents/src/inference/interruption/interruption_detector.ts b/agents/src/inference/interruption/interruption_detector.ts index 585fbad2b..d47a3fc0b 100644 --- a/agents/src/inference/interruption/interruption_detector.ts +++ b/agents/src/inference/interruption/interruption_detector.ts @@ -1,8 +1,9 @@ // SPDX-FileCopyrightText: 2026 LiveKit, Inc. // // SPDX-License-Identifier: Apache-2.0 -import type { TypedEventEmitter } from '@livekit/typed-emitter'; import { ThrowsPromise } from '@livekit/throws-transformer/throws'; +import type { TypedEventEmitter } from '@livekit/typed-emitter'; +import { unknownToError } from 'agents/src/utils.js'; import EventEmitter from 'events'; import { log } from '../../log.js'; import type { InterruptionMetrics } from '../../metrics/base.js'; @@ -166,7 +167,7 @@ export class AdaptiveInterruptionDetector extends (EventEmitter as new () => Typ this.streams.add(streamBase); return streamBase; } catch (e) { - const cause = e instanceof Error ? e : new Error(String(e)); + const cause = unknownToError(e); this.emitError(new InterruptionDetectionError(cause.message, Date.now(), this._label, false)); throw e; } diff --git a/agents/src/utils.ts b/agents/src/utils.ts index b2ac0bd69..a601798a8 100644 --- a/agents/src/utils.ts +++ b/agents/src/utils.ts @@ -244,7 +244,7 @@ export class CancellablePromise { executor( resolve, (reason) => { - this.#error = reason instanceof Error ? reason : new Error(String(reason)); + this.#error = unknownToError(reason); reject(this.#error); }, (cancelFn) => { diff --git a/agents/src/voice/agent_session.ts b/agents/src/voice/agent_session.ts index 76fc3d339..40c337468 100644 --- a/agents/src/voice/agent_session.ts +++ b/agents/src/voice/agent_session.ts @@ -37,7 +37,7 @@ import { type ResolvedSessionConnectOptions, type SessionConnectOptions, } from '../types.js'; -import { Task } from '../utils.js'; +import { Task, unknownToError } from '../utils.js'; import type { VAD } from '../vad.js'; import type { Agent } from './agent.js'; import { @@ -768,7 +768,7 @@ export class AgentSession< unlock(); this.generateReply({ userInput }); } catch (e) { - runState._reject(e instanceof Error ? e : new Error(String(e))); + runState._reject(unknownToError(e)); } })(); diff --git a/plugins/cartesia/src/tts.ts b/plugins/cartesia/src/tts.ts index 867f730c9..d4d55be4c 100644 --- a/plugins/cartesia/src/tts.ts +++ b/plugins/cartesia/src/tts.ts @@ -16,6 +16,7 @@ import { stream, tokenize, tts, + unknownToError, } from '@livekit/agents'; import type { AudioFrame } from '@livekit/rtc-node'; import { request } from 'node:https'; @@ -554,7 +555,7 @@ export class SynthesizeStream extends tts.SynthesizeStream { } } -const asError = (e: unknown): Error => (e instanceof Error ? e : new Error(String(e))); +const asError = (e: unknown): Error => unknownToError(e); const transientNetworkCodes = new Set([ 'ETIMEDOUT', diff --git a/plugins/elevenlabs/src/tts.ts b/plugins/elevenlabs/src/tts.ts index 1f5bbf71b..141ae6c00 100644 --- a/plugins/elevenlabs/src/tts.ts +++ b/plugins/elevenlabs/src/tts.ts @@ -18,6 +18,7 @@ import { stream, tokenize, tts, + unknownToError, } from '@livekit/agents'; import { Mutex } from '@livekit/mutex'; import type { AudioFrame } from '@livekit/rtc-node'; @@ -591,7 +592,7 @@ class Connection { } catch (e) { this.#logger.warn({ error: e }, 'recv loop error'); for (const ctx of this.#contextData.values()) { - ctx.waiter.reject(e instanceof Error ? e : new Error(String(e))); + ctx.waiter.reject(unknownToError(e)); } this.#contextData.clear(); } finally { diff --git a/plugins/inworld/src/tts.ts b/plugins/inworld/src/tts.ts index dd2272e69..acf0a144c 100644 --- a/plugins/inworld/src/tts.ts +++ b/plugins/inworld/src/tts.ts @@ -10,6 +10,7 @@ import { shortuuid, tokenize, tts, + unknownToError, } from '@livekit/agents'; import type { AudioFrame } from '@livekit/rtc-node'; import { randomUUID } from 'node:crypto'; @@ -193,7 +194,7 @@ class WSConnectionPool { try { return await this.#attemptConnection(); } catch (err) { - lastError = err instanceof Error ? err : new Error(String(err)); + lastError = unknownToError(err); this.#connecting = undefined; if (attempt < MAX_RETRIES) { diff --git a/plugins/phonic/src/realtime/realtime_model.ts b/plugins/phonic/src/realtime/realtime_model.ts index 51e9fea47..1dd54aa48 100644 --- a/plugins/phonic/src/realtime/realtime_model.ts +++ b/plugins/phonic/src/realtime/realtime_model.ts @@ -10,6 +10,7 @@ import { log, shortuuid, stream, + unknownToError, } from '@livekit/agents'; import { AudioFrame, AudioResampler } from '@livekit/rtc-node'; import type { Phonic } from 'phonic'; @@ -250,7 +251,7 @@ export class RealtimeSession extends llm.RealtimeSession { private configSent = false; private instructionsReady = new Future(); private toolsReady = new Future(); - private closedFuture = new Future(); + private closedFuture = new Future(); private connectTask: Promise; private toolDefinitions: Record[] = []; private pendingToolCallIds = new Set(); @@ -273,7 +274,7 @@ export class RealtimeSession extends llm.RealtimeSession { (PHONIC_INPUT_SAMPLE_RATE * PHONIC_INPUT_FRAME_MS) / 1000, ); this.connectTask = this.connect().catch((error: unknown) => { - const normalizedError = error instanceof Error ? error : new Error(String(error)); + const normalizedError = unknownToError(error); this.emitError(normalizedError, false); }); } From 1d851fc67bc59054e36b5e5798a9ee84ac9fe8db Mon Sep 17 00:00:00 2001 From: lukasIO Date: Tue, 14 Apr 2026 14:23:06 +0200 Subject: [PATCH 04/13] update usage everywhere --- examples/src/sms_agent.ts | 103 ++++++++++++++++++ examples/src/testing/agent_task.test.ts | 8 +- examples/src/testing/basic_task_group.test.ts | 8 +- examples/src/testing/task_group.test.ts | 8 +- plugins/cartesia/src/tts.ts | 6 +- 5 files changed, 111 insertions(+), 22 deletions(-) create mode 100644 examples/src/sms_agent.ts diff --git a/examples/src/sms_agent.ts b/examples/src/sms_agent.ts new file mode 100644 index 000000000..21c160982 --- /dev/null +++ b/examples/src/sms_agent.ts @@ -0,0 +1,103 @@ +// SPDX-FileCopyrightText: 2025 LiveKit, Inc. +// +// SPDX-License-Identifier: Apache-2.0 +import { + AgentServer, + type JobContext, + type JobProcess, + ServerOptions, + cli, + defineAgent, + inference, + llm, + log, + metrics, + voice, +} from '@livekit/agents'; +import * as livekit from '@livekit/agents-plugin-livekit'; +import * as silero from '@livekit/agents-plugin-silero'; +import type { AgentSession } from 'agents/dist/voice/agent_session.js'; +import { fetchWeatherForecast } from 'my-weather-api'; +import { fileURLToPath } from 'node:url'; +import { z } from 'zod'; + +const server: AgentServer = createServer({ + port: process.env.PORT, + worker: fileURLToPath(import.meta.url), +}); + +const myAgent = createAgent(); + +const weatherTool = createTool('getWeather', (toolCtx) => { + return { + description: 'Get the weather for a given location.', + parameters: z.object({ + location: z.string().describe('The location to get the weather for'), + }), + execute: async ({ location }) => { + const weatherPromise = fetchWeatherForecast({ location }); + await toolCtx.session.say('Fetching the weather for you'); + return weatherPromise; + }, + }; +}); + +const flightBooking = createTask<{ + start: string; + desiredDestinations: string[]; + startDate: Date; + endDate: Date; +}>('sunSeekerFlightBooking', async function* (taskCtx, state) { + taskCtx.generateReply('help the user to book a flight to the sunniest destination'); + const availableFlights = await taskCtx.step(async () => { + return searchForFlights(state.start, state.desiredDestinations, state.startDate, state.endDate); + }); + + const sunniestDestinations = await taskCtx.step(async () => { + const sunshineMap = new Map(); + for (const flight of availableFlights) { + const forecast: Array<{ hoursOfSunshine: number; maxTemperature: number }> = + await fetchWeatherForecast({ + location: flight.destination, + startDate: state.startDate, + endDate: state.endDate, + }); + sunshineMap.set( + flight.destination, + forecast.reduce((accHoursOfSun, day) => { + accHoursOfSun += day.hoursOfSunshine; + return accHoursOfSun; + }, 0), + ); + } + return Array.from(sunshineMap.entries()).sort((a, b) => a[1] - b[1]); + }); + + yield `The sunniest destination on your travel dates is ${sunniestDestinations[0]}, do you want to go there?`; + await taskCtx.waitForApproval(); + yield `The following flights are available for ${sunniestDestinations[0]}`; +}); + +server.on('rtc', (jobContext: JobContext) => {}); + +// generic event registering for different endpoints, e.g. text mode +// .on(event, endpoint, handler) +server.on('text', 'weather', async (textMessageContext) => { + const session: AgentSession = createSession({ llm: 'openai/gpt-4.1-mini' }); + + // make tools available to the agent on demand, e.g. depending on the endpoint + myAgent.updateTools(weatherTool); + const startResult = session.start({ agent: myAgent }); + + for await (const ev of startResult) { + await textMessageContext.sendResponse(ev); + } + + for await (const ev of session.run({ userInput: textMessageContext.text })) { + await textMessageContext.sendResponse(ev); + } +}); + +export default server; + +cli.runApp(server); diff --git a/examples/src/testing/agent_task.test.ts b/examples/src/testing/agent_task.test.ts index 927faf68b..2c0fd0385 100644 --- a/examples/src/testing/agent_task.test.ts +++ b/examples/src/testing/agent_task.test.ts @@ -1,7 +1,7 @@ // SPDX-FileCopyrightText: 2026 LiveKit, Inc. // // SPDX-License-Identifier: Apache-2.0 -import { Future, initializeLogger, llm, voice } from '@livekit/agents'; +import { Future, initializeLogger, llm, unknownToError, voice } from '@livekit/agents'; import * as openai from '@livekit/agents-plugin-openai'; import { afterEach, describe, expect, it } from 'vitest'; import { z } from 'zod'; @@ -27,15 +27,11 @@ initializeLogger({ pretty: true, level: 'warn' }); * NOT COVERED in this suite due to known deadlock limitation. */ -function asError(error: unknown): Error { - return error instanceof Error ? error : new Error(String(error)); -} - async function withFutureResolution(done: Future, fn: () => Promise): Promise { try { done.resolve(await fn()); } catch (error) { - done.reject(asError(error)); + done.reject(unknownToError(error)); } } diff --git a/examples/src/testing/basic_task_group.test.ts b/examples/src/testing/basic_task_group.test.ts index 7cbc9580c..9aaf3d794 100644 --- a/examples/src/testing/basic_task_group.test.ts +++ b/examples/src/testing/basic_task_group.test.ts @@ -26,7 +26,7 @@ * - onEnter() signals readiness instead of awaiting generateReply() * - Tasks tested in isolation and as a group */ -import { Future, beta, initializeLogger, llm, voice } from '@livekit/agents'; +import { Future, beta, initializeLogger, llm, unknownToError, voice } from '@livekit/agents'; import { afterEach, describe, expect, it } from 'vitest'; import { z } from 'zod'; @@ -40,16 +40,12 @@ initializeLogger({ pretty: true, level: 'warn' }); // Helpers // --------------------------------------------------------------------------- -function asError(error: unknown): Error { - return error instanceof Error ? error : new Error(String(error)); -} - /** Run `fn` and forward its result or error to `done`. */ async function withFutureResolution(done: Future, fn: () => Promise): Promise { try { done.resolve(await fn()); } catch (error) { - done.reject(asError(error)); + done.reject(unknownToError(error)); } } diff --git a/examples/src/testing/task_group.test.ts b/examples/src/testing/task_group.test.ts index ed2687e87..2e523548d 100644 --- a/examples/src/testing/task_group.test.ts +++ b/examples/src/testing/task_group.test.ts @@ -1,7 +1,7 @@ // SPDX-FileCopyrightText: 2026 LiveKit, Inc. // // SPDX-License-Identifier: Apache-2.0 -import { Future, beta, initializeLogger, llm, voice } from '@livekit/agents'; +import { Future, beta, initializeLogger, llm, unknownToError, voice } from '@livekit/agents'; import { afterEach, describe, expect, it } from 'vitest'; import { z } from 'zod'; @@ -11,15 +11,11 @@ type TaskCompletedEvent = beta.TaskCompletedEvent; initializeLogger({ pretty: true, level: 'warn' }); -function asError(error: unknown): Error { - return error instanceof Error ? error : new Error(String(error)); -} - async function withFutureResolution(done: Future, fn: () => Promise): Promise { try { done.resolve(await fn()); } catch (error) { - done.reject(asError(error)); + done.reject(unknownToError(error)); } } diff --git a/plugins/cartesia/src/tts.ts b/plugins/cartesia/src/tts.ts index d4d55be4c..cf12d15f2 100644 --- a/plugins/cartesia/src/tts.ts +++ b/plugins/cartesia/src/tts.ts @@ -555,8 +555,6 @@ export class SynthesizeStream extends tts.SynthesizeStream { } } -const asError = (e: unknown): Error => unknownToError(e); - const transientNetworkCodes = new Set([ 'ETIMEDOUT', 'ECONNRESET', @@ -594,7 +592,7 @@ const hasAnyTransientCode = (e: unknown): boolean => { }; const toRetryableConnectionError = (e: unknown): APIConnectionError => { - const err = asError(e); + const err = unknownToError(e); const isTimeout = hasErrorCode(e, 'ETIMEDOUT') || (typeof err.message === 'string' && err.message.includes('ETIMEDOUT')); @@ -629,7 +627,7 @@ const waitForWsOpen = async ({ }; const onOpen = () => fut.resolve(); - const onError = (err: Error) => fut.reject(asError(err)); + const onError = (err: Error) => fut.reject(unknownToError(err)); const onClose = (code: number, reason: Buffer) => fut.reject( new Error(`WebSocket closed before open (code=${code}, reason=${reason.toString()})`), From af4db56c5f76fc8e8b5275edd5812968bc193def Mon Sep 17 00:00:00 2001 From: lukasIO Date: Tue, 14 Apr 2026 14:25:18 +0200 Subject: [PATCH 05/13] asError --- agents/src/beta/workflows/task_group.ts | 4 ++-- agents/src/inference/interruption/interruption_detector.ts | 4 ++-- agents/src/stream/merge_readable_streams.ts | 4 ++-- agents/src/utils.ts | 4 ++-- agents/src/voice/agent_session.ts | 4 ++-- examples/src/testing/agent_task.test.ts | 4 ++-- examples/src/testing/basic_task_group.test.ts | 4 ++-- examples/src/testing/task_group.test.ts | 4 ++-- plugins/cartesia/src/tts.ts | 6 +++--- plugins/elevenlabs/src/tts.ts | 4 ++-- plugins/inworld/src/tts.ts | 4 ++-- plugins/phonic/src/realtime/realtime_model.ts | 4 ++-- 12 files changed, 25 insertions(+), 25 deletions(-) diff --git a/agents/src/beta/workflows/task_group.ts b/agents/src/beta/workflows/task_group.ts index 247390886..73a336f7b 100644 --- a/agents/src/beta/workflows/task_group.ts +++ b/agents/src/beta/workflows/task_group.ts @@ -1,7 +1,7 @@ // SPDX-FileCopyrightText: 2026 LiveKit, Inc. // // SPDX-License-Identifier: Apache-2.0 -import { unknownToError } from 'agents/src/utils.js'; +import { asError } from 'agents/src/utils.js'; import { z } from 'zod'; import type { ChatContext } from '../../llm/chat_context.js'; import { LLM, ToolError, ToolFlag, tool } from '../../llm/index.js'; @@ -115,7 +115,7 @@ export class TaskGroup extends AgentTask { taskResults[taskId] = e; continue; } else { - this.complete(unknownToError(e)); + this.complete(asError(e)); return; } } diff --git a/agents/src/inference/interruption/interruption_detector.ts b/agents/src/inference/interruption/interruption_detector.ts index d47a3fc0b..9bb39318d 100644 --- a/agents/src/inference/interruption/interruption_detector.ts +++ b/agents/src/inference/interruption/interruption_detector.ts @@ -3,7 +3,7 @@ // SPDX-License-Identifier: Apache-2.0 import { ThrowsPromise } from '@livekit/throws-transformer/throws'; import type { TypedEventEmitter } from '@livekit/typed-emitter'; -import { unknownToError } from 'agents/src/utils.js'; +import { asError } from 'agents/src/utils.js'; import EventEmitter from 'events'; import { log } from '../../log.js'; import type { InterruptionMetrics } from '../../metrics/base.js'; @@ -167,7 +167,7 @@ export class AdaptiveInterruptionDetector extends (EventEmitter as new () => Typ this.streams.add(streamBase); return streamBase; } catch (e) { - const cause = unknownToError(e); + const cause = asError(e); this.emitError(new InterruptionDetectionError(cause.message, Date.now(), this._label, false)); throw e; } diff --git a/agents/src/stream/merge_readable_streams.ts b/agents/src/stream/merge_readable_streams.ts index 1ebb9d032..dbce90c67 100644 --- a/agents/src/stream/merge_readable_streams.ts +++ b/agents/src/stream/merge_readable_streams.ts @@ -3,7 +3,7 @@ // SPDX-License-Identifier: Apache-2.0 import { ThrowsPromise } from '@livekit/throws-transformer/throws'; import { ReadableStream } from 'node:stream/web'; -import { unknownToError, withResolvers } from '../utils.js'; +import { asError, withResolvers } from '../utils.js'; // Adapted from https://github.com/denoland/std/blob/main/streams/merge_readable_streams.ts // we manually adapted to make ReadableStream typing compatible with our current node @@ -32,7 +32,7 @@ export function mergeReadableStreams(...streams: ReadableStream[]): Readab } resolvePromises[index]!.resolve(); } catch (error) { - resolvePromises[index]!.reject(unknownToError(error)); + resolvePromises[index]!.reject(asError(error)); } })(); } diff --git a/agents/src/utils.ts b/agents/src/utils.ts index a601798a8..f8ac11342 100644 --- a/agents/src/utils.ts +++ b/agents/src/utils.ts @@ -244,7 +244,7 @@ export class CancellablePromise { executor( resolve, (reason) => { - this.#error = unknownToError(reason); + this.#error = asError(reason); reject(this.#error); }, (cancelFn) => { @@ -1067,7 +1067,7 @@ export const isHosted = (): boolean => { return process.env.LIVEKIT_REMOTE_EOT_URL !== undefined; }; -export function unknownToError(maybeError: unknown): Error { +export function asError(maybeError: unknown): Error { if (maybeError instanceof Error) { return maybeError; } diff --git a/agents/src/voice/agent_session.ts b/agents/src/voice/agent_session.ts index 40c337468..2b5ba75ad 100644 --- a/agents/src/voice/agent_session.ts +++ b/agents/src/voice/agent_session.ts @@ -37,7 +37,7 @@ import { type ResolvedSessionConnectOptions, type SessionConnectOptions, } from '../types.js'; -import { Task, unknownToError } from '../utils.js'; +import { Task, asError } from '../utils.js'; import type { VAD } from '../vad.js'; import type { Agent } from './agent.js'; import { @@ -768,7 +768,7 @@ export class AgentSession< unlock(); this.generateReply({ userInput }); } catch (e) { - runState._reject(unknownToError(e)); + runState._reject(asError(e)); } })(); diff --git a/examples/src/testing/agent_task.test.ts b/examples/src/testing/agent_task.test.ts index 2c0fd0385..163d232a1 100644 --- a/examples/src/testing/agent_task.test.ts +++ b/examples/src/testing/agent_task.test.ts @@ -1,7 +1,7 @@ // SPDX-FileCopyrightText: 2026 LiveKit, Inc. // // SPDX-License-Identifier: Apache-2.0 -import { Future, initializeLogger, llm, unknownToError, voice } from '@livekit/agents'; +import { Future, asError, initializeLogger, llm, voice } from '@livekit/agents'; import * as openai from '@livekit/agents-plugin-openai'; import { afterEach, describe, expect, it } from 'vitest'; import { z } from 'zod'; @@ -31,7 +31,7 @@ async function withFutureResolution(done: Future, fn: () => Promise): P try { done.resolve(await fn()); } catch (error) { - done.reject(unknownToError(error)); + done.reject(asError(error)); } } diff --git a/examples/src/testing/basic_task_group.test.ts b/examples/src/testing/basic_task_group.test.ts index 9aaf3d794..afbc7400e 100644 --- a/examples/src/testing/basic_task_group.test.ts +++ b/examples/src/testing/basic_task_group.test.ts @@ -26,7 +26,7 @@ * - onEnter() signals readiness instead of awaiting generateReply() * - Tasks tested in isolation and as a group */ -import { Future, beta, initializeLogger, llm, unknownToError, voice } from '@livekit/agents'; +import { Future, asError, beta, initializeLogger, llm, voice } from '@livekit/agents'; import { afterEach, describe, expect, it } from 'vitest'; import { z } from 'zod'; @@ -45,7 +45,7 @@ async function withFutureResolution(done: Future, fn: () => Promise): P try { done.resolve(await fn()); } catch (error) { - done.reject(unknownToError(error)); + done.reject(asError(error)); } } diff --git a/examples/src/testing/task_group.test.ts b/examples/src/testing/task_group.test.ts index 2e523548d..3d5afff06 100644 --- a/examples/src/testing/task_group.test.ts +++ b/examples/src/testing/task_group.test.ts @@ -1,7 +1,7 @@ // SPDX-FileCopyrightText: 2026 LiveKit, Inc. // // SPDX-License-Identifier: Apache-2.0 -import { Future, beta, initializeLogger, llm, unknownToError, voice } from '@livekit/agents'; +import { Future, asError, beta, initializeLogger, llm, voice } from '@livekit/agents'; import { afterEach, describe, expect, it } from 'vitest'; import { z } from 'zod'; @@ -15,7 +15,7 @@ async function withFutureResolution(done: Future, fn: () => Promise): P try { done.resolve(await fn()); } catch (error) { - done.reject(unknownToError(error)); + done.reject(asError(error)); } } diff --git a/plugins/cartesia/src/tts.ts b/plugins/cartesia/src/tts.ts index cf12d15f2..966c57f2d 100644 --- a/plugins/cartesia/src/tts.ts +++ b/plugins/cartesia/src/tts.ts @@ -8,6 +8,7 @@ import { AudioByteStream, Future, type TimedString, + asError, createTimedString, getBaseLanguage, log, @@ -16,7 +17,6 @@ import { stream, tokenize, tts, - unknownToError, } from '@livekit/agents'; import type { AudioFrame } from '@livekit/rtc-node'; import { request } from 'node:https'; @@ -592,7 +592,7 @@ const hasAnyTransientCode = (e: unknown): boolean => { }; const toRetryableConnectionError = (e: unknown): APIConnectionError => { - const err = unknownToError(e); + const err = asError(e); const isTimeout = hasErrorCode(e, 'ETIMEDOUT') || (typeof err.message === 'string' && err.message.includes('ETIMEDOUT')); @@ -627,7 +627,7 @@ const waitForWsOpen = async ({ }; const onOpen = () => fut.resolve(); - const onError = (err: Error) => fut.reject(unknownToError(err)); + const onError = (err: Error) => fut.reject(asError(err)); const onClose = (code: number, reason: Buffer) => fut.reject( new Error(`WebSocket closed before open (code=${code}, reason=${reason.toString()})`), diff --git a/plugins/elevenlabs/src/tts.ts b/plugins/elevenlabs/src/tts.ts index 141ae6c00..c1e4ad592 100644 --- a/plugins/elevenlabs/src/tts.ts +++ b/plugins/elevenlabs/src/tts.ts @@ -10,6 +10,7 @@ import { AudioByteStream, Future, type TimedString, + asError, createTimedString, getBaseLanguage, log, @@ -18,7 +19,6 @@ import { stream, tokenize, tts, - unknownToError, } from '@livekit/agents'; import { Mutex } from '@livekit/mutex'; import type { AudioFrame } from '@livekit/rtc-node'; @@ -592,7 +592,7 @@ class Connection { } catch (e) { this.#logger.warn({ error: e }, 'recv loop error'); for (const ctx of this.#contextData.values()) { - ctx.waiter.reject(unknownToError(e)); + ctx.waiter.reject(asError(e)); } this.#contextData.clear(); } finally { diff --git a/plugins/inworld/src/tts.ts b/plugins/inworld/src/tts.ts index acf0a144c..28848b95b 100644 --- a/plugins/inworld/src/tts.ts +++ b/plugins/inworld/src/tts.ts @@ -5,12 +5,12 @@ import { type APIConnectOptions, AudioByteStream, type TimedString, + asError, createTimedString, log, shortuuid, tokenize, tts, - unknownToError, } from '@livekit/agents'; import type { AudioFrame } from '@livekit/rtc-node'; import { randomUUID } from 'node:crypto'; @@ -194,7 +194,7 @@ class WSConnectionPool { try { return await this.#attemptConnection(); } catch (err) { - lastError = unknownToError(err); + lastError = asError(err); this.#connecting = undefined; if (attempt < MAX_RETRIES) { diff --git a/plugins/phonic/src/realtime/realtime_model.ts b/plugins/phonic/src/realtime/realtime_model.ts index 1dd54aa48..a966e06f6 100644 --- a/plugins/phonic/src/realtime/realtime_model.ts +++ b/plugins/phonic/src/realtime/realtime_model.ts @@ -6,11 +6,11 @@ import { AudioByteStream, DEFAULT_API_CONNECT_OPTIONS, Future, + asError, llm, log, shortuuid, stream, - unknownToError, } from '@livekit/agents'; import { AudioFrame, AudioResampler } from '@livekit/rtc-node'; import type { Phonic } from 'phonic'; @@ -274,7 +274,7 @@ export class RealtimeSession extends llm.RealtimeSession { (PHONIC_INPUT_SAMPLE_RATE * PHONIC_INPUT_FRAME_MS) / 1000, ); this.connectTask = this.connect().catch((error: unknown) => { - const normalizedError = unknownToError(error); + const normalizedError = asError(error); this.emitError(normalizedError, false); }); } From df40d6b35b8d4addaccc5a8f7ede0b7b15519749 Mon Sep 17 00:00:00 2001 From: lukasIO Date: Tue, 14 Apr 2026 14:26:55 +0200 Subject: [PATCH 06/13] remove test file --- examples/src/sms_agent.ts | 103 -------------------------------------- 1 file changed, 103 deletions(-) delete mode 100644 examples/src/sms_agent.ts diff --git a/examples/src/sms_agent.ts b/examples/src/sms_agent.ts deleted file mode 100644 index 21c160982..000000000 --- a/examples/src/sms_agent.ts +++ /dev/null @@ -1,103 +0,0 @@ -// SPDX-FileCopyrightText: 2025 LiveKit, Inc. -// -// SPDX-License-Identifier: Apache-2.0 -import { - AgentServer, - type JobContext, - type JobProcess, - ServerOptions, - cli, - defineAgent, - inference, - llm, - log, - metrics, - voice, -} from '@livekit/agents'; -import * as livekit from '@livekit/agents-plugin-livekit'; -import * as silero from '@livekit/agents-plugin-silero'; -import type { AgentSession } from 'agents/dist/voice/agent_session.js'; -import { fetchWeatherForecast } from 'my-weather-api'; -import { fileURLToPath } from 'node:url'; -import { z } from 'zod'; - -const server: AgentServer = createServer({ - port: process.env.PORT, - worker: fileURLToPath(import.meta.url), -}); - -const myAgent = createAgent(); - -const weatherTool = createTool('getWeather', (toolCtx) => { - return { - description: 'Get the weather for a given location.', - parameters: z.object({ - location: z.string().describe('The location to get the weather for'), - }), - execute: async ({ location }) => { - const weatherPromise = fetchWeatherForecast({ location }); - await toolCtx.session.say('Fetching the weather for you'); - return weatherPromise; - }, - }; -}); - -const flightBooking = createTask<{ - start: string; - desiredDestinations: string[]; - startDate: Date; - endDate: Date; -}>('sunSeekerFlightBooking', async function* (taskCtx, state) { - taskCtx.generateReply('help the user to book a flight to the sunniest destination'); - const availableFlights = await taskCtx.step(async () => { - return searchForFlights(state.start, state.desiredDestinations, state.startDate, state.endDate); - }); - - const sunniestDestinations = await taskCtx.step(async () => { - const sunshineMap = new Map(); - for (const flight of availableFlights) { - const forecast: Array<{ hoursOfSunshine: number; maxTemperature: number }> = - await fetchWeatherForecast({ - location: flight.destination, - startDate: state.startDate, - endDate: state.endDate, - }); - sunshineMap.set( - flight.destination, - forecast.reduce((accHoursOfSun, day) => { - accHoursOfSun += day.hoursOfSunshine; - return accHoursOfSun; - }, 0), - ); - } - return Array.from(sunshineMap.entries()).sort((a, b) => a[1] - b[1]); - }); - - yield `The sunniest destination on your travel dates is ${sunniestDestinations[0]}, do you want to go there?`; - await taskCtx.waitForApproval(); - yield `The following flights are available for ${sunniestDestinations[0]}`; -}); - -server.on('rtc', (jobContext: JobContext) => {}); - -// generic event registering for different endpoints, e.g. text mode -// .on(event, endpoint, handler) -server.on('text', 'weather', async (textMessageContext) => { - const session: AgentSession = createSession({ llm: 'openai/gpt-4.1-mini' }); - - // make tools available to the agent on demand, e.g. depending on the endpoint - myAgent.updateTools(weatherTool); - const startResult = session.start({ agent: myAgent }); - - for await (const ev of startResult) { - await textMessageContext.sendResponse(ev); - } - - for await (const ev of session.run({ userInput: textMessageContext.text })) { - await textMessageContext.sendResponse(ev); - } -}); - -export default server; - -cli.runApp(server); From 31ce9d0a7147acf5bd968e39641e5692b06e48a2 Mon Sep 17 00:00:00 2001 From: lukasIO Date: Tue, 14 Apr 2026 14:28:32 +0200 Subject: [PATCH 07/13] format --- agents/src/ipc/job_proc_lazy_main.ts | 2 +- agents/src/telemetry/logging.ts | 2 +- agents/src/transcription.ts | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/agents/src/ipc/job_proc_lazy_main.ts b/agents/src/ipc/job_proc_lazy_main.ts index a24ae54f0..dba451ada 100644 --- a/agents/src/ipc/job_proc_lazy_main.ts +++ b/agents/src/ipc/job_proc_lazy_main.ts @@ -1,8 +1,8 @@ // SPDX-FileCopyrightText: 2024 LiveKit, Inc. // // SPDX-License-Identifier: Apache-2.0 -import { ThrowsPromise } from '@livekit/throws-transformer/throws'; import { Room, RoomEvent, dispose } from '@livekit/rtc-node'; +import { ThrowsPromise } from '@livekit/throws-transformer/throws'; import { EventEmitter, once } from 'node:events'; import { pathToFileURL } from 'node:url'; import type { Logger } from 'pino'; diff --git a/agents/src/telemetry/logging.ts b/agents/src/telemetry/logging.ts index 5f3130560..4f589875f 100644 --- a/agents/src/telemetry/logging.ts +++ b/agents/src/telemetry/logging.ts @@ -1,9 +1,9 @@ // SPDX-FileCopyrightText: 2025 LiveKit, Inc. // // SPDX-License-Identifier: Apache-2.0 +import { ThrowsPromise } from '@livekit/throws-transformer/throws'; import type { Attributes } from '@opentelemetry/api'; import type { LogRecord, LogRecordProcessor } from '@opentelemetry/sdk-logs'; -import { ThrowsPromise } from '@livekit/throws-transformer/throws'; /** * Metadata log processor that injects metadata (room_id, job_id) into all log records. diff --git a/agents/src/transcription.ts b/agents/src/transcription.ts index 2c50cdf5b..bee01e20a 100644 --- a/agents/src/transcription.ts +++ b/agents/src/transcription.ts @@ -3,9 +3,9 @@ // SPDX-License-Identifier: Apache-2.0 import { TranscriptionSegment } from '@livekit/protocol'; import { AudioFrame } from '@livekit/rtc-node'; +import { ThrowsPromise } from '@livekit/throws-transformer/throws'; import type { TypedEventEmitter as TypedEmitter } from '@livekit/typed-emitter'; import { EventEmitter } from 'node:events'; -import { ThrowsPromise } from '@livekit/throws-transformer/throws'; import { basic } from './tokenize/index.js'; import type { SentenceStream, SentenceTokenizer } from './tokenize/tokenizer.js'; import { AsyncIterableQueue, Future, shortuuid } from './utils.js'; From 97be24ab69c9bd590a15b9e7d6961be69926372b Mon Sep 17 00:00:00 2001 From: lukasIO Date: Tue, 14 Apr 2026 14:29:00 +0200 Subject: [PATCH 08/13] Create warm-papayas-smash.md --- .changeset/warm-papayas-smash.md | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 .changeset/warm-papayas-smash.md diff --git a/.changeset/warm-papayas-smash.md b/.changeset/warm-papayas-smash.md new file mode 100644 index 000000000..3e4fdd61d --- /dev/null +++ b/.changeset/warm-papayas-smash.md @@ -0,0 +1,5 @@ +--- +"@livekit/agents": patch +--- + +Use ThrowsPromise helper across agent package From c171ca94dcfb68ae697ab18c18115212d5238880 Mon Sep 17 00:00:00 2001 From: lukasIO Date: Tue, 14 Apr 2026 14:49:01 +0200 Subject: [PATCH 09/13] extend to cancellable promise --- agents/src/transcription.ts | 11 ++++++----- agents/src/utils.ts | 31 +++++++++++++++++-------------- 2 files changed, 23 insertions(+), 19 deletions(-) diff --git a/agents/src/transcription.ts b/agents/src/transcription.ts index bee01e20a..f46ff952e 100644 --- a/agents/src/transcription.ts +++ b/agents/src/transcription.ts @@ -3,7 +3,8 @@ // SPDX-License-Identifier: Apache-2.0 import { TranscriptionSegment } from '@livekit/protocol'; import { AudioFrame } from '@livekit/rtc-node'; -import { ThrowsPromise } from '@livekit/throws-transformer/throws'; +import { ThrowsPromise } from '@livekit/throws-transformer/promise'; +import type { Throws } from '@livekit/throws-transformer/throws'; import type { TypedEventEmitter as TypedEmitter } from '@livekit/typed-emitter'; import { EventEmitter } from 'node:events'; import { basic } from './tokenize/index.js'; @@ -60,7 +61,7 @@ export class TextAudioSynchronizer extends (EventEmitter as new () => TypedEmitt #closed = false; #interrupted = false; - #closeFut = new Future(); + #closeFut = new Future(); #playingSegIndex = -1; #finishedSegIndex = -1; @@ -289,10 +290,10 @@ export class TextAudioSynchronizer extends (EventEmitter as new () => TypedEmitt textData.forwardedSentences++; } - async #sleepIfNotClosed(delay: number) { - await Promise.race([ + async #sleepIfNotClosed(delay: number): Promise> { + await ThrowsPromise.race([ this.#closeFut.await, - new Promise((resolve) => setTimeout(resolve, delay)), + new ThrowsPromise((resolve) => setTimeout(resolve, delay)), ]); } diff --git a/agents/src/utils.ts b/agents/src/utils.ts index f8ac11342..034fd0963 100644 --- a/agents/src/utils.ts +++ b/agents/src/utils.ts @@ -39,9 +39,12 @@ export type AudioBuffer = AudioFrame[] | AudioFrame; export const noop = () => {}; -export const isPending = async (promise: Promise): Promise => { +export const isPending = async (promise: Promise): Promise> => { const sentinel = Symbol('sentinel'); - const result = await Promise.race([promise, Promise.resolve(sentinel)]); + const result = await Promise.race([ + ThrowsPromise.fromPromise(promise), + ThrowsPromise.resolve(sentinel), + ]); return result === sentinel; }; @@ -225,27 +228,27 @@ export class Event { } /** @internal */ -export class CancellablePromise { - #promise: Promise; +export class CancellablePromise { + #promise: ThrowsPromise; #cancelFn: () => void; #isCancelled: boolean = false; - #error: Error | null = null; + #error: E | null = null; constructor( executor: ( resolve: (value: T | PromiseLike) => void, - reject: (reason?: unknown) => void, + reject: (reason: E) => void, onCancel: (cancelFn: () => void) => void, ) => void, ) { let cancel: () => void; - this.#promise = new ThrowsPromise((resolve, reject) => { + this.#promise = new ThrowsPromise((resolve, reject) => { executor( resolve, (reason) => { - this.#error = asError(reason); - reject(this.#error); + this.#error = reason; + reject(reason); }, (cancelFn) => { cancel = () => { @@ -269,18 +272,18 @@ export class CancellablePromise { then( onfulfilled?: ((value: T) => TResult1 | Promise) | null, - onrejected?: ((reason: unknown) => TResult2 | Promise) | null, + onrejected?: ((reason: E) => TResult2 | Promise) | null, ): Promise { return this.#promise.then(onfulfilled, onrejected); } catch( - onrejected?: ((reason: unknown) => TResult | Promise) | null, - ): Promise { - return this.#promise.catch(onrejected); + onrejected?: ((reason: E) => TResult | Promise) | null, + ): Promise> { + return this.#promise.catch((e) => onrejected?.(e)); } - finally(onfinally?: (() => void) | null): Promise { + finally(onfinally?: (() => void) | null): Promise> { return this.#promise.finally(onfinally); } From e51a52edaaca474a1f52613c4f4b14554dbf9b29 Mon Sep 17 00:00:00 2001 From: lukasIO Date: Tue, 14 Apr 2026 14:51:50 +0200 Subject: [PATCH 10/13] Update agents/src/beta/workflows/task_group.ts Co-authored-by: devin-ai-integration[bot] <158243242+devin-ai-integration[bot]@users.noreply.github.com> --- agents/src/beta/workflows/task_group.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/agents/src/beta/workflows/task_group.ts b/agents/src/beta/workflows/task_group.ts index 73a336f7b..48e96eccb 100644 --- a/agents/src/beta/workflows/task_group.ts +++ b/agents/src/beta/workflows/task_group.ts @@ -1,7 +1,7 @@ // SPDX-FileCopyrightText: 2026 LiveKit, Inc. // // SPDX-License-Identifier: Apache-2.0 -import { asError } from 'agents/src/utils.js'; +import { asError } from '../../utils.js'; import { z } from 'zod'; import type { ChatContext } from '../../llm/chat_context.js'; import { LLM, ToolError, ToolFlag, tool } from '../../llm/index.js'; From 8a8b9e7ffe23538ef17bf2005e1b948142a87308 Mon Sep 17 00:00:00 2001 From: lukasIO Date: Tue, 14 Apr 2026 14:51:59 +0200 Subject: [PATCH 11/13] Update agents/src/inference/interruption/interruption_detector.ts Co-authored-by: devin-ai-integration[bot] <158243242+devin-ai-integration[bot]@users.noreply.github.com> --- agents/src/inference/interruption/interruption_detector.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/agents/src/inference/interruption/interruption_detector.ts b/agents/src/inference/interruption/interruption_detector.ts index 9bb39318d..a7f0a5e33 100644 --- a/agents/src/inference/interruption/interruption_detector.ts +++ b/agents/src/inference/interruption/interruption_detector.ts @@ -3,7 +3,7 @@ // SPDX-License-Identifier: Apache-2.0 import { ThrowsPromise } from '@livekit/throws-transformer/throws'; import type { TypedEventEmitter } from '@livekit/typed-emitter'; -import { asError } from 'agents/src/utils.js'; +import { asError } from '../../utils.js'; import EventEmitter from 'events'; import { log } from '../../log.js'; import type { InterruptionMetrics } from '../../metrics/base.js'; From 038a70238a1a31920085413b94b5b8eb46777073 Mon Sep 17 00:00:00 2001 From: lukasIO Date: Tue, 14 Apr 2026 14:55:20 +0200 Subject: [PATCH 12/13] format --- agents/src/beta/workflows/task_group.ts | 2 +- agents/src/inference/interruption/interruption_detector.ts | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/agents/src/beta/workflows/task_group.ts b/agents/src/beta/workflows/task_group.ts index 48e96eccb..8147c814b 100644 --- a/agents/src/beta/workflows/task_group.ts +++ b/agents/src/beta/workflows/task_group.ts @@ -1,10 +1,10 @@ // SPDX-FileCopyrightText: 2026 LiveKit, Inc. // // SPDX-License-Identifier: Apache-2.0 -import { asError } from '../../utils.js'; import { z } from 'zod'; import type { ChatContext } from '../../llm/chat_context.js'; import { LLM, ToolError, ToolFlag, tool } from '../../llm/index.js'; +import { asError } from '../../utils.js'; import { AgentTask } from '../../voice/agent.js'; interface FactoryInfo { diff --git a/agents/src/inference/interruption/interruption_detector.ts b/agents/src/inference/interruption/interruption_detector.ts index a7f0a5e33..99778181c 100644 --- a/agents/src/inference/interruption/interruption_detector.ts +++ b/agents/src/inference/interruption/interruption_detector.ts @@ -3,10 +3,10 @@ // SPDX-License-Identifier: Apache-2.0 import { ThrowsPromise } from '@livekit/throws-transformer/throws'; import type { TypedEventEmitter } from '@livekit/typed-emitter'; -import { asError } from '../../utils.js'; import EventEmitter from 'events'; import { log } from '../../log.js'; import type { InterruptionMetrics } from '../../metrics/base.js'; +import { asError } from '../../utils.js'; import { DEFAULT_INFERENCE_URL, STAGING_INFERENCE_URL, getDefaultInferenceUrl } from '../utils.js'; import { FRAMES_PER_SECOND, SAMPLE_RATE, interruptionOptionDefaults } from './defaults.js'; import { InterruptionDetectionError } from './errors.js'; From 994d83a7a2089a02c6f13ff5a9b2fb092ef148b8 Mon Sep 17 00:00:00 2001 From: lukasIO Date: Tue, 14 Apr 2026 15:44:41 +0200 Subject: [PATCH 13/13] fix onrejected --- agents/src/utils.ts | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/agents/src/utils.ts b/agents/src/utils.ts index 034fd0963..50f559377 100644 --- a/agents/src/utils.ts +++ b/agents/src/utils.ts @@ -280,7 +280,7 @@ export class CancellablePromise { catch( onrejected?: ((reason: E) => TResult | Promise) | null, ): Promise> { - return this.#promise.catch((e) => onrejected?.(e)); + return this.#promise.catch(onrejected); } finally(onfinally?: (() => void) | null): Promise> { @@ -291,6 +291,8 @@ export class CancellablePromise { this.#cancelFn(); } + static from(promise: Promise>): CancellablePromise; + static from(promise: Promise): CancellablePromise; static from(promise: Promise): CancellablePromise { return new CancellablePromise((resolve, reject) => { promise.then(resolve).catch(reject);