From 44ca09c7d0b89134deb7002828c793b8d4c9a091 Mon Sep 17 00:00:00 2001 From: simon Date: Thu, 13 Nov 2025 12:54:47 +0100 Subject: [PATCH 1/7] Audio Mixer ported from python --- packages/livekit-rtc/src/audio_mixer.ts | 329 ++++++++++++++++++++++++ packages/livekit-rtc/src/index.ts | 1 + 2 files changed, 330 insertions(+) create mode 100644 packages/livekit-rtc/src/audio_mixer.ts diff --git a/packages/livekit-rtc/src/audio_mixer.ts b/packages/livekit-rtc/src/audio_mixer.ts new file mode 100644 index 00000000..510cba2c --- /dev/null +++ b/packages/livekit-rtc/src/audio_mixer.ts @@ -0,0 +1,329 @@ +// SPDX-FileCopyrightText: 2025 LiveKit, Inc. +// +// SPDX-License-Identifier: Apache-2.0 +import { AudioFrame } from './audio_frame.js'; +import { log } from './log.js'; + +type AudioStream = AsyncIterator; + +interface Contribution { + stream: AudioStream; + data: Int16Array; + buffer: Int16Array; + hadData: boolean; + exhausted: boolean; +} + +/** + * AudioMixer accepts multiple async audio streams and mixes them into a single output stream. + * + * Each output frame is generated with a fixed chunk size determined by the blocksize (in samples). + * If blocksize is not provided (or 0), it defaults to 100ms worth of audio. + * + * Each input stream is processed in parallel, accumulating audio data until at least one chunk + * of samples is available. If an input stream does not provide data within the specified timeout, + * a warning is logged. + * + * @example + * ```typescript + * const mixer = new AudioMixer(48000, 1, { blocksize: 4800 }); + * mixer.addStream(stream1); + * mixer.addStream(stream2); + * + * for await (const frame of mixer) { + * await audioSource.captureFrame(frame); + * } + * ``` + */ +export class AudioMixer { + private streams: Set = new Set(); + private buffers: Map = new Map(); + private readonly sampleRate: number; + private readonly numChannels: number; + private readonly chunkSize: number; + private readonly streamTimeoutMs: number; + private queue: AudioFrame[] = []; + private readonly capacity: number; + private ending = false; + private mixerTask?: Promise; + private queueResolvers: Array<() => void> = []; + + constructor( + sampleRate: number, + numChannels: number, + options?: { + blocksize?: number; + streamTimeoutMs?: number; + capacity?: number; + }, + ) { + this.sampleRate = sampleRate; + this.numChannels = numChannels; + this.chunkSize = options?.blocksize && options.blocksize > 0 ? options.blocksize : Math.floor(sampleRate / 10); + this.streamTimeoutMs = options?.streamTimeoutMs ?? 100; + this.capacity = options?.capacity ?? 100; + + // Start the mixer task + this.mixerTask = this.mixer(); + } + + /** + * Add an audio stream to the mixer. + * + * The stream is added to the internal set of streams and an empty buffer is initialized for it, + * if not already present. + * + * @param stream - An async iterator that produces AudioFrame objects + */ + addStream(stream: AudioStream): void { + if (this.ending) { + throw new Error('Cannot add stream after mixer has been closed'); + } + + this.streams.add(stream); + if (!this.buffers.has(stream)) { + this.buffers.set(stream, new Int16Array(0)); + } + } + + /** + * Remove an audio stream from the mixer. + * + * This method removes the specified stream and its associated buffer from the mixer. + * + * @param stream - The audio stream to remove + */ + removeStream(stream: AudioStream): void { + this.streams.delete(stream); + this.buffers.delete(stream); + } + + /** + * Async iterator implementation to allow consuming mixed frames. + */ + async *[Symbol.asyncIterator](): AsyncIterator { + while (true) { + const frame = await this.getNextFrame(); + if (frame === null) { + break; + } + yield frame; + } + } + + /** + * Immediately stop mixing and close the mixer. + * + * This stops the mixing task, and any unconsumed output in the queue may be dropped. + */ + async aclose(): Promise { + this.ending = true; + if (this.mixerTask) { + await this.mixerTask; + } + // Resolve any waiting consumers + this.queueResolvers.forEach((resolve) => resolve()); + this.queueResolvers = []; + } + + /** + * Signal that no more streams will be added. + * + * This method marks the mixer as ending so that it flushes any remaining buffered output before ending. + * Note that existing streams will still be processed until exhausted. + */ + endInput(): void { + this.ending = true; + } + + private async getNextFrame(): Promise { + while (this.queue.length === 0) { + if (this.ending && this.streams.size === 0) { + return null; + } + + // Wait for frames to be added to the queue + await new Promise((resolve) => { + this.queueResolvers.push(resolve); + }); + + if (this.ending && this.streams.size === 0 && this.queue.length === 0) { + return null; + } + } + + return this.queue.shift()!; + } + + private async mixer(): Promise { + while (true) { + // If we're in ending mode and there are no more streams, exit + if (this.ending && this.streams.size === 0) { + break; + } + + if (this.streams.size === 0) { + await this.sleep(10); + continue; + } + + // Get contributions from all streams in parallel + const streamList = Array.from(this.streams); + const tasks = streamList.map((stream) => { + const buffer = this.buffers.get(stream) || new Int16Array(0); + return this.getContribution(stream, buffer); + }); + + const results = await Promise.allSettled(tasks); + const contributions: Int16Array[] = []; + let anyData = false; + const removals: AudioStream[] = []; + + for (const result of results) { + if (result.status !== 'fulfilled') { + continue; + } + + const contrib = result.value; + if (!contrib) continue; + + // Convert Int16 to Float32 for mixing + const float32Data = new Float32Array(contrib.data.length); + for (let i = 0; i < contrib.data.length; i++) { + float32Data[i] = contrib.data[i]!; + } + contributions.push(float32Data as unknown as Int16Array); + + this.buffers.set(contrib.stream, contrib.buffer); + + if (contrib.hadData) { + anyData = true; + } + + if (contrib.exhausted && contrib.buffer.length === 0) { + removals.push(contrib.stream); + } + } + + // Remove exhausted streams + for (const stream of removals) { + this.removeStream(stream); + } + + if (!anyData) { + await this.sleep(1); + continue; + } + + // Mix all contributions by summing + const mixed = new Float32Array(this.chunkSize * this.numChannels); + for (const contribution of contributions) { + for (let i = 0; i < mixed.length; i++) { + mixed[i] = (mixed[i] || 0) + ((contribution as unknown as Float32Array)[i] || 0); + } + } + + // Clip and convert back to Int16 + const int16Data = new Int16Array(mixed.length); + for (let i = 0; i < mixed.length; i++) { + const clipped = Math.max(-32768, Math.min(32767, mixed[i]!)); + int16Data[i] = Math.round(clipped); + } + + const frame = new AudioFrame(int16Data, this.sampleRate, this.numChannels, this.chunkSize); + + // Add to queue if there's space + if (this.queue.length < this.capacity) { + this.queue.push(frame); + // Notify waiting consumers + const resolver = this.queueResolvers.shift(); + if (resolver) { + resolver(); + } + } + } + + // Signal end of stream + this.queueResolvers.forEach((resolve) => resolve()); + this.queueResolvers = []; + } + + private async getContribution( + stream: AudioStream, + buf: Int16Array, + ): Promise { + let hadData = buf.length > 0; + let exhausted = false; + let buffer = buf; + + const samplesNeeded = this.chunkSize * this.numChannels; + + while (buffer.length < samplesNeeded && !exhausted) { + try { + const frame = await this.timeoutPromise( + stream.next(), + this.streamTimeoutMs, + ); + + if (frame.done) { + exhausted = true; + break; + } + + const audioFrame = frame.value; + const newData = audioFrame.data; + + // Concatenate buffers + const combined = new Int16Array(buffer.length + newData.length); + combined.set(buffer, 0); + combined.set(newData, buffer.length); + buffer = combined; + hadData = true; + } catch (error) { + if (error instanceof Error && error.message === 'timeout') { + log.warn(`AudioMixer: stream timeout, ignoring`); + break; + } + // Stream ended + exhausted = true; + break; + } + } + + let contrib: Int16Array; + if (buffer.length >= samplesNeeded) { + contrib = buffer.slice(0, samplesNeeded); + buffer = buffer.slice(samplesNeeded); + } else { + // Pad with zeros if we don't have enough data + const padded = new Int16Array(samplesNeeded); + padded.set(buffer, 0); + contrib = padded; + buffer = new Int16Array(0); + } + + return { + stream, + data: contrib, + buffer, + hadData, + exhausted, + }; + } + + private async timeoutPromise( + promise: Promise, + timeoutMs: number, + ): Promise { + return Promise.race([ + promise, + new Promise((_, reject) => + setTimeout(() => reject(new Error('timeout')), timeoutMs), + ), + ]); + } + + private sleep(ms: number): Promise { + return new Promise((resolve) => setTimeout(resolve, ms)); + } +} diff --git a/packages/livekit-rtc/src/index.ts b/packages/livekit-rtc/src/index.ts index 332e902e..4304d5ab 100644 --- a/packages/livekit-rtc/src/index.ts +++ b/packages/livekit-rtc/src/index.ts @@ -8,6 +8,7 @@ export { AudioSource } from './audio_source.js'; export { AudioStream } from './audio_stream.js'; export type { NoiseCancellationOptions } from './audio_stream.js'; export { AudioFilter } from './audio_filter.js'; +export { AudioMixer } from './audio_mixer.js'; export * from './data_streams/index.js'; export { E2EEManager, FrameCryptor, KeyProvider } from './e2ee.js'; export type { E2EEOptions, KeyProviderOptions } from './e2ee.js'; From f01e5564e2712ca2b7732c2698c15822d04f69c1 Mon Sep 17 00:00:00 2001 From: simon Date: Thu, 13 Nov 2025 14:23:44 +0100 Subject: [PATCH 2/7] AsyncQueue-based AudioMixer based on the work of @toubatbrian --- packages/livekit-rtc/src/audio_mixer.test.ts | 180 ++++++++ packages/livekit-rtc/src/audio_mixer.ts | 451 +++++++++++++------ packages/livekit-rtc/src/index.ts | 2 +- 3 files changed, 489 insertions(+), 144 deletions(-) create mode 100644 packages/livekit-rtc/src/audio_mixer.test.ts diff --git a/packages/livekit-rtc/src/audio_mixer.test.ts b/packages/livekit-rtc/src/audio_mixer.test.ts new file mode 100644 index 00000000..b66b7a5e --- /dev/null +++ b/packages/livekit-rtc/src/audio_mixer.test.ts @@ -0,0 +1,180 @@ +// SPDX-FileCopyrightText: 2024 LiveKit, Inc. +// +// SPDX-License-Identifier: Apache-2.0 + +import { describe, expect, it } from 'vitest'; +import { AudioFrame } from './audio_frame.js'; +import { AudioMixer } from './audio_mixer.js'; + +// Helper to create a mock audio stream that yields frames +async function* createMockAudioStream( + frameCount: number, + sampleRate: number, + numChannels: number, + samplesPerChannel: number, + value: number, +): AsyncGenerator { + for (let i = 0; i < frameCount; i++) { + const data = new Int16Array(numChannels * samplesPerChannel); + // Fill with a specific value + for (let j = 0; j < data.length; j++) { + data[j] = value; + } + yield new AudioFrame(data, sampleRate, numChannels, samplesPerChannel); + // Small delay to simulate real stream + await new Promise((resolve) => setTimeout(resolve, 1)); + } +} + +describe('AudioMixer', () => { + it('mixes two audio streams', async () => { + const sampleRate = 48000; + const numChannels = 1; + const samplesPerChannel = 480; // 10ms at 48kHz + const mixer = new AudioMixer(sampleRate, numChannels, { + blocksize: samplesPerChannel, + }); + + // Create two streams with different values + const stream1 = createMockAudioStream(3, sampleRate, numChannels, samplesPerChannel, 100); + const stream2 = createMockAudioStream(3, sampleRate, numChannels, samplesPerChannel, 200); + + mixer.addStream(stream1); + mixer.addStream(stream2); + + // Collect first frame + const frames: AudioFrame[] = []; + for await (const frame of mixer) { + frames.push(frame); + if (frames.length >= 1) { + break; + } + } + + await mixer.aclose(); + + expect(frames.length).toBe(1); + const frame = frames[0]!; + expect(frame.sampleRate).toBe(sampleRate); + expect(frame.channels).toBe(numChannels); + expect(frame.samplesPerChannel).toBe(samplesPerChannel); + + // Each sample should be 100 + 200 = 300 + for (let i = 0; i < frame.data.length; i++) { + expect(frame.data[i]).toBe(300); + } + }); + + it('handles stream removal', async () => { + const sampleRate = 48000; + const numChannels = 1; + const samplesPerChannel = 480; + const mixer = new AudioMixer(sampleRate, numChannels, { + blocksize: samplesPerChannel, + }); + + const stream1 = createMockAudioStream(10, sampleRate, numChannels, samplesPerChannel, 100); + const stream2 = createMockAudioStream(10, sampleRate, numChannels, samplesPerChannel, 200); + + mixer.addStream(stream1); + mixer.addStream(stream2); + + // Get one frame + const iterator = mixer[Symbol.asyncIterator](); + const result1 = await iterator.next(); + expect(result1.done).toBe(false); + expect(result1.value?.data[0]).toBe(300); + + // Remove one stream + mixer.removeStream(stream2); + + // Next frame should only have stream1's value + const result2 = await iterator.next(); + expect(result2.done).toBe(false); + // Note: there might be buffered data, so this test is simplified + + await mixer.aclose(); + }); + + it('handles empty mixer', async () => { + const sampleRate = 48000; + const numChannels = 1; + const mixer = new AudioMixer(sampleRate, numChannels); + + // Signal end without adding any streams + mixer.endInput(); + + const frames: AudioFrame[] = []; + for await (const frame of mixer) { + frames.push(frame); + } + + expect(frames.length).toBe(0); + }); + + it('clips audio values to int16 range', async () => { + const sampleRate = 48000; + const numChannels = 1; + const samplesPerChannel = 480; + const mixer = new AudioMixer(sampleRate, numChannels, { + blocksize: samplesPerChannel, + }); + + // Create streams with values that will overflow + const stream1 = createMockAudioStream( + 2, + sampleRate, + numChannels, + samplesPerChannel, + 20000, + ); + const stream2 = createMockAudioStream( + 2, + sampleRate, + numChannels, + samplesPerChannel, + 20000, + ); + + mixer.addStream(stream1); + mixer.addStream(stream2); + + const iterator = mixer[Symbol.asyncIterator](); + const result = await iterator.next(); + + await mixer.aclose(); + + expect(result.done).toBe(false); + // 20000 + 20000 = 40000, which should be clipped to 32767 + for (let i = 0; i < result.value!.data.length; i++) { + expect(result.value!.data[i]).toBe(32767); + } + }); + + it('handles exhausted streams', async () => { + const sampleRate = 48000; + const numChannels = 1; + const samplesPerChannel = 480; + const mixer = new AudioMixer(sampleRate, numChannels, { + blocksize: samplesPerChannel, + }); + + // Short stream + const stream = createMockAudioStream(2, sampleRate, numChannels, samplesPerChannel, 100); + + mixer.addStream(stream); + + const frames: AudioFrame[] = []; + for await (const frame of mixer) { + frames.push(frame); + if (frames.length >= 3) { + break; + } + } + + await mixer.aclose(); + + // Should get at least 2 frames (stream exhausts after 2) + expect(frames.length).toBeGreaterThanOrEqual(2); + }); +}); diff --git a/packages/livekit-rtc/src/audio_mixer.ts b/packages/livekit-rtc/src/audio_mixer.ts index 510cba2c..9af57115 100644 --- a/packages/livekit-rtc/src/audio_mixer.ts +++ b/packages/livekit-rtc/src/audio_mixer.ts @@ -1,10 +1,72 @@ // SPDX-FileCopyrightText: 2025 LiveKit, Inc. // // SPDX-License-Identifier: Apache-2.0 + import { AudioFrame } from './audio_frame.js'; -import { log } from './log.js'; -type AudioStream = AsyncIterator; +class AsyncQueue { + private items: T[] = []; + private waitingProducers: (() => void)[] = []; + private waitingConsumers: (() => void)[] = []; + closed = false; + + constructor(private capacity: number = Infinity) {} + + async put(item: T) { + if (this.closed) throw new Error("Queue closed"); + + while (this.items.length >= this.capacity) { + await new Promise(resolve => this.waitingProducers.push(resolve)); + } + + this.items.push(item); + + // Wake up one waiting consumer + if (this.waitingConsumers.length > 0) { + const resolve = this.waitingConsumers.shift()!; + resolve(); + } + } + + get(): T | undefined { + const item = this.items.shift(); + if (this.waitingProducers.length > 0) { + const resolve = this.waitingProducers.shift()!; + resolve(); // wakes up one waiting producer + } + return item; + } + + /** + * Wait until an item is available or the queue is closed. + * Returns immediately if items are already available. + */ + async waitForItem(): Promise { + if (this.items.length > 0 || this.closed) { + return; + } + await new Promise(resolve => this.waitingConsumers.push(resolve)); + } + + close() { + this.closed = true; + for (const resolve of this.waitingProducers) resolve(); + for (const resolve of this.waitingConsumers) resolve(); + this.waitingProducers = []; + this.waitingConsumers = []; + } + + get length() { + return this.items.length; + } +} + +// Define types for async iteration (since lib: es2015 doesn't include them) +type AudioStream = { + [Symbol.asyncIterator](): { + next(): Promise>; + }; +}; interface Contribution { stream: AudioStream; @@ -14,54 +76,83 @@ interface Contribution { exhausted: boolean; } +export interface AudioMixerOptions { + /** + * The size of the audio block (in samples) for mixing. + * If not provided, defaults to sampleRate / 10 (100ms). + */ + blocksize?: number; + + /** + * The maximum wait time in milliseconds for each stream to provide + * audio data before timing out. Defaults to 100 ms. + */ + streamTimeoutMs?: number; + + /** + * The maximum number of mixed frames to store in the output queue. + * Defaults to 100. + */ + capacity?: number; +} + /** - * AudioMixer accepts multiple async audio streams and mixes them into a single output stream. + * AudioMixer combines multiple async audio streams into a single output stream. * + * The mixer accepts multiple async audio streams and mixes them into a single output stream. * Each output frame is generated with a fixed chunk size determined by the blocksize (in samples). - * If blocksize is not provided (or 0), it defaults to 100ms worth of audio. + * If blocksize is not provided (or 0), it defaults to 100ms. * * Each input stream is processed in parallel, accumulating audio data until at least one chunk * of samples is available. If an input stream does not provide data within the specified timeout, - * a warning is logged. + * a warning is logged. The mixer can be closed immediately + * (dropping unconsumed frames) or allowed to flush remaining data using endInput(). * * @example * ```typescript - * const mixer = new AudioMixer(48000, 1, { blocksize: 4800 }); + * const mixer = new AudioMixer(48000, 2); * mixer.addStream(stream1); * mixer.addStream(stream2); * * for await (const frame of mixer) { - * await audioSource.captureFrame(frame); + * // Process mixed audio frame * } * ``` */ export class AudioMixer { - private streams: Set = new Set(); - private buffers: Map = new Map(); - private readonly sampleRate: number; - private readonly numChannels: number; - private readonly chunkSize: number; - private readonly streamTimeoutMs: number; - private queue: AudioFrame[] = []; - private readonly capacity: number; - private ending = false; + private streams: Set; + private buffers: Map; + private streamIterators: Map> }>; + private sampleRate: number; + private numChannels: number; + private chunkSize: number; + private streamTimeoutMs: number; + private queue: AsyncQueue; + private streamSignal: AsyncQueue; // Signals when streams are added + private ending: boolean; private mixerTask?: Promise; - private queueResolvers: Array<() => void> = []; - - constructor( - sampleRate: number, - numChannels: number, - options?: { - blocksize?: number; - streamTimeoutMs?: number; - capacity?: number; - }, - ) { + private closed: boolean; + + /** + * Initialize the AudioMixer. + * + * @param sampleRate - The audio sample rate in Hz. + * @param numChannels - The number of audio channels. + * @param options - Optional configuration for the mixer. + */ + constructor(sampleRate: number, numChannels: number, options: AudioMixerOptions = {}) { + this.streams = new Set(); + this.buffers = new Map(); + this.streamIterators = new Map(); this.sampleRate = sampleRate; this.numChannels = numChannels; - this.chunkSize = options?.blocksize && options.blocksize > 0 ? options.blocksize : Math.floor(sampleRate / 10); - this.streamTimeoutMs = options?.streamTimeoutMs ?? 100; - this.capacity = options?.capacity ?? 100; + this.chunkSize = + options.blocksize && options.blocksize > 0 ? options.blocksize : Math.floor(sampleRate / 10); + this.streamTimeoutMs = options.streamTimeoutMs ?? 100; + this.queue = new AsyncQueue(options.capacity ?? 100); + this.streamSignal = new AsyncQueue(1); // there should only be one mixer + this.ending = false; + this.closed = false; // Start the mixer task this.mixerTask = this.mixer(); @@ -73,17 +164,24 @@ export class AudioMixer { * The stream is added to the internal set of streams and an empty buffer is initialized for it, * if not already present. * - * @param stream - An async iterator that produces AudioFrame objects + * @param stream - An async iterable that produces AudioFrame objects. + * @throws Error if the mixer has been closed. */ addStream(stream: AudioStream): void { if (this.ending) { throw new Error('Cannot add stream after mixer has been closed'); } + console.log(`AudioMixer: Adding stream (total streams: ${this.streams.size + 1})`); this.streams.add(stream); if (!this.buffers.has(stream)) { this.buffers.set(stream, new Int16Array(0)); } + + // Signal that a stream was added (non-blocking) + this.streamSignal.put(undefined).catch(() => { + // Ignore errors if signal queue is closed + }); } /** @@ -91,24 +189,28 @@ export class AudioMixer { * * This method removes the specified stream and its associated buffer from the mixer. * - * @param stream - The audio stream to remove + * @param stream - The audio stream to remove. */ removeStream(stream: AudioStream): void { + console.log(`AudioMixer: Removing stream (total streams: ${this.streams.size - 1})`); this.streams.delete(stream); this.buffers.delete(stream); + this.streamIterators.delete(stream); } /** - * Async iterator implementation to allow consuming mixed frames. + * Returns an async iterator for the mixed audio frames. */ - async *[Symbol.asyncIterator](): AsyncIterator { - while (true) { - const frame = await this.getNextFrame(); - if (frame === null) { - break; - } - yield frame; - } + [Symbol.asyncIterator]() { + return { + next: async (): Promise> => { + const frame = await this.getNextFrame(); + if (frame === null) { + return { done: true, value: undefined }; + } + return { done: false, value: frame }; + }, + }; } /** @@ -117,19 +219,23 @@ export class AudioMixer { * This stops the mixing task, and any unconsumed output in the queue may be dropped. */ async aclose(): Promise { - this.ending = true; - if (this.mixerTask) { - await this.mixerTask; + if (this.closed) { + return; } - // Resolve any waiting consumers - this.queueResolvers.forEach((resolve) => resolve()); - this.queueResolvers = []; + this.closed = true; + this.ending = true; + + // Close both queues to wake up any waiting operations + this.streamSignal.close(); + this.queue.close(); + + await this.mixerTask; } /** * Signal that no more streams will be added. * - * This method marks the mixer as ending so that it flushes any remaining buffered output before ending. + * This method marks the mixer as closed so that it flushes any remaining buffered output before ending. * Note that existing streams will still be processed until exhausted. */ endInput(): void { @@ -137,63 +243,79 @@ export class AudioMixer { } private async getNextFrame(): Promise { - while (this.queue.length === 0) { - if (this.ending && this.streams.size === 0) { - return null; - } + while (true) { + // Try to get an item from the queue (non-blocking) + const frame = this.queue.get(); - // Wait for frames to be added to the queue - await new Promise((resolve) => { - this.queueResolvers.push(resolve); - }); + if (frame !== undefined) { + console.log(`AudioMixer: Returning frame from queue (${this.queue.length} remaining)`); + return frame; + } - if (this.ending && this.streams.size === 0 && this.queue.length === 0) { + // Check if mixer is closed or ending + if (this.queue.closed || (this.ending && this.streams.size === 0)) { + console.log('AudioMixer: getNextFrame returning null (closed or ending)'); return null; } - } - return this.queue.shift()!; + // Queue is empty but mixer is still running - wait for an item to be added + console.log('AudioMixer: Queue empty, waiting for frames...'); + await this.queue.waitForItem(); + console.log('AudioMixer: Woken up, checking queue again...'); + } } private async mixer(): Promise { + console.log('AudioMixer: mixer() task started'); + let iterationCount = 0; + while (true) { - // If we're in ending mode and there are no more streams, exit + iterationCount++; + if (iterationCount % 100 === 0) { + console.log(`AudioMixer: mixer iteration ${iterationCount}, streams: ${this.streams.size}, queue: ${this.queue.length}`); + } + + // If we're in ending mode and there are no more streams, exit. if (this.ending && this.streams.size === 0) { + console.log('AudioMixer: MIXER ENDING (no more streams)'); break; } if (this.streams.size === 0) { - await this.sleep(10); + console.log('AudioMixer: No streams available, waiting...'); + // Wait for a stream to be added (signal queue will have an item) + await this.streamSignal.waitForItem(); + // Consume the signal + this.streamSignal.get(); + console.log('AudioMixer: Woken up, checking streams again...'); continue; } - // Get contributions from all streams in parallel - const streamList = Array.from(this.streams); - const tasks = streamList.map((stream) => { - const buffer = this.buffers.get(stream) || new Int16Array(0); - return this.getContribution(stream, buffer); - }); + console.log(`AudioMixer: Processing ${this.streams.size} streams...`); + + // Process all streams in parallel + const streamArray = Array.from(this.streams); + const promises = streamArray.map((stream) => this.getContribution(stream)); + const results = await Promise.all( + promises.map((p) => + p + .then((value) => ({ status: 'fulfilled' as const, value })) + .catch((reason) => ({ status: 'rejected' as const, reason })), + ), + ); - const results = await Promise.allSettled(tasks); const contributions: Int16Array[] = []; let anyData = false; const removals: AudioStream[] = []; for (const result of results) { if (result.status !== 'fulfilled') { + console.warn('AudioMixer: Stream contribution failed:', result.reason); continue; } const contrib = result.value; - if (!contrib) continue; - - // Convert Int16 to Float32 for mixing - const float32Data = new Float32Array(contrib.data.length); - for (let i = 0; i < contrib.data.length; i++) { - float32Data[i] = contrib.data[i]!; - } - contributions.push(float32Data as unknown as Int16Array); - + contributions.push(contrib.data); this.buffers.set(contrib.stream, contrib.buffer); if (contrib.hadData) { @@ -201,6 +323,7 @@ export class AudioMixer { } if (contrib.exhausted && contrib.buffer.length === 0) { + console.log('AudioMixer: Stream exhausted, will be removed'); removals.push(contrib.stream); } } @@ -211,119 +334,161 @@ export class AudioMixer { } if (!anyData) { + console.log('AudioMixer: No data from any stream, sleeping...'); await this.sleep(1); continue; } - // Mix all contributions by summing - const mixed = new Float32Array(this.chunkSize * this.numChannels); - for (const contribution of contributions) { - for (let i = 0; i < mixed.length; i++) { - mixed[i] = (mixed[i] || 0) + ((contribution as unknown as Float32Array)[i] || 0); - } - } + console.log(`AudioMixer: Mixing ${contributions.length} contributions`); - // Clip and convert back to Int16 - const int16Data = new Int16Array(mixed.length); - for (let i = 0; i < mixed.length; i++) { - const clipped = Math.max(-32768, Math.min(32767, mixed[i]!)); - int16Data[i] = Math.round(clipped); - } + // Mix the audio data + const mixed = this.mixAudio(contributions); + const frame = new AudioFrame(mixed, this.sampleRate, this.numChannels, this.chunkSize); - const frame = new AudioFrame(int16Data, this.sampleRate, this.numChannels, this.chunkSize); + if (this.closed) { + console.log('AudioMixer: Mixer closed, exiting'); + break; + } - // Add to queue if there's space - if (this.queue.length < this.capacity) { - this.queue.push(frame); - // Notify waiting consumers - const resolver = this.queueResolvers.shift(); - if (resolver) { - resolver(); - } + try { + // Add to queue + await this.queue.put(frame); + console.log(`AudioMixer: Frame added to queue (queue size: ${this.queue.length})`); + } catch { + console.log('AudioMixer: Queue closed while trying to add frame, exiting'); + break; } } - // Signal end of stream - this.queueResolvers.forEach((resolve) => resolve()); - this.queueResolvers = []; + // Close the queue to signal end of stream + console.log('AudioMixer: Closing queue'); + this.queue.close(); } - private async getContribution( - stream: AudioStream, - buf: Int16Array, - ): Promise { - let hadData = buf.length > 0; + private async getContribution(stream: AudioStream): Promise { + let buf = this.buffers.get(stream) ?? new Int16Array(0); + const initialBufferLength = buf.length; let exhausted = false; - let buffer = buf; + let receivedDataInThisCall = false; - const samplesNeeded = this.chunkSize * this.numChannels; + console.log(`AudioMixer: getContribution - initial buffer length: ${buf.length}, needed: ${this.chunkSize * this.numChannels}`); - while (buffer.length < samplesNeeded && !exhausted) { + // Get or create iterator for this stream + let iterator = this.streamIterators.get(stream); + if (!iterator) { + console.log('AudioMixer: Creating new iterator for stream'); + iterator = stream[Symbol.asyncIterator](); + this.streamIterators.set(stream, iterator); + } + + // Accumulate data until we have at least chunkSize samples + let iterCount = 0; + while (buf.length < this.chunkSize * this.numChannels && !exhausted && !this.closed) { + iterCount++; try { - const frame = await this.timeoutPromise( - stream.next(), - this.streamTimeoutMs, - ); + console.log(`AudioMixer: Calling iterator.next() (iteration ${iterCount})...`); + const result = await Promise.race([iterator.next(), this.timeout(this.streamTimeoutMs)]); + + if (result === 'timeout') { + console.warn(`AudioMixer: stream timeout after ${this.streamTimeoutMs}ms`); + break; + } - if (frame.done) { + if (result.done) { + console.log('AudioMixer: Stream iterator done (exhausted)'); exhausted = true; break; } - const audioFrame = frame.value; - const newData = audioFrame.data; + const frame = result.value; + const newData = frame.data; + console.log(`AudioMixer: Received frame with ${newData.length} samples`); + + // Mark that we received data in this call + receivedDataInThisCall = true; // Concatenate buffers - const combined = new Int16Array(buffer.length + newData.length); - combined.set(buffer, 0); - combined.set(newData, buffer.length); - buffer = combined; - hadData = true; - } catch (error) { - if (error instanceof Error && error.message === 'timeout') { - log.warn(`AudioMixer: stream timeout, ignoring`); - break; + if (buf.length === 0) { + buf = newData; + } else { + const combined = new Int16Array(buf.length + newData.length); + combined.set(buf); + combined.set(newData, buf.length); + buf = combined; } - // Stream ended + console.log(`AudioMixer: Buffer now has ${buf.length} samples`); + } catch (error) { + console.error(`AudioMixer: Error reading from stream:`, error); exhausted = true; break; } } + // Extract contribution and update buffer let contrib: Int16Array; - if (buffer.length >= samplesNeeded) { - contrib = buffer.slice(0, samplesNeeded); - buffer = buffer.slice(samplesNeeded); + const samplesNeeded = this.chunkSize * this.numChannels; + + if (buf.length >= samplesNeeded) { + contrib = buf.subarray(0, samplesNeeded); + buf = buf.subarray(samplesNeeded); + console.log(`AudioMixer: Extracted ${samplesNeeded} samples, ${buf.length} remaining in buffer`); } else { // Pad with zeros if we don't have enough data const padded = new Int16Array(samplesNeeded); - padded.set(buffer, 0); + padded.set(buf); contrib = padded; - buffer = new Int16Array(0); + buf = new Int16Array(0); + console.log(`AudioMixer: Padded contribution (had ${buf.length} samples, needed ${samplesNeeded})`); } + // hadData means: we had data at start OR we received data during this call OR we have data remaining + const hadData = initialBufferLength > 0 || receivedDataInThisCall || buf.length > 0; + return { stream, data: contrib, - buffer, + buffer: buf, hadData, exhausted, }; } - private async timeoutPromise( - promise: Promise, - timeoutMs: number, - ): Promise { - return Promise.race([ - promise, - new Promise((_, reject) => - setTimeout(() => reject(new Error('timeout')), timeoutMs), - ), - ]); + private mixAudio(contributions: Int16Array[]): Int16Array { + if (contributions.length === 0) { + return new Int16Array(this.chunkSize * this.numChannels); + } + + const length = this.chunkSize * this.numChannels; + const mixed = new Int16Array(length); + + // Sum all contributions + for (const contrib of contributions) { + for (let i = 0; i < length; i++) { + const val = contrib[i]; + if (val !== undefined) { + mixed[i] = (mixed[i] ?? 0) + val; + } + } + } + + // Clip to Int16 range + for (let i = 0; i < length; i++) { + const val = mixed[i] ?? 0; + if (val > 32767) { + mixed[i] = 32767; + } else if (val < -32768) { + mixed[i] = -32768; + } + } + + return mixed; } private sleep(ms: number): Promise { return new Promise((resolve) => setTimeout(resolve, ms)); } + + private timeout(ms: number): Promise<'timeout'> { + return new Promise((resolve) => setTimeout(() => resolve('timeout'), ms)); + } } diff --git a/packages/livekit-rtc/src/index.ts b/packages/livekit-rtc/src/index.ts index 4304d5ab..7e5d6433 100644 --- a/packages/livekit-rtc/src/index.ts +++ b/packages/livekit-rtc/src/index.ts @@ -8,7 +8,7 @@ export { AudioSource } from './audio_source.js'; export { AudioStream } from './audio_stream.js'; export type { NoiseCancellationOptions } from './audio_stream.js'; export { AudioFilter } from './audio_filter.js'; -export { AudioMixer } from './audio_mixer.js'; +export { AudioMixer, type AudioMixerOptions } from './audio_mixer.js'; export * from './data_streams/index.js'; export { E2EEManager, FrameCryptor, KeyProvider } from './e2ee.js'; export type { E2EEOptions, KeyProviderOptions } from './e2ee.js'; From f9b5b092ce9ad540c0dd6374a75df6ce4920ebe6 Mon Sep 17 00:00:00 2001 From: simon Date: Thu, 13 Nov 2025 14:27:21 +0100 Subject: [PATCH 3/7] prettier + changeset --- .changeset/plenty-garlics-watch.md | 5 +++++ packages/livekit-rtc/src/audio_mixer.test.ts | 19 +++------------- packages/livekit-rtc/src/audio_mixer.ts | 23 +++++++++++++------- 3 files changed, 23 insertions(+), 24 deletions(-) create mode 100644 .changeset/plenty-garlics-watch.md diff --git a/.changeset/plenty-garlics-watch.md b/.changeset/plenty-garlics-watch.md new file mode 100644 index 00000000..8d391586 --- /dev/null +++ b/.changeset/plenty-garlics-watch.md @@ -0,0 +1,5 @@ +--- +'@livekit/rtc-node': patch +--- + +added AsyncQueue-based AudioMixer \ No newline at end of file diff --git a/packages/livekit-rtc/src/audio_mixer.test.ts b/packages/livekit-rtc/src/audio_mixer.test.ts index b66b7a5e..45cc9ffd 100644 --- a/packages/livekit-rtc/src/audio_mixer.test.ts +++ b/packages/livekit-rtc/src/audio_mixer.test.ts @@ -1,7 +1,6 @@ -// SPDX-FileCopyrightText: 2024 LiveKit, Inc. +// SPDX-FileCopyrightText: 2025 LiveKit, Inc. // // SPDX-License-Identifier: Apache-2.0 - import { describe, expect, it } from 'vitest'; import { AudioFrame } from './audio_frame.js'; import { AudioMixer } from './audio_mixer.js'; @@ -121,20 +120,8 @@ describe('AudioMixer', () => { }); // Create streams with values that will overflow - const stream1 = createMockAudioStream( - 2, - sampleRate, - numChannels, - samplesPerChannel, - 20000, - ); - const stream2 = createMockAudioStream( - 2, - sampleRate, - numChannels, - samplesPerChannel, - 20000, - ); + const stream1 = createMockAudioStream(2, sampleRate, numChannels, samplesPerChannel, 20000); + const stream2 = createMockAudioStream(2, sampleRate, numChannels, samplesPerChannel, 20000); mixer.addStream(stream1); mixer.addStream(stream2); diff --git a/packages/livekit-rtc/src/audio_mixer.ts b/packages/livekit-rtc/src/audio_mixer.ts index 9af57115..63ede203 100644 --- a/packages/livekit-rtc/src/audio_mixer.ts +++ b/packages/livekit-rtc/src/audio_mixer.ts @@ -1,7 +1,6 @@ // SPDX-FileCopyrightText: 2025 LiveKit, Inc. // // SPDX-License-Identifier: Apache-2.0 - import { AudioFrame } from './audio_frame.js'; class AsyncQueue { @@ -13,10 +12,10 @@ class AsyncQueue { constructor(private capacity: number = Infinity) {} async put(item: T) { - if (this.closed) throw new Error("Queue closed"); + if (this.closed) throw new Error('Queue closed'); while (this.items.length >= this.capacity) { - await new Promise(resolve => this.waitingProducers.push(resolve)); + await new Promise((resolve) => this.waitingProducers.push(resolve)); } this.items.push(item); @@ -45,7 +44,7 @@ class AsyncQueue { if (this.items.length > 0 || this.closed) { return; } - await new Promise(resolve => this.waitingConsumers.push(resolve)); + await new Promise((resolve) => this.waitingConsumers.push(resolve)); } close() { @@ -272,7 +271,9 @@ export class AudioMixer { while (true) { iterationCount++; if (iterationCount % 100 === 0) { - console.log(`AudioMixer: mixer iteration ${iterationCount}, streams: ${this.streams.size}, queue: ${this.queue.length}`); + console.log( + `AudioMixer: mixer iteration ${iterationCount}, streams: ${this.streams.size}, queue: ${this.queue.length}`, + ); } // If we're in ending mode and there are no more streams, exit. @@ -371,7 +372,9 @@ export class AudioMixer { let exhausted = false; let receivedDataInThisCall = false; - console.log(`AudioMixer: getContribution - initial buffer length: ${buf.length}, needed: ${this.chunkSize * this.numChannels}`); + console.log( + `AudioMixer: getContribution - initial buffer length: ${buf.length}, needed: ${this.chunkSize * this.numChannels}`, + ); // Get or create iterator for this stream let iterator = this.streamIterators.get(stream); @@ -431,14 +434,18 @@ export class AudioMixer { if (buf.length >= samplesNeeded) { contrib = buf.subarray(0, samplesNeeded); buf = buf.subarray(samplesNeeded); - console.log(`AudioMixer: Extracted ${samplesNeeded} samples, ${buf.length} remaining in buffer`); + console.log( + `AudioMixer: Extracted ${samplesNeeded} samples, ${buf.length} remaining in buffer`, + ); } else { // Pad with zeros if we don't have enough data const padded = new Int16Array(samplesNeeded); padded.set(buf); contrib = padded; buf = new Int16Array(0); - console.log(`AudioMixer: Padded contribution (had ${buf.length} samples, needed ${samplesNeeded})`); + console.log( + `AudioMixer: Padded contribution (had ${buf.length} samples, needed ${samplesNeeded})`, + ); } // hadData means: we had data at start OR we received data during this call OR we have data remaining From 801bf8ad7a6b2cb3d6be592599dbc777a26dfa39 Mon Sep 17 00:00:00 2001 From: simon Date: Thu, 13 Nov 2025 15:55:05 +0100 Subject: [PATCH 4/7] remove console logs and and add some comments --- packages/livekit-rtc/src/audio_mixer.ts | 66 +++++++------------------ 1 file changed, 18 insertions(+), 48 deletions(-) diff --git a/packages/livekit-rtc/src/audio_mixer.ts b/packages/livekit-rtc/src/audio_mixer.ts index 63ede203..343a8e66 100644 --- a/packages/livekit-rtc/src/audio_mixer.ts +++ b/packages/livekit-rtc/src/audio_mixer.ts @@ -3,6 +3,17 @@ // SPDX-License-Identifier: Apache-2.0 import { AudioFrame } from './audio_frame.js'; +/** + * AsyncQueue is a bounded queue with async support for both producers and consumers. + * + * This queue simplifies the AudioMixer implementation by handling backpressure and + * synchronization automatically: + * - Producers can await put() until the queue has space (when queue is full) + * - Consumers can await waitForItem() until data is available (when queue is empty) + * + * This eliminates the need for manual coordination logic, polling loops, and + * complex state management throughout the rest of the codebase. + */ class AsyncQueue { private items: T[] = []; private waitingProducers: (() => void)[] = []; @@ -171,7 +182,6 @@ export class AudioMixer { throw new Error('Cannot add stream after mixer has been closed'); } - console.log(`AudioMixer: Adding stream (total streams: ${this.streams.size + 1})`); this.streams.add(stream); if (!this.buffers.has(stream)) { this.buffers.set(stream, new Int16Array(0)); @@ -191,7 +201,6 @@ export class AudioMixer { * @param stream - The audio stream to remove. */ removeStream(stream: AudioStream): void { - console.log(`AudioMixer: Removing stream (total streams: ${this.streams.size - 1})`); this.streams.delete(stream); this.buffers.delete(stream); this.streamIterators.delete(stream); @@ -247,53 +256,35 @@ export class AudioMixer { const frame = this.queue.get(); if (frame !== undefined) { - console.log(`AudioMixer: Returning frame from queue (${this.queue.length} remaining)`); return frame; } // Check if mixer is closed or ending if (this.queue.closed || (this.ending && this.streams.size === 0)) { - console.log('AudioMixer: getNextFrame returning null (closed or ending)'); return null; } // Queue is empty but mixer is still running - wait for an item to be added - console.log('AudioMixer: Queue empty, waiting for frames...'); await this.queue.waitForItem(); - console.log('AudioMixer: Woken up, checking queue again...'); } } private async mixer(): Promise { - console.log('AudioMixer: mixer() task started'); - let iterationCount = 0; - + // Main mixing loop that continuously processes streams and produces output frames while (true) { - iterationCount++; - if (iterationCount % 100 === 0) { - console.log( - `AudioMixer: mixer iteration ${iterationCount}, streams: ${this.streams.size}, queue: ${this.queue.length}`, - ); - } - - // If we're in ending mode and there are no more streams, exit. + // If we're in ending mode and there are no more streams, exit if (this.ending && this.streams.size === 0) { - console.log('AudioMixer: MIXER ENDING (no more streams)'); break; } if (this.streams.size === 0) { - console.log('AudioMixer: No streams available, waiting...'); // Wait for a stream to be added (signal queue will have an item) await this.streamSignal.waitForItem(); // Consume the signal this.streamSignal.get(); - console.log('AudioMixer: Woken up, checking streams again...'); continue; } - console.log(`AudioMixer: Processing ${this.streams.size} streams...`); - // Process all streams in parallel const streamArray = Array.from(this.streams); const promises = streamArray.map((stream) => this.getContribution(stream)); @@ -323,8 +314,8 @@ export class AudioMixer { anyData = true; } + // Mark exhausted streams with no remaining buffer for removal if (contrib.exhausted && contrib.buffer.length === 0) { - console.log('AudioMixer: Stream exhausted, will be removed'); removals.push(contrib.stream); } } @@ -335,34 +326,29 @@ export class AudioMixer { } if (!anyData) { - console.log('AudioMixer: No data from any stream, sleeping...'); + // No data available from any stream, wait briefly before trying again await this.sleep(1); continue; } - console.log(`AudioMixer: Mixing ${contributions.length} contributions`); - // Mix the audio data const mixed = this.mixAudio(contributions); const frame = new AudioFrame(mixed, this.sampleRate, this.numChannels, this.chunkSize); if (this.closed) { - console.log('AudioMixer: Mixer closed, exiting'); break; } try { - // Add to queue + // Add mixed frame to output queue await this.queue.put(frame); - console.log(`AudioMixer: Frame added to queue (queue size: ${this.queue.length})`); } catch { - console.log('AudioMixer: Queue closed while trying to add frame, exiting'); + // Queue closed while trying to add frame break; } } // Close the queue to signal end of stream - console.log('AudioMixer: Closing queue'); this.queue.close(); } @@ -372,24 +358,16 @@ export class AudioMixer { let exhausted = false; let receivedDataInThisCall = false; - console.log( - `AudioMixer: getContribution - initial buffer length: ${buf.length}, needed: ${this.chunkSize * this.numChannels}`, - ); - // Get or create iterator for this stream let iterator = this.streamIterators.get(stream); if (!iterator) { - console.log('AudioMixer: Creating new iterator for stream'); iterator = stream[Symbol.asyncIterator](); this.streamIterators.set(stream, iterator); } // Accumulate data until we have at least chunkSize samples - let iterCount = 0; while (buf.length < this.chunkSize * this.numChannels && !exhausted && !this.closed) { - iterCount++; try { - console.log(`AudioMixer: Calling iterator.next() (iteration ${iterCount})...`); const result = await Promise.race([iterator.next(), this.timeout(this.streamTimeoutMs)]); if (result === 'timeout') { @@ -398,14 +376,12 @@ export class AudioMixer { } if (result.done) { - console.log('AudioMixer: Stream iterator done (exhausted)'); exhausted = true; break; } const frame = result.value; const newData = frame.data; - console.log(`AudioMixer: Received frame with ${newData.length} samples`); // Mark that we received data in this call receivedDataInThisCall = true; @@ -419,7 +395,6 @@ export class AudioMixer { combined.set(newData, buf.length); buf = combined; } - console.log(`AudioMixer: Buffer now has ${buf.length} samples`); } catch (error) { console.error(`AudioMixer: Error reading from stream:`, error); exhausted = true; @@ -432,20 +407,15 @@ export class AudioMixer { const samplesNeeded = this.chunkSize * this.numChannels; if (buf.length >= samplesNeeded) { + // Extract the needed samples and keep the remainder in the buffer contrib = buf.subarray(0, samplesNeeded); buf = buf.subarray(samplesNeeded); - console.log( - `AudioMixer: Extracted ${samplesNeeded} samples, ${buf.length} remaining in buffer`, - ); } else { // Pad with zeros if we don't have enough data const padded = new Int16Array(samplesNeeded); padded.set(buf); contrib = padded; buf = new Int16Array(0); - console.log( - `AudioMixer: Padded contribution (had ${buf.length} samples, needed ${samplesNeeded})`, - ); } // hadData means: we had data at start OR we received data during this call OR we have data remaining From cbf09e84908ee42886c04c7024f120076842a5ed Mon Sep 17 00:00:00 2001 From: simon Date: Thu, 13 Nov 2025 16:15:59 +0100 Subject: [PATCH 5/7] use deque --- packages/livekit-rtc/package.json | 5 +++-- packages/livekit-rtc/src/audio_mixer.ts | 25 +++++++++++++------------ pnpm-lock.yaml | 8 ++++++++ 3 files changed, 24 insertions(+), 14 deletions(-) diff --git a/packages/livekit-rtc/package.json b/packages/livekit-rtc/package.json index be6883b2..4effa94c 100644 --- a/packages/livekit-rtc/package.json +++ b/packages/livekit-rtc/package.json @@ -47,18 +47,19 @@ }, "dependencies": { "@bufbuild/protobuf": "^1.10.1", + "@datastructures-js/deque": "^1.0.8", "@livekit/mutex": "^1.0.0", "@livekit/typed-emitter": "^3.0.0", "pino": "^9.0.0", "pino-pretty": "^13.0.0" }, "devDependencies": { + "@bufbuild/protoc-gen-es": "^1.10.1", "@napi-rs/cli": "^2.18.0", "@types/node": "^22.13.10", "prettier": "^3.0.3", "tsup": "^8.3.5", - "typescript": "5.8.2", - "@bufbuild/protoc-gen-es": "^1.10.1" + "typescript": "5.8.2" }, "optionalDependencies": { "@livekit/rtc-node-darwin-arm64": "workspace:*", diff --git a/packages/livekit-rtc/src/audio_mixer.ts b/packages/livekit-rtc/src/audio_mixer.ts index 343a8e66..b353b28a 100644 --- a/packages/livekit-rtc/src/audio_mixer.ts +++ b/packages/livekit-rtc/src/audio_mixer.ts @@ -1,6 +1,7 @@ // SPDX-FileCopyrightText: 2025 LiveKit, Inc. // // SPDX-License-Identifier: Apache-2.0 +import { Deque } from '@datastructures-js/deque'; import { AudioFrame } from './audio_frame.js'; /** @@ -16,8 +17,8 @@ import { AudioFrame } from './audio_frame.js'; */ class AsyncQueue { private items: T[] = []; - private waitingProducers: (() => void)[] = []; - private waitingConsumers: (() => void)[] = []; + private waitingProducers = new Deque<() => void>(); + private waitingConsumers = new Deque<() => void>(); closed = false; constructor(private capacity: number = Infinity) {} @@ -26,22 +27,22 @@ class AsyncQueue { if (this.closed) throw new Error('Queue closed'); while (this.items.length >= this.capacity) { - await new Promise((resolve) => this.waitingProducers.push(resolve)); + await new Promise((resolve) => this.waitingProducers.pushBack(resolve)); } this.items.push(item); // Wake up one waiting consumer - if (this.waitingConsumers.length > 0) { - const resolve = this.waitingConsumers.shift()!; + if (this.waitingConsumers.size() > 0) { + const resolve = this.waitingConsumers.popFront()!; resolve(); } } get(): T | undefined { const item = this.items.shift(); - if (this.waitingProducers.length > 0) { - const resolve = this.waitingProducers.shift()!; + if (this.waitingProducers.size() > 0) { + const resolve = this.waitingProducers.popFront()!; resolve(); // wakes up one waiting producer } return item; @@ -55,15 +56,15 @@ class AsyncQueue { if (this.items.length > 0 || this.closed) { return; } - await new Promise((resolve) => this.waitingConsumers.push(resolve)); + await new Promise((resolve) => this.waitingConsumers.pushBack(resolve)); } close() { this.closed = true; - for (const resolve of this.waitingProducers) resolve(); - for (const resolve of this.waitingConsumers) resolve(); - this.waitingProducers = []; - this.waitingConsumers = []; + this.waitingProducers.toArray().forEach((resolve) => resolve()); + this.waitingConsumers.toArray().forEach((resolve) => resolve()); + this.waitingProducers.clear(); + this.waitingConsumers.clear(); } get length() { diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 0dac0036..1fceed8c 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -203,6 +203,9 @@ importers: '@bufbuild/protobuf': specifier: ^1.10.1 version: 1.10.1 + '@datastructures-js/deque': + specifier: ^1.0.8 + version: 1.0.8 '@livekit/mutex': specifier: ^1.0.0 version: 1.1.1 @@ -426,6 +429,9 @@ packages: '@changesets/write@0.4.0': resolution: {integrity: sha512-CdTLvIOPiCNuH71pyDu3rA+Q0n65cmAbXnwWH84rKGiFumFzkmHNT8KHTMEchcxN+Kl8I54xGUhJ7l3E7X396Q==} + '@datastructures-js/deque@1.0.8': + resolution: {integrity: sha512-PSBhJ2/SmeRPRHuBv7i/fHWIdSC3JTyq56qb+Rq0wjOagi0/fdV5/B/3Md5zFZus/W6OkSPMaxMKKMNMrSmubg==} + '@edge-runtime/primitives@6.0.0': resolution: {integrity: sha512-FqoxaBT+prPBHBwE1WXS1ocnu/VLTQyZ6NMUBAdbP7N2hsFTTxMC/jMu2D/8GAlMQfxeuppcPuCUk/HO3fpIvA==} engines: {node: '>=18'} @@ -4043,6 +4049,8 @@ snapshots: human-id: 4.1.1 prettier: 2.8.8 + '@datastructures-js/deque@1.0.8': {} + '@edge-runtime/primitives@6.0.0': {} '@edge-runtime/vm@5.0.0': From 97d349235fdbf5678f71f7a7f3d284e5b3b61eab Mon Sep 17 00:00:00 2001 From: simon Date: Wed, 19 Nov 2025 15:08:23 +0100 Subject: [PATCH 6/7] add test for async_queue, fix close, and pin deque --- packages/livekit-rtc/package.json | 2 +- packages/livekit-rtc/src/async_queue.test.ts | 250 +++++++++++++++++++ packages/livekit-rtc/src/async_queue.ts | 78 ++++++ packages/livekit-rtc/src/audio_mixer.ts | 71 +----- pnpm-lock.yaml | 2 +- 5 files changed, 333 insertions(+), 70 deletions(-) create mode 100644 packages/livekit-rtc/src/async_queue.test.ts create mode 100644 packages/livekit-rtc/src/async_queue.ts diff --git a/packages/livekit-rtc/package.json b/packages/livekit-rtc/package.json index 4effa94c..521a5820 100644 --- a/packages/livekit-rtc/package.json +++ b/packages/livekit-rtc/package.json @@ -47,7 +47,7 @@ }, "dependencies": { "@bufbuild/protobuf": "^1.10.1", - "@datastructures-js/deque": "^1.0.8", + "@datastructures-js/deque": "1.0.8", "@livekit/mutex": "^1.0.0", "@livekit/typed-emitter": "^3.0.0", "pino": "^9.0.0", diff --git a/packages/livekit-rtc/src/async_queue.test.ts b/packages/livekit-rtc/src/async_queue.test.ts new file mode 100644 index 00000000..8a2fb106 --- /dev/null +++ b/packages/livekit-rtc/src/async_queue.test.ts @@ -0,0 +1,250 @@ +// SPDX-FileCopyrightText: 2025 LiveKit, Inc. +// +// SPDX-License-Identifier: Apache-2.0 +import { describe, expect, it } from 'vitest'; +import { AsyncQueue } from './async_queue.js'; + +describe('AsyncQueue', () => { + it('allows basic put and get operations', async () => { + const queue = new AsyncQueue(); + + await queue.put(1); + await queue.put(2); + await queue.put(3); + + expect(queue.get()).toBe(1); + expect(queue.get()).toBe(2); + expect(queue.get()).toBe(3); + expect(queue.get()).toBe(undefined); + }); + + it('respects capacity limits', async () => { + const queue = new AsyncQueue(2); + + // Fill the queue to capacity + await queue.put(1); + await queue.put(2); + + // Try to put a third item - this should block + let putCompleted = false; + const putPromise = queue.put(3).then(() => { + putCompleted = true; + }); + + // Wait a bit to ensure put() is blocked + await new Promise((resolve) => setTimeout(resolve, 10)); + expect(putCompleted).toBe(false); + + // Get an item to make space + expect(queue.get()).toBe(1); + + // Now the put should complete + await putPromise; + expect(putCompleted).toBe(true); + + expect(queue.get()).toBe(2); + expect(queue.get()).toBe(3); + }); + + it('blocks consumers when queue is empty', async () => { + const queue = new AsyncQueue(); + + // Start waiting for an item + let itemAvailable = false; + const waitPromise = queue.waitForItem().then(() => { + itemAvailable = true; + }); + + // Wait a bit to ensure waitForItem() is blocked + await new Promise((resolve) => setTimeout(resolve, 10)); + expect(itemAvailable).toBe(false); + + // Put an item + await queue.put(42); + + // Now waitForItem should resolve + await waitPromise; + expect(itemAvailable).toBe(true); + + expect(queue.get()).toBe(42); + }); + + it('returns immediately from waitForItem if items exist', async () => { + const queue = new AsyncQueue(); + + await queue.put(1); + await queue.put(2); + + // Should return immediately since items are available + await queue.waitForItem(); + expect(queue.get()).toBe(1); + }); + + it('handles close correctly', async () => { + const queue = new AsyncQueue(); + + // Add some items + await queue.put(1); + await queue.put(2); + + // Close the queue + queue.close(); + + // Should be able to get existing items + expect(queue.get()).toBe(1); + expect(queue.get()).toBe(2); + + // Trying to put should throw + await expect(queue.put(3)).rejects.toThrow('Queue closed'); + + // waitForItem should return immediately when closed + await queue.waitForItem(); + expect(queue.closed).toBe(true); + }); + + it('wakes up waiting producers when closed', async () => { + const queue = new AsyncQueue(1); + + // Fill the queue + await queue.put(1); + + // Try to put another item (will block) + let putRejected = false; + const putPromise = queue.put(2).catch(() => { + putRejected = true; + }); + + // Wait a bit + await new Promise((resolve) => setTimeout(resolve, 10)); + + // Close the queue + queue.close(); + + // The blocked put should reject + await putPromise; + expect(putRejected).toBe(true); + }); + + it('wakes up waiting consumers when closed', async () => { + const queue = new AsyncQueue(); + + // Start waiting for an item + const waitPromise = queue.waitForItem(); + + // Wait a bit + await new Promise((resolve) => setTimeout(resolve, 10)); + + // Close the queue + queue.close(); + + // waitForItem should resolve + await waitPromise; + expect(queue.closed).toBe(true); + }); + + it('handles multiple waiting producers', async () => { + const queue = new AsyncQueue(1); + + // Fill the queue + await queue.put(1); + + // Start multiple producers waiting + const put2 = queue.put(2); + const put3 = queue.put(3); + + // Get items to allow producers to proceed + expect(queue.get()).toBe(1); + await put2; + expect(queue.get()).toBe(2); + await put3; + expect(queue.get()).toBe(3); + }); + + it('handles multiple waiting consumers', async () => { + const queue = new AsyncQueue(); + + // Start multiple consumers waiting + const wait1 = queue.waitForItem(); + const wait2 = queue.waitForItem(); + + // Put items + await queue.put(1); + await queue.put(2); + + // Both waits should resolve + await Promise.all([wait1, wait2]); + + expect(queue.length).toBe(2); + }); + + it('reports length correctly', async () => { + const queue = new AsyncQueue(); + + expect(queue.length).toBe(0); + + await queue.put(1); + expect(queue.length).toBe(1); + + await queue.put(2); + expect(queue.length).toBe(2); + + queue.get(); + expect(queue.length).toBe(1); + + queue.get(); + expect(queue.length).toBe(0); + }); + + it('handles unbounded queue (infinite capacity)', async () => { + const queue = new AsyncQueue(); // No capacity specified + + // Should be able to add many items without blocking + for (let i = 0; i < 1000; i++) { + await queue.put(i); + } + + expect(queue.length).toBe(1000); + + // Get them all back + for (let i = 0; i < 1000; i++) { + expect(queue.get()).toBe(i); + } + + expect(queue.length).toBe(0); + }); + + it('handles concurrent put and get operations', async () => { + const queue = new AsyncQueue(5); + + const consumed: number[] = []; + + // Start concurrent producers + const producers = Array.from({ length: 10 }, (_, i) => + (async () => { + await queue.put(i); + })(), + ); + + // Start concurrent consumers - each consumer tries to get items until queue is empty + const consumers = Array.from({ length: 10 }, () => + (async () => { + while (true) { + await queue.waitForItem(); + const item = queue.get(); + if (item !== undefined) { + consumed.push(item); + break; // Each consumer gets one item + } + // If item is undefined, another consumer got it first, try again + } + })(), + ); + + // Wait for all to complete + await Promise.all([...producers, ...consumers]); + + // Should have consumed all items + expect(consumed.length).toBe(10); + expect(consumed.sort((a, b) => a - b)).toEqual([0, 1, 2, 3, 4, 5, 6, 7, 8, 9]); + }); +}); diff --git a/packages/livekit-rtc/src/async_queue.ts b/packages/livekit-rtc/src/async_queue.ts new file mode 100644 index 00000000..2aaf1105 --- /dev/null +++ b/packages/livekit-rtc/src/async_queue.ts @@ -0,0 +1,78 @@ +// SPDX-FileCopyrightText: 2025 LiveKit, Inc. +// +// SPDX-License-Identifier: Apache-2.0 +import { Deque } from '@datastructures-js/deque'; + +/** + * AsyncQueue is a bounded queue with async support for both producers and consumers. + * + * This queue simplifies the AudioMixer implementation by handling backpressure and + * synchronization automatically: + * - Producers can await put() until the queue has space (when queue is full) + * - Consumers can await waitForItem() until data is available (when queue is empty) + * + * This eliminates the need for manual coordination logic, polling loops, and + * complex state management throughout the rest of the codebase. + */ +export class AsyncQueue { + private items: T[] = []; + private waitingProducers = new Deque<{ resolve: () => void; reject: (err: Error) => void }>(); + private waitingConsumers = new Deque<() => void>(); + closed = false; + + constructor(private capacity: number = Infinity) {} + + async put(item: T) { + if (this.closed) throw new Error('Queue closed'); + + while (this.items.length >= this.capacity) { + await new Promise((resolve, reject) => + this.waitingProducers.pushBack({ resolve, reject }), + ); + // Re-check if closed after waking up + if (this.closed) throw new Error('Queue closed'); + } + + this.items.push(item); + + // Wake up one waiting consumer + if (this.waitingConsumers.size() > 0) { + const resolve = this.waitingConsumers.popFront()!; + resolve(); + } + } + + get(): T | undefined { + const item = this.items.shift(); + if (this.waitingProducers.size() > 0) { + const producer = this.waitingProducers.popFront()!; + producer.resolve(); // wakes up one waiting producer + } + return item; + } + + /** + * Wait until an item is available or the queue is closed. + * Returns immediately if items are already available. + */ + async waitForItem(): Promise { + if (this.items.length > 0 || this.closed) { + return; + } + await new Promise((resolve) => this.waitingConsumers.pushBack(resolve)); + } + + close() { + this.closed = true; + // Reject all waiting producers with an error + this.waitingProducers.toArray().forEach((producer) => producer.reject(new Error('Queue closed'))); + // Resolve all waiting consumers so they can see the queue is closed + this.waitingConsumers.toArray().forEach((resolve) => resolve()); + this.waitingProducers.clear(); + this.waitingConsumers.clear(); + } + + get length() { + return this.items.length; + } +} diff --git a/packages/livekit-rtc/src/audio_mixer.ts b/packages/livekit-rtc/src/audio_mixer.ts index b353b28a..7f63a446 100644 --- a/packages/livekit-rtc/src/audio_mixer.ts +++ b/packages/livekit-rtc/src/audio_mixer.ts @@ -1,76 +1,11 @@ // SPDX-FileCopyrightText: 2025 LiveKit, Inc. // // SPDX-License-Identifier: Apache-2.0 -import { Deque } from '@datastructures-js/deque'; import { AudioFrame } from './audio_frame.js'; +import { AsyncQueue } from './async_queue.js'; -/** - * AsyncQueue is a bounded queue with async support for both producers and consumers. - * - * This queue simplifies the AudioMixer implementation by handling backpressure and - * synchronization automatically: - * - Producers can await put() until the queue has space (when queue is full) - * - Consumers can await waitForItem() until data is available (when queue is empty) - * - * This eliminates the need for manual coordination logic, polling loops, and - * complex state management throughout the rest of the codebase. - */ -class AsyncQueue { - private items: T[] = []; - private waitingProducers = new Deque<() => void>(); - private waitingConsumers = new Deque<() => void>(); - closed = false; - - constructor(private capacity: number = Infinity) {} - - async put(item: T) { - if (this.closed) throw new Error('Queue closed'); - - while (this.items.length >= this.capacity) { - await new Promise((resolve) => this.waitingProducers.pushBack(resolve)); - } - - this.items.push(item); - - // Wake up one waiting consumer - if (this.waitingConsumers.size() > 0) { - const resolve = this.waitingConsumers.popFront()!; - resolve(); - } - } - - get(): T | undefined { - const item = this.items.shift(); - if (this.waitingProducers.size() > 0) { - const resolve = this.waitingProducers.popFront()!; - resolve(); // wakes up one waiting producer - } - return item; - } - - /** - * Wait until an item is available or the queue is closed. - * Returns immediately if items are already available. - */ - async waitForItem(): Promise { - if (this.items.length > 0 || this.closed) { - return; - } - await new Promise((resolve) => this.waitingConsumers.pushBack(resolve)); - } - - close() { - this.closed = true; - this.waitingProducers.toArray().forEach((resolve) => resolve()); - this.waitingConsumers.toArray().forEach((resolve) => resolve()); - this.waitingProducers.clear(); - this.waitingConsumers.clear(); - } - - get length() { - return this.items.length; - } -} +// Re-export AsyncQueue for backward compatibility +export { AsyncQueue } from './async_queue.js'; // Define types for async iteration (since lib: es2015 doesn't include them) type AudioStream = { diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 1fceed8c..3b2a5da8 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -204,7 +204,7 @@ importers: specifier: ^1.10.1 version: 1.10.1 '@datastructures-js/deque': - specifier: ^1.0.8 + specifier: 1.0.8 version: 1.0.8 '@livekit/mutex': specifier: ^1.0.0 From 511c4f1f2b3863cbeea27f83aee7bdbdbc48d63c Mon Sep 17 00:00:00 2001 From: simon Date: Wed, 19 Nov 2025 16:22:18 +0100 Subject: [PATCH 7/7] lint --- packages/livekit-rtc/src/async_queue.ts | 4 +++- packages/livekit-rtc/src/audio_mixer.ts | 2 +- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/packages/livekit-rtc/src/async_queue.ts b/packages/livekit-rtc/src/async_queue.ts index 2aaf1105..d3d0360a 100644 --- a/packages/livekit-rtc/src/async_queue.ts +++ b/packages/livekit-rtc/src/async_queue.ts @@ -65,7 +65,9 @@ export class AsyncQueue { close() { this.closed = true; // Reject all waiting producers with an error - this.waitingProducers.toArray().forEach((producer) => producer.reject(new Error('Queue closed'))); + this.waitingProducers + .toArray() + .forEach((producer) => producer.reject(new Error('Queue closed'))); // Resolve all waiting consumers so they can see the queue is closed this.waitingConsumers.toArray().forEach((resolve) => resolve()); this.waitingProducers.clear(); diff --git a/packages/livekit-rtc/src/audio_mixer.ts b/packages/livekit-rtc/src/audio_mixer.ts index 7f63a446..71885b6a 100644 --- a/packages/livekit-rtc/src/audio_mixer.ts +++ b/packages/livekit-rtc/src/audio_mixer.ts @@ -1,8 +1,8 @@ // SPDX-FileCopyrightText: 2025 LiveKit, Inc. // // SPDX-License-Identifier: Apache-2.0 -import { AudioFrame } from './audio_frame.js'; import { AsyncQueue } from './async_queue.js'; +import { AudioFrame } from './audio_frame.js'; // Re-export AsyncQueue for backward compatibility export { AsyncQueue } from './async_queue.js';