Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 118 additions & 1 deletion agents/src/utils.test.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,19 @@
// SPDX-FileCopyrightText: 2024 LiveKit, Inc.
//
// SPDX-License-Identifier: Apache-2.0
import { AudioFrame } from '@livekit/rtc-node';
import { delay } from '@std/async';
import { ReadableStream } from 'node:stream/web';
import { describe, expect, it } from 'vitest';
import { initializeLogger } from '../src/log.js';
import { Event, TASK_TIMEOUT_ERROR, Task, TaskResult, isPending } from '../src/utils.js';
import {
Event,
TASK_TIMEOUT_ERROR,
Task,
TaskResult,
isPending,
resampleStream,
} from '../src/utils.js';

describe('utils', () => {
// initialize logger
Expand Down Expand Up @@ -538,4 +547,112 @@ describe('utils', () => {
expect(await waiterAfterSet).toBe(true);
});
});

describe('resampleStream', () => {
const createAudioFrame = (sampleRate: number, samples: number, channels = 1): AudioFrame => {
const data = new Int16Array(samples * channels);
for (let i = 0; i < data.length; i++) {
data[i] = Math.sin((i / samples) * Math.PI * 2) * 16000;
}
return new AudioFrame(data, sampleRate, channels, samples);
};

const streamToArray = async (stream: ReadableStream<AudioFrame>): Promise<AudioFrame[]> => {
const reader = stream.getReader();
const chunks: AudioFrame[] = [];
try {
while (true) {
const { done, value } = await reader.read();
if (done) break;
chunks.push(value);
}
} finally {
reader.releaseLock();
}
return chunks;
};

it('should resample audio frames to target sample rate', async () => {
const inputRate = 48000;
const outputRate = 16000;
const inputFrame = createAudioFrame(inputRate, 960); // 20ms at 48kHz

const inputStream = new ReadableStream<AudioFrame>({
start(controller) {
controller.enqueue(inputFrame);
controller.close();
},
});

const outputStream = resampleStream({ stream: inputStream, outputRate });
const outputFrames = await streamToArray(outputStream);

expect(outputFrames.length).toBeGreaterThan(0);

for (const frame of outputFrames) {
expect(frame.sampleRate).toBe(outputRate);
expect(frame.channels).toBe(inputFrame.channels);
}
});

it('should handle same input and output rate', async () => {
const sampleRate = 44100;
const inputFrame = createAudioFrame(sampleRate, 1024);

const inputStream = new ReadableStream<AudioFrame>({
start(controller) {
controller.enqueue(inputFrame);
controller.close();
},
});

const outputStream = resampleStream({ stream: inputStream, outputRate: sampleRate });
const outputFrames = await streamToArray(outputStream);

expect(outputFrames.length).toBeGreaterThan(0);

for (const frame of outputFrames) {
expect(frame.sampleRate).toBe(sampleRate);
expect(frame.channels).toBe(inputFrame.channels);
}
});

it('should handle multiple input frames', async () => {
const inputRate = 32000;
const outputRate = 48000;
const frame1 = createAudioFrame(inputRate, 640);
const frame2 = createAudioFrame(inputRate, 640);

const inputStream = new ReadableStream<AudioFrame>({
start(controller) {
controller.enqueue(frame1);
controller.enqueue(frame2);
controller.close();
},
});

const outputStream = resampleStream({ stream: inputStream, outputRate });
const outputFrames = await streamToArray(outputStream);

expect(outputFrames.length).toBeGreaterThan(0);

for (const frame of outputFrames) {
expect(frame.sampleRate).toBe(outputRate);
expect(frame.channels).toBe(frame1.channels);
}
});

it('should handle empty stream', async () => {
const inputStream = new ReadableStream<AudioFrame>({
start(controller) {
controller.close();
},
});

const outputStream = resampleStream({ stream: inputStream, outputRate: 44100 });
const outputFrames = await streamToArray(outputStream);

expect(outputFrames).toEqual([]);
});
});
});
36 changes: 36 additions & 0 deletions agents/src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,12 @@
// SPDX-License-Identifier: Apache-2.0

