Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/warm-papayas-smash.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@livekit/agents": patch
---

Use ThrowsPromise helper across agent package
2 changes: 1 addition & 1 deletion agents/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@
"@livekit/mutex": "^1.1.1",
"@livekit/protocol": "^1.45.3",
"@livekit/typed-emitter": "^3.0.0",
"@livekit/throws-transformer": "0.0.0-20260320165515",
"@livekit/throws-transformer": "0.1.8",
"@opentelemetry/api": "^1.9.0",
"@opentelemetry/api-logs": "^0.54.0",
"@opentelemetry/core": "^2.2.0",
Expand Down
3 changes: 2 additions & 1 deletion agents/src/beta/workflows/task_group.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import { z } from 'zod';
import type { ChatContext } from '../../llm/chat_context.js';
import { LLM, ToolError, ToolFlag, tool } from '../../llm/index.js';
import { asError } from '../../utils.js';
import { AgentTask } from '../../voice/agent.js';

interface FactoryInfo {
Expand Down Expand Up @@ -114,7 +115,7 @@ export class TaskGroup extends AgentTask<TaskGroupResult> {
taskResults[taskId] = e;
continue;
} else {
this.complete(e instanceof Error ? e : new Error(String(e)));
this.complete(asError(e));
return;
}
}
Expand Down
3 changes: 2 additions & 1 deletion agents/src/connection_pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
//
// SPDX-License-Identifier: Apache-2.0
import { Mutex } from '@livekit/mutex';
import { ThrowsPromise } from '@livekit/throws-transformer/throws';
import { waitForAbort } from './utils.js';

