diff --git a/.changeset/plenty-garlics-watch.md b/.changeset/plenty-garlics-watch.md new file mode 100644 index 00000000..8d391586 --- /dev/null +++ b/.changeset/plenty-garlics-watch.md @@ -0,0 +1,5 @@ +--- +'@livekit/rtc-node': patch +--- + +added AsyncQueue-based AudioMixer \ No newline at end of file diff --git a/packages/livekit-rtc/package.json b/packages/livekit-rtc/package.json index be6883b2..521a5820 100644 --- a/packages/livekit-rtc/package.json +++ b/packages/livekit-rtc/package.json @@ -47,18 +47,19 @@ }, "dependencies": { "@bufbuild/protobuf": "^1.10.1", + "@datastructures-js/deque": "1.0.8", "@livekit/mutex": "^1.0.0", "@livekit/typed-emitter": "^3.0.0", "pino": "^9.0.0", "pino-pretty": "^13.0.0" }, "devDependencies": { + "@bufbuild/protoc-gen-es": "^1.10.1", "@napi-rs/cli": "^2.18.0", "@types/node": "^22.13.10", "prettier": "^3.0.3", "tsup": "^8.3.5", - "typescript": "5.8.2", - "@bufbuild/protoc-gen-es": "^1.10.1" + "typescript": "5.8.2" }, "optionalDependencies": { "@livekit/rtc-node-darwin-arm64": "workspace:*", diff --git a/packages/livekit-rtc/src/async_queue.test.ts b/packages/livekit-rtc/src/async_queue.test.ts new file mode 100644 index 00000000..8a2fb106 --- /dev/null +++ b/packages/livekit-rtc/src/async_queue.test.ts @@ -0,0 +1,250 @@ +// SPDX-FileCopyrightText: 2025 LiveKit, Inc. +// +// SPDX-License-Identifier: Apache-2.0 +import { describe, expect, it } from 'vitest'; +import { AsyncQueue } from './async_queue.js'; + +describe('AsyncQueue', () => { + it('allows basic put and get operations', async () => { + const queue = new AsyncQueue(); + + await queue.put(1); + await queue.put(2); + await queue.put(3); + + expect(queue.get()).toBe(1); + expect(queue.get()).toBe(2); + expect(queue.get()).toBe(3); + expect(queue.get()).toBe(undefined); + }); + + it('respects capacity limits', async () => { + const queue = new AsyncQueue(2); + + // Fill the queue to capacity + await queue.put(1); + await queue.put(2); + + // Try to put a third item - this should block + let putCompleted = false; + const putPromise = queue.put(3).then(() => { + putCompleted = true; + }); + + // Wait a bit to ensure put() is blocked + await new Promise((resolve) => setTimeout(resolve, 10)); + expect(putCompleted).toBe(false); + + // Get an item to make space + expect(queue.get()).toBe(1); + + // Now the put should complete + await putPromise; + expect(putCompleted).toBe(true); + + expect(queue.get()).toBe(2); + expect(queue.get()).toBe(3); + }); + + it('blocks consumers when queue is empty', async () => { + const queue = new AsyncQueue(); + + // Start waiting for an item + let itemAvailable = false; + const waitPromise = queue.waitForItem().then(() => { + itemAvailable = true; + }); + + // Wait a bit to ensure waitForItem() is blocked + await new Promise((resolve) => setTimeout(resolve, 10)); + expect(itemAvailable).toBe(false); + + // Put an item + await queue.put(42); + + // Now waitForItem should resolve + await waitPromise; + expect(itemAvailable).toBe(true); + + expect(queue.get()).toBe(42); + }); + + it('returns immediately from waitForItem if items exist', async () => { + const queue = new AsyncQueue(); + + await queue.put(1); + await queue.put(2); + + // Should return immediately since items are available + await queue.waitForItem(); + expect(queue.get()).toBe(1); + }); + + it('handles close correctly', async () => { + const queue = new AsyncQueue(); + + // Add some items + await queue.put(1); + await queue.put(2); + + // Close the queue + queue.close(); + + // Should be able to get existing items + expect(queue.get()).toBe(1); + expect(queue.get()).toBe(2); + + // Trying to put should throw + await expect(queue.put(3)).rejects.toThrow('Queue closed'); + + // waitForItem should return immediately when closed + await queue.waitForItem(); + expect(queue.closed).toBe(true); + }); + + it('wakes up waiting producers when closed', async () => { + const queue = new AsyncQueue(1); + + // Fill the queue + await queue.put(1); + + // Try to put another item (will block) + let putRejected = false; + const putPromise = queue.put(2).catch(() => { + putRejected = true; + }); + + // Wait a bit + await new Promise((resolve) => setTimeout(resolve, 10)); + + // Close the queue + queue.close(); + + // The blocked put should reject + await putPromise; + expect(putRejected).toBe(true); + }); + + it('wakes up waiting consumers when closed', async () => { + const queue = new AsyncQueue(); + + // Start waiting for an item + const waitPromise = queue.waitForItem(); + + // Wait a bit + await new Promise((resolve) => setTimeout(resolve, 10)); + + // Close the queue + queue.close(); + + // waitForItem should resolve + await waitPromise; + expect(queue.closed).toBe(true); + }); + + it('handles multiple waiting producers', async () => { + const queue = new AsyncQueue(1); + + // Fill the queue + await queue.put(1); + + // Start multiple producers waiting + const put2 = queue.put(2); + const put3 = queue.put(3); + + // Get items to allow producers to proceed + expect(queue.get()).toBe(1); + await put2; + expect(queue.get()).toBe(2); + await put3; + expect(queue.get()).toBe(3); + }); + + it('handles multiple waiting consumers', async () => { + const queue = new AsyncQueue(); + + // Start multiple consumers waiting + const wait1 = queue.waitForItem(); + const wait2 = queue.waitForItem(); + + // Put items + await queue.put(1); + await queue.put(2); + + // Both waits should resolve + await Promise.all([wait1, wait2]); + + expect(queue.length).toBe(2); + }); + + it('reports length correctly', async () => { + const queue = new AsyncQueue(); + + expect(queue.length).toBe(0); + + await queue.put(1); + expect(queue.length).toBe(1); + + await queue.put(2); + expect(queue.length).toBe(2); + + queue.get(); + expect(queue.length).toBe(1); + + queue.get(); + expect(queue.length).toBe(0); + }); + + it('handles unbounded queue (infinite capacity)', async () => { + const queue = new AsyncQueue(); // No capacity specified + + // Should be able to add many items without blocking + for (let i = 0; i < 1000; i++) { + await queue.put(i); + } + + expect(queue.length).toBe(1000); + + // Get them all back + for (let i = 0; i < 1000; i++) { + expect(queue.get()).toBe(i); + } + + expect(queue.length).toBe(0); + }); + + it('handles concurrent put and get operations', async () => { + const queue = new AsyncQueue(5); + + const consumed: number[] = []; + + // Start concurrent producers + const producers = Array.from({ length: 10 }, (_, i) => + (async () => { + await queue.put(i); + })(), + ); + + // Start concurrent consumers - each consumer tries to get items until queue is empty + const consumers = Array.from({ length: 10 }, () => + (async () => { + while (true) { + await queue.waitForItem(); + const item = queue.get(); + if (item !== undefined) { + consumed.push(item); + break; // Each consumer gets one item + } + // If item is undefined, another consumer got it first, try again + } + })(), + ); + + // Wait for all to complete + await Promise.all([...producers, ...consumers]); + + // Should have consumed all items + expect(consumed.length).toBe(10); + expect(consumed.sort((a, b) => a - b)).toEqual([0, 1, 2, 3, 4, 5, 6, 7, 8, 9]); + }); +}); diff --git a/packages/livekit-rtc/src/async_queue.ts b/packages/livekit-rtc/src/async_queue.ts new file mode 100644 index 00000000..d3d0360a --- /dev/null +++ b/packages/livekit-rtc/src/async_queue.ts @@ -0,0 +1,80 @@ +// SPDX-FileCopyrightText: 2025 LiveKit, Inc. +// +// SPDX-License-Identifier: Apache-2.0 +import { Deque } from '@datastructures-js/deque'; + +/** + * AsyncQueue is a bounded queue with async support for both producers and consumers. + * + * This queue simplifies the AudioMixer implementation by handling backpressure and + * synchronization automatically: + * - Producers can await put() until the queue has space (when queue is full) + * - Consumers can await waitForItem() until data is available (when queue is empty) + * + * This eliminates the need for manual coordination logic, polling loops, and + * complex state management throughout the rest of the codebase. + */ +export class AsyncQueue { + private items: T[] = []; + private waitingProducers = new Deque<{ resolve: () => void; reject: (err: Error) => void }>(); + private waitingConsumers = new Deque<() => void>(); + closed = false; + + constructor(private capacity: number = Infinity) {} + + async put(item: T) { + if (this.closed) throw new Error('Queue closed'); + + while (this.items.length >= this.capacity) { + await new Promise((resolve, reject) => + this.waitingProducers.pushBack({ resolve, reject }), + ); + // Re-check if closed after waking up + if (this.closed) throw new Error('Queue closed'); + } + + this.items.push(item); + + // Wake up one waiting consumer + if (this.waitingConsumers.size() > 0) { + const resolve = this.waitingConsumers.popFront()!; + resolve(); + } + } + + get(): T | undefined { + const item = this.items.shift(); + if (this.waitingProducers.size() > 0) { + const producer = this.waitingProducers.popFront()!; + producer.resolve(); // wakes up one waiting producer + } + return item; + } + + /** + * Wait until an item is available or the queue is closed. + * Returns immediately if items are already available. + */ + async waitForItem(): Promise { + if (this.items.length > 0 || this.closed) { + return; + } + await new Promise((resolve) => this.waitingConsumers.pushBack(resolve)); + } + + close() { + this.closed = true; + // Reject all waiting producers with an error + this.waitingProducers + .toArray() + .forEach((producer) => producer.reject(new Error('Queue closed'))); + // Resolve all waiting consumers so they can see the queue is closed + this.waitingConsumers.toArray().forEach((resolve) => resolve()); + this.waitingProducers.clear(); + this.waitingConsumers.clear(); + } + + get length() { + return this.items.length; + } +} diff --git a/packages/livekit-rtc/src/audio_mixer.test.ts b/packages/livekit-rtc/src/audio_mixer.test.ts new file mode 100644 index 00000000..45cc9ffd --- /dev/null +++ b/packages/livekit-rtc/src/audio_mixer.test.ts @@ -0,0 +1,167 @@ +// SPDX-FileCopyrightText: 2025 LiveKit, Inc. +// +// SPDX-License-Identifier: Apache-2.0 +import { describe, expect, it } from 'vitest'; +import { AudioFrame } from './audio_frame.js'; +import { AudioMixer } from './audio_mixer.js'; + +// Helper to create a mock audio stream that yields frames +async function* createMockAudioStream( + frameCount: number, + sampleRate: number, + numChannels: number, + samplesPerChannel: number, + value: number, +): AsyncGenerator { + for (let i = 0; i < frameCount; i++) { + const data = new Int16Array(numChannels * samplesPerChannel); + // Fill with a specific value + for (let j = 0; j < data.length; j++) { + data[j] = value; + } + yield new AudioFrame(data, sampleRate, numChannels, samplesPerChannel); + // Small delay to simulate real stream + await new Promise((resolve) => setTimeout(resolve, 1)); + } +} + +describe('AudioMixer', () => { + it('mixes two audio streams', async () => { + const sampleRate = 48000; + const numChannels = 1; + const samplesPerChannel = 480; // 10ms at 48kHz + const mixer = new AudioMixer(sampleRate, numChannels, { + blocksize: samplesPerChannel, + }); + + // Create two streams with different values + const stream1 = createMockAudioStream(3, sampleRate, numChannels, samplesPerChannel, 100); + const stream2 = createMockAudioStream(3, sampleRate, numChannels, samplesPerChannel, 200); + + mixer.addStream(stream1); + mixer.addStream(stream2); + + // Collect first frame + const frames: AudioFrame[] = []; + for await (const frame of mixer) { + frames.push(frame); + if (frames.length >= 1) { + break; + } + } + + await mixer.aclose(); + + expect(frames.length).toBe(1); + const frame = frames[0]!; + expect(frame.sampleRate).toBe(sampleRate); + expect(frame.channels).toBe(numChannels); + expect(frame.samplesPerChannel).toBe(samplesPerChannel); + + // Each sample should be 100 + 200 = 300 + for (let i = 0; i < frame.data.length; i++) { + expect(frame.data[i]).toBe(300); + } + }); + + it('handles stream removal', async () => { + const sampleRate = 48000; + const numChannels = 1; + const samplesPerChannel = 480; + const mixer = new AudioMixer(sampleRate, numChannels, { + blocksize: samplesPerChannel, + }); + + const stream1 = createMockAudioStream(10, sampleRate, numChannels, samplesPerChannel, 100); + const stream2 = createMockAudioStream(10, sampleRate, numChannels, samplesPerChannel, 200); + + mixer.addStream(stream1); + mixer.addStream(stream2); + + // Get one frame + const iterator = mixer[Symbol.asyncIterator](); + const result1 = await iterator.next(); + expect(result1.done).toBe(false); + expect(result1.value?.data[0]).toBe(300); + + // Remove one stream + mixer.removeStream(stream2); + + // Next frame should only have stream1's value + const result2 = await iterator.next(); + expect(result2.done).toBe(false); + // Note: there might be buffered data, so this test is simplified + + await mixer.aclose(); + }); + + it('handles empty mixer', async () => { + const sampleRate = 48000; + const numChannels = 1; + const mixer = new AudioMixer(sampleRate, numChannels); + + // Signal end without adding any streams + mixer.endInput(); + + const frames: AudioFrame[] = []; + for await (const frame of mixer) { + frames.push(frame); + } + + expect(frames.length).toBe(0); + }); + + it('clips audio values to int16 range', async () => { + const sampleRate = 48000; + const numChannels = 1; + const samplesPerChannel = 480; + const mixer = new AudioMixer(sampleRate, numChannels, { + blocksize: samplesPerChannel, + }); + + // Create streams with values that will overflow + const stream1 = createMockAudioStream(2, sampleRate, numChannels, samplesPerChannel, 20000); + const stream2 = createMockAudioStream(2, sampleRate, numChannels, samplesPerChannel, 20000); + + mixer.addStream(stream1); + mixer.addStream(stream2); + + const iterator = mixer[Symbol.asyncIterator](); + const result = await iterator.next(); + + await mixer.aclose(); + + expect(result.done).toBe(false); + // 20000 + 20000 = 40000, which should be clipped to 32767 + for (let i = 0; i < result.value!.data.length; i++) { + expect(result.value!.data[i]).toBe(32767); + } + }); + + it('handles exhausted streams', async () => { + const sampleRate = 48000; + const numChannels = 1; + const samplesPerChannel = 480; + const mixer = new AudioMixer(sampleRate, numChannels, { + blocksize: samplesPerChannel, + }); + + // Short stream + const stream = createMockAudioStream(2, sampleRate, numChannels, samplesPerChannel, 100); + + mixer.addStream(stream); + + const frames: AudioFrame[] = []; + for await (const frame of mixer) { + frames.push(frame); + if (frames.length >= 3) { + break; + } + } + + await mixer.aclose(); + + // Should get at least 2 frames (stream exhausts after 2) + expect(frames.length).toBeGreaterThanOrEqual(2); + }); +}); diff --git a/packages/livekit-rtc/src/audio_mixer.ts b/packages/livekit-rtc/src/audio_mixer.ts new file mode 100644 index 00000000..71885b6a --- /dev/null +++ b/packages/livekit-rtc/src/audio_mixer.ts @@ -0,0 +1,407 @@ +// SPDX-FileCopyrightText: 2025 LiveKit, Inc. +// +// SPDX-License-Identifier: Apache-2.0 +import { AsyncQueue } from './async_queue.js'; +import { AudioFrame } from './audio_frame.js'; + +// Re-export AsyncQueue for backward compatibility +export { AsyncQueue } from './async_queue.js'; + +// Define types for async iteration (since lib: es2015 doesn't include them) +type AudioStream = { + [Symbol.asyncIterator](): { + next(): Promise>; + }; +}; + +interface Contribution { + stream: AudioStream; + data: Int16Array; + buffer: Int16Array; + hadData: boolean; + exhausted: boolean; +} + +export interface AudioMixerOptions { + /** + * The size of the audio block (in samples) for mixing. + * If not provided, defaults to sampleRate / 10 (100ms). + */ + blocksize?: number; + + /** + * The maximum wait time in milliseconds for each stream to provide + * audio data before timing out. Defaults to 100 ms. + */ + streamTimeoutMs?: number; + + /** + * The maximum number of mixed frames to store in the output queue. + * Defaults to 100. + */ + capacity?: number; +} + +/** + * AudioMixer combines multiple async audio streams into a single output stream. + * + * The mixer accepts multiple async audio streams and mixes them into a single output stream. + * Each output frame is generated with a fixed chunk size determined by the blocksize (in samples). + * If blocksize is not provided (or 0), it defaults to 100ms. + * + * Each input stream is processed in parallel, accumulating audio data until at least one chunk + * of samples is available. If an input stream does not provide data within the specified timeout, + * a warning is logged. The mixer can be closed immediately + * (dropping unconsumed frames) or allowed to flush remaining data using endInput(). + * + * @example + * ```typescript + * const mixer = new AudioMixer(48000, 2); + * mixer.addStream(stream1); + * mixer.addStream(stream2); + * + * for await (const frame of mixer) { + * // Process mixed audio frame + * } + * ``` + */ +export class AudioMixer { + private streams: Set; + private buffers: Map; + private streamIterators: Map> }>; + private sampleRate: number; + private numChannels: number; + private chunkSize: number; + private streamTimeoutMs: number; + private queue: AsyncQueue; + private streamSignal: AsyncQueue; // Signals when streams are added + private ending: boolean; + private mixerTask?: Promise; + private closed: boolean; + + /** + * Initialize the AudioMixer. + * + * @param sampleRate - The audio sample rate in Hz. + * @param numChannels - The number of audio channels. + * @param options - Optional configuration for the mixer. + */ + constructor(sampleRate: number, numChannels: number, options: AudioMixerOptions = {}) { + this.streams = new Set(); + this.buffers = new Map(); + this.streamIterators = new Map(); + this.sampleRate = sampleRate; + this.numChannels = numChannels; + this.chunkSize = + options.blocksize && options.blocksize > 0 ? options.blocksize : Math.floor(sampleRate / 10); + this.streamTimeoutMs = options.streamTimeoutMs ?? 100; + this.queue = new AsyncQueue(options.capacity ?? 100); + this.streamSignal = new AsyncQueue(1); // there should only be one mixer + this.ending = false; + this.closed = false; + + // Start the mixer task + this.mixerTask = this.mixer(); + } + + /** + * Add an audio stream to the mixer. + * + * The stream is added to the internal set of streams and an empty buffer is initialized for it, + * if not already present. + * + * @param stream - An async iterable that produces AudioFrame objects. + * @throws Error if the mixer has been closed. + */ + addStream(stream: AudioStream): void { + if (this.ending) { + throw new Error('Cannot add stream after mixer has been closed'); + } + + this.streams.add(stream); + if (!this.buffers.has(stream)) { + this.buffers.set(stream, new Int16Array(0)); + } + + // Signal that a stream was added (non-blocking) + this.streamSignal.put(undefined).catch(() => { + // Ignore errors if signal queue is closed + }); + } + + /** + * Remove an audio stream from the mixer. + * + * This method removes the specified stream and its associated buffer from the mixer. + * + * @param stream - The audio stream to remove. + */ + removeStream(stream: AudioStream): void { + this.streams.delete(stream); + this.buffers.delete(stream); + this.streamIterators.delete(stream); + } + + /** + * Returns an async iterator for the mixed audio frames. + */ + [Symbol.asyncIterator]() { + return { + next: async (): Promise> => { + const frame = await this.getNextFrame(); + if (frame === null) { + return { done: true, value: undefined }; + } + return { done: false, value: frame }; + }, + }; + } + + /** + * Immediately stop mixing and close the mixer. + * + * This stops the mixing task, and any unconsumed output in the queue may be dropped. + */ + async aclose(): Promise { + if (this.closed) { + return; + } + this.closed = true; + this.ending = true; + + // Close both queues to wake up any waiting operations + this.streamSignal.close(); + this.queue.close(); + + await this.mixerTask; + } + + /** + * Signal that no more streams will be added. + * + * This method marks the mixer as closed so that it flushes any remaining buffered output before ending. + * Note that existing streams will still be processed until exhausted. + */ + endInput(): void { + this.ending = true; + } + + private async getNextFrame(): Promise { + while (true) { + // Try to get an item from the queue (non-blocking) + const frame = this.queue.get(); + + if (frame !== undefined) { + return frame; + } + + // Check if mixer is closed or ending + if (this.queue.closed || (this.ending && this.streams.size === 0)) { + return null; + } + + // Queue is empty but mixer is still running - wait for an item to be added + await this.queue.waitForItem(); + } + } + + private async mixer(): Promise { + // Main mixing loop that continuously processes streams and produces output frames + while (true) { + // If we're in ending mode and there are no more streams, exit + if (this.ending && this.streams.size === 0) { + break; + } + + if (this.streams.size === 0) { + // Wait for a stream to be added (signal queue will have an item) + await this.streamSignal.waitForItem(); + // Consume the signal + this.streamSignal.get(); + continue; + } + + // Process all streams in parallel + const streamArray = Array.from(this.streams); + const promises = streamArray.map((stream) => this.getContribution(stream)); + const results = await Promise.all( + promises.map((p) => + p + .then((value) => ({ status: 'fulfilled' as const, value })) + .catch((reason) => ({ status: 'rejected' as const, reason })), + ), + ); + + const contributions: Int16Array[] = []; + let anyData = false; + const removals: AudioStream[] = []; + + for (const result of results) { + if (result.status !== 'fulfilled') { + console.warn('AudioMixer: Stream contribution failed:', result.reason); + continue; + } + + const contrib = result.value; + contributions.push(contrib.data); + this.buffers.set(contrib.stream, contrib.buffer); + + if (contrib.hadData) { + anyData = true; + } + + // Mark exhausted streams with no remaining buffer for removal + if (contrib.exhausted && contrib.buffer.length === 0) { + removals.push(contrib.stream); + } + } + + // Remove exhausted streams + for (const stream of removals) { + this.removeStream(stream); + } + + if (!anyData) { + // No data available from any stream, wait briefly before trying again + await this.sleep(1); + continue; + } + + // Mix the audio data + const mixed = this.mixAudio(contributions); + const frame = new AudioFrame(mixed, this.sampleRate, this.numChannels, this.chunkSize); + + if (this.closed) { + break; + } + + try { + // Add mixed frame to output queue + await this.queue.put(frame); + } catch { + // Queue closed while trying to add frame + break; + } + } + + // Close the queue to signal end of stream + this.queue.close(); + } + + private async getContribution(stream: AudioStream): Promise { + let buf = this.buffers.get(stream) ?? new Int16Array(0); + const initialBufferLength = buf.length; + let exhausted = false; + let receivedDataInThisCall = false; + + // Get or create iterator for this stream + let iterator = this.streamIterators.get(stream); + if (!iterator) { + iterator = stream[Symbol.asyncIterator](); + this.streamIterators.set(stream, iterator); + } + + // Accumulate data until we have at least chunkSize samples + while (buf.length < this.chunkSize * this.numChannels && !exhausted && !this.closed) { + try { + const result = await Promise.race([iterator.next(), this.timeout(this.streamTimeoutMs)]); + + if (result === 'timeout') { + console.warn(`AudioMixer: stream timeout after ${this.streamTimeoutMs}ms`); + break; + } + + if (result.done) { + exhausted = true; + break; + } + + const frame = result.value; + const newData = frame.data; + + // Mark that we received data in this call + receivedDataInThisCall = true; + + // Concatenate buffers + if (buf.length === 0) { + buf = newData; + } else { + const combined = new Int16Array(buf.length + newData.length); + combined.set(buf); + combined.set(newData, buf.length); + buf = combined; + } + } catch (error) { + console.error(`AudioMixer: Error reading from stream:`, error); + exhausted = true; + break; + } + } + + // Extract contribution and update buffer + let contrib: Int16Array; + const samplesNeeded = this.chunkSize * this.numChannels; + + if (buf.length >= samplesNeeded) { + // Extract the needed samples and keep the remainder in the buffer + contrib = buf.subarray(0, samplesNeeded); + buf = buf.subarray(samplesNeeded); + } else { + // Pad with zeros if we don't have enough data + const padded = new Int16Array(samplesNeeded); + padded.set(buf); + contrib = padded; + buf = new Int16Array(0); + } + + // hadData means: we had data at start OR we received data during this call OR we have data remaining + const hadData = initialBufferLength > 0 || receivedDataInThisCall || buf.length > 0; + + return { + stream, + data: contrib, + buffer: buf, + hadData, + exhausted, + }; + } + + private mixAudio(contributions: Int16Array[]): Int16Array { + if (contributions.length === 0) { + return new Int16Array(this.chunkSize * this.numChannels); + } + + const length = this.chunkSize * this.numChannels; + const mixed = new Int16Array(length); + + // Sum all contributions + for (const contrib of contributions) { + for (let i = 0; i < length; i++) { + const val = contrib[i]; + if (val !== undefined) { + mixed[i] = (mixed[i] ?? 0) + val; + } + } + } + + // Clip to Int16 range + for (let i = 0; i < length; i++) { + const val = mixed[i] ?? 0; + if (val > 32767) { + mixed[i] = 32767; + } else if (val < -32768) { + mixed[i] = -32768; + } + } + + return mixed; + } + + private sleep(ms: number): Promise { + return new Promise((resolve) => setTimeout(resolve, ms)); + } + + private timeout(ms: number): Promise<'timeout'> { + return new Promise((resolve) => setTimeout(() => resolve('timeout'), ms)); + } +} diff --git a/packages/livekit-rtc/src/index.ts b/packages/livekit-rtc/src/index.ts index 332e902e..7e5d6433 100644 --- a/packages/livekit-rtc/src/index.ts +++ b/packages/livekit-rtc/src/index.ts @@ -8,6 +8,7 @@ export { AudioSource } from './audio_source.js'; export { AudioStream } from './audio_stream.js'; export type { NoiseCancellationOptions } from './audio_stream.js'; export { AudioFilter } from './audio_filter.js'; +export { AudioMixer, type AudioMixerOptions } from './audio_mixer.js'; export * from './data_streams/index.js'; export { E2EEManager, FrameCryptor, KeyProvider } from './e2ee.js'; export type { E2EEOptions, KeyProviderOptions } from './e2ee.js'; diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 0dac0036..3b2a5da8 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -203,6 +203,9 @@ importers: '@bufbuild/protobuf': specifier: ^1.10.1 version: 1.10.1 + '@datastructures-js/deque': + specifier: 1.0.8 + version: 1.0.8 '@livekit/mutex': specifier: ^1.0.0 version: 1.1.1 @@ -426,6 +429,9 @@ packages: '@changesets/write@0.4.0': resolution: {integrity: sha512-CdTLvIOPiCNuH71pyDu3rA+Q0n65cmAbXnwWH84rKGiFumFzkmHNT8KHTMEchcxN+Kl8I54xGUhJ7l3E7X396Q==} + '@datastructures-js/deque@1.0.8': + resolution: {integrity: sha512-PSBhJ2/SmeRPRHuBv7i/fHWIdSC3JTyq56qb+Rq0wjOagi0/fdV5/B/3Md5zFZus/W6OkSPMaxMKKMNMrSmubg==} + '@edge-runtime/primitives@6.0.0': resolution: {integrity: sha512-FqoxaBT+prPBHBwE1WXS1ocnu/VLTQyZ6NMUBAdbP7N2hsFTTxMC/jMu2D/8GAlMQfxeuppcPuCUk/HO3fpIvA==} engines: {node: '>=18'} @@ -4043,6 +4049,8 @@ snapshots: human-id: 4.1.1 prettier: 2.8.8 + '@datastructures-js/deque@1.0.8': {} + '@edge-runtime/primitives@6.0.0': {} '@edge-runtime/vm@5.0.0':