/* eslint-disable @typescript-eslint/no-explicit-any */
import { AudioResampler } from '@livekit/rtc-node';
import { AudioFrame } from '@livekit/rtc-node';
import { delay } from '@std/async';
import { EventEmitter, once } from 'node:events';
import type { ReadableStream } from 'node:stream/web';
import { TransformStream, type TransformStreamDefaultController } from 'node:stream/web';
import { v4 as uuidv4 } from 'uuid';
import { log } from './log.js';

Expand Down Expand Up @@ -587,3 +590,36 @@ export function isImmutableArray(array: unknown): boolean {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
return typeof array === 'object' && !!(array as any)[READONLY_SYMBOL];
}

/**
* Resamples an audio stream to a target sample rate.
*
* WARINING: The input stream will be locked until the resampled stream is closed.
*
* @param stream - The input stream to resample.
* @param outputRate - The target sample rate.
* @returns A new stream with the resampled audio.
*/
export function resampleStream({
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We will probably need this in the plugins. That's why I kept it a separate utility.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add a brief function spec to this function?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Like a JS doc you mean?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, just a one or two line brief description would be fine

stream,
outputRate,
}: {
stream: ReadableStream<AudioFrame>;
outputRate: number;
}): ReadableStream<AudioFrame> {
let resampler: AudioResampler | null = null;
const transformStream = new TransformStream<AudioFrame, AudioFrame>({
transform(chunk: AudioFrame, controller: TransformStreamDefaultController<AudioFrame>) {
if (!resampler) {
resampler = new AudioResampler(chunk.sampleRate, outputRate);
}
for (const frame of resampler.push(chunk)) {
controller.enqueue(frame);
}
for (const frame of resampler.flush()) {
controller.enqueue(frame);
}
},
});
return stream.pipeThrough(transformStream);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note that this will lock the input stream argument. When doing resource cleanup, we need to keep this in mind to avoid potential locking issue.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this a good callout. I'll add a comment for the function.

Copy link
Copy Markdown
Contributor

@toubatbrian toubatbrian Jul 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we make it a class like "ResampledStream" and we expose method for unlock, cancel, just like deferred readable stream.

I think instead of using pipeThrough we should probably use manual getReader() to pipe the input stream to the transformStream just to make things consistent, because we might need to deal with bugs if using pipeThrough as we lose control of input stream's lock. What do you think?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can just use the streampipe options?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't tried it yet, but feel free to try it.

}
8 changes: 7 additions & 1 deletion agents/src/voice/room_io/_input.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import {
import type { ReadableStream } from 'node:stream/web';
import { log } from '../../log.js';
import { DeferredReadableStream } from '../../stream/deferred_stream.js';
import { resampleStream } from '../../utils.js';

export class ParticipantAudioInputStream {
private room: Room;
Expand Down Expand Up @@ -83,7 +84,12 @@ export class ParticipantAudioInputStream {
}
this.publication = publication;
this.logger.debug({ track, publication, participant }, 'track subscribed');
this.deferredStream.setSource(this.createStream(track));
this.deferredStream.setSource(
resampleStream({
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

^ same, we need to take care of un-pipeThrough whenever we are going to use it.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you mean "un-pipeThrough"? If you call cancel on the output stream then it will signal it back down the chain?

Copy link
Copy Markdown
Contributor

@toubatbrian toubatbrian Jul 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we cancel the pipeThrough the source and destination stream will be cancelled as well and stop stream immediately, but in some situation, like deferred readable stream, when we"detach" source stream we still want to keep source stream "alive" and pipe to other stream later. Not sure if we'd ever want to "detach" source stream here tho

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a great point. Let me look through the code base and see what makes sense.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when we"detach" source stream we still want to keep source stream "alive" and pipe to other stream later

you can do this via preventCancel/Abort? [source]

Copy link
Copy Markdown
Contributor Author

@Shubhrakanti Shubhrakanti Jul 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we would set preventCancel to true in this case? Screenshot 2025-07-15 at 4 19 30 PM

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I think you're right. Going to test it with restaurant agent and handoff to make sure it works okay. Then I'll let you know.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hand off worked correctly so I think we don't need it here.

stream: this.createStream(track),
outputRate: this.sampleRate,
}),
);
return true;
};

Expand Down