/**
Expand Down Expand Up @@ -273,7 +274,7 @@ export class ConnectionPool<T> {
try {
const fnPromise = fn(conn);
const result = signal
? await Promise.race([
? await ThrowsPromise.race([
fnPromise.then((value) => ({ type: 'result' as const, value })),
waitForAbort(signal).then(() => ({ type: 'abort' as const })),
]).then((r) => {
Expand Down
5 changes: 3 additions & 2 deletions agents/src/http_server.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// SPDX-FileCopyrightText: 2024 LiveKit, Inc.
//
// SPDX-License-Identifier: Apache-2.0
import { ThrowsPromise } from '@livekit/throws-transformer/throws';
import { type IncomingMessage, type Server, type ServerResponse, createServer } from 'node:http';
import { log } from './log.js';

Expand Down Expand Up @@ -53,7 +54,7 @@ export class HTTPServer {
}

async run(): Promise<void> {
return new Promise((resolve, reject) => {
return new ThrowsPromise<void, Error>((resolve, reject) => {
this.app.listen(this.port, this.host, (err?: Error) => {
if (err) reject(err);
const address = this.app.address();
Expand All @@ -66,7 +67,7 @@ export class HTTPServer {
}

async close(): Promise<void> {
return new Promise((resolve, reject) => {
return new ThrowsPromise<void, Error>((resolve, reject) => {
this.app.close((err?: Error) => {
if (err) reject(err);
resolve();
Expand Down
6 changes: 4 additions & 2 deletions agents/src/inference/interruption/interruption_detector.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
// SPDX-FileCopyrightText: 2026 LiveKit, Inc.
//
// SPDX-License-Identifier: Apache-2.0
import { ThrowsPromise } from '@livekit/throws-transformer/throws';
import type { TypedEventEmitter } from '@livekit/typed-emitter';
import EventEmitter from 'events';
import { log } from '../../log.js';
import type { InterruptionMetrics } from '../../metrics/base.js';
import { asError } from '../../utils.js';
import { DEFAULT_INFERENCE_URL, STAGING_INFERENCE_URL, getDefaultInferenceUrl } from '../utils.js';
import { FRAMES_PER_SECOND, SAMPLE_RATE, interruptionOptionDefaults } from './defaults.js';
import { InterruptionDetectionError } from './errors.js';
Expand Down Expand Up @@ -165,7 +167,7 @@ export class AdaptiveInterruptionDetector extends (EventEmitter as new () => Typ
this.streams.add(streamBase);
return streamBase;
} catch (e) {
const cause = e instanceof Error ? e : new Error(String(e));
const cause = asError(e);
this.emitError(new InterruptionDetectionError(cause.message, Date.now(), this._label, false));
throw e;
}
Expand Down Expand Up @@ -199,6 +201,6 @@ export class AdaptiveInterruptionDetector extends (EventEmitter as new () => Typ
for (const stream of this.streams) {
updatePromises.push(stream.updateOptions(options));
}
await Promise.all(updatePromises);
await ThrowsPromise.all(updatePromises);
}
}
5 changes: 2 additions & 3 deletions agents/src/inference/interruption/ws_transport.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
// SPDX-FileCopyrightText: 2025 LiveKit, Inc.
//
// SPDX-License-Identifier: Apache-2.0
import type { Throws } from '@livekit/throws-transformer/throws';
import { type Throws, ThrowsPromise } from '@livekit/throws-transformer/throws';
import { TransformStream } from 'stream/web';
import WebSocket from 'ws';
import { z } from 'zod';
import { APIConnectionError, APIStatusError, APITimeoutError } from '../../_exceptions.js';
import { log } from '../../log.js';
import TypedPromise from '../../typed_promise.js';
import { buildMetadataHeaders, createAccessToken } from '../utils.js';
import { InterruptionCacheEntry } from './interruption_cache_entry.js';
import type { OverlappingSpeechEvent } from './types.js';
Expand Down Expand Up @@ -83,7 +82,7 @@ async function connectWebSocket(
headers: { ...buildMetadataHeaders(), Authorization: `Bearer ${token}` },
});

await new TypedPromise<void, APIStatusError | APITimeoutError | APIConnectionError>(
await new ThrowsPromise<void, APIStatusError | APITimeoutError | APIConnectionError>(
(resolve, reject) => {
const timeout = setTimeout(() => {
ws.terminate();
Expand Down
13 changes: 7 additions & 6 deletions agents/src/inference/stt.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
//
// SPDX-License-Identifier: Apache-2.0
import { type AudioFrame } from '@livekit/rtc-node';
import { ThrowsPromise } from '@livekit/throws-transformer/throws';
import type { WebSocket } from 'ws';
import { APIError, APIStatusError } from '../_exceptions.js';
import { AudioByteStream } from '../audio.js';
Expand Down Expand Up @@ -403,7 +404,7 @@ export class SpeechStream<TModel extends STTModels> extends BaseSpeechStream {
};

const createWsListener = async (ws: WebSocket, signal: AbortSignal) => {
return new Promise<void>((resolve, reject) => {
return new ThrowsPromise<void, Error | APIStatusError>((resolve, reject) => {
const onAbort = () => {
resourceCleanup();
reject(new Error('WebSocket connection aborted'));
Expand Down Expand Up @@ -446,7 +447,7 @@ export class SpeechStream<TModel extends STTModels> extends BaseSpeechStream {
);

// Create abort promise once to avoid memory leak
const abortPromise = new Promise<never>((_, reject) => {
const abortPromise = new ThrowsPromise<never, Error>((_, reject) => {
if (signal.aborted) {
return reject(new Error('Send aborted'));
}
Expand All @@ -458,7 +459,7 @@ export class SpeechStream<TModel extends STTModels> extends BaseSpeechStream {
const iterator = this.input[Symbol.asyncIterator]();
try {
while (true) {
const result = await Promise.race([iterator.next(), abortPromise]);
const result = await ThrowsPromise.race([iterator.next(), abortPromise]);

if (result.done) break;
const ev = result.value;
Expand Down Expand Up @@ -560,13 +561,13 @@ export class SpeechStream<TModel extends STTModels> extends BaseSpeechStream {
);
const recvTask = Task.from(({ signal }) => recv(signal), connController);
const waitReconnectTask = Task.from(
({ signal }) => Promise.race([this.reconnectEvent.wait(), waitForAbort(signal)]),
({ signal }) => ThrowsPromise.race([this.reconnectEvent.wait(), waitForAbort(signal)]),
connController,
);

try {
await Promise.race([
Promise.all([sendTask.result, wsListenerTask.result, recvTask.result]),
await ThrowsPromise.race([
ThrowsPromise.all([sendTask.result, wsListenerTask.result, recvTask.result]),
waitReconnectTask.result,
]);

Expand Down
3 changes: 2 additions & 1 deletion agents/src/inference/tts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
//
// SPDX-License-Identifier: Apache-2.0
import type { AudioFrame } from '@livekit/rtc-node';
import { ThrowsPromise } from '@livekit/throws-transformer/throws';
import { WebSocket } from 'ws';
import { APIError, APIStatusError, APITimeoutError } from '../_exceptions.js';
import { AudioByteStream } from '../audio.js';
Expand Down Expand Up @@ -710,7 +711,7 @@ export class SynthesizeStream<TModel extends TTSModels> extends BaseSynthesizeSt
];

try {
await Promise.all(tasks.map((t) => t.result));
await ThrowsPromise.all(tasks.map((t) => t.result));
} finally {
// Mirror python finally: unblock recv and cancel all tasks.
inputSentEvent.set();
Expand Down
3 changes: 2 additions & 1 deletion agents/src/inference/utils.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// SPDX-FileCopyrightText: 2025 LiveKit, Inc.
//
// SPDX-License-Identifier: Apache-2.0
import { ThrowsPromise } from '@livekit/throws-transformer/throws';
import { AccessToken } from 'livekit-server-sdk';
import { WebSocket } from 'ws';
import { APIConnectionError, APIStatusError } from '../_exceptions.js';
Expand Down Expand Up @@ -78,7 +79,7 @@ export async function connectWs(
headers: Record<string, string>,
timeoutMs: number,
): Promise<WebSocket> {
return new Promise<WebSocket>((resolve, reject) => {
return new ThrowsPromise<WebSocket, APIConnectionError | APIStatusError>((resolve, reject) => {
const socket = new WebSocket(url, { headers: { ...buildMetadataHeaders(), ...headers } });

const timeout = setTimeout(() => {
Expand Down
9 changes: 6 additions & 3 deletions agents/src/ipc/inference_proc_executor.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// SPDX-FileCopyrightText: 2025 LiveKit, Inc.
//
// SPDX-License-Identifier: Apache-2.0
import { ThrowsPromise } from '@livekit/throws-transformer/throws';
import type { ChildProcess } from 'node:child_process';
import { fork } from 'node:child_process';
import { extname } from 'node:path';
Expand All @@ -11,9 +12,11 @@ import type { IPCMessage } from './message.js';
import { SupervisedProc } from './supervised_proc.js';

class PendingInference {
promise = new Promise<{ requestId: string; data: unknown; error?: Error }>((resolve) => {
this.resolve = resolve;
});
promise = new ThrowsPromise<{ requestId: string; data: unknown; error?: Error }, never>(
(resolve) => {
this.resolve = resolve;
},
);
resolve(arg: { requestId: string; data: unknown; error?: Error }) {
arg;
}
Expand Down
7 changes: 4 additions & 3 deletions agents/src/ipc/inference_proc_lazy_main.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// SPDX-FileCopyrightText: 2025 LiveKit, Inc.
//
// SPDX-License-Identifier: Apache-2.0
import { ThrowsPromise } from '@livekit/throws-transformer/throws';
import { once } from 'node:events';
import type { InferenceRunner } from '../inference_runner.js';
import { initializeLogger, log } from '../log.js';
Expand Down Expand Up @@ -34,7 +35,7 @@ const ORPHANED_TIMEOUT = 15 * 1000;
});
const logger = log().child({ pid: process.pid });

const runners: { [id: string]: InferenceRunner } = await Promise.all(
const runners: { [id: string]: InferenceRunner } = await ThrowsPromise.all(
Object.entries(JSON.parse(process.argv[2]!)).map(async ([k, v]) => {
return [
k,
Expand All @@ -52,7 +53,7 @@ const ORPHANED_TIMEOUT = 15 * 1000;
}),
).then(Object.fromEntries);

await Promise.all(
await ThrowsPromise.all(
Object.entries(runners).map(async ([runner, v]) => {
logger.child({ runner }).debug('initializing inference runner');
await v.initialize();
Expand Down Expand Up @@ -101,7 +102,7 @@ const ORPHANED_TIMEOUT = 15 * 1000;
clearTimeout(orphanedTimeout);
// Remove our message handler to stop processing new messages
process.off('message', messageHandler);
Promise.all(Object.values(runners).map((r) => r.close()))
ThrowsPromise.all(Object.values(runners).map((r) => r.close()))
.then(() => {
logger.debug('Inference runners closed');
process.send!({ case: 'done' });
Expand Down
11 changes: 7 additions & 4 deletions agents/src/ipc/job_proc_lazy_main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
//
// SPDX-License-Identifier: Apache-2.0
import { Room, RoomEvent, dispose } from '@livekit/rtc-node';
import { ThrowsPromise } from '@livekit/throws-transformer/throws';
import { EventEmitter, once } from 'node:events';
import { pathToFileURL } from 'node:url';
import type { Logger } from 'pino';
Expand Down Expand Up @@ -40,9 +41,11 @@ type JobTask = {
};

class PendingInference {
promise = new Promise<{ requestId: string; data: unknown; error?: Error }>((resolve) => {
this.resolve = resolve; // this is how JavaScript lets you resolve promises externally
});
promise = new ThrowsPromise<{ requestId: string; data: unknown; error?: Error }, never>(
(resolve) => {
this.resolve = resolve; // this is how JavaScript lets you resolve promises externally
},
);
resolve(arg: { requestId: string; data: unknown; error?: Error }) {
arg; // useless call to counteract TypeScript E6133
}
Expand Down Expand Up @@ -177,7 +180,7 @@ const startJob = (
for (const callback of ctx.shutdownCallbacks) {
shutdownTasks.push(callback());
}
await Promise.all(shutdownTasks).catch((error) =>
await ThrowsPromise.all(shutdownTasks).catch((error) =>
logger.error({ error }, 'error while shutting down the job'),
);

Expand Down
4 changes: 2 additions & 2 deletions agents/src/ipc/proc_pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
//
// SPDX-License-Identifier: Apache-2.0
import { MultiMutex, Mutex } from '@livekit/mutex';
import type { Throws } from '@livekit/throws-transformer/throws';
import { type Throws, ThrowsPromise } from '@livekit/throws-transformer/throws';
import type { RunningJobInfo } from '../job.js';
import { Queue } from '../utils.js';
import type { InferenceExecutor } from './inference_executor.js';
Expand Down Expand Up @@ -174,6 +174,6 @@ export class ProcPool {
e.proc.close();
});
this.executors.forEach((e) => e.close());
await Promise.allSettled(this.tasks);
await ThrowsPromise.allSettled(this.tasks);
}
}
3 changes: 2 additions & 1 deletion agents/src/job.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import type {
RtcConfiguration,
} from '@livekit/rtc-node';
import { ParticipantKind, RoomEvent, TrackKind } from '@livekit/rtc-node';
import { ThrowsPromise } from '@livekit/throws-transformer/throws';
import { AsyncLocalStorage } from 'node:async_hooks';
import * as os from 'node:os';
import * as path from 'node:path';
Expand Down Expand Up @@ -188,7 +189,7 @@ export class JobContext {
}
}

return new Promise((resolve, reject) => {
return new ThrowsPromise<RemoteParticipant, Error>((resolve, reject) => {
const onParticipantConnected = (participant: RemoteParticipant) => {
if (
(!identity || participant.identity === identity) &&
Expand Down
7 changes: 4 additions & 3 deletions agents/src/stream/merge_readable_streams.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
// SPDX-FileCopyrightText: 2025 LiveKit, Inc.
//
// SPDX-License-Identifier: Apache-2.0
import { ThrowsPromise } from '@livekit/throws-transformer/throws';
import { ReadableStream } from 'node:stream/web';
import { withResolvers } from '../utils.js';
import { asError, withResolvers } from '../utils.js';

// Adapted from https://github.com/denoland/std/blob/main/streams/merge_readable_streams.ts
// we manually adapted to make ReadableStream<T> typing compatible with our current node
Expand All @@ -12,7 +13,7 @@ export function mergeReadableStreams<T>(...streams: ReadableStream<T>[]): Readab
return new ReadableStream<T>({
start(controller) {
let mustClose = false;
Promise.all(resolvePromises.map(({ promise }) => promise))
ThrowsPromise.all(resolvePromises.map(({ promise }) => promise))
.then(() => {
controller.close();
})
Expand All @@ -31,7 +32,7 @@ export function mergeReadableStreams<T>(...streams: ReadableStream<T>[]): Readab
}
resolvePromises[index]!.resolve();
} catch (error) {
resolvePromises[index]!.reject(error);
resolvePromises[index]!.reject(asError(error));
}
})();
}
Expand Down
3 changes: 2 additions & 1 deletion agents/src/stream/multi_input_stream.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// SPDX-FileCopyrightText: 2025 LiveKit, Inc.
//
// SPDX-License-Identifier: Apache-2.0
import { ThrowsPromise } from '@livekit/throws-transformer/throws';
import type {
ReadableStream,
ReadableStreamDefaultReader,
Expand Down Expand Up @@ -115,7 +116,7 @@ export class MultiInputStream<T> {
this.inputs.clear();

// Wait for every pump loop to finish before touching the writer.
await Promise.allSettled([...this.pumpPromises.values()]);
await ThrowsPromise.allSettled([...this.pumpPromises.values()]);
this.pumpPromises.clear();

// Close the output writer + writable side of the transform.
Expand Down
Loading
Loading