From 58726f402f4fa38f2ae6f081b07c4bc5d5f1c062 Mon Sep 17 00:00:00 2001 From: Brian Yin Date: Wed, 1 Apr 2026 15:38:59 -0700 Subject: [PATCH 1/9] Support reusable rt session & phonic say --- agents/src/llm/realtime.ts | 20 +- agents/src/stream/deferred_stream.ts | 7 + agents/src/voice/agent_activity.ts | 304 ++++++++++---- .../src/voice/agent_activity_handoff.test.ts | 247 +++++++++++- agents/src/voice/agent_session.ts | 41 +- .../src/voice/agent_session_handoff.test.ts | 79 ++-- examples/src/survey_agent.ts | 376 ++++++++++++++++++ .../google/src/beta/realtime/realtime_api.ts | 2 + plugins/openai/src/realtime/realtime_model.ts | 3 + .../src/realtime/realtime_model_beta.ts | 3 + plugins/phonic/src/realtime/realtime_model.ts | 52 +++ 11 files changed, 994 insertions(+), 140 deletions(-) create mode 100644 examples/src/survey_agent.ts diff --git a/agents/src/llm/realtime.ts b/agents/src/llm/realtime.ts index 864e25d2d..99939b8ba 100644 --- a/agents/src/llm/realtime.ts +++ b/agents/src/llm/realtime.ts @@ -49,6 +49,9 @@ export interface RealtimeCapabilities { autoToolReplyGeneration: boolean; audioOutput: boolean; manualFunctionCalls: boolean; + midSessionContextUpdate?: boolean; + midSessionInstructionsUpdate?: boolean; + midSessionToolsUpdate?: boolean; } export interface InputTranscriptionCompleted { @@ -155,6 +158,13 @@ export abstract class RealtimeSession extends EventEmitter { return; } + say( + _text: string | ReadableStream, + _options?: { allowInterruptions?: boolean }, + ): Promise { + throw new Error(`${this.constructor.name} does not implement say(). use a TTS model instead`); + } + private async _mainTaskImpl(signal: AbortSignal): Promise { const reader = this.deferredInputStream.stream.getReader(); while (true) { @@ -167,6 +177,14 @@ export abstract class RealtimeSession extends EventEmitter { } setInputAudioStream(audioStream: ReadableStream): void { - this.deferredInputStream.setSource(audioStream); + if (this.deferredInputStream.isSourceSet) { + // Reused sessions must detach the previous audio source before rebinding. + void this.deferredInputStream.detachSource(); + } + try { + this.deferredInputStream.setSource(audioStream); + } catch (error) { + throw error; + } } } diff --git a/agents/src/stream/deferred_stream.ts b/agents/src/stream/deferred_stream.ts index 47ef0c2b1..8f4b611ac 100644 --- a/agents/src/stream/deferred_stream.ts +++ b/agents/src/stream/deferred_stream.ts @@ -41,6 +41,7 @@ export class DeferredReadableStream { private transform: IdentityTransform; private writer: WritableStreamDefaultWriter; private sourceReader?: ReadableStreamDefaultReader; + private detachRequested = false; constructor() { this.transform = new IdentityTransform(); @@ -83,6 +84,11 @@ export class DeferredReadableStream { sourceError = e; } finally { + if (this.detachRequested) { + this.detachRequested = false; + return; + } + // any other error from source will be propagated to the consumer if (sourceError) { try { @@ -124,6 +130,7 @@ export class DeferredReadableStream { } const sourceReader = this.sourceReader!; + this.detachRequested = true; // Clear source first so future setSource() calls can reattach cleanly. this.sourceReader = undefined; diff --git a/agents/src/voice/agent_activity.ts b/agents/src/voice/agent_activity.ts index e672b9e3f..391bd17b5 100644 --- a/agents/src/voice/agent_activity.ts +++ b/agents/src/voice/agent_activity.ts @@ -96,6 +96,22 @@ interface OnEnterData { agent: Agent; } +export interface ReusableResources { + sttPipeline?: STTPipeline; + rtSession?: RealtimeSession; +} + +export async function cleanupReusableResources(resources: ReusableResources): Promise { + if (resources.sttPipeline) { + await resources.sttPipeline.close(); + resources.sttPipeline = undefined; + } + if (resources.rtSession) { + await resources.rtSession.close(); + resources.rtSession = undefined; + } +} + interface PreemptiveGeneration { speechHandle: SpeechHandle; userMessage: ChatMessage; @@ -293,26 +309,26 @@ export class AgentActivity implements RecognitionHooks { this.isDefaultInterruptionByAudioActivityEnabled = this.isInterruptionByAudioActivityEnabled; } - async start(options?: { reuseSttPipeline?: STTPipeline }): Promise { + async start(options?: { reuseResources?: ReusableResources }): Promise { const unlock = await this.lock.lock(); try { await this._startSession({ spanName: 'start_agent_activity', runOnEnter: true, - reuseSttPipeline: options?.reuseSttPipeline, + reuseResources: options?.reuseResources, }); } finally { unlock(); } } - async resume(options?: { reuseSttPipeline?: STTPipeline }): Promise { + async resume(options?: { reuseResources?: ReusableResources }): Promise { const unlock = await this.lock.lock(); try { await this._startSession({ spanName: 'resume_agent_activity', runOnEnter: false, - reuseSttPipeline: options?.reuseSttPipeline, + reuseResources: options?.reuseResources, }); } finally { unlock(); @@ -322,9 +338,9 @@ export class AgentActivity implements RecognitionHooks { private async _startSession(options: { spanName: 'start_agent_activity' | 'resume_agent_activity'; runOnEnter: boolean; - reuseSttPipeline?: STTPipeline; + reuseResources?: ReusableResources; }): Promise { - const { spanName, runOnEnter, reuseSttPipeline } = options; + const { spanName, runOnEnter, reuseResources } = options; const startSpan = tracer.startSpan({ name: spanName, attributes: { [traceTypes.ATTR_AGENT_LABEL]: this.agent.id }, @@ -334,38 +350,60 @@ export class AgentActivity implements RecognitionHooks { this.agent._agentActivity = this; if (this.llm instanceof RealtimeModel) { - this.realtimeSession = this.llm.session(); + const rtReused = reuseResources?.rtSession !== undefined; + if (rtReused) { + this.logger.debug('reusing realtime session from previous activity'); + this.realtimeSession = reuseResources!.rtSession; + reuseResources!.rtSession = undefined; // ownership transferred + + // clear any stale audio/generation state + await this.realtimeSession!.interrupt(); + await this.realtimeSession!.clearAudio(); + } else { + this.realtimeSession = this.llm.session(); + } + this.realtimeSpans = new Map(); - this.realtimeSession.on('generation_created', this.onRealtimeGenerationCreated); - this.realtimeSession.on('input_speech_started', this.onRealtimeInputSpeechStarted); - this.realtimeSession.on('input_speech_stopped', this.onRealtimeInputSpeechStopped); - this.realtimeSession.on( + this.realtimeSession!.on('generation_created', this.onRealtimeGenerationCreated); + this.realtimeSession!.on('input_speech_started', this.onRealtimeInputSpeechStarted); + this.realtimeSession!.on('input_speech_stopped', this.onRealtimeInputSpeechStopped); + this.realtimeSession!.on( 'input_audio_transcription_completed', this.onRealtimeInputAudioTranscriptionCompleted, ); - this.realtimeSession.on('metrics_collected', this.onMetricsCollected); - this.realtimeSession.on('error', this.onModelError); + this.realtimeSession!.on('metrics_collected', this.onMetricsCollected); + this.realtimeSession!.on('error', this.onModelError); removeInstructions(this.agent._chatCtx); - try { - await this.realtimeSession.updateInstructions(this.agent.instructions); - } catch (error) { - this.logger.error(error, 'failed to update the instructions'); + + // skip the update if the session is reused and no mid-session update is supported + // this means the content is the same as the previous session + const capabilities = this.llm.capabilities; + if (!rtReused || capabilities.midSessionInstructionsUpdate) { + try { + await this.realtimeSession!.updateInstructions(this.agent.instructions); + } catch (error) { + this.logger.error(error, 'failed to update the instructions'); + } } - try { - await this.realtimeSession.updateChatCtx(this.agent.chatCtx); - } catch (error) { - this.logger.error(error, 'failed to update the chat context'); + if (!rtReused || capabilities.midSessionContextUpdate) { + try { + await this.realtimeSession!.updateChatCtx(this.agent.chatCtx); + } catch (error) { + this.logger.error(error, 'failed to update the chat context'); + } } - try { - await this.realtimeSession.updateTools(this.tools); - } catch (error) { - this.logger.error(error, 'failed to update the tools'); + if (!rtReused || capabilities.midSessionToolsUpdate) { + try { + await this.realtimeSession!.updateTools(this.tools); + } catch (error) { + this.logger.error(error, 'failed to update the tools'); + } } - if (!this.llm.capabilities.audioOutput && !this.tts && this.agentSession.output.audio) { + if (!capabilities.audioOutput && !this.tts && this.agentSession.output.audio) { this.logger.error( 'audio output is enabled but RealtimeModel has no audio modality ' + 'and no TTS is set. Either enable audio modality in the RealtimeModel ' + @@ -426,9 +464,10 @@ export class AgentActivity implements RecognitionHooks { getLinkedParticipant: () => this.agentSession._roomIO?.linkedParticipant, }); - if (reuseSttPipeline) { - this.logger.debug('Reusing STT pipeline from previous activity'); - await this.audioRecognition.start({ sttPipeline: reuseSttPipeline }); + if (reuseResources?.sttPipeline) { + this.logger.debug('reusing STT pipeline from previous activity'); + await this.audioRecognition.start({ sttPipeline: reuseResources.sttPipeline }); + reuseResources.sttPipeline = undefined; // ownership transferred } else { await this.audioRecognition.start(); } @@ -454,28 +493,61 @@ export class AgentActivity implements RecognitionHooks { startSpan.end(); } - async _detachSttPipelineIfReusable(newActivity: AgentActivity): Promise { - const hasAudioRecognition = !!this.audioRecognition; - const hasSttOld = !!this.stt; - const hasSttNew = !!newActivity.stt; - const sameSttInstance = this.stt === newActivity.stt; - const sameSttNode = - Object.getPrototypeOf(this.agent).sttNode === - Object.getPrototypeOf(newActivity.agent).sttNode; + async _detachReusableResources(newActivity: AgentActivity): Promise { + const resources: ReusableResources = {}; - if (!hasAudioRecognition || !hasSttOld || !hasSttNew) { - return undefined; + // stt pipeline + if ( + this.audioRecognition && + this.stt && + newActivity.stt && + this.stt === newActivity.stt && + Object.getPrototypeOf(this.agent).sttNode === Object.getPrototypeOf(newActivity.agent).sttNode + ) { + resources.sttPipeline = await this.audioRecognition.detachSttPipeline(); } - if (!sameSttInstance) { - return undefined; - } + // rt session + if (this.realtimeSession && this.llm instanceof RealtimeModel && this.llm === newActivity.llm) { + const capabilities = this.llm.capabilities; - if (!sameSttNode) { - return undefined; + // context update is supported or chat context is equivalent + let reusable = + capabilities.midSessionContextUpdate || + this.realtimeSession.chatCtx + .copy({ excludeInstructions: true, excludeHandoff: true }) + .isEquivalent( + newActivity.agent.chatCtx.copy({ excludeInstructions: true, excludeHandoff: true }), + ); + + // instructions update is supported or instructions are the same + reusable = + reusable && + (capabilities.midSessionInstructionsUpdate || + this.agent.instructions === newActivity.agent.instructions); + + // tools update is supported or tools are the same + reusable = + reusable && + (capabilities.midSessionToolsUpdate || isSameToolContext(this.tools, newActivity.tools)); + + if (reusable) { + // detach: remove event listeners but don't close the session + this.realtimeSession.off('generation_created', this.onRealtimeGenerationCreated); + this.realtimeSession.off('input_speech_started', this.onRealtimeInputSpeechStarted); + this.realtimeSession.off('input_speech_stopped', this.onRealtimeInputSpeechStopped); + this.realtimeSession.off( + 'input_audio_transcription_completed', + this.onRealtimeInputAudioTranscriptionCompleted, + ); + this.realtimeSession.off('metrics_collected', this.onMetricsCollected); + this.realtimeSession.off('error', this.onModelError); + resources.rtSession = this.realtimeSession; + this.realtimeSession = undefined; // prevent _closeSessionResources from closing it + } } - return await this.audioRecognition!.detachSttPipeline(); + return resources; } get currentSpeech(): SpeechHandle | undefined { @@ -711,18 +783,10 @@ export class AgentActivity implements RecognitionHooks { } = options ?? {}; let allowInterruptions = defaultAllowInterruptions; - if ( - !audio && - !this.tts && - this.agentSession.output.audio && - this.agentSession.output.audioEnabled - ) { - throw new Error('trying to generate speech from text without a TTS model'); - } - if ( this.llm instanceof RealtimeModel && this.llm.capabilities.turnDetection && + this.tts && allowInterruptions === false ) { this.logger.warn( @@ -732,6 +796,20 @@ export class AgentActivity implements RecognitionHooks { allowInterruptions = true; } + if ( + !audio && + !this.tts && + this.realtimeSession === undefined && + this.agentSession.output.audio && + this.agentSession.output.audioEnabled + ) { + const modelInfo = + this.llm instanceof RealtimeModel + ? 'a RealtimeSession that implements say()' + : 'a TTS model'; + throw new Error(`trying to generate speech from text without ${modelInfo}`); + } + const handle = SpeechHandle.create({ allowInterruptions: allowInterruptions ?? this.allowInterruptions, }); @@ -744,14 +822,29 @@ export class AgentActivity implements RecognitionHooks { speechHandle: handle, }), ); - const task = this.createSpeechTask({ - taskFn: (abortController: AbortController) => - this.ttsTask(handle, text, addToChatCtx, {}, abortController, audio), - ownedSpeechHandle: handle, - name: 'AgentActivity.say_tts', - }); - task.result.finally(() => this.onPipelineReplyDone()); + let task: Task; + if (this.realtimeSession !== undefined && !audio && !this.tts) { + task = this.createSpeechTask({ + taskFn: (abortController: AbortController) => + this.realtimeSayTask(handle, text, {}, abortController), + ownedSpeechHandle: handle, + name: 'AgentActivity.realtime_say', + }); + } else { + task = this.createSpeechTask({ + taskFn: (abortController: AbortController) => + this.ttsTask(handle, text, addToChatCtx, {}, abortController, audio), + ownedSpeechHandle: handle, + name: 'AgentActivity.tts_say', + }); + } + + // Avoid duplicate state transitions for realtime say path: realtimeGenerationTask already + // performs end-of-speech transitions internally. + if (this.realtimeSession === undefined || audio !== undefined || this.tts) { + task.result.finally(() => this.onPipelineReplyDone()); + } this.scheduleSpeech(handle, SpeechHandle.SPEECH_PRIORITY_NORMAL); return handle; } @@ -2766,6 +2859,54 @@ export class AgentActivity implements RecognitionHooks { }; } + private async realtimeSayTask( + speechHandle: SpeechHandle, + text: string | ReadableStream, + modelSettings: ModelSettings, + replyAbortController: AbortController, + ): Promise { + speechHandleStorage.enterWith(speechHandle); + + if (!this.realtimeSession) { + throw new Error('realtimeSession is not available'); + } + + // Parity gap (python->ts): Python also waits on a user-silence event when interruptions + // are enabled. TS voice runtime currently does not expose an equivalent wait primitive here. + // Behavior equivalent: we gate on tool/speech authorization and interruption status before + // issuing realtime say(). + await speechHandle.waitIfNotInterrupted([speechHandle._waitForAuthorization()]); + + if (speechHandle.interrupted) { + return; + } + + let generationEv: GenerationCreatedEvent; + try { + generationEv = await this.realtimeSession.say(text, { + allowInterruptions: speechHandle.allowInterruptions, + }); + } catch (e) { + if (e instanceof Error && e.message.includes('does not implement say()')) { + this.logger.error( + 'say() is not implemented for %s; use a TTS model instead', + this.realtimeSession.realtimeModel.provider, + ); + return; + } + this.logger.error('failed to say text: %s', String(e)); + this.agentSession._updateAgentState('listening'); + return; + } + + await this.realtimeGenerationTask( + speechHandle, + generationEv, + modelSettings, + replyAbortController, + ); + } + private async realtimeReplyTask({ speechHandle, modelSettings: { toolChoice }, @@ -2858,8 +2999,13 @@ export class AgentActivity implements RecognitionHooks { this._mainTask = Task.from(({ signal }) => this.mainTask(signal)); } - async pause(options: { blockedTasks?: Task[] } = {}): Promise { - const { blockedTasks = [] } = options; + async pause( + options: { + blockedTasks?: Task[]; + newActivity?: AgentActivity; + } = {}, + ): Promise { + const { blockedTasks = [], newActivity } = options; const unlock = await this.lock.lock(); try { @@ -2867,31 +3013,49 @@ export class AgentActivity implements RecognitionHooks { name: 'pause_agent_activity', attributes: { [traceTypes.ATTR_AGENT_LABEL]: this.agent.id }, }); + + let resources: ReusableResources | undefined; try { await this._pauseSchedulingTask(blockedTasks); + + // detach after speech tasks are done but before _closeSessionResources + if (newActivity) { + resources = await this._detachReusableResources(newActivity); + } + await this._closeSessionResources(); + } catch (error) { + if (resources) { + await cleanupReusableResources(resources); + } + throw error; } finally { span.end(); } + + return resources; } finally { unlock(); } } - async drain(): Promise { + async drain(options?: { newActivity?: AgentActivity }): Promise { // Create drain_agent_activity as a ROOT span (new trace) to match Python behavior - return tracer.startActiveSpan(async (span) => this._drainImpl(span), { + return tracer.startActiveSpan(async (span) => this._drainImpl(span, options?.newActivity), { name: 'drain_agent_activity', context: ROOT_CONTEXT, }); } - private async _drainImpl(span: Span): Promise { + private async _drainImpl( + span: Span, + newActivity?: AgentActivity, + ): Promise { span.setAttribute(traceTypes.ATTR_AGENT_LABEL, this.agent.id); const unlock = await this.lock.lock(); try { - if (this._schedulingPaused) return; + if (this._schedulingPaused) return undefined; this._onExitTask = this.createSpeechTask({ taskFn: () => @@ -2907,6 +3071,12 @@ export class AgentActivity implements RecognitionHooks { await this._onExitTask.result; await this._pauseSchedulingTask([]); + + // detach after speech tasks are done but before _closeSessionResources + if (newActivity) { + return await this._detachReusableResources(newActivity); + } + return undefined; } finally { unlock(); } diff --git a/agents/src/voice/agent_activity_handoff.test.ts b/agents/src/voice/agent_activity_handoff.test.ts index e27fc507a..25188cdb5 100644 --- a/agents/src/voice/agent_activity_handoff.test.ts +++ b/agents/src/voice/agent_activity_handoff.test.ts @@ -3,14 +3,23 @@ // SPDX-License-Identifier: Apache-2.0 import type { AudioFrame } from '@livekit/rtc-node'; import { describe, expect, it, vi } from 'vitest'; -import { type SpeechEvent } from '../stt/stt.js'; +import { ChatContext } from '../llm/chat_context.js'; +import { type RealtimeCapabilities, RealtimeModel, type RealtimeSession } from '../llm/realtime.js'; +import type { SpeechEvent } from '../stt/stt.js'; import { Agent } from './agent.js'; -import { AgentActivity } from './agent_activity.js'; +import { + AgentActivity, + type ReusableResources, + cleanupReusableResources, +} from './agent_activity.js'; type FakeActivity = { agent: Agent; audioRecognition: { detachSttPipeline: ReturnType } | undefined; stt: unknown; + llm: unknown; + tools: unknown; + realtimeSession: unknown; }; function createFakeActivity(agent: Agent, stt: unknown) { @@ -21,13 +30,19 @@ function createFakeActivity(agent: Agent, stt: unknown) { detachSttPipeline: vi.fn(async () => detachedPipeline), }, stt, + llm: undefined, + tools: [], + realtimeSession: undefined, } as FakeActivity; return { activity, detachedPipeline }; } -async function detachIfReusable(oldActivity: FakeActivity, newActivity: FakeActivity) { - return await (AgentActivity.prototype as any)._detachSttPipelineIfReusable.call( +async function detachResources( + oldActivity: FakeActivity, + newActivity: FakeActivity, +): Promise { + return await (AgentActivity.prototype as any)._detachReusableResources.call( oldActivity, newActivity, ); @@ -39,9 +54,9 @@ describe('AgentActivity STT handoff reuse eligibility', () => { const oldActivity = createFakeActivity(new Agent({ instructions: 'a' }), sharedStt); const newActivity = createFakeActivity(new Agent({ instructions: 'b' }), sharedStt); - const result = await detachIfReusable(oldActivity.activity, newActivity.activity); + const resources = await detachResources(oldActivity.activity, newActivity.activity); - expect(result).toBe(oldActivity.detachedPipeline); + expect(resources.sttPipeline).toBe(oldActivity.detachedPipeline); expect(oldActivity.activity.audioRecognition?.detachSttPipeline).toHaveBeenCalledTimes(1); }); @@ -49,9 +64,9 @@ describe('AgentActivity STT handoff reuse eligibility', () => { const oldActivity = createFakeActivity(new Agent({ instructions: 'a' }), { id: 'stt-a' }); const newActivity = createFakeActivity(new Agent({ instructions: 'b' }), { id: 'stt-b' }); - const result = await detachIfReusable(oldActivity.activity, newActivity.activity); + const resources = await detachResources(oldActivity.activity, newActivity.activity); - expect(result).toBeUndefined(); + expect(resources.sttPipeline).toBeUndefined(); expect(oldActivity.activity.audioRecognition?.detachSttPipeline).not.toHaveBeenCalled(); }); @@ -60,9 +75,9 @@ describe('AgentActivity STT handoff reuse eligibility', () => { const oldActivity = createFakeActivity(new Agent({ instructions: 'a' }), undefined); const newActivity = createFakeActivity(new Agent({ instructions: 'b' }), sharedStt); - const result = await detachIfReusable(oldActivity.activity, newActivity.activity); + const resources = await detachResources(oldActivity.activity, newActivity.activity); - expect(result).toBeUndefined(); + expect(resources.sttPipeline).toBeUndefined(); expect(oldActivity.activity.audioRecognition?.detachSttPipeline).not.toHaveBeenCalled(); }); @@ -84,9 +99,9 @@ describe('AgentActivity STT handoff reuse eligibility', () => { const oldActivity = createFakeActivity(new AgentA({ instructions: 'a' }), sharedStt); const newActivity = createFakeActivity(new AgentB({ instructions: 'b' }), sharedStt); - const result = await detachIfReusable(oldActivity.activity, newActivity.activity); + const resources = await detachResources(oldActivity.activity, newActivity.activity); - expect(result).toBeUndefined(); + expect(resources.sttPipeline).toBeUndefined(); expect(oldActivity.activity.audioRecognition?.detachSttPipeline).not.toHaveBeenCalled(); }); @@ -104,9 +119,9 @@ describe('AgentActivity STT handoff reuse eligibility', () => { const oldActivity = createFakeActivity(new AgentA({ instructions: 'a' }), sharedStt); const newActivity = createFakeActivity(new AgentB({ instructions: 'b' }), sharedStt); - const result = await detachIfReusable(oldActivity.activity, newActivity.activity); + const resources = await detachResources(oldActivity.activity, newActivity.activity); - expect(result).toBe(oldActivity.detachedPipeline); + expect(resources.sttPipeline).toBe(oldActivity.detachedPipeline); expect(oldActivity.activity.audioRecognition?.detachSttPipeline).toHaveBeenCalledTimes(1); }); @@ -116,8 +131,208 @@ describe('AgentActivity STT handoff reuse eligibility', () => { const newActivity = createFakeActivity(new Agent({ instructions: 'b' }), sharedStt); oldActivity.activity.audioRecognition = undefined; - const result = await detachIfReusable(oldActivity.activity, newActivity.activity); + const resources = await detachResources(oldActivity.activity, newActivity.activity); - expect(result).toBeUndefined(); + expect(resources.sttPipeline).toBeUndefined(); + }); +}); + +describe('AgentActivity RT session reuse eligibility', () => { + function createFakeRtSession(): RealtimeSession { + return { + chatCtx: ChatContext.empty(), + off: vi.fn(), + on: vi.fn(), + interrupt: vi.fn(), + clearAudio: vi.fn(), + close: vi.fn(async () => {}), + } as unknown as RealtimeSession; + } + + class FakeRealtimeModel extends RealtimeModel { + get model() { + return 'fake'; + } + session(): RealtimeSession { + throw new Error('not implemented'); + } + async close() {} + } + + function createFakeRealtimeModel(capabilitiesOverrides: Partial = {}) { + const capabilities: RealtimeCapabilities = { + messageTruncation: false, + turnDetection: false, + userTranscription: false, + autoToolReplyGeneration: false, + audioOutput: true, + manualFunctionCalls: false, + midSessionContextUpdate: false, + midSessionInstructionsUpdate: false, + midSessionToolsUpdate: false, + ...capabilitiesOverrides, + }; + return new FakeRealtimeModel(capabilities); + } + + function createRtActivity(agent: Agent, llm: unknown, rtSession?: RealtimeSession): FakeActivity { + return { + agent, + audioRecognition: undefined, + stt: undefined, + llm, + tools: agent.toolCtx, + realtimeSession: rtSession, + }; + } + + it('reuses RT session when same LLM, same instructions, equivalent context, and same tools', async () => { + const sharedLlm = createFakeRealtimeModel(); + const rtSession = createFakeRtSession(); + + const oldAgent = new Agent({ instructions: 'hello' }); + const newAgent = new Agent({ instructions: 'hello' }); + const oldActivity = createRtActivity(oldAgent, sharedLlm, rtSession); + const newActivity = createRtActivity(newAgent, sharedLlm); + + const resources = await (AgentActivity.prototype as any)._detachReusableResources.call( + oldActivity, + newActivity, + ); + + expect(resources.rtSession).toBe(rtSession); + expect(oldActivity.realtimeSession).toBeUndefined(); + expect(rtSession.off).toHaveBeenCalled(); + }); + + it('does not reuse RT session when LLM instances differ', async () => { + const rtSession = createFakeRtSession(); + + const oldLlm = createFakeRealtimeModel(); + const newLlm = createFakeRealtimeModel(); + const oldActivity = createRtActivity(new Agent({ instructions: 'a' }), oldLlm, rtSession); + const newActivity = createRtActivity(new Agent({ instructions: 'a' }), newLlm); + + const resources = await (AgentActivity.prototype as any)._detachReusableResources.call( + oldActivity, + newActivity, + ); + + expect(resources.rtSession).toBeUndefined(); + }); + + it('does not reuse RT session when instructions differ and midSessionInstructionsUpdate is false', async () => { + const sharedLlm = createFakeRealtimeModel({ midSessionInstructionsUpdate: false }); + const rtSession = createFakeRtSession(); + + const oldActivity = createRtActivity(new Agent({ instructions: 'old' }), sharedLlm, rtSession); + const newActivity = createRtActivity(new Agent({ instructions: 'new' }), sharedLlm); + + const resources = await (AgentActivity.prototype as any)._detachReusableResources.call( + oldActivity, + newActivity, + ); + + expect(resources.rtSession).toBeUndefined(); + }); + + it('reuses RT session when instructions differ but midSessionInstructionsUpdate is true', async () => { + const sharedLlm = createFakeRealtimeModel({ midSessionInstructionsUpdate: true }); + const rtSession = createFakeRtSession(); + + const oldActivity = createRtActivity(new Agent({ instructions: 'old' }), sharedLlm, rtSession); + const newActivity = createRtActivity(new Agent({ instructions: 'new' }), sharedLlm); + + const resources = await (AgentActivity.prototype as any)._detachReusableResources.call( + oldActivity, + newActivity, + ); + + expect(resources.rtSession).toBe(rtSession); + }); + + it('reuses RT session when context differs but midSessionContextUpdate is true', async () => { + const sharedLlm = createFakeRealtimeModel({ midSessionContextUpdate: true }); + const rtSession = createFakeRtSession(); + // Give the session a non-empty chat context + (rtSession as any).chatCtx = ChatContext.empty(); + + const oldActivity = createRtActivity(new Agent({ instructions: 'same' }), sharedLlm, rtSession); + const newActivity = createRtActivity(new Agent({ instructions: 'same' }), sharedLlm); + + const resources = await (AgentActivity.prototype as any)._detachReusableResources.call( + oldActivity, + newActivity, + ); + + expect(resources.rtSession).toBe(rtSession); + }); + + it('does not reuse when no RT session exists', async () => { + const sharedLlm = createFakeRealtimeModel(); + const oldActivity = createRtActivity(new Agent({ instructions: 'a' }), sharedLlm, undefined); + const newActivity = createRtActivity(new Agent({ instructions: 'a' }), sharedLlm); + + const resources = await (AgentActivity.prototype as any)._detachReusableResources.call( + oldActivity, + newActivity, + ); + + expect(resources.rtSession).toBeUndefined(); + }); + + it('does not reuse when LLM is not a RealtimeModel', async () => { + const rtSession = createFakeRtSession(); + const nonRealtimeLlm = { id: 'plain-llm' }; + + const oldActivity = createRtActivity( + new Agent({ instructions: 'a' }), + nonRealtimeLlm, + rtSession, + ); + const newActivity = createRtActivity(new Agent({ instructions: 'a' }), nonRealtimeLlm); + + const resources = await (AgentActivity.prototype as any)._detachReusableResources.call( + oldActivity, + newActivity, + ); + + expect(resources.rtSession).toBeUndefined(); + }); +}); + +describe('cleanupReusableResources', () => { + it('closes both STT pipeline and RT session', async () => { + const sttClose = vi.fn(async () => {}); + const rtClose = vi.fn(async () => {}); + const resources: ReusableResources = { + sttPipeline: { close: sttClose } as any, + rtSession: { close: rtClose } as any, + }; + + await cleanupReusableResources(resources); + + expect(sttClose).toHaveBeenCalledTimes(1); + expect(rtClose).toHaveBeenCalledTimes(1); + expect(resources.sttPipeline).toBeUndefined(); + expect(resources.rtSession).toBeUndefined(); + }); + + it('handles partial resources (only STT)', async () => { + const sttClose = vi.fn(async () => {}); + const resources: ReusableResources = { + sttPipeline: { close: sttClose } as any, + }; + + await cleanupReusableResources(resources); + + expect(sttClose).toHaveBeenCalledTimes(1); + expect(resources.sttPipeline).toBeUndefined(); + }); + + it('handles empty resources', async () => { + const resources: ReusableResources = {}; + await cleanupReusableResources(resources); + // should not throw }); }); diff --git a/agents/src/voice/agent_session.ts b/agents/src/voice/agent_session.ts index 3cc11c79a..2a024c7a6 100644 --- a/agents/src/voice/agent_session.ts +++ b/agents/src/voice/agent_session.ts @@ -39,8 +39,12 @@ import { import { Task } from '../utils.js'; import type { VAD } from '../vad.js'; import type { Agent } from './agent.js'; -import { AgentActivity } from './agent_activity.js'; -import type { STTPipeline, _TurnDetector } from './audio_recognition.js'; +import { + AgentActivity, + type ReusableResources, + cleanupReusableResources, +} from './agent_activity.js'; +import type { _TurnDetector } from './audio_recognition.js'; import { type AgentEvent, AgentSessionEventTypes, @@ -762,7 +766,7 @@ export class AgentSession< const runWithContext = async () => { const unlock = await this.activityLock.lock(); let onEnterTask: Task | undefined; - let reusedSttPipeline: STTPipeline | undefined; + let reusableResources: ReusableResources | undefined; try { this.agent = agent; @@ -785,15 +789,16 @@ export class AgentSession< this.nextActivity = agent._agentActivity; } - if (prevActivityObj && this.nextActivity && prevActivityObj !== this.nextActivity) { - reusedSttPipeline = await prevActivityObj._detachSttPipelineIfReusable(this.nextActivity); - } - if (prevActivityObj && prevActivityObj !== this.nextActivity) { if (previousActivity === 'pause') { - await prevActivityObj.pause({ blockedTasks }); + reusableResources = await prevActivityObj.pause({ + blockedTasks, + newActivity: this.nextActivity, + }); } else { - await prevActivityObj.drain(); + reusableResources = await prevActivityObj.drain({ + newActivity: this.nextActivity, + }); await prevActivityObj.close(); } } @@ -803,8 +808,10 @@ export class AgentSession< { agentId: this.nextActivity?.agent.id }, 'Session is closing, skipping start of next activity', ); - await reusedSttPipeline?.close(); - reusedSttPipeline = undefined; + if (reusableResources) { + await cleanupReusableResources(reusableResources); + reusableResources = undefined; + } this.nextActivity = undefined; this.activity = undefined; return; @@ -834,11 +841,11 @@ export class AgentSession< ); if (newActivity === 'start') { - await this.activity!.start({ reuseSttPipeline: reusedSttPipeline }); + await this.activity!.start({ reuseResources: reusableResources }); } else { - await this.activity!.resume({ reuseSttPipeline: reusedSttPipeline }); + await this.activity!.resume({ reuseResources: reusableResources }); } - reusedSttPipeline = undefined; + reusableResources = undefined; onEnterTask = this.activity!._onEnterTask; @@ -846,9 +853,11 @@ export class AgentSession< this.activity!.attachAudioInput(this._input.audio.stream); } } catch (error) { - // JS safeguard: session cleanup owns the detached pipeline until the next activity + // JS safeguard: session cleanup owns the detached resources until the next activity // starts successfully, preventing leaks when handoff fails mid-transition. - await reusedSttPipeline?.close(); + if (reusableResources) { + await cleanupReusableResources(reusableResources); + } throw error; } finally { unlock(); diff --git a/agents/src/voice/agent_session_handoff.test.ts b/agents/src/voice/agent_session_handoff.test.ts index d3e054f3c..ffacb1ab2 100644 --- a/agents/src/voice/agent_session_handoff.test.ts +++ b/agents/src/voice/agent_session_handoff.test.ts @@ -4,7 +4,7 @@ import { describe, expect, it, vi } from 'vitest'; import { ChatContext } from '../llm/chat_context.js'; import { Agent } from './agent.js'; -import { AgentActivity } from './agent_activity.js'; +import { AgentActivity, type ReusableResources } from './agent_activity.js'; import { AgentSession } from './agent_session.js'; function createFakeLock() { @@ -50,19 +50,18 @@ function createFakeSession() { } as unknown as AgentSession; } -describe('AgentSession STT pipeline handoff', () => { - it('passes a detached STT pipeline into the next resumed activity', async () => { - const pipeline = { - close: vi.fn(async () => {}), +describe('AgentSession reusable resources handoff', () => { + it('passes reusable resources from drain into the next resumed activity', async () => { + const resources: ReusableResources = { + sttPipeline: { close: vi.fn(async () => {}) } as any, }; const previousAgent = new Agent({ instructions: 'old' }); const nextAgent = new Agent({ instructions: 'new' }); const previousActivity = { agent: previousAgent, - _detachSttPipelineIfReusable: vi.fn(async () => pipeline), - drain: vi.fn(async () => {}), + drain: vi.fn(async () => resources), close: vi.fn(async () => {}), - pause: vi.fn(async () => {}), + pause: vi.fn(async () => resources), }; const nextActivity = { agent: nextAgent, @@ -81,23 +80,22 @@ describe('AgentSession STT pipeline handoff', () => { waitOnEnter: false, }); - expect(previousActivity._detachSttPipelineIfReusable).toHaveBeenCalledWith(nextActivity); - expect(nextActivity.resume).toHaveBeenCalledWith({ reuseSttPipeline: pipeline }); - expect(pipeline.close).not.toHaveBeenCalled(); + expect(previousActivity.drain).toHaveBeenCalledWith({ newActivity: nextActivity }); + expect(nextActivity.resume).toHaveBeenCalledWith({ reuseResources: resources }); }); - it('closes the detached pipeline if the next activity fails to start', async () => { - const pipeline = { - close: vi.fn(async () => {}), + it('cleans up reusable resources if the next activity fails to start', async () => { + const closeFn = vi.fn(async () => {}); + const resources: ReusableResources = { + sttPipeline: { close: closeFn } as any, }; const previousAgent = new Agent({ instructions: 'old' }); const nextAgent = new Agent({ instructions: 'new' }); const previousActivity = { agent: previousAgent, - _detachSttPipelineIfReusable: vi.fn(async () => pipeline), - drain: vi.fn(async () => {}), + drain: vi.fn(async () => resources), close: vi.fn(async () => {}), - pause: vi.fn(async () => {}), + pause: vi.fn(async () => resources), }; const nextActivity = { agent: nextAgent, @@ -120,21 +118,21 @@ describe('AgentSession STT pipeline handoff', () => { }), ).rejects.toThrow('resume failed'); - expect(pipeline.close).toHaveBeenCalledTimes(1); + expect(closeFn).toHaveBeenCalledTimes(1); }); - it('does not close the adopted pipeline after the next activity starts successfully', async () => { - const pipeline = { - close: vi.fn(async () => {}), + it('does not cleanup reusable resources after the next activity starts successfully', async () => { + const closeFn = vi.fn(async () => {}); + const resources: ReusableResources = { + sttPipeline: { close: closeFn } as any, }; const previousAgent = new Agent({ instructions: 'old' }); const nextAgent = new Agent({ instructions: 'new' }); const previousActivity = { agent: previousAgent, - _detachSttPipelineIfReusable: vi.fn(async () => pipeline), - drain: vi.fn(async () => {}), + drain: vi.fn(async () => resources), close: vi.fn(async () => {}), - pause: vi.fn(async () => {}), + pause: vi.fn(async () => resources), }; const nextActivity = { agent: nextAgent, @@ -158,18 +156,18 @@ describe('AgentSession STT pipeline handoff', () => { }), ).rejects.toThrow('attach failed'); - expect(nextActivity.resume).toHaveBeenCalledWith({ reuseSttPipeline: pipeline }); - expect(pipeline.close).not.toHaveBeenCalled(); + expect(nextActivity.resume).toHaveBeenCalledWith({ reuseResources: resources }); + // pipeline was already transferred, so cleanup should NOT have been called + expect(closeFn).not.toHaveBeenCalled(); }); - it('skips STT detach when the same activity object is reused', async () => { + it('skips detach when the same activity object is reused', async () => { const agent = new Agent({ instructions: 'same' }); const activity = { agent, - _detachSttPipelineIfReusable: vi.fn(async () => undefined), - drain: vi.fn(async () => {}), + drain: vi.fn(async () => undefined), close: vi.fn(async () => {}), - pause: vi.fn(async () => {}), + pause: vi.fn(async () => undefined), resume: vi.fn(async () => {}), start: vi.fn(async () => {}), attachAudioInput: vi.fn(), @@ -185,22 +183,23 @@ describe('AgentSession STT pipeline handoff', () => { waitOnEnter: false, }); - expect(activity._detachSttPipelineIfReusable).not.toHaveBeenCalled(); - expect(activity.resume).toHaveBeenCalledWith({ reuseSttPipeline: undefined }); + expect(activity.drain).not.toHaveBeenCalled(); + expect(activity.pause).not.toHaveBeenCalled(); + expect(activity.resume).toHaveBeenCalledWith({ reuseResources: undefined }); }); - it('skips starting a new activity while the session is closing and closes the detached pipeline', async () => { - const pipeline = { - close: vi.fn(async () => {}), + it('skips starting a new activity while the session is closing and cleans up resources', async () => { + const closeFn = vi.fn(async () => {}); + const resources: ReusableResources = { + sttPipeline: { close: closeFn } as any, }; const previousAgent = new Agent({ instructions: 'old' }); const nextAgent = new Agent({ instructions: 'new' }); const previousActivity = { agent: previousAgent, - _detachSttPipelineIfReusable: vi.fn(async () => pipeline), - drain: vi.fn(async () => {}), + drain: vi.fn(async () => resources), close: vi.fn(async () => {}), - pause: vi.fn(async () => {}), + pause: vi.fn(async () => resources), }; const startSpy = vi.spyOn(AgentActivity.prototype, 'start').mockResolvedValue(undefined); @@ -215,9 +214,9 @@ describe('AgentSession STT pipeline handoff', () => { waitOnEnter: false, }); - expect(previousActivity._detachSttPipelineIfReusable).toHaveBeenCalledTimes(1); + expect(previousActivity.drain).toHaveBeenCalledTimes(1); expect(previousActivity.close).toHaveBeenCalledTimes(1); - expect(pipeline.close).toHaveBeenCalledTimes(1); + expect(closeFn).toHaveBeenCalledTimes(1); expect(startSpy).not.toHaveBeenCalled(); expect((session as any).activity).toBeUndefined(); expect((session as any).nextActivity).toBeUndefined(); diff --git a/examples/src/survey_agent.ts b/examples/src/survey_agent.ts new file mode 100644 index 000000000..a5f230435 --- /dev/null +++ b/examples/src/survey_agent.ts @@ -0,0 +1,376 @@ +// SPDX-FileCopyrightText: 2026 LiveKit, Inc. +// +// SPDX-License-Identifier: Apache-2.0 +import { + type JobContext, + ServerOptions, + beta, + cli, + defineAgent, + llm, + voice, +} from '@livekit/agents'; +// import * as phonic from '@livekit/agents-plugin-phonic'; +import { access, appendFile } from 'node:fs/promises'; +import { fileURLToPath } from 'node:url'; +import { z } from 'zod'; + +type SurveyUserData = { + filename: string; + candidateName: string; + taskResults: Record; +}; + +type IntroResults = { + name: string; + intro: string; +}; + +type EmailResults = { + email: string; +}; + +type CommuteResults = { + canCommute: boolean; + commuteMethod: 'driving' | 'bus' | 'subway' | 'none'; +}; + +type ExperienceResults = { + yearsOfExperience: number; + experienceDescription: string; +}; + +type BehavioralResults = { + strengths: string; + weaknesses: string; + workStyle: 'independent' | 'team_player'; +}; + +function toCsvValue(value: unknown): string { + const raw = typeof value === 'string' ? value : JSON.stringify(value); + return `"${raw.replace(/"/g, '""')}"`; +} + +async function writeCsvRow(path: string, data: Record): Promise { + let hasFile = true; + try { + await access(path); + } catch { + hasFile = false; + } + + const keys = Object.keys(data); + const row = keys.map((key) => toCsvValue(data[key])).join(',') + '\n'; + + if (!hasFile) { + await appendFile(path, keys.join(',') + '\n', 'utf8'); + } + await appendFile(path, row, 'utf8'); +} + +function disqualifyTool() { + return llm.tool({ + description: + 'End the interview if the candidate refuses to cooperate, provides inappropriate answers, or is not a fit.', + parameters: z.object({ + disqualificationReason: z.string().describe('Why the interview should end immediately'), + }), + execute: async ({ disqualificationReason }, { ctx }: llm.ToolOptions) => { + const reason = `[DISQUALIFIED] ${disqualificationReason}`; + await writeCsvRow(ctx.userData.filename, { + name: ctx.userData.candidateName || 'unknown', + disqualificationReason: reason, + }); + await ctx.session.say( + `Thanks for your time today. We are ending the interview now. Reason: ${disqualificationReason}.`, + ); + ctx.session.shutdown(); + return 'Interview ended and disqualification saved.'; + }, + }); +} + +class IntroTask extends voice.AgentTask { + constructor() { + super({ + instructions: + 'You are Alex, an interviewer screening a software engineer candidate. Gather the candidate name and short self-introduction.', + tools: { + saveIntro: llm.tool({ + description: 'Save candidate name and intro notes.', + parameters: z.object({ + name: z.string().describe('Candidate name'), + intro: z.string().describe('Short notes from their introduction'), + }), + execute: async ({ name, intro }) => { + (this.session.userData as SurveyUserData).candidateName = name; + this.complete({ name, intro }); + return `Saved intro for ${name}.`; + }, + }), + }, + }); + } + + async onEnter() { + await this.session.generateReply({ + instructions: + 'Welcome the candidate and collect their name plus a brief self-introduction, then call saveIntro.', + }); + } +} + +class EmailTask extends voice.AgentTask { + constructor() { + const disqualify = disqualifyTool(); + super({ + instructions: + 'Collect a valid email address. If the candidate refuses, call disqualify immediately.', + tools: { + disqualify, + saveEmail: llm.tool({ + description: 'Save candidate email address.', + parameters: z.object({ + email: z.string().describe('Candidate email'), + }), + execute: async ({ email }) => { + this.complete({ email }); + return `Saved email: ${email}`; + }, + }), + }, + }); + } + + async onEnter() { + await this.session.generateReply({ + instructions: 'Ask for the candidate email and call saveEmail as soon as you get it.', + }); + } +} + +class CommuteTask extends voice.AgentTask { + constructor() { + const disqualify = disqualifyTool(); + super({ + instructions: + 'Collect commute flexibility. The role expects office attendance three days per week.', + tools: { + disqualify, + saveCommute: llm.tool({ + description: 'Save candidate commute information.', + parameters: z.object({ + canCommute: z.boolean().describe('Whether the candidate can commute to office'), + commuteMethod: z + .enum(['driving', 'bus', 'subway', 'none']) + .describe('Main commute method'), + }), + execute: async ({ canCommute, commuteMethod }) => { + this.complete({ canCommute, commuteMethod }); + return 'Saved commute flexibility.'; + }, + }), + }, + }); + } + + async onEnter() { + await this.session.generateReply({ + instructions: + 'Ask if the candidate can commute to office regularly and capture the commute method, then call saveCommute.', + }); + } +} + +class ExperienceTask extends voice.AgentTask { + constructor() { + const disqualify = disqualifyTool(); + super({ + instructions: + 'Collect years of experience and a concise timeline of previous roles relevant to software engineering.', + tools: { + disqualify, + saveExperience: llm.tool({ + description: 'Save candidate experience details.', + parameters: z.object({ + yearsOfExperience: z + .number() + .describe('Total years of professional software experience'), + experienceDescription: z.string().describe('Summary of previous roles and employers'), + }), + execute: async ({ yearsOfExperience, experienceDescription }) => { + this.complete({ yearsOfExperience, experienceDescription }); + return 'Saved experience details.'; + }, + }), + }, + }); + } + + async onEnter() { + await this.session.generateReply({ + instructions: + 'Ask about years of experience and previous roles, then call saveExperience once gathered.', + }); + } +} + +class BehavioralTask extends voice.AgentTask { + private partial: Partial = {}; + + constructor() { + const disqualify = disqualifyTool(); + super({ + instructions: + 'Collect strengths, weaknesses, and work style. Keep a natural conversational tone and avoid bullet lists.', + tools: { + disqualify, + saveStrengths: llm.tool({ + description: "Save a concise summary of the candidate's strengths.", + parameters: z.object({ + strengths: z.string().describe('Strengths summary'), + }), + execute: async ({ strengths }) => { + this.partial.strengths = strengths; + this.checkCompletion(); + return 'Saved strengths.'; + }, + }), + saveWeaknesses: llm.tool({ + description: "Save a concise summary of the candidate's weaknesses.", + parameters: z.object({ + weaknesses: z.string().describe('Weaknesses summary'), + }), + execute: async ({ weaknesses }) => { + this.partial.weaknesses = weaknesses; + this.checkCompletion(); + return 'Saved weaknesses.'; + }, + }), + saveWorkStyle: llm.tool({ + description: "Save candidate's work style.", + parameters: z.object({ + workStyle: z.enum(['independent', 'team_player']).describe('Primary work style'), + }), + execute: async ({ workStyle }) => { + this.partial.workStyle = workStyle; + this.checkCompletion(); + return 'Saved work style.'; + }, + }), + }, + }); + } + + async onEnter() { + await this.session.generateReply({ + instructions: + 'In a conversational way, gather strengths, weaknesses, and work style, then call save* tools.', + }); + } + + private checkCompletion() { + if (this.partial.strengths && this.partial.weaknesses && this.partial.workStyle) { + this.complete({ + strengths: this.partial.strengths, + weaknesses: this.partial.weaknesses, + workStyle: this.partial.workStyle, + }); + return; + } + + this.session.generateReply({ + instructions: + 'Continue gathering missing behavioral details in a concise, natural dialogue and use save* tools.', + }); + } +} + +class SurveyAgent extends voice.Agent { + constructor() { + super({ + instructions: + 'You are a survey interviewer for a software engineer screening. Be concise, professional, and natural. Call endScreening when the process is complete.', + tools: { + endScreening: llm.tool({ + description: 'End interview and hang up.', + execute: async (_, { ctx }: llm.ToolOptions) => { + ctx.session.shutdown(); + return 'Interview concluded.'; + }, + }), + }, + }); + } + + async onEnter() { + const group = new beta.TaskGroup({ + summarizeChatCtx: false, + }); + + group.add(() => new IntroTask(), { + id: 'intro_task', + description: 'Collect candidate name and intro', + }); + group.add(() => new EmailTask(), { + id: 'email_task', + description: 'Collect candidate email', + }); + group.add(() => new CommuteTask(), { + id: 'commute_task', + description: 'Collect commute flexibility and method', + }); + group.add(() => new ExperienceTask(), { + id: 'experience_task', + description: 'Collect years of experience and role history', + }); + group.add(() => new BehavioralTask(), { + id: 'behavioral_task', + description: 'Collect strengths, weaknesses, and work style', + }); + + const result = await group.run(); + const summaryItem = this.chatCtx.items[this.chatCtx.items.length - 1]; + let summaryText = ''; + if (summaryItem && 'content' in summaryItem) { + summaryText = + typeof summaryItem.content === 'string' + ? summaryItem.content + : JSON.stringify(summaryItem.content ?? ''); + } + + const mergedResults: Record = { + ...result.taskResults, + summary: summaryText, + }; + this.session.userData.taskResults = mergedResults; + await writeCsvRow(this.session.userData.filename, mergedResults); + + await this.session.say( + 'The interview is now complete. Thank you for your time. We will follow up within three business days.', + ); + } +} + +export default defineAgent({ + entry: async (ctx: JobContext) => { + const session = new voice.AgentSession({ + // Manual testing target for #5293 follow-up: realtime say path using Phonic. + llm: 'openai/gpt-4.1', + stt: 'deepgram/nova-3', + tts: 'cartesia/sonic-3', + userData: { + filename: 'survey_results.csv', + candidateName: '', + taskResults: {}, + }, + }); + + await session.start({ + agent: new SurveyAgent(), + room: ctx.room, + }); + }, +}); + +cli.runApp(new ServerOptions({ agent: fileURLToPath(import.meta.url) })); diff --git a/plugins/google/src/beta/realtime/realtime_api.ts b/plugins/google/src/beta/realtime/realtime_api.ts index 89af9e805..f20b23efc 100644 --- a/plugins/google/src/beta/realtime/realtime_api.ts +++ b/plugins/google/src/beta/realtime/realtime_api.ts @@ -311,6 +311,8 @@ export class RealtimeModel extends llm.RealtimeModel { autoToolReplyGeneration: true, audioOutput: options.modalities?.includes(Modality.AUDIO) ?? true, manualFunctionCalls: false, + midSessionInstructionsUpdate: true, + midSessionToolsUpdate: false, }); // Environment variable fallbacks diff --git a/plugins/openai/src/realtime/realtime_model.ts b/plugins/openai/src/realtime/realtime_model.ts index c7e9eb7b3..d32b1e1d9 100644 --- a/plugins/openai/src/realtime/realtime_model.ts +++ b/plugins/openai/src/realtime/realtime_model.ts @@ -185,6 +185,9 @@ export class RealtimeModel extends llm.RealtimeModel { autoToolReplyGeneration: false, audioOutput: modalities.includes('audio'), manualFunctionCalls: true, + midSessionContextUpdate: true, + midSessionInstructionsUpdate: true, + midSessionToolsUpdate: true, }); const isAzure = !!(options.apiVersion || options.entraToken || options.azureDeployment); diff --git a/plugins/openai/src/realtime/realtime_model_beta.ts b/plugins/openai/src/realtime/realtime_model_beta.ts index ce7a2a51b..4b1763546 100644 --- a/plugins/openai/src/realtime/realtime_model_beta.ts +++ b/plugins/openai/src/realtime/realtime_model_beta.ts @@ -177,6 +177,9 @@ export class RealtimeModel extends llm.RealtimeModel { autoToolReplyGeneration: false, audioOutput: modalities.includes('audio'), manualFunctionCalls: true, + midSessionContextUpdate: true, + midSessionInstructionsUpdate: true, + midSessionToolsUpdate: true, }); const isAzure = !!(options.apiVersion || options.entraToken || options.azureDeployment); diff --git a/plugins/phonic/src/realtime/realtime_model.ts b/plugins/phonic/src/realtime/realtime_model.ts index 55bcb3f0e..15ea27cca 100644 --- a/plugins/phonic/src/realtime/realtime_model.ts +++ b/plugins/phonic/src/realtime/realtime_model.ts @@ -12,6 +12,7 @@ import { stream, } from '@livekit/agents'; import { AudioFrame, AudioResampler } from '@livekit/rtc-node'; +import type { ReadableStream } from 'node:stream/web'; import type { Phonic } from 'phonic'; import { PhonicClient } from 'phonic'; import type { ServerEvent, Voice } from './api_proto.js'; @@ -181,6 +182,7 @@ interface GenerationState { * Realtime session for Phonic (https://docs.phonic.co/) */ export class RealtimeSession extends llm.RealtimeSession { + private static readonly SAY_TIMEOUT_MS = 10_000; private _tools: llm.ToolContext = {}; private _chatCtx = llm.ChatContext.empty(); @@ -378,6 +380,56 @@ export class RealtimeSession extends llm.RealtimeSession { return this.startNewAssistantTurn({ userInitiated: true }); } + async say( + text: string | ReadableStream, + options?: { allowInterruptions?: boolean }, + ): Promise { + await Promise.race([ + this.sendSayAsync(text, options?.allowInterruptions), + new Promise((_, reject) => { + setTimeout(() => reject(new Error('say() timed out.')), RealtimeSession.SAY_TIMEOUT_MS); + }), + ]); + + this.closeCurrentGeneration({ interrupted: false }); + return this.startNewAssistantTurn({ userInitiated: true }); + } + + private async sendSayAsync( + text: string | ReadableStream, + allowInterruptions?: boolean, + ): Promise { + if (this.closed) return; + + let fullText: string; + if (typeof text === 'string') { + fullText = text; + } else { + const chunks: string[] = []; + const reader = text.getReader(); + try { + while (true) { + const { done, value } = await reader.read(); + if (done) break; + chunks.push(value); + } + } finally { + reader.releaseLock(); + } + fullText = chunks.join(''); + } + + if (!this.socket) { + throw new Error('Cannot send say: WebSocket not available'); + } + + const payload: Record = { type: 'say', text: fullText }; + if (allowInterruptions !== undefined) { + payload.interruptible = allowInterruptions; + } + this.socket.socket.send(JSON.stringify(payload)); + } + async commitAudio(): Promise { this.logger.warn('commitAudio is not supported by the Phonic realtime model.'); } From 5c71660c8ff1c7715412d5b8f754c751ed3c1410 Mon Sep 17 00:00:00 2001 From: Brian Yin Date: Wed, 1 Apr 2026 18:25:24 -0700 Subject: [PATCH 2/9] fix from comments --- agents/src/llm/realtime.ts | 19 ++++++++----------- agents/src/stream/deferred_stream.ts | 13 ++++--------- agents/src/utils.ts | 4 ++++ agents/src/voice/agent_activity.ts | 13 ++++++++++--- examples/src/survey_agent.ts | 1 - plugins/openai/src/realtime/realtime_model.ts | 12 +++++++++--- 6 files changed, 35 insertions(+), 27 deletions(-) diff --git a/agents/src/llm/realtime.ts b/agents/src/llm/realtime.ts index 99939b8ba..debbe2a1b 100644 --- a/agents/src/llm/realtime.ts +++ b/agents/src/llm/realtime.ts @@ -4,7 +4,7 @@ import type { AudioFrame } from '@livekit/rtc-node'; import { EventEmitter } from 'events'; import type { ReadableStream } from 'node:stream/web'; -import { DeferredReadableStream } from '../stream/deferred_stream.js'; +import { MultiInputStream } from '../stream/multi_input_stream.js'; import { Task } from '../utils.js'; import type { TimedString } from '../voice/io.js'; import type { ChatContext, FunctionCall } from './chat_context.js'; @@ -87,7 +87,8 @@ export abstract class RealtimeModel { export abstract class RealtimeSession extends EventEmitter { protected _realtimeModel: RealtimeModel; - private deferredInputStream = new DeferredReadableStream(); + private inputAudioStream = new MultiInputStream(); + private inputAudioStreamId?: string; private _mainTask: Task; constructor(realtimeModel: RealtimeModel) { @@ -149,6 +150,7 @@ export abstract class RealtimeSession extends EventEmitter { async close(): Promise { this._mainTask.cancel(); + await this.inputAudioStream.close(); } /** @@ -166,7 +168,7 @@ export abstract class RealtimeSession extends EventEmitter { } private async _mainTaskImpl(signal: AbortSignal): Promise { - const reader = this.deferredInputStream.stream.getReader(); + const reader = this.inputAudioStream.stream.getReader(); while (true) { const { done, value } = await reader.read(); if (done || signal.aborted) { @@ -177,14 +179,9 @@ export abstract class RealtimeSession extends EventEmitter { } setInputAudioStream(audioStream: ReadableStream): void { - if (this.deferredInputStream.isSourceSet) { - // Reused sessions must detach the previous audio source before rebinding. - void this.deferredInputStream.detachSource(); - } - try { - this.deferredInputStream.setSource(audioStream); - } catch (error) { - throw error; + if (this.inputAudioStreamId !== undefined) { + void this.inputAudioStream.removeInputStream(this.inputAudioStreamId); } + this.inputAudioStreamId = this.inputAudioStream.addInputStream(audioStream); } } diff --git a/agents/src/stream/deferred_stream.ts b/agents/src/stream/deferred_stream.ts index 8f4b611ac..6ca9baa48 100644 --- a/agents/src/stream/deferred_stream.ts +++ b/agents/src/stream/deferred_stream.ts @@ -41,7 +41,7 @@ export class DeferredReadableStream { private transform: IdentityTransform; private writer: WritableStreamDefaultWriter; private sourceReader?: ReadableStreamDefaultReader; - private detachRequested = false; + private sourceAttached = false; constructor() { this.transform = new IdentityTransform(); @@ -60,10 +60,11 @@ export class DeferredReadableStream { * Call once the actual source is ready. */ setSource(source: ReadableStream) { - if (this.isSourceSet) { + if (this.sourceAttached) { throw new Error('Stream source already set'); } + this.sourceAttached = true; const sourceReader = source.getReader(); this.sourceReader = sourceReader; void this.pump(sourceReader); @@ -84,11 +85,6 @@ export class DeferredReadableStream { sourceError = e; } finally { - if (this.detachRequested) { - this.detachRequested = false; - return; - } - // any other error from source will be propagated to the consumer if (sourceError) { try { @@ -130,8 +126,7 @@ export class DeferredReadableStream { } const sourceReader = this.sourceReader!; - this.detachRequested = true; - // Clear source first so future setSource() calls can reattach cleanly. + // Clear active source reader reference before releasing lock. this.sourceReader = undefined; // release lock will make any pending read() throw TypeError diff --git a/agents/src/utils.ts b/agents/src/utils.ts index 64f19bacf..fe0cf5316 100644 --- a/agents/src/utils.ts +++ b/agents/src/utils.ts @@ -983,6 +983,10 @@ export async function* readStream( } export async function waitForAbort(signal: AbortSignal) { + if (signal.aborted) { + return; + } + const abortFuture = new Future(); const handler = () => { abortFuture.resolve(); diff --git a/agents/src/voice/agent_activity.ts b/agents/src/voice/agent_activity.ts index 391bd17b5..2c2095f8a 100644 --- a/agents/src/voice/agent_activity.ts +++ b/agents/src/voice/agent_activity.ts @@ -827,7 +827,7 @@ export class AgentActivity implements RecognitionHooks { if (this.realtimeSession !== undefined && !audio && !this.tts) { task = this.createSpeechTask({ taskFn: (abortController: AbortController) => - this.realtimeSayTask(handle, text, {}, abortController), + this.realtimeSayTask(handle, text, addToChatCtx, {}, abortController), ownedSpeechHandle: handle, name: 'AgentActivity.realtime_say', }); @@ -2365,6 +2365,7 @@ export class AgentActivity implements RecognitionHooks { ev: GenerationCreatedEvent, modelSettings: ModelSettings, replyAbortController: AbortController, + addToChatCtx: boolean = true, ): Promise { return tracer.startActiveSpan( async (span) => @@ -2373,6 +2374,7 @@ export class AgentActivity implements RecognitionHooks { ev, modelSettings, replyAbortController, + addToChatCtx, span, }), { @@ -2387,12 +2389,14 @@ export class AgentActivity implements RecognitionHooks { ev, modelSettings, replyAbortController, + addToChatCtx, span, }: { speechHandle: SpeechHandle; ev: GenerationCreatedEvent; modelSettings: ModelSettings; replyAbortController: AbortController; + addToChatCtx: boolean; span: Span; }): Promise { speechHandle._agentTurnContext = otelContext.active(); @@ -2664,7 +2668,7 @@ export class AgentActivity implements RecognitionHooks { }); } - if (forwardedText) { + if (forwardedText && addToChatCtx) { const message = ChatMessage.create({ role: 'assistant', content: forwardedText, @@ -2689,7 +2693,7 @@ export class AgentActivity implements RecognitionHooks { return; } - if (messageOutputs.length > 0) { + if (messageOutputs.length > 0 && addToChatCtx) { // there should be only one message const [msgId, textOut, _, __] = messageOutputs[0]!; const message = ChatMessage.create({ @@ -2862,6 +2866,7 @@ export class AgentActivity implements RecognitionHooks { private async realtimeSayTask( speechHandle: SpeechHandle, text: string | ReadableStream, + addToChatCtx: boolean, modelSettings: ModelSettings, replyAbortController: AbortController, ): Promise { @@ -2892,6 +2897,7 @@ export class AgentActivity implements RecognitionHooks { 'say() is not implemented for %s; use a TTS model instead', this.realtimeSession.realtimeModel.provider, ); + this.agentSession._updateAgentState('listening'); return; } this.logger.error('failed to say text: %s', String(e)); @@ -2904,6 +2910,7 @@ export class AgentActivity implements RecognitionHooks { generationEv, modelSettings, replyAbortController, + addToChatCtx, ); } diff --git a/examples/src/survey_agent.ts b/examples/src/survey_agent.ts index a5f230435..d455bdf64 100644 --- a/examples/src/survey_agent.ts +++ b/examples/src/survey_agent.ts @@ -355,7 +355,6 @@ class SurveyAgent extends voice.Agent { export default defineAgent({ entry: async (ctx: JobContext) => { const session = new voice.AgentSession({ - // Manual testing target for #5293 follow-up: realtime say path using Phonic. llm: 'openai/gpt-4.1', stt: 'deepgram/nova-3', tts: 'cartesia/sonic-3', diff --git a/plugins/openai/src/realtime/realtime_model.ts b/plugins/openai/src/realtime/realtime_model.ts index d32b1e1d9..f728a70ff 100644 --- a/plugins/openai/src/realtime/realtime_model.ts +++ b/plugins/openai/src/realtime/realtime_model.ts @@ -484,12 +484,18 @@ export class RealtimeSession extends llm.RealtimeSession { const futures: Future[] = []; for (const event of events) { - const future = new Future(); - futures.push(future); - if (event.type === 'conversation.item.create') { + const future = new Future(); + futures.push(future); this.itemCreateFutures[event.item.id] = future; } else if (event.type == 'conversation.item.delete') { + const existingDeleteFuture = this.itemDeleteFutures[event.item_id]; + if (existingDeleteFuture) { + futures.push(existingDeleteFuture); + continue; + } + const future = new Future(); + futures.push(future); this.itemDeleteFutures[event.item_id] = future; } From 3e1a320816daa839f9c5150774f1068d31d91b0c Mon Sep 17 00:00:00 2001 From: Brian Yin Date: Wed, 1 Apr 2026 18:26:03 -0700 Subject: [PATCH 3/9] Create sharp-apples-appear.md --- .changeset/sharp-apples-appear.md | 9 +++++++++ 1 file changed, 9 insertions(+) create mode 100644 .changeset/sharp-apples-appear.md diff --git a/.changeset/sharp-apples-appear.md b/.changeset/sharp-apples-appear.md new file mode 100644 index 000000000..0ff50ec96 --- /dev/null +++ b/.changeset/sharp-apples-appear.md @@ -0,0 +1,9 @@ +--- +"@livekit/agents": patch +"@livekit/agents-plugin-google": patch +"@livekit/agents-plugin-openai": patch +"@livekit/agents-plugin-phonic": patch +--- + +- Make reusable Realtime Session across Handoffs & Agent Tasks +- Add say() capability to phonic realtime model From 7ec88ebf91e10d266e02c2239621856eb1da937c Mon Sep 17 00:00:00 2001 From: Brian Yin Date: Wed, 1 Apr 2026 18:32:08 -0700 Subject: [PATCH 4/9] Update agent_activity.ts --- agents/src/voice/agent_activity.ts | 8 -------- 1 file changed, 8 deletions(-) diff --git a/agents/src/voice/agent_activity.ts b/agents/src/voice/agent_activity.ts index 2c2095f8a..1b6e8992a 100644 --- a/agents/src/voice/agent_activity.ts +++ b/agents/src/voice/agent_activity.ts @@ -2892,14 +2892,6 @@ export class AgentActivity implements RecognitionHooks { allowInterruptions: speechHandle.allowInterruptions, }); } catch (e) { - if (e instanceof Error && e.message.includes('does not implement say()')) { - this.logger.error( - 'say() is not implemented for %s; use a TTS model instead', - this.realtimeSession.realtimeModel.provider, - ); - this.agentSession._updateAgentState('listening'); - return; - } this.logger.error('failed to say text: %s', String(e)); this.agentSession._updateAgentState('listening'); return; From dd61a2aa6837577356279422cdb1d72f9e4c4bbe Mon Sep 17 00:00:00 2001 From: Brian Yin Date: Wed, 1 Apr 2026 18:52:28 -0700 Subject: [PATCH 5/9] Update agent_activity.ts --- agents/src/voice/agent_activity.ts | 4 ---- 1 file changed, 4 deletions(-) diff --git a/agents/src/voice/agent_activity.ts b/agents/src/voice/agent_activity.ts index 1b6e8992a..52913ba81 100644 --- a/agents/src/voice/agent_activity.ts +++ b/agents/src/voice/agent_activity.ts @@ -2876,10 +2876,6 @@ export class AgentActivity implements RecognitionHooks { throw new Error('realtimeSession is not available'); } - // Parity gap (python->ts): Python also waits on a user-silence event when interruptions - // are enabled. TS voice runtime currently does not expose an equivalent wait primitive here. - // Behavior equivalent: we gate on tool/speech authorization and interruption status before - // issuing realtime say(). await speechHandle.waitIfNotInterrupted([speechHandle._waitForAuthorization()]); if (speechHandle.interrupted) { From eb2b58bc3e0b380465bdb626c6c60789bc016395 Mon Sep 17 00:00:00 2001 From: Brian Yin Date: Wed, 8 Apr 2026 17:36:06 -0700 Subject: [PATCH 6/9] harden handoff reply scheduling and clean up chat-context capability surface --- agents/src/llm/index.ts | 4 + agents/src/llm/realtime.ts | 3 +- agents/src/llm/utils.test.ts | 88 ++++++++- agents/src/llm/utils.ts | 169 ++++++++++++++++++ agents/src/voice/agent_activity.ts | 148 +++++++++------ .../src/voice/agent_activity_handoff.test.ts | 6 +- agents/src/voice/agent_session.ts | 22 ++- .../google/src/beta/realtime/realtime_api.ts | 2 + plugins/openai/src/realtime/realtime_model.ts | 48 ++++- .../src/realtime/realtime_model_beta.ts | 3 +- 10 files changed, 430 insertions(+), 63 deletions(-) diff --git a/agents/src/llm/index.ts b/agents/src/llm/index.ts index a8b157293..6d9bad4f4 100644 --- a/agents/src/llm/index.ts +++ b/agents/src/llm/index.ts @@ -68,6 +68,10 @@ export { oaiParams, serializeImage, toJsonSchema, + validateChatContextStructure, + type ChatContextValidationIssue, + type ChatContextValidationResult, + type ChatContextValidationSeverity, type FormatChatHistoryOptions, type OpenAIFunctionParameters, type SerializedImage, diff --git a/agents/src/llm/realtime.ts b/agents/src/llm/realtime.ts index debbe2a1b..e1866c319 100644 --- a/agents/src/llm/realtime.ts +++ b/agents/src/llm/realtime.ts @@ -49,9 +49,10 @@ export interface RealtimeCapabilities { autoToolReplyGeneration: boolean; audioOutput: boolean; manualFunctionCalls: boolean; - midSessionContextUpdate?: boolean; + midSessionChatCtxUpdate?: boolean; midSessionInstructionsUpdate?: boolean; midSessionToolsUpdate?: boolean; + perResponseToolChoice?: boolean; } export interface InputTranscriptionCompleted { diff --git a/agents/src/llm/utils.test.ts b/agents/src/llm/utils.test.ts index 68a6c2b61..964b520a7 100644 --- a/agents/src/llm/utils.test.ts +++ b/agents/src/llm/utils.test.ts @@ -12,7 +12,12 @@ import { FunctionCallOutput, type ImageContent, } from './chat_context.js'; -import { computeChatCtxDiff, formatChatHistory, serializeImage } from './utils.js'; +import { + computeChatCtxDiff, + formatChatHistory, + serializeImage, + validateChatContextStructure, +} from './utils.js'; function createChatMessage( id: string, @@ -457,6 +462,87 @@ describe('formatChatHistory', () => { }); }); +describe('validateChatContextStructure', () => { + it('returns valid=true for well-formed chat context', () => { + const ctx = new ChatContext([ + ChatMessage.create({ + id: 'msg_user', + role: 'user', + content: ['hello'], + createdAt: 1, + }), + FunctionCall.create({ + id: 'fn_call', + callId: 'call_1', + name: 'lookup_order', + args: '{"orderId":"123"}', + createdAt: 2, + }), + FunctionCallOutput.create({ + id: 'fn_output', + callId: 'call_1', + name: 'lookup_order', + output: '{"ok":true}', + isError: false, + createdAt: 3, + }), + ]); + + const result = validateChatContextStructure(ctx); + expect(result.valid).toBe(true); + expect(result.errors).toBe(0); + expect(result.warnings).toBe(0); + expect(result.issues).toEqual([]); + }); + + it('detects duplicate ids and timestamp ordering issues', () => { + const m1 = ChatMessage.create({ + id: 'dup_id', + role: 'user', + content: ['hello'], + createdAt: 10, + }); + const m2 = ChatMessage.create({ + id: 'dup_id', + role: 'assistant', + content: ['world'], + createdAt: 5, + }); + const ctx = new ChatContext([m1, m2]); + + const result = validateChatContextStructure(ctx); + expect(result.valid).toBe(false); + expect(result.errors).toBeGreaterThanOrEqual(2); + expect(result.issues.some((i) => i.code === 'duplicate_id')).toBe(true); + expect(result.issues.some((i) => i.code === 'timestamp_order')).toBe(true); + }); + + it('detects malformed terms and orphan function outputs', () => { + const msg = ChatMessage.create({ + id: 'msg_1', + role: 'user', + content: [' '], + createdAt: 1, + }); + const output = FunctionCallOutput.create({ + id: 'fn_out_1', + callId: 'call_missing', + name: 'lookup_order', + output: 'ok', + isError: false, + createdAt: 2, + }); + const ctx = new ChatContext([msg, output]); + + const result = validateChatContextStructure(ctx); + expect(result.valid).toBe(true); + expect(result.errors).toBe(0); + expect(result.warnings).toBeGreaterThanOrEqual(2); + expect(result.issues.some((i) => i.code === 'empty_text_term')).toBe(true); + expect(result.issues.some((i) => i.code === 'orphan_function_call_output')).toBe(true); + }); +}); + describe('serializeImage', () => { let consoleWarnSpy: ReturnType; diff --git a/agents/src/llm/utils.ts b/agents/src/llm/utils.ts index 76975d150..8865d1056 100644 --- a/agents/src/llm/utils.ts +++ b/agents/src/llm/utils.ts @@ -247,6 +247,33 @@ export interface FormatChatHistoryOptions { includeTimestamps?: boolean; } +export type ChatContextValidationSeverity = 'error' | 'warning'; + +export interface ChatContextValidationIssue { + severity: ChatContextValidationSeverity; + code: + | 'duplicate_id' + | 'timestamp_order' + | 'empty_message_content' + | 'empty_text_term' + | 'missing_image_term' + | 'invalid_audio_term' + | 'invalid_function_call' + | 'invalid_function_call_args' + | 'invalid_function_call_output' + | 'orphan_function_call_output'; + index: number; + itemId: string; + message: string; +} + +export interface ChatContextValidationResult { + valid: boolean; + errors: number; + warnings: number; + issues: ChatContextValidationIssue[]; +} + /** * Render a chat context into a readable multiline string for debugging and logging. */ @@ -273,6 +300,148 @@ export function formatChatHistory( ].join('\n'); } +/** + * Validate structural integrity of chat context items/terms for realtime usage. + */ +export function validateChatContextStructure(chatCtx: ChatContext): ChatContextValidationResult { + const issues: ChatContextValidationIssue[] = []; + const ids = new Set(); + const seenFunctionCallIds = new Set(); + let previousCreatedAt = -Infinity; + + const pushIssue = (issue: ChatContextValidationIssue) => { + issues.push(issue); + }; + + for (let index = 0; index < chatCtx.items.length; index += 1) { + const item = chatCtx.items[index]!; + + if (ids.has(item.id)) { + pushIssue({ + severity: 'error', + code: 'duplicate_id', + index, + itemId: item.id, + message: `Duplicate item id '${item.id}'`, + }); + } else { + ids.add(item.id); + } + + if (item.createdAt < previousCreatedAt) { + pushIssue({ + severity: 'error', + code: 'timestamp_order', + index, + itemId: item.id, + message: `Item createdAt (${item.createdAt}) is older than previous item (${previousCreatedAt})`, + }); + } + previousCreatedAt = item.createdAt; + + if (item.type === 'message') { + if (item.content.length === 0) { + pushIssue({ + severity: 'warning', + code: 'empty_message_content', + index, + itemId: item.id, + message: 'Message has empty content array', + }); + } + + item.content.forEach((term, termIndex) => { + if (typeof term === 'string') { + if (term.trim().length === 0) { + pushIssue({ + severity: 'warning', + code: 'empty_text_term', + index, + itemId: item.id, + message: `Message term[${termIndex}] is empty text`, + }); + } + return; + } + + if (term.type === 'image_content') { + if (!term.id || term.image === undefined || term.image === null) { + pushIssue({ + severity: 'error', + code: 'missing_image_term', + index, + itemId: item.id, + message: `Message term[${termIndex}] has invalid image content`, + }); + } + return; + } + + if (!Array.isArray(term.frame)) { + pushIssue({ + severity: 'error', + code: 'invalid_audio_term', + index, + itemId: item.id, + message: `Message term[${termIndex}] has invalid audio content`, + }); + } + }); + } else if (item.type === 'function_call') { + if (!item.name || !item.callId) { + pushIssue({ + severity: 'error', + code: 'invalid_function_call', + index, + itemId: item.id, + message: 'Function call is missing name or callId', + }); + } else { + seenFunctionCallIds.add(item.callId); + } + + try { + JSON.parse(item.args); + } catch { + pushIssue({ + severity: 'warning', + code: 'invalid_function_call_args', + index, + itemId: item.id, + message: 'Function call args are not valid JSON', + }); + } + } else if (item.type === 'function_call_output') { + if (!item.callId) { + pushIssue({ + severity: 'error', + code: 'invalid_function_call_output', + index, + itemId: item.id, + message: 'Function call output is missing callId', + }); + } else if (!seenFunctionCallIds.has(item.callId)) { + pushIssue({ + severity: 'warning', + code: 'orphan_function_call_output', + index, + itemId: item.id, + message: `Function call output references unknown callId '${item.callId}'`, + }); + } + } + } + + const errors = issues.filter((issue) => issue.severity === 'error').length; + const warnings = issues.length - errors; + return { + valid: errors === 0, + errors, + warnings, + issues, + }; +} + function formatChatHistoryItem( item: ChatItem, index: number, diff --git a/agents/src/voice/agent_activity.ts b/agents/src/voice/agent_activity.ts index 52913ba81..22655d5e5 100644 --- a/agents/src/voice/agent_activity.ts +++ b/agents/src/voice/agent_activity.ts @@ -8,6 +8,7 @@ import { ROOT_CONTEXT, context as otelContext, trace } from '@opentelemetry/api' import { Heap } from 'heap-js'; import { AsyncLocalStorage } from 'node:async_hooks'; import { ReadableStream, TransformStream } from 'node:stream/web'; +import type { Logger } from 'pino'; import type { InterruptionDetectionError } from '../inference/interruption/errors.js'; import { AdaptiveInterruptionDetector } from '../inference/interruption/interruption_detector.js'; import type { OverlappingSpeechEvent } from '../inference/interruption/types.js'; @@ -101,15 +102,43 @@ export interface ReusableResources { rtSession?: RealtimeSession; } -export async function cleanupReusableResources(resources: ReusableResources): Promise { +export class SchedulingPausedError extends Error { + constructor() { + super('cannot schedule new speech, the speech scheduling is draining/pausing'); + this.name = 'SchedulingPausedError'; + } +} + +export function isSchedulingPausedError(error: unknown): error is SchedulingPausedError { + return error instanceof SchedulingPausedError; +} + +export async function cleanupReusableResources( + resources: ReusableResources, + logger: Logger, +): Promise { + const tasks: Promise[] = []; if (resources.sttPipeline) { - await resources.sttPipeline.close(); + tasks.push(resources.sttPipeline.close()); resources.sttPipeline = undefined; } if (resources.rtSession) { - await resources.rtSession.close(); + tasks.push(resources.rtSession.close()); resources.rtSession = undefined; } + + if (tasks.length > 0) { + const outputs = await Promise.allSettled(tasks); + for (const output of outputs) { + if (output.status === 'rejected') { + if (logger) { + logger.error({ error: output.reason }, 'error cleaning up reusable resources'); + } else { + console.error('error cleaning up reusable resources', output.reason); + } + } + } + } } interface PreemptiveGeneration { @@ -387,7 +416,7 @@ export class AgentActivity implements RecognitionHooks { } } - if (!rtReused || capabilities.midSessionContextUpdate) { + if (!rtReused || capabilities.midSessionChatCtxUpdate) { try { await this.realtimeSession!.updateChatCtx(this.agent.chatCtx); } catch (error) { @@ -495,56 +524,65 @@ export class AgentActivity implements RecognitionHooks { async _detachReusableResources(newActivity: AgentActivity): Promise { const resources: ReusableResources = {}; + try { + // stt pipeline + if ( + this.audioRecognition && + this.stt && + newActivity.stt && + this.stt === newActivity.stt && + Object.getPrototypeOf(this.agent).sttNode === + Object.getPrototypeOf(newActivity.agent).sttNode + ) { + resources.sttPipeline = await this.audioRecognition.detachSttPipeline(); + } - // stt pipeline - if ( - this.audioRecognition && - this.stt && - newActivity.stt && - this.stt === newActivity.stt && - Object.getPrototypeOf(this.agent).sttNode === Object.getPrototypeOf(newActivity.agent).sttNode - ) { - resources.sttPipeline = await this.audioRecognition.detachSttPipeline(); - } - - // rt session - if (this.realtimeSession && this.llm instanceof RealtimeModel && this.llm === newActivity.llm) { - const capabilities = this.llm.capabilities; + // rt session + if ( + this.realtimeSession && + this.llm instanceof RealtimeModel && + this.llm === newActivity.llm + ) { + const capabilities = this.llm.capabilities; + + // context update is supported or chat context is equivalent + let reusable = + capabilities.midSessionChatCtxUpdate || + this.realtimeSession.chatCtx + .copy({ excludeInstructions: true, excludeHandoff: true }) + .isEquivalent( + newActivity.agent.chatCtx.copy({ excludeInstructions: true, excludeHandoff: true }), + ); - // context update is supported or chat context is equivalent - let reusable = - capabilities.midSessionContextUpdate || - this.realtimeSession.chatCtx - .copy({ excludeInstructions: true, excludeHandoff: true }) - .isEquivalent( - newActivity.agent.chatCtx.copy({ excludeInstructions: true, excludeHandoff: true }), + // instructions update is supported or instructions are the same + reusable = + reusable && + (capabilities.midSessionInstructionsUpdate || + this.agent.instructions === newActivity.agent.instructions); + + // tools update is supported or tools are the same + reusable = + reusable && + (capabilities.midSessionToolsUpdate || isSameToolContext(this.tools, newActivity.tools)); + + if (reusable) { + // detach: remove event listeners but don't close the session + this.realtimeSession.off('generation_created', this.onRealtimeGenerationCreated); + this.realtimeSession.off('input_speech_started', this.onRealtimeInputSpeechStarted); + this.realtimeSession.off('input_speech_stopped', this.onRealtimeInputSpeechStopped); + this.realtimeSession.off( + 'input_audio_transcription_completed', + this.onRealtimeInputAudioTranscriptionCompleted, ); - - // instructions update is supported or instructions are the same - reusable = - reusable && - (capabilities.midSessionInstructionsUpdate || - this.agent.instructions === newActivity.agent.instructions); - - // tools update is supported or tools are the same - reusable = - reusable && - (capabilities.midSessionToolsUpdate || isSameToolContext(this.tools, newActivity.tools)); - - if (reusable) { - // detach: remove event listeners but don't close the session - this.realtimeSession.off('generation_created', this.onRealtimeGenerationCreated); - this.realtimeSession.off('input_speech_started', this.onRealtimeInputSpeechStarted); - this.realtimeSession.off('input_speech_stopped', this.onRealtimeInputSpeechStopped); - this.realtimeSession.off( - 'input_audio_transcription_completed', - this.onRealtimeInputAudioTranscriptionCompleted, - ); - this.realtimeSession.off('metrics_collected', this.onMetricsCollected); - this.realtimeSession.off('error', this.onModelError); - resources.rtSession = this.realtimeSession; - this.realtimeSession = undefined; // prevent _closeSessionResources from closing it + this.realtimeSession.off('metrics_collected', this.onMetricsCollected); + this.realtimeSession.off('error', this.onModelError); + resources.rtSession = this.realtimeSession; + this.realtimeSession = undefined; // prevent _closeSessionResources from closing it + } } + } catch (error) { + await cleanupReusableResources(resources, this.logger); + throw error; } return resources; @@ -2963,7 +3001,7 @@ export class AgentActivity implements RecognitionHooks { // when force=true, we allow tool responses to bypass scheduling pause // This allows for tool responses to be generated before the AgentActivity is finalized if (this.schedulingPaused && !force) { - throw new Error('cannot schedule new speech, the speech scheduling is draining/pausing'); + throw new SchedulingPausedError(); } // Monotonic time to avoid near 0 collisions @@ -3021,7 +3059,7 @@ export class AgentActivity implements RecognitionHooks { await this._closeSessionResources(); } catch (error) { if (resources) { - await cleanupReusableResources(resources); + await cleanupReusableResources(resources, this.logger); } throw error; } finally { @@ -3069,7 +3107,11 @@ export class AgentActivity implements RecognitionHooks { // detach after speech tasks are done but before _closeSessionResources if (newActivity) { - return await this._detachReusableResources(newActivity); + try { + return await this._detachReusableResources(newActivity); + } catch (error) { + this.logger.error(error, 'failed to detach reusable resources'); + } } return undefined; } finally { diff --git a/agents/src/voice/agent_activity_handoff.test.ts b/agents/src/voice/agent_activity_handoff.test.ts index 25188cdb5..f9422b34d 100644 --- a/agents/src/voice/agent_activity_handoff.test.ts +++ b/agents/src/voice/agent_activity_handoff.test.ts @@ -167,7 +167,7 @@ describe('AgentActivity RT session reuse eligibility', () => { autoToolReplyGeneration: false, audioOutput: true, manualFunctionCalls: false, - midSessionContextUpdate: false, + midSessionChatCtxUpdate: false, midSessionInstructionsUpdate: false, midSessionToolsUpdate: false, ...capabilitiesOverrides, @@ -251,8 +251,8 @@ describe('AgentActivity RT session reuse eligibility', () => { expect(resources.rtSession).toBe(rtSession); }); - it('reuses RT session when context differs but midSessionContextUpdate is true', async () => { - const sharedLlm = createFakeRealtimeModel({ midSessionContextUpdate: true }); + it('reuses RT session when context differs but midSessionChatCtxUpdate is true', async () => { + const sharedLlm = createFakeRealtimeModel({ midSessionChatCtxUpdate: true }); const rtSession = createFakeRtSession(); // Give the session a non-empty chat context (rtSession as any).chatCtx = ChatContext.empty(); diff --git a/agents/src/voice/agent_session.ts b/agents/src/voice/agent_session.ts index 2a024c7a6..3f4e813d0 100644 --- a/agents/src/voice/agent_session.ts +++ b/agents/src/voice/agent_session.ts @@ -43,6 +43,7 @@ import { AgentActivity, type ReusableResources, cleanupReusableResources, + isSchedulingPausedError, } from './agent_activity.js'; import type { _TurnDetector } from './audio_recognition.js'; import { @@ -684,7 +685,22 @@ export class AgentSession< } return nextActivity.generateReply({ userMessage, ...options }); } - return activity.generateReply({ userMessage, ...options }); + + // Handoff can race with scheduling pause between the check above and generateReply(). + // If that happens, retry on the next activity instead of surfacing an avoidable error. + try { + return activity.generateReply({ userMessage, ...options }); + } catch (error) { + const canFallback = nextActivity !== undefined && isSchedulingPausedError(error); + if (!canFallback) { + throw error; + } + this.logger.debug( + { error }, + 'generateReply scheduling raced with handoff drain; retrying on next activity', + ); + return nextActivity.generateReply({ userMessage, ...options }); + } }; // attach to the session span if called outside of the AgentSession @@ -809,7 +825,7 @@ export class AgentSession< 'Session is closing, skipping start of next activity', ); if (reusableResources) { - await cleanupReusableResources(reusableResources); + await cleanupReusableResources(reusableResources, this.logger); reusableResources = undefined; } this.nextActivity = undefined; @@ -856,7 +872,7 @@ export class AgentSession< // JS safeguard: session cleanup owns the detached resources until the next activity // starts successfully, preventing leaks when handoff fails mid-transition. if (reusableResources) { - await cleanupReusableResources(reusableResources); + await cleanupReusableResources(reusableResources, this.logger); } throw error; } finally { diff --git a/plugins/google/src/beta/realtime/realtime_api.ts b/plugins/google/src/beta/realtime/realtime_api.ts index 00e773630..da0ebba19 100644 --- a/plugins/google/src/beta/realtime/realtime_api.ts +++ b/plugins/google/src/beta/realtime/realtime_api.ts @@ -311,8 +311,10 @@ export class RealtimeModel extends llm.RealtimeModel { autoToolReplyGeneration: true, audioOutput: options.modalities?.includes(Modality.AUDIO) ?? true, manualFunctionCalls: false, + midSessionChatCtxUpdate: false, midSessionInstructionsUpdate: true, midSessionToolsUpdate: false, + perResponseToolChoice: false, }); // Environment variable fallbacks diff --git a/plugins/openai/src/realtime/realtime_model.ts b/plugins/openai/src/realtime/realtime_model.ts index 4496028c4..41b148eff 100644 --- a/plugins/openai/src/realtime/realtime_model.ts +++ b/plugins/openai/src/realtime/realtime_model.ts @@ -185,9 +185,10 @@ export class RealtimeModel extends llm.RealtimeModel { autoToolReplyGeneration: false, audioOutput: modalities.includes('audio'), manualFunctionCalls: true, - midSessionContextUpdate: true, + midSessionChatCtxUpdate: true, midSessionInstructionsUpdate: true, midSessionToolsUpdate: true, + perResponseToolChoice: true, }); const isAzure = !!(options.apiVersion || options.entraToken || options.azureDeployment); @@ -480,8 +481,52 @@ export class RealtimeSession extends llm.RealtimeSession { async updateChatCtx(_chatCtx: llm.ChatContext): Promise { const unlock = await this.updateChatCtxLock.lock(); try { + const validation = llm.validateChatContextStructure(_chatCtx); + const blockingErrors = validation.issues.filter( + (issue: llm.ChatContextValidationIssue) => + issue.severity === 'error' && issue.code !== 'timestamp_order', + ); + const timestampOrderIssue = validation.issues.find( + (issue: llm.ChatContextValidationIssue) => issue.code === 'timestamp_order', + ); + if (blockingErrors.length > 0) { + this.#logger.error( + { issues: validation.issues, blockingErrors }, + 'Invalid chat context supplied to updateChatCtx', + ); + throw new Error( + `Invalid chat context: ${validation.errors} errors, ${validation.warnings} warnings`, + ); + } + if (timestampOrderIssue) { + this.#logger.warn( + { timestampOrderIssue }, + 'Proceeding with non-monotonic createdAt ordering in realtime chat context', + ); + } + if (lkOaiDebug > 0 && validation.warnings > 0) { + this.#logger.debug( + { + warnings: validation.warnings, + issues: validation.issues, + }, + 'Chat context warnings detected before realtime update', + ); + } + const events = await this.createChatCtxUpdateEvents(_chatCtx); const futures: Future[] = []; + const cleanupTimedOutFutures = () => { + // remove timed-out entries so late server acks + // don't resolve stale futures from a previous updateChatCtx call. + for (const event of events) { + if (event.type === 'conversation.item.delete') { + delete this.itemDeleteFutures[event.item_id]; + } else if (event.type === 'conversation.item.create') { + delete this.itemCreateFutures[event.item.id]; + } + } + }; for (const event of events) { if (event.type === 'conversation.item.create') { @@ -510,6 +555,7 @@ export class RealtimeSession extends llm.RealtimeSession { await Promise.race([ Promise.all(futures), delay(5000).then(() => { + cleanupTimedOutFutures(); throw new Error('Chat ctx update events timed out'); }), ]); diff --git a/plugins/openai/src/realtime/realtime_model_beta.ts b/plugins/openai/src/realtime/realtime_model_beta.ts index 4b1763546..d81720d67 100644 --- a/plugins/openai/src/realtime/realtime_model_beta.ts +++ b/plugins/openai/src/realtime/realtime_model_beta.ts @@ -177,9 +177,10 @@ export class RealtimeModel extends llm.RealtimeModel { autoToolReplyGeneration: false, audioOutput: modalities.includes('audio'), manualFunctionCalls: true, - midSessionContextUpdate: true, + midSessionChatCtxUpdate: true, midSessionInstructionsUpdate: true, midSessionToolsUpdate: true, + perResponseToolChoice: true, }); const isAzure = !!(options.apiVersion || options.entraToken || options.azureDeployment); From 782a916fe8df024179c3f55ac17e458a8113f418 Mon Sep 17 00:00:00 2001 From: Brian Yin Date: Wed, 8 Apr 2026 17:46:26 -0700 Subject: [PATCH 7/9] minor fix --- agents/src/utils.ts | 1 + agents/src/voice/agent_activity.ts | 3 ++- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/agents/src/utils.ts b/agents/src/utils.ts index ab6a499c0..9ea71de61 100644 --- a/agents/src/utils.ts +++ b/agents/src/utils.ts @@ -970,6 +970,7 @@ export async function* readStream( stream: ReadableStream, signal?: AbortSignal, ): AsyncGenerator { + if (signal?.aborted) return; const reader = stream.getReader(); try { if (signal) { diff --git a/agents/src/voice/agent_activity.ts b/agents/src/voice/agent_activity.ts index 22655d5e5..8216a79e8 100644 --- a/agents/src/voice/agent_activity.ts +++ b/agents/src/voice/agent_activity.ts @@ -2927,7 +2927,8 @@ export class AgentActivity implements RecognitionHooks { }); } catch (e) { this.logger.error('failed to say text: %s', String(e)); - this.agentSession._updateAgentState('listening'); + // Keep state transition logic centralized so queued/planned speeches are respected. + this.onPipelineReplyDone(); return; } From ab5c52851daff828900b3c09c4f7d3e1bd1ee602 Mon Sep 17 00:00:00 2001 From: Tina Nguyen <72938484+tinalenguyen@users.noreply.github.com> Date: Thu, 9 Apr 2026 20:37:20 -0400 Subject: [PATCH 8/9] (phonic): add logic for maintaining a single ws conn and remove say() (#1220) Co-authored-by: Qiong Zhou Huang Co-authored-by: Brian Yin --- .changeset/happy-yaks-bet.md | 5 + .changeset/plenty-baths-hug.md | 5 + agents/src/llm/realtime.ts | 7 - agents/src/voice/agent_activity.ts | 135 +++------- plugins/hedra/package.json | 4 +- plugins/phonic/package.json | 2 +- plugins/phonic/src/realtime/realtime_model.ts | 241 ++++++++++++------ pnpm-lock.yaml | 130 +++++----- pnpm-workspace.yaml | 2 +- 9 files changed, 280 insertions(+), 251 deletions(-) create mode 100644 .changeset/happy-yaks-bet.md create mode 100644 .changeset/plenty-baths-hug.md diff --git a/.changeset/happy-yaks-bet.md b/.changeset/happy-yaks-bet.md new file mode 100644 index 000000000..887dd511c --- /dev/null +++ b/.changeset/happy-yaks-bet.md @@ -0,0 +1,5 @@ +--- +"@livekit/agents-plugin-phonic": patch +--- + +Update phonic plugin to reuse session for handoffs diff --git a/.changeset/plenty-baths-hug.md b/.changeset/plenty-baths-hug.md new file mode 100644 index 000000000..611510e3a --- /dev/null +++ b/.changeset/plenty-baths-hug.md @@ -0,0 +1,5 @@ +--- +'@livekit/agents': patch +--- + +remove rt session say logic and add phonic logic for resetting ws conn diff --git a/agents/src/llm/realtime.ts b/agents/src/llm/realtime.ts index e1866c319..abfef0d46 100644 --- a/agents/src/llm/realtime.ts +++ b/agents/src/llm/realtime.ts @@ -161,13 +161,6 @@ export abstract class RealtimeSession extends EventEmitter { return; } - say( - _text: string | ReadableStream, - _options?: { allowInterruptions?: boolean }, - ): Promise { - throw new Error(`${this.constructor.name} does not implement say(). use a TTS model instead`); - } - private async _mainTaskImpl(signal: AbortSignal): Promise { const reader = this.inputAudioStream.stream.getReader(); while (true) { diff --git a/agents/src/voice/agent_activity.ts b/agents/src/voice/agent_activity.ts index 8216a79e8..167287cd5 100644 --- a/agents/src/voice/agent_activity.ts +++ b/agents/src/voice/agent_activity.ts @@ -380,6 +380,7 @@ export class AgentActivity implements RecognitionHooks { if (this.llm instanceof RealtimeModel) { const rtReused = reuseResources?.rtSession !== undefined; + if (rtReused) { this.logger.debug('reusing realtime session from previous activity'); this.realtimeSession = reuseResources!.rtSession; @@ -408,27 +409,41 @@ export class AgentActivity implements RecognitionHooks { // skip the update if the session is reused and no mid-session update is supported // this means the content is the same as the previous session const capabilities = this.llm.capabilities; - if (!rtReused || capabilities.midSessionInstructionsUpdate) { + if (rtReused && this.realtimeSession?.realtimeModel.provider == 'phonic') { + // if the session is being reused, then call phonic's _updateSession to send a full mid-session config update. + // otherwise, call the separate update_* functions to build the initial config. try { - await this.realtimeSession!.updateInstructions(this.agent.instructions); + await (this.realtimeSession as any)._updateSession( + this.agent.instructions, + this.agent.chatCtx, + this.tools, + ); } catch (error) { - this.logger.error(error, 'failed to update the instructions'); + this.logger.error(error, 'failed to update phonic session'); + } + } else { + if (!rtReused || capabilities.midSessionInstructionsUpdate) { + try { + await this.realtimeSession!.updateInstructions(this.agent.instructions); + } catch (error) { + this.logger.error(error, 'failed to update the instructions'); + } } - } - if (!rtReused || capabilities.midSessionChatCtxUpdate) { - try { - await this.realtimeSession!.updateChatCtx(this.agent.chatCtx); - } catch (error) { - this.logger.error(error, 'failed to update the chat context'); + if (!rtReused || capabilities.midSessionChatCtxUpdate) { + try { + await this.realtimeSession!.updateChatCtx(this.agent.chatCtx); + } catch (error) { + this.logger.error(error, 'failed to update the chat context'); + } } - } - if (!rtReused || capabilities.midSessionToolsUpdate) { - try { - await this.realtimeSession!.updateTools(this.tools); - } catch (error) { - this.logger.error(error, 'failed to update the tools'); + if (!rtReused || capabilities.midSessionToolsUpdate) { + try { + await this.realtimeSession!.updateTools(this.tools); + } catch (error) { + this.logger.error(error, 'failed to update the tools'); + } } } @@ -819,33 +834,15 @@ export class AgentActivity implements RecognitionHooks { allowInterruptions: defaultAllowInterruptions, addToChatCtx = true, } = options ?? {}; - let allowInterruptions = defaultAllowInterruptions; - - if ( - this.llm instanceof RealtimeModel && - this.llm.capabilities.turnDetection && - this.tts && - allowInterruptions === false - ) { - this.logger.warn( - 'the RealtimeModel uses a server-side turn detection, allowInterruptions cannot be false when using VoiceAgent.say(), ' + - 'disable turnDetection in the RealtimeModel and use VAD on the AgentTask/VoiceAgent instead', - ); - allowInterruptions = true; - } + const allowInterruptions = defaultAllowInterruptions; if ( !audio && !this.tts && - this.realtimeSession === undefined && this.agentSession.output.audio && this.agentSession.output.audioEnabled ) { - const modelInfo = - this.llm instanceof RealtimeModel - ? 'a RealtimeSession that implements say()' - : 'a TTS model'; - throw new Error(`trying to generate speech from text without ${modelInfo}`); + throw new Error('trying to generate speech from text without a TTS model'); } const handle = SpeechHandle.create({ @@ -861,28 +858,14 @@ export class AgentActivity implements RecognitionHooks { }), ); - let task: Task; - if (this.realtimeSession !== undefined && !audio && !this.tts) { - task = this.createSpeechTask({ - taskFn: (abortController: AbortController) => - this.realtimeSayTask(handle, text, addToChatCtx, {}, abortController), - ownedSpeechHandle: handle, - name: 'AgentActivity.realtime_say', - }); - } else { - task = this.createSpeechTask({ - taskFn: (abortController: AbortController) => - this.ttsTask(handle, text, addToChatCtx, {}, abortController, audio), - ownedSpeechHandle: handle, - name: 'AgentActivity.tts_say', - }); - } + const task = this.createSpeechTask({ + taskFn: (abortController: AbortController) => + this.ttsTask(handle, text, addToChatCtx, {}, abortController, audio), + ownedSpeechHandle: handle, + name: 'AgentActivity.tts_say', + }); - // Avoid duplicate state transitions for realtime say path: realtimeGenerationTask already - // performs end-of-speech transitions internally. - if (this.realtimeSession === undefined || audio !== undefined || this.tts) { - task.result.finally(() => this.onPipelineReplyDone()); - } + task.result.finally(() => this.onPipelineReplyDone()); this.scheduleSpeech(handle, SpeechHandle.SPEECH_PRIORITY_NORMAL); return handle; } @@ -2901,46 +2884,6 @@ export class AgentActivity implements RecognitionHooks { }; } - private async realtimeSayTask( - speechHandle: SpeechHandle, - text: string | ReadableStream, - addToChatCtx: boolean, - modelSettings: ModelSettings, - replyAbortController: AbortController, - ): Promise { - speechHandleStorage.enterWith(speechHandle); - - if (!this.realtimeSession) { - throw new Error('realtimeSession is not available'); - } - - await speechHandle.waitIfNotInterrupted([speechHandle._waitForAuthorization()]); - - if (speechHandle.interrupted) { - return; - } - - let generationEv: GenerationCreatedEvent; - try { - generationEv = await this.realtimeSession.say(text, { - allowInterruptions: speechHandle.allowInterruptions, - }); - } catch (e) { - this.logger.error('failed to say text: %s', String(e)); - // Keep state transition logic centralized so queued/planned speeches are respected. - this.onPipelineReplyDone(); - return; - } - - await this.realtimeGenerationTask( - speechHandle, - generationEv, - modelSettings, - replyAbortController, - addToChatCtx, - ); - } - private async realtimeReplyTask({ speechHandle, modelSettings: { toolChoice }, diff --git a/plugins/hedra/package.json b/plugins/hedra/package.json index 0ef1e78d5..6003f1171 100644 --- a/plugins/hedra/package.json +++ b/plugins/hedra/package.json @@ -35,7 +35,7 @@ }, "devDependencies": { "@livekit/agents": "workspace:*", - "@livekit/rtc-node": "^0.13.22", + "@livekit/rtc-node": "catalog:", "@microsoft/api-extractor": "^7.35.0", "pino": "^8.19.0", "tsup": "^8.3.5", @@ -46,6 +46,6 @@ }, "peerDependencies": { "@livekit/agents": "workspace:*", - "@livekit/rtc-node": "^0.13.22" + "@livekit/rtc-node": "catalog:" } } diff --git a/plugins/phonic/package.json b/plugins/phonic/package.json index 600cfb37e..7d5593459 100644 --- a/plugins/phonic/package.json +++ b/plugins/phonic/package.json @@ -41,7 +41,7 @@ "typescript": "^5.0.0" }, "dependencies": { - "phonic": "^0.31.8" + "phonic": "^0.31.10" }, "peerDependencies": { "@livekit/agents": "workspace:*", diff --git a/plugins/phonic/src/realtime/realtime_model.ts b/plugins/phonic/src/realtime/realtime_model.ts index ee626bf76..d1c7af962 100644 --- a/plugins/phonic/src/realtime/realtime_model.ts +++ b/plugins/phonic/src/realtime/realtime_model.ts @@ -12,7 +12,6 @@ import { stream, } from '@livekit/agents'; import { AudioFrame, AudioResampler } from '@livekit/rtc-node'; -import type { ReadableStream } from 'node:stream/web'; import type { Phonic } from 'phonic'; import { PhonicClient } from 'phonic'; import type { ServerEvent, Voice } from './api_proto.js'; @@ -24,6 +23,10 @@ const PHONIC_INPUT_FRAME_MS = 20; const DEFAULT_MODEL = 'merritt'; const WS_CLOSE_NORMAL = 1000; const TOOL_CALL_OUTPUT_TIMEOUT_MS = 60_000; +const CONVERSATION_HISTORY_PREFIX = + '\n\nThis conversation is being continued from an existing ' + + 'conversation. You are the assistant speaking to the user. ' + + 'The following is the conversation history:\n'; export interface RealtimeModelOptions { apiKey: string; @@ -57,6 +60,10 @@ export class RealtimeModel extends llm.RealtimeModel { return this._options.model; } + get provider(): string { + return 'phonic'; + } + constructor( options: { /** @@ -147,6 +154,10 @@ export class RealtimeModel extends llm.RealtimeModel { autoToolReplyGeneration: true, manualFunctionCalls: false, audioOutput: true, + midSessionChatCtxUpdate: true, + midSessionInstructionsUpdate: true, + midSessionToolsUpdate: true, + perResponseToolChoice: false, }); const apiKey = options.apiKey || process.env.PHONIC_API_KEY; @@ -215,7 +226,6 @@ interface GenerationState { * Realtime session for Phonic (https://docs.phonic.co/) */ export class RealtimeSession extends llm.RealtimeSession { - private static readonly SAY_TIMEOUT_MS = 10_000; private _tools: llm.ToolContext = {}; private _chatCtx = llm.ChatContext.empty(); @@ -238,7 +248,8 @@ export class RealtimeSession extends llm.RealtimeSession { private connectTask: Promise; private toolDefinitions: Record[] = []; private pendingToolCallIds = new Set(); - private readyToStart = false; + private readyToStart = new Future(); + private pendingGenerateReplyFut?: Future; private systemPromptPostfix = ''; constructor(realtimeModel: RealtimeModel) { @@ -296,9 +307,7 @@ export class RealtimeSession extends llm.RealtimeSession { this.logger.debug( 'updateChatCtx called with messages prior to config being sent to Phonic. Including conversation state in system instructions.', ); - this.systemPromptPostfix = - '\n\nThis conversation is being continued from an existing conversation. You are the assistant speaking to the user. The following is the conversation history:\n' + - turnHistory; + this.systemPromptPostfix = CONVERSATION_HISTORY_PREFIX + turnHistory; } this._chatCtx = chatCtx.copy(); } @@ -377,12 +386,69 @@ export class RealtimeSession extends llm.RealtimeSession { this.toolsReady.resolve(); } + async _updateSession( + instructions?: string, + chatCtx?: llm.ChatContext, + tools?: llm.ToolContext, + ): Promise { + await this.readyToStart.await; + if (instructions !== undefined) { + this.options.instructions = instructions; + } + if (tools !== undefined) { + this._tools = { ...tools }; + this.toolDefinitions = Object.entries(tools) + .filter(([, tool]) => llm.isFunctionTool(tool)) + .map(([name, tool]) => ({ + type: 'custom_websocket', + tool_schema: { + type: 'function', + function: { + name, + description: tool.description, + parameters: llm.toJsonSchema(tool.parameters), + strict: true, + }, + }, + tool_call_output_timeout_ms: TOOL_CALL_OUTPUT_TIMEOUT_MS, + wait_for_speech_before_tool_call: true, + allow_tool_chaining: false, + })); + } + if (chatCtx !== undefined) { + this._chatCtx = chatCtx.copy(); + } + + let systemPrompt = this.options.instructions ?? ''; + if (chatCtx !== undefined) { + const history = this.buildTurnHistory(chatCtx); + if (history) { + systemPrompt += CONVERSATION_HISTORY_PREFIX + history; + } + } + + this.closeCurrentGeneration({ interrupted: true }); + + const toolsPayload: Phonic.ConfigOptions.Tools.Item[] = [ + ...(this.options.phonicTools ?? []), + ...this.toolDefinitions, + ]; + + if (this.socket) { + this.logger.info('Sending mid-session reset to Phonic'); + this.socket.sendReset({ + type: 'reset', + config: this.buildConfigOptions({ systemPrompt, toolsPayload }), + }); + } + } + updateOptions(_options: { toolChoice?: llm.ToolChoice | null }): void { this.logger.warn('updateOptions is not supported by the Phonic realtime model.'); } pushAudio(frame: AudioFrame): void { - if (this.closed || !this.readyToStart) { + if (this.closed || !this.readyToStart.done) { return; } @@ -403,64 +469,24 @@ export class RealtimeSession extends llm.RealtimeSession { } async generateReply(instructions?: string): Promise { - if (this.socket) { - this.socket.sendGenerateReply({ type: 'generate_reply', system_message: instructions }); - } else { - this.logger.warn('Cannot send generate_reply: WebSocket not available'); + if (this.closed) { + return Promise.reject(new Error('session is closed')); } - this.closeCurrentGeneration({ interrupted: false }); - return this.startNewAssistantTurn({ userInitiated: true }); - } + this.pendingGenerateReplyFut = new Future(); + this.sendGenerateReply(instructions); - async say( - text: string | ReadableStream, - options?: { allowInterruptions?: boolean }, - ): Promise { - await Promise.race([ - this.sendSayAsync(text, options?.allowInterruptions), - new Promise((_, reject) => { - setTimeout(() => reject(new Error('say() timed out.')), RealtimeSession.SAY_TIMEOUT_MS); - }), - ]); - - this.closeCurrentGeneration({ interrupted: false }); - return this.startNewAssistantTurn({ userInitiated: true }); + return this.pendingGenerateReplyFut.await; } - private async sendSayAsync( - text: string | ReadableStream, - allowInterruptions?: boolean, - ): Promise { - if (this.closed) return; - - let fullText: string; - if (typeof text === 'string') { - fullText = text; - } else { - const chunks: string[] = []; - const reader = text.getReader(); - try { - while (true) { - const { done, value } = await reader.read(); - if (done) break; - chunks.push(value); - } - } finally { - reader.releaseLock(); - } - fullText = chunks.join(''); - } - - if (!this.socket) { - throw new Error('Cannot send say: WebSocket not available'); - } - - const payload: Record = { type: 'say', text: fullText }; - if (allowInterruptions !== undefined) { - payload.interruptible = allowInterruptions; + private async sendGenerateReply(instructions?: string): Promise { + await this.readyToStart.await; + if (this.closed || !this.socket) { + this.pendingGenerateReplyFut?.reject(new Error('session is closed')); + this.pendingGenerateReplyFut = undefined; + return; } - this.socket.socket.send(JSON.stringify(payload)); + this.socket.sendGenerateReply({ type: 'generate_reply', system_message: instructions }); } async commitAudio(): Promise { @@ -485,7 +511,10 @@ export class RealtimeSession extends llm.RealtimeSession { this.closedFuture.resolve(); this.instructionsReady.resolve(); this.toolsReady.resolve(); + this.readyToStart.resolve(); this.closeCurrentGeneration({ interrupted: false }); + this.rejectPendingGenerateReply(); + this.inputResampler = undefined; this.socket?.close(); await this.connectTask; await super.close(); @@ -507,6 +536,7 @@ export class RealtimeSession extends llm.RealtimeSession { this.socket.on('error', (error: Error) => this.emitError(error, false)); this.socket.on('close', (event: { code?: number }) => { this.closeCurrentGeneration({ interrupted: false }); + this.rejectPendingGenerateReply(); if (!this.closed && event.code !== WS_CLOSE_NORMAL) { this.emitError(new Error(`Phonic STS socket closed with code ${event.code ?? -1}`), false); } @@ -534,30 +564,10 @@ export class RealtimeSession extends llm.RealtimeSession { this.socket.sendConfig({ type: 'config', model: this.options.model as Phonic.ConfigPayload['model'], - agent: this.options.phonicAgent, - project: this.options.project, - welcome_message: this.options.welcomeMessage, - generate_welcome_message: this.options.generateWelcomeMessage, - system_prompt: this.options.instructions + this.systemPromptPostfix, - voice_id: this.options.voice, - input_format: 'pcm_44100', - output_format: 'pcm_44100', - ...(this.options.defaultLanguage !== undefined && { - default_language: this.options.defaultLanguage, + ...this.buildConfigOptions({ + systemPrompt: this.options.instructions + this.systemPromptPostfix, + toolsPayload: [...(this.options.phonicTools ?? []), ...this.toolDefinitions], }), - ...(this.options.additionalLanguages !== undefined && { - additional_languages: this.options.additionalLanguages, - }), - ...(this.options.multilingualMode !== undefined && { - multilingual_mode: this.options.multilingualMode, - }), - audio_speed: this.options.audioSpeed, - tools: [...(this.options.phonicTools ?? []), ...this.toolDefinitions], - boosted_keywords: this.options.boostedKeywords, - generate_no_input_poke_text: this.options.generateNoInputPokeText, - no_input_poke_sec: this.options.noInputPokeSec, - no_input_poke_text: this.options.noInputPokeText, - no_input_end_conversation_sec: this.options.noInputEndConversationSec, }); } @@ -609,7 +619,7 @@ export class RealtimeSession extends llm.RealtimeSession { this.handleToolCallInterrupted(message); break; case 'ready_to_start_conversation': - this.readyToStart = true; + this.readyToStart.resolve(); break; case 'assistant_chose_not_to_respond': case 'input_cancelled': @@ -750,6 +760,12 @@ export class RealtimeSession extends llm.RealtimeSession { responseId, }; + if (this.pendingGenerateReplyFut && !this.pendingGenerateReplyFut.done) { + generationEvent.userInitiated = true; + this.pendingGenerateReplyFut.resolve(generationEvent); + this.pendingGenerateReplyFut = undefined; + } + this.emit('generation_created', generationEvent); return generationEvent; } @@ -778,6 +794,13 @@ export class RealtimeSession extends llm.RealtimeSession { this.currentGeneration = undefined; } + private rejectPendingGenerateReply(): void { + if (this.pendingGenerateReplyFut && !this.pendingGenerateReplyFut.done) { + this.pendingGenerateReplyFut.reject(new Error('session is closed')); + this.pendingGenerateReplyFut = undefined; + } + } + private emitError(error: Error, recoverable: boolean): void { this.emit('error', { timestamp: Date.now(), @@ -788,9 +811,61 @@ export class RealtimeSession extends llm.RealtimeSession { } satisfies llm.RealtimeModelError); } + private buildConfigOptions({ + systemPrompt, + toolsPayload, + }: { + systemPrompt: string; + toolsPayload: Phonic.ConfigOptions.Tools.Item[]; + }): Phonic.ConfigOptions { + return { + agent: this.options.phonicAgent, + project: this.options.project, + welcome_message: this.options.welcomeMessage, + generate_welcome_message: this.options.generateWelcomeMessage, + system_prompt: systemPrompt, + voice_id: this.options.voice, + input_format: 'pcm_44100', + output_format: 'pcm_44100', + ...(this.options.defaultLanguage !== undefined && { + default_language: this.options.defaultLanguage, + }), + ...(this.options.additionalLanguages !== undefined && { + additional_languages: this.options.additionalLanguages, + }), + ...(this.options.multilingualMode !== undefined && { + multilingual_mode: this.options.multilingualMode, + }), + audio_speed: this.options.audioSpeed, + tools: toolsPayload, + boosted_keywords: this.options.boostedKeywords, + // ...(this.options.minWordsToInterrupt !== undefined && { + // min_words_to_interrupt: this.options.minWordsToInterrupt, + // }), + generate_no_input_poke_text: this.options.generateNoInputPokeText, + no_input_poke_sec: this.options.noInputPokeSec, + no_input_poke_text: this.options.noInputPokeText, + no_input_end_conversation_sec: this.options.noInputEndConversationSec, + }; + } + + private buildTurnHistory(chatCtx: llm.ChatContext): string | undefined { + const messages = chatCtx.items.filter( + (item): item is llm.ChatMessage => + item.type === 'message' && + 'textContent' in item && + item.textContent !== undefined && + item.textContent.trim() !== '', + ); + if (messages.length === 0) return undefined; + const history = messages.map((m) => `${m.role}: ${m.textContent}`).join('\n'); + return history.trim() || undefined; + } + private *resampleAudio(frame: AudioFrame): Generator { if (this.inputResampler) { if (frame.sampleRate !== this.inputResamplerInputRate) { + this.inputResampler.close(); this.inputResampler = undefined; this.inputResamplerInputRate = undefined; } diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 861ed9e85..4f81a174b 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -7,8 +7,8 @@ settings: catalogs: default: '@livekit/rtc-node': - specifier: ^0.13.24 - version: 0.13.24 + specifier: ^0.13.25 + version: 0.13.25 patchedDependencies: '@changesets/assemble-release-plan': @@ -205,7 +205,7 @@ importers: devDependencies: '@livekit/rtc-node': specifier: 'catalog:' - version: 0.13.24 + version: 0.13.25 '@microsoft/api-extractor': specifier: ^7.35.0 version: 7.43.7(@types/node@22.15.30) @@ -295,7 +295,7 @@ importers: version: 0.1.9 '@livekit/rtc-node': specifier: 'catalog:' - version: 0.13.24 + version: 0.13.25 '@opentelemetry/api': specifier: ^1.9.0 version: 1.9.0 @@ -369,7 +369,7 @@ importers: version: link:../test '@livekit/rtc-node': specifier: 'catalog:' - version: 0.13.24 + version: 0.13.25 '@microsoft/api-extractor': specifier: ^7.35.0 version: 7.43.7(@types/node@22.19.1) @@ -406,7 +406,7 @@ importers: version: link:../test '@livekit/rtc-node': specifier: 'catalog:' - version: 0.13.24 + version: 0.13.25 '@microsoft/api-extractor': specifier: ^7.35.0 version: 7.43.7(@types/node@22.19.1) @@ -434,7 +434,7 @@ importers: version: link:../../agents '@livekit/rtc-node': specifier: 'catalog:' - version: 0.13.24 + version: 0.13.25 '@microsoft/api-extractor': specifier: ^7.35.0 version: 7.43.7(@types/node@22.19.1) @@ -468,7 +468,7 @@ importers: version: link:../test '@livekit/rtc-node': specifier: 'catalog:' - version: 0.13.24 + version: 0.13.25 '@microsoft/api-extractor': specifier: ^7.35.0 version: 7.43.7(@types/node@22.19.1) @@ -502,7 +502,7 @@ importers: version: link:../test '@livekit/rtc-node': specifier: 'catalog:' - version: 0.13.24 + version: 0.13.25 '@microsoft/api-extractor': specifier: ^7.35.0 version: 7.43.7(@types/node@22.19.1) @@ -536,7 +536,7 @@ importers: version: link:../test '@livekit/rtc-node': specifier: 'catalog:' - version: 0.13.24 + version: 0.13.25 '@microsoft/api-extractor': specifier: ^7.35.0 version: 7.43.7(@types/node@22.19.1) @@ -576,7 +576,7 @@ importers: version: link:../test '@livekit/rtc-node': specifier: 'catalog:' - version: 0.13.24 + version: 0.13.25 '@microsoft/api-extractor': specifier: ^7.35.0 version: 7.43.7(@types/node@22.19.1) @@ -597,8 +597,8 @@ importers: specifier: workspace:* version: link:../../agents '@livekit/rtc-node': - specifier: ^0.13.22 - version: 0.13.24 + specifier: 'catalog:' + version: 0.13.25 '@microsoft/api-extractor': specifier: ^7.35.0 version: 7.43.7(@types/node@22.19.1) @@ -629,7 +629,7 @@ importers: version: link:../test '@livekit/rtc-node': specifier: 'catalog:' - version: 0.13.24 + version: 0.13.25 '@microsoft/api-extractor': specifier: ^7.35.0 version: 7.43.7(@types/node@22.19.1) @@ -654,7 +654,7 @@ importers: version: link:../../agents '@livekit/rtc-node': specifier: 'catalog:' - version: 0.13.24 + version: 0.13.25 '@microsoft/api-extractor': specifier: ^7.35.0 version: 7.43.7(@types/node@22.19.1) @@ -713,7 +713,7 @@ importers: version: link:../test '@livekit/rtc-node': specifier: 'catalog:' - version: 0.13.24 + version: 0.13.25 '@microsoft/api-extractor': specifier: ^7.35.0 version: 7.43.7(@types/node@22.19.1) @@ -753,7 +753,7 @@ importers: version: link:../test '@livekit/rtc-node': specifier: 'catalog:' - version: 0.13.24 + version: 0.13.25 '@microsoft/api-extractor': specifier: ^7.35.0 version: 7.43.7(@types/node@22.19.1) @@ -770,15 +770,15 @@ importers: plugins/phonic: dependencies: phonic: - specifier: ^0.31.8 - version: 0.31.8 + specifier: ^0.31.10 + version: 0.31.10 devDependencies: '@livekit/agents': specifier: workspace:* version: link:../../agents '@livekit/rtc-node': specifier: 'catalog:' - version: 0.13.24 + version: 0.13.25 '@microsoft/api-extractor': specifier: ^7.35.0 version: 7.43.7(@types/node@22.19.1) @@ -806,7 +806,7 @@ importers: version: link:../test '@livekit/rtc-node': specifier: 'catalog:' - version: 0.13.24 + version: 0.13.25 '@microsoft/api-extractor': specifier: ^7.35.0 version: 7.43.7(@types/node@22.19.1) @@ -840,7 +840,7 @@ importers: version: link:../test '@livekit/rtc-node': specifier: 'catalog:' - version: 0.13.24 + version: 0.13.25 '@microsoft/api-extractor': specifier: ^7.35.0 version: 7.43.7(@types/node@22.19.1) @@ -874,7 +874,7 @@ importers: version: link:../test '@livekit/rtc-node': specifier: 'catalog:' - version: 0.13.24 + version: 0.13.25 '@microsoft/api-extractor': specifier: ^7.35.0 version: 7.43.7(@types/node@22.19.1) @@ -902,7 +902,7 @@ importers: version: link:../../agents '@livekit/rtc-node': specifier: 'catalog:' - version: 0.13.24 + version: 0.13.25 '@microsoft/api-extractor': specifier: ^7.35.0 version: 7.43.7(@types/node@22.19.1) @@ -933,7 +933,7 @@ importers: version: link:../../agents '@livekit/rtc-node': specifier: 'catalog:' - version: 0.13.24 + version: 0.13.25 '@types/node': specifier: ^22.5.5 version: 22.15.30 @@ -958,7 +958,7 @@ importers: version: link:../../agents '@livekit/rtc-node': specifier: 'catalog:' - version: 0.13.24 + version: 0.13.25 '@microsoft/api-extractor': specifier: ^7.35.0 version: 7.43.7(@types/node@22.19.1) @@ -986,7 +986,7 @@ importers: version: link:../openai '@livekit/rtc-node': specifier: 'catalog:' - version: 0.13.24 + version: 0.13.25 '@microsoft/api-extractor': specifier: ^7.35.0 version: 7.43.7(@types/node@22.19.1) @@ -2063,38 +2063,42 @@ packages: '@livekit/protocol@1.45.1': resolution: {integrity: sha512-sr6p0TwKofHO5KW6kUzjq4hH2de4Al5scQo824xFnyI1XYo0qQn6fTG+bdr+Uj4EedjYAOqjezwUju5OErVIRA==} - '@livekit/rtc-node-darwin-arm64@0.13.24': - resolution: {integrity: sha512-gm5xOpGu6Rj/mNU2jEijcGhQGN2GdxV2dNYQm3NCKN7ow0BmMFZvXSCAWOWf+9oTutPXHnrc7EN1mt2v+lfqhA==} - engines: {node: '>= 10'} + '@livekit/rtc-ffi-bindings-darwin-arm64@0.12.52-patch.0': + resolution: {integrity: sha512-IKUir6goV8yVRR7E2qrAP0JtH7gUyMkO0TG8G+dopO/fkXAsPpSealgI9fLcBJl0zhKK+eGCr741r6xR+xxsVw==} + engines: {node: '>= 18'} cpu: [arm64] os: [darwin] - '@livekit/rtc-node-darwin-x64@0.13.24': - resolution: {integrity: sha512-jZSK5lHDp7+u0jby7PEWMzbxc0F0nLx6FT3FVjuMlT13ZY6QWKDUUCFbfDOtbdhiOZJYc5A4SwvubY6woEJXTg==} - engines: {node: '>= 10'} + '@livekit/rtc-ffi-bindings-darwin-x64@0.12.52-patch.0': + resolution: {integrity: sha512-h2oKdGvK4E4nYxHc+hsHkYu+oJIhKKqrC96v1XSGa5fgIEcq4Bve6tNEwUCBTkvuGh/I2tOI83udgcF4P4+mhQ==} + engines: {node: '>= 18'} cpu: [x64] os: [darwin] - '@livekit/rtc-node-linux-arm64-gnu@0.13.24': - resolution: {integrity: sha512-I+IeZET2h+viZ48moEFF0EWDHa+kLii5yuEsw38ya4mHZaZtlfbzrYKGKdONqbI9M9ldvv8XXuD0wFPjuH5CZw==} - engines: {node: '>= 10'} + '@livekit/rtc-ffi-bindings-linux-arm64-gnu@0.12.52-patch.0': + resolution: {integrity: sha512-bWtJ3r+wQ1Fd8s8jiM7GnBMfaKepSYk5c4bgimMIC4mkz+puChpfaj9avz1M271FUgxnIAKCTz6fPN2ygIAFnw==} + engines: {node: '>= 18'} cpu: [arm64] os: [linux] - '@livekit/rtc-node-linux-x64-gnu@0.13.24': - resolution: {integrity: sha512-vKOxzN/SsrtV8zIVwZCi31bZUhlb6RhJZ0NnY5MwKGSRFPi7Dwt8fmr0Vh0YmsY/p+4eZjKxvFmy7L3WVE54zw==} - engines: {node: '>= 10'} + '@livekit/rtc-ffi-bindings-linux-x64-gnu@0.12.52-patch.0': + resolution: {integrity: sha512-y1j4ciiCMaUrii0/XYwLFyRBRHDvx4202YCK5ePF3xB+9tW3Fuwexd/z4GuupCpP9eadGkpALCQt60wnLnFDnw==} + engines: {node: '>= 18'} cpu: [x64] os: [linux] - '@livekit/rtc-node-win32-x64-msvc@0.13.24': - resolution: {integrity: sha512-yTzqwndq2oKLUkXW2i/BkZMJC6kZOpRO/DKvkkKQvqc3Q+JuWz1m48GmyjIwTOKF28QjqEU3+IrnD65Uu+mFOg==} - engines: {node: '>= 10'} + '@livekit/rtc-ffi-bindings-win32-x64-msvc@0.12.52-patch.0': + resolution: {integrity: sha512-a7eoTor7KgN4JDPqZjyBQjgkVIZcxkyP5Iau3O/1qDaYKboLMqSYHfSAk84Un4r0SsSFvxUXXDY3boMLJ7QYow==} + engines: {node: '>= 18'} cpu: [x64] os: [win32] - '@livekit/rtc-node@0.13.24': - resolution: {integrity: sha512-06pF8YJlJk11R6J7kFXFpwV8etpbmCskoXFvwfwcDDixMqaP6qtS5srq3G23mDaRjx7ofz/PXg2GtiZbqNGT5A==} + '@livekit/rtc-ffi-bindings@0.12.52-patch.0': + resolution: {integrity: sha512-e01PH3AAS0/oN93LzgLDycDWzLGCpHqvZ35qzSuBWrG7V9mmQpdW/bOc6r9UFGZx/BcUXov4OTtao4OyDVVyHw==} + engines: {node: '>= 18'} + + '@livekit/rtc-node@0.13.25': + resolution: {integrity: sha512-4tL58O2DdTDP+g1ajyP5mgEOzjymD/u06IxWWVKBee1goEwDSQlMqEog/DJW34FoNNqXp1yRMCsphI4V/T1ILg==} engines: {node: '>= 18'} '@livekit/throws-transformer@0.0.0-20260320165515': @@ -4482,8 +4486,8 @@ packages: resolution: {integrity: sha512-vE7JKRyES09KiunauX7nd2Q9/L7lhok4smP9RZTDeD4MVs72Dp2qNFVz39Nz5a0FVEW0BJR6C0DYrq6unoziZA==} engines: {node: '>= 14.16'} - phonic@0.31.8: - resolution: {integrity: sha512-BeUqRbr0Ta0uB+6OCB770BNJ/r77HvgF+5vE2c69HEylqO93IMQkeWVeXb56Vh8mT5D3LGoO6NRssG5ChtreMw==} + phonic@0.31.10: + resolution: {integrity: sha512-MMEbfgBnjdZ0j8dkRMCl3TQTpIiKtqaJY6U1DMJhl8F8diOVh92Q9XXbBeBCoRsrKhoHpfDxRrnKVunX6NlF1w==} engines: {node: '>=18.0.0'} picocolors@1.0.1: @@ -4965,7 +4969,7 @@ packages: tar@7.4.3: resolution: {integrity: sha512-5S7Va8hKfV7W5U6g3aYxXmlPoZVAwUMy9AOKyF2fVuZa2UD3qZjg578OrLRt8PcNN1PleVaL/5/yYATNL0ICUw==} engines: {node: '>=18'} - deprecated: Old versions of tar are not supported, and contain widely publicized security vulnerabilities, which have been fixed in the current version. Please update. Support for old versions may be purchased (at exorbitant rates) by contacting i@izs.me + deprecated: Old versions of tar are not supported, and contain widely publicized security vulnerabilities, which have been fixed in the current version. Please update. Support for old versions may be purchased (at exhorbitant rates) by contacting i@izs.me term-size@2.2.1: resolution: {integrity: sha512-wK0Ri4fOGjv/XPy8SBHZChl8CM7uMc5VML7SqiQ0zG7+J5Vr+RMQDoHa2CNT6KHUnTGIXH34UDMkPzAUyapBZg==} @@ -6353,7 +6357,7 @@ snapshots: '@livekit/noise-cancellation-node@0.1.9': dependencies: - '@livekit/rtc-node': 0.13.24 + '@livekit/rtc-node': 0.13.25 tsx: 4.21.0 optionalDependencies: '@livekit/noise-cancellation-darwin-arm64': 0.1.9 @@ -6369,35 +6373,39 @@ snapshots: dependencies: '@bufbuild/protobuf': 1.10.1 - '@livekit/rtc-node-darwin-arm64@0.13.24': + '@livekit/rtc-ffi-bindings-darwin-arm64@0.12.52-patch.0': optional: true - '@livekit/rtc-node-darwin-x64@0.13.24': + '@livekit/rtc-ffi-bindings-darwin-x64@0.12.52-patch.0': optional: true - '@livekit/rtc-node-linux-arm64-gnu@0.13.24': + '@livekit/rtc-ffi-bindings-linux-arm64-gnu@0.12.52-patch.0': optional: true - '@livekit/rtc-node-linux-x64-gnu@0.13.24': + '@livekit/rtc-ffi-bindings-linux-x64-gnu@0.12.52-patch.0': optional: true - '@livekit/rtc-node-win32-x64-msvc@0.13.24': + '@livekit/rtc-ffi-bindings-win32-x64-msvc@0.12.52-patch.0': optional: true - '@livekit/rtc-node@0.13.24': + '@livekit/rtc-ffi-bindings@0.12.52-patch.0': dependencies: '@bufbuild/protobuf': 1.10.1 + optionalDependencies: + '@livekit/rtc-ffi-bindings-darwin-arm64': 0.12.52-patch.0 + '@livekit/rtc-ffi-bindings-darwin-x64': 0.12.52-patch.0 + '@livekit/rtc-ffi-bindings-linux-arm64-gnu': 0.12.52-patch.0 + '@livekit/rtc-ffi-bindings-linux-x64-gnu': 0.12.52-patch.0 + '@livekit/rtc-ffi-bindings-win32-x64-msvc': 0.12.52-patch.0 + + '@livekit/rtc-node@0.13.25': + dependencies: '@datastructures-js/deque': 1.0.8 '@livekit/mutex': 1.1.1 + '@livekit/rtc-ffi-bindings': 0.12.52-patch.0 '@livekit/typed-emitter': 3.0.0 pino: 9.6.0 pino-pretty: 13.0.0 - optionalDependencies: - '@livekit/rtc-node-darwin-arm64': 0.13.24 - '@livekit/rtc-node-darwin-x64': 0.13.24 - '@livekit/rtc-node-linux-arm64-gnu': 0.13.24 - '@livekit/rtc-node-linux-x64-gnu': 0.13.24 - '@livekit/rtc-node-win32-x64-msvc': 0.13.24 '@livekit/throws-transformer@0.0.0-20260320165515(typescript@5.4.5)': dependencies: @@ -9121,7 +9129,7 @@ snapshots: pathval@2.0.0: {} - phonic@0.31.8: + phonic@0.31.10: dependencies: ws: 8.20.0 transitivePeerDependencies: diff --git a/pnpm-workspace.yaml b/pnpm-workspace.yaml index d073c55c8..a396d66df 100644 --- a/pnpm-workspace.yaml +++ b/pnpm-workspace.yaml @@ -8,4 +8,4 @@ minimumReleaseAgeExclude: - '@livekit/*' catalog: - '@livekit/rtc-node': ^0.13.24 + '@livekit/rtc-node': ^0.13.25 From 4b4f2515a3150d5707d7fda82c3be5cb2f3a0f0a Mon Sep 17 00:00:00 2001 From: Brian Yin Date: Thu, 9 Apr 2026 18:19:16 -0700 Subject: [PATCH 9/9] fix PR comments --- plugins/openai/src/realtime/realtime_model.ts | 41 +++++++++++++------ plugins/phonic/src/realtime/realtime_model.ts | 20 ++++++++- 2 files changed, 46 insertions(+), 15 deletions(-) diff --git a/plugins/openai/src/realtime/realtime_model.ts b/plugins/openai/src/realtime/realtime_model.ts index 41b148eff..633100e49 100644 --- a/plugins/openai/src/realtime/realtime_model.ts +++ b/plugins/openai/src/realtime/realtime_model.ts @@ -516,14 +516,20 @@ export class RealtimeSession extends llm.RealtimeSession { const events = await this.createChatCtxUpdateEvents(_chatCtx); const futures: Future[] = []; + const ownedCreateFutures: { [id: string]: Future } = {}; + const ownedDeleteFutures: { [id: string]: Future } = {}; + const cleanupTimedOutFutures = () => { // remove timed-out entries so late server acks // don't resolve stale futures from a previous updateChatCtx call. - for (const event of events) { - if (event.type === 'conversation.item.delete') { - delete this.itemDeleteFutures[event.item_id]; - } else if (event.type === 'conversation.item.create') { - delete this.itemCreateFutures[event.item.id]; + for (const [itemId, future] of Object.entries(ownedDeleteFutures)) { + if (this.itemDeleteFutures[itemId] === future) { + delete this.itemDeleteFutures[itemId]; + } + } + for (const [itemId, future] of Object.entries(ownedCreateFutures)) { + if (this.itemCreateFutures[itemId] === future) { + delete this.itemCreateFutures[itemId]; } } }; @@ -533,6 +539,7 @@ export class RealtimeSession extends llm.RealtimeSession { const future = new Future(); futures.push(future); this.itemCreateFutures[event.item.id] = future; + ownedCreateFutures[event.item.id] = future; } else if (event.type == 'conversation.item.delete') { const existingDeleteFuture = this.itemDeleteFutures[event.item_id]; if (existingDeleteFuture) { @@ -542,6 +549,7 @@ export class RealtimeSession extends llm.RealtimeSession { const future = new Future(); futures.push(future); this.itemDeleteFutures[event.item_id] = future; + ownedDeleteFutures[event.item_id] = future; } this.sendEvent(event); @@ -551,14 +559,21 @@ export class RealtimeSession extends llm.RealtimeSession { return; } - // wait for futures to resolve or timeout - await Promise.race([ - Promise.all(futures), - delay(5000).then(() => { - cleanupTimedOutFutures(); - throw new Error('Chat ctx update events timed out'); - }), - ]); + // wait for futures to resolve or timeout. + // Cancel the timeout branch once futures resolve to avoid stale cleanup. + const timeoutController = new AbortController(); + const timeoutPromise = delay(5000, { signal: timeoutController.signal }).then(() => { + cleanupTimedOutFutures(); + throw new Error('Chat ctx update events timed out'); + }); + + try { + await Promise.race([Promise.all(futures), timeoutPromise]); + } finally { + if (!timeoutController.signal.aborted) { + timeoutController.abort(); + } + } } catch (e) { this.#logger.error((e as Error).message); throw e; diff --git a/plugins/phonic/src/realtime/realtime_model.ts b/plugins/phonic/src/realtime/realtime_model.ts index d1c7af962..542749fb1 100644 --- a/plugins/phonic/src/realtime/realtime_model.ts +++ b/plugins/phonic/src/realtime/realtime_model.ts @@ -250,6 +250,7 @@ export class RealtimeSession extends llm.RealtimeSession { private pendingToolCallIds = new Set(); private readyToStart = new Future(); private pendingGenerateReplyFut?: Future; + private generateReplyRequestId = 0; private systemPromptPostfix = ''; constructor(realtimeModel: RealtimeModel) { @@ -472,15 +473,30 @@ export class RealtimeSession extends llm.RealtimeSession { if (this.closed) { return Promise.reject(new Error('session is closed')); } + + if (this.pendingGenerateReplyFut && !this.pendingGenerateReplyFut.done) { + this.pendingGenerateReplyFut.reject( + new Error('generateReply superseded by a newer generateReply call'), + ); + } + + const requestId = ++this.generateReplyRequestId; this.closeCurrentGeneration({ interrupted: false }); this.pendingGenerateReplyFut = new Future(); - this.sendGenerateReply(instructions); + this.sendGenerateReply(instructions, requestId); return this.pendingGenerateReplyFut.await; } - private async sendGenerateReply(instructions?: string): Promise { + private async sendGenerateReply( + instructions: string | undefined, + requestId: number, + ): Promise { await this.readyToStart.await; + if (requestId !== this.generateReplyRequestId) { + return; + } + if (this.closed || !this.socket) { this.pendingGenerateReplyFut?.reject(new Error('session is closed')); this.pendingGenerateReplyFut = undefined;