diff --git a/agents/src/utils.test.ts b/agents/src/utils.test.ts index d3a347ab7..fab0d89a6 100644 --- a/agents/src/utils.test.ts +++ b/agents/src/utils.test.ts @@ -1,10 +1,19 @@ // SPDX-FileCopyrightText: 2024 LiveKit, Inc. // // SPDX-License-Identifier: Apache-2.0 +import { AudioFrame } from '@livekit/rtc-node'; import { delay } from '@std/async'; +import { ReadableStream } from 'node:stream/web'; import { describe, expect, it } from 'vitest'; import { initializeLogger } from '../src/log.js'; -import { Event, TASK_TIMEOUT_ERROR, Task, TaskResult, isPending } from '../src/utils.js'; +import { + Event, + TASK_TIMEOUT_ERROR, + Task, + TaskResult, + isPending, + resampleStream, +} from '../src/utils.js'; describe('utils', () => { // initialize logger @@ -538,4 +547,112 @@ describe('utils', () => { expect(await waiterAfterSet).toBe(true); }); }); + + describe('resampleStream', () => { + const createAudioFrame = (sampleRate: number, samples: number, channels = 1): AudioFrame => { + const data = new Int16Array(samples * channels); + for (let i = 0; i < data.length; i++) { + data[i] = Math.sin((i / samples) * Math.PI * 2) * 16000; + } + return new AudioFrame(data, sampleRate, channels, samples); + }; + + const streamToArray = async (stream: ReadableStream): Promise => { + const reader = stream.getReader(); + const chunks: AudioFrame[] = []; + try { + while (true) { + const { done, value } = await reader.read(); + if (done) break; + chunks.push(value); + } + } finally { + reader.releaseLock(); + } + return chunks; + }; + + it('should resample audio frames to target sample rate', async () => { + const inputRate = 48000; + const outputRate = 16000; + const inputFrame = createAudioFrame(inputRate, 960); // 20ms at 48kHz + + const inputStream = new ReadableStream({ + start(controller) { + controller.enqueue(inputFrame); + controller.close(); + }, + }); + + const outputStream = resampleStream({ stream: inputStream, outputRate }); + const outputFrames = await streamToArray(outputStream); + + expect(outputFrames.length).toBeGreaterThan(0); + + for (const frame of outputFrames) { + expect(frame.sampleRate).toBe(outputRate); + expect(frame.channels).toBe(inputFrame.channels); + } + }); + + it('should handle same input and output rate', async () => { + const sampleRate = 44100; + const inputFrame = createAudioFrame(sampleRate, 1024); + + const inputStream = new ReadableStream({ + start(controller) { + controller.enqueue(inputFrame); + controller.close(); + }, + }); + + const outputStream = resampleStream({ stream: inputStream, outputRate: sampleRate }); + const outputFrames = await streamToArray(outputStream); + + expect(outputFrames.length).toBeGreaterThan(0); + + for (const frame of outputFrames) { + expect(frame.sampleRate).toBe(sampleRate); + expect(frame.channels).toBe(inputFrame.channels); + } + }); + + it('should handle multiple input frames', async () => { + const inputRate = 32000; + const outputRate = 48000; + const frame1 = createAudioFrame(inputRate, 640); + const frame2 = createAudioFrame(inputRate, 640); + + const inputStream = new ReadableStream({ + start(controller) { + controller.enqueue(frame1); + controller.enqueue(frame2); + controller.close(); + }, + }); + + const outputStream = resampleStream({ stream: inputStream, outputRate }); + const outputFrames = await streamToArray(outputStream); + + expect(outputFrames.length).toBeGreaterThan(0); + + for (const frame of outputFrames) { + expect(frame.sampleRate).toBe(outputRate); + expect(frame.channels).toBe(frame1.channels); + } + }); + + it('should handle empty stream', async () => { + const inputStream = new ReadableStream({ + start(controller) { + controller.close(); + }, + }); + + const outputStream = resampleStream({ stream: inputStream, outputRate: 44100 }); + const outputFrames = await streamToArray(outputStream); + + expect(outputFrames).toEqual([]); + }); + }); }); diff --git a/agents/src/utils.ts b/agents/src/utils.ts index 1aabf2d89..1f4ab4dcd 100644 --- a/agents/src/utils.ts +++ b/agents/src/utils.ts @@ -3,9 +3,12 @@ // SPDX-License-Identifier: Apache-2.0 /* eslint-disable @typescript-eslint/no-explicit-any */ +import { AudioResampler } from '@livekit/rtc-node'; import { AudioFrame } from '@livekit/rtc-node'; import { delay } from '@std/async'; import { EventEmitter, once } from 'node:events'; +import type { ReadableStream } from 'node:stream/web'; +import { TransformStream, type TransformStreamDefaultController } from 'node:stream/web'; import { v4 as uuidv4 } from 'uuid'; import { log } from './log.js'; @@ -587,3 +590,36 @@ export function isImmutableArray(array: unknown): boolean { // eslint-disable-next-line @typescript-eslint/no-explicit-any return typeof array === 'object' && !!(array as any)[READONLY_SYMBOL]; } + +/** + * Resamples an audio stream to a target sample rate. + * + * WARINING: The input stream will be locked until the resampled stream is closed. + * + * @param stream - The input stream to resample. + * @param outputRate - The target sample rate. + * @returns A new stream with the resampled audio. + */ +export function resampleStream({ + stream, + outputRate, +}: { + stream: ReadableStream; + outputRate: number; +}): ReadableStream { + let resampler: AudioResampler | null = null; + const transformStream = new TransformStream({ + transform(chunk: AudioFrame, controller: TransformStreamDefaultController) { + if (!resampler) { + resampler = new AudioResampler(chunk.sampleRate, outputRate); + } + for (const frame of resampler.push(chunk)) { + controller.enqueue(frame); + } + for (const frame of resampler.flush()) { + controller.enqueue(frame); + } + }, + }); + return stream.pipeThrough(transformStream); +} diff --git a/agents/src/voice/room_io/_input.ts b/agents/src/voice/room_io/_input.ts index 51ee0c380..65abb18f4 100644 --- a/agents/src/voice/room_io/_input.ts +++ b/agents/src/voice/room_io/_input.ts @@ -15,6 +15,7 @@ import { import type { ReadableStream } from 'node:stream/web'; import { log } from '../../log.js'; import { DeferredReadableStream } from '../../stream/deferred_stream.js'; +import { resampleStream } from '../../utils.js'; export class ParticipantAudioInputStream { private room: Room; @@ -83,7 +84,12 @@ export class ParticipantAudioInputStream { } this.publication = publication; this.logger.debug({ track, publication, participant }, 'track subscribed'); - this.deferredStream.setSource(this.createStream(track)); + this.deferredStream.setSource( + resampleStream({ + stream: this.createStream(track), + outputRate: this.sampleRate, + }), + ); return true; };