Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion livekit-agents/livekit/agents/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,12 @@
RunResult,
mock_tools,
)
from .voice.turn import EndpointingOptions, InterruptionOptions, TurnHandlingOptions
from .voice.turn import (
EndpointingOptions,
InterruptionOptions,
PreemptiveGenerationOptions,
TurnHandlingOptions,
)
from .worker import (
AgentServer,
WorkerOptions,
Expand Down Expand Up @@ -231,6 +236,7 @@ def __getattr__(name: str) -> typing.Any:
"TurnHandlingOptions",
"EndpointingOptions",
"InterruptionOptions",
"PreemptiveGenerationOptions",
]

# Cleanup docs of unexported modules
Expand Down
55 changes: 44 additions & 11 deletions livekit-agents/livekit/agents/voice/agent_activity.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ def __init__(self, agent: Agent, sess: AgentSession) -> None:
self._speech_tasks: list[asyncio.Task[Any]] = []

self._preemptive_generation: _PreemptiveGeneration | None = None
self._preemptive_generation_count: int = 0
self._authorization_allowed = asyncio.Event()
self._authorization_allowed.set()

Expand Down Expand Up @@ -1795,8 +1796,9 @@ def on_final_transcript(self, ev: stt.SpeechEvent, *, speaking: bool | None = No
)

def on_preemptive_generation(self, info: _PreemptiveGenerationInfo) -> None:
preemptive_opts = self._session.options.preemptive_generation
if (
not self._session.options.preemptive_generation
not preemptive_opts["enabled"]
or self._scheduling_paused
or self._new_turns_blocked
or (self._current_speech is not None and not self._current_speech.interrupted)
Expand All @@ -1806,6 +1808,17 @@ def on_preemptive_generation(self, info: _PreemptiveGenerationInfo) -> None:

self._cancel_preemptive_generation()

if (
info.started_speaking_at is not None
and time.time() - info.started_speaking_at > preemptive_opts["max_speech_duration"]
):
return

if self._preemptive_generation_count >= preemptive_opts["max_retries"]:
return

self._preemptive_generation_count += 1
Comment on lines 1809 to +1820
Copy link
Copy Markdown
Contributor

@devin-ai-integration devin-ai-integration Bot Apr 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 Existing preemptive generation is cancelled before max_retries check, discarding valid work

In on_preemptive_generation, _cancel_preemptive_generation() is called unconditionally on line 1783 before the max_retries check on line 1791. When _preemptive_generation_count >= max_retries, the method returns early without starting a new generation — but the previous (most recent) preemptive generation has already been cancelled and set to None. This means the last successful preemptive generation is destroyed without replacement. Later, in _user_turn_completed_task at line 1995, self._preemptive_generation is None, so the preemptive result can never be used and a fresh (non-preemptive) LLM call is always made instead. This defeats the purpose of the max_retries limit, which should keep the last generation alive when retries are exhausted.

The fix is to move _cancel_preemptive_generation() after the early-return checks (or at least after the max_retries check), so the existing generation is only cancelled when it will actually be replaced by a new one.

Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is expected, on_preemptive_generation is called when user transcript changed, so the previous preemptive generation is invalid, we should cancel it asap.


user_message = llm.ChatMessage(
role="user",
content=[info.new_transcript],
Expand Down Expand Up @@ -1887,6 +1900,8 @@ async def _user_turn_completed_task(
# is detected. So the previous execution should complete quickly.
await old_task

self._preemptive_generation_count = 0

# When the audio recognition detects the end of a user turn:
# - check if realtime model server-side turn detection is enabled
# - check if there is no current generation happening
Expand Down Expand Up @@ -2394,8 +2409,8 @@ async def _pipeline_reply_task_impl(

tts_task: asyncio.Task[bool] | None = None
tts_gen_data: _TTSGenerationData | None = None
read_transcript_from_tts = False
if audio_output is not None:

async def _start_tts_inference() -> tuple[asyncio.Task[bool], _TTSGenerationData]:
await llm_gen_data.started_fut # make sure tts span starts after llm span
tts_task, tts_gen_data = perform_tts_inference(
node=self._agent.tts_node,
Expand All @@ -2405,15 +2420,17 @@ async def _pipeline_reply_task_impl(
model=self.tts.model if self.tts else None,
provider=self.tts.provider if self.tts else None,
)
return tts_task, tts_gen_data

# start preemptive tts inference if enabled
preemptive_opts = self._session.options.preemptive_generation
if (
audio_output is not None
and preemptive_opts["enabled"]
and preemptive_opts["preemptive_tts"]
):
tts_task, tts_gen_data = await _start_tts_inference()
tasks.append(tts_task)
if (
self.use_tts_aligned_transcript
and (tts := self.tts)
and (tts.capabilities.aligned_transcript or not tts.capabilities.streaming)
and (timed_texts := await tts_gen_data.timed_texts_fut)
):
tr_input = timed_texts
read_transcript_from_tts = True

wait_for_scheduled = asyncio.ensure_future(speech_handle._wait_for_scheduled())
await speech_handle.wait_if_not_interrupted([wait_for_scheduled])
Expand All @@ -2432,6 +2449,22 @@ async def _pipeline_reply_task_impl(
await text_tee.aclose()
return

# start tts inference if not already started and audio output is enabled
if audio_output is not None and tts_task is None:
tts_task, tts_gen_data = await _start_tts_inference()
tasks.append(tts_task)

read_transcript_from_tts = False
if (
tts_gen_data is not None
and self.use_tts_aligned_transcript
and (tts := self.tts)
and (tts.capabilities.aligned_transcript or not tts.capabilities.streaming)
and (timed_texts := await tts_gen_data.timed_texts_fut)
):
tr_input = timed_texts
read_transcript_from_tts = True

self._session._update_agent_state("thinking")

authorization_tasks: list[asyncio.Future[Any]] = [
Expand Down
34 changes: 18 additions & 16 deletions livekit-agents/livekit/agents/voice/agent_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,13 @@
from .turn import (
EndpointingOptions,
InterruptionOptions,
PreemptiveGenerationOptions,
TurnDetectionMode,
TurnHandlingOptions,
_migrate_turn_handling,
_resolve_endpointing,
_resolve_interruption,
_resolve_preemptive_generation,
)

if TYPE_CHECKING:
Expand Down Expand Up @@ -135,7 +137,6 @@ class AgentSessionOptions:
turn_handling: TurnHandlingOptions
max_tool_steps: int
user_away_timeout: float | None
preemptive_generation: bool
min_consecutive_speech_delay: float
use_tts_aligned_transcript: bool | None
tts_text_transforms: Sequence[TextTransforms] | None
Expand All @@ -151,6 +152,10 @@ def endpointing(self) -> EndpointingOptions:
def interruption(self) -> InterruptionOptions:
return self.turn_handling["interruption"]

@property
def preemptive_generation(self) -> PreemptiveGenerationOptions:
return self.turn_handling["preemptive_generation"]
Comment on lines +155 to +157
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need to expose it if we expose turn_handling

Suggested change
@property
def preemptive_generation(self) -> PreemptiveGenerationOptions:
return self.turn_handling["preemptive_generation"]



Userdata_T = TypeVar("Userdata_T")
Run_T = TypeVar("Run_T")
Expand Down Expand Up @@ -202,6 +207,7 @@ class AgentSession(rtc.EventEmitter[EventTypes], Generic[Userdata_T]):
"allow_interruptions": "Use turn_handling=TurnHandlingOptions(...) instead",
"discard_audio_if_uninterruptible": "Use turn_handling=TurnHandlingOptions(...) instead",
"min_interruption_duration": "Use turn_handling=TurnHandlingOptions(...) instead",
"preemptive_generation": "Use turn_handling=TurnHandlingOptions(...) instead",
"min_interruption_words": "Use turn_handling=TurnHandlingOptions(...) instead",
"turn_detection": "Use turn_handling=TurnHandlingOptions(...) instead",
"agent_false_interruption_timeout": "Use turn_handling=TurnHandlingOptions(...) instead",
Expand All @@ -227,7 +233,6 @@ def __init__(
# Misc settings
userdata: NotGivenOr[Userdata_T] = NOT_GIVEN,
video_sampler: NotGivenOr[_VideoSampler | None] = NOT_GIVEN,
preemptive_generation: bool = True,
aec_warmup_duration: float | None = 3.0,
ivr_detection: bool = False,
user_away_timeout: float | None = 15.0,
Expand All @@ -236,6 +241,7 @@ def __init__(
conn_options: NotGivenOr[SessionConnectOptions] = NOT_GIVEN,
loop: asyncio.AbstractEventLoop | None = None,
# deprecated
preemptive_generation: NotGivenOr[bool] = NOT_GIVEN,
min_endpointing_delay: NotGivenOr[float] = NOT_GIVEN,
max_endpointing_delay: NotGivenOr[float] = NOT_GIVEN,
false_interruption_timeout: NotGivenOr[float | None] = NOT_GIVEN,
Expand Down Expand Up @@ -294,20 +300,14 @@ def __init__(
user_away_timeout (float, optional): If set, set the user state as
"away" after this amount of time after user and agent are silent.
Defaults to ``15.0`` s, set to ``None`` to disable.
preemptive_generation (bool):
Whether to speculatively begin LLM and TTS requests before an end-of-turn is
detected. When True, the agent sends inference calls as soon as a user
transcript is received rather than waiting for a definitive turn boundary. This
can reduce response latency by overlapping model inference with user audio,
but may incur extra compute if the user interrupts or revises mid-utterance.
Defaults to ``True``.
aec_warmup_duration (float, optional): The duration in seconds that the agent
will ignore user's audio interruptions after the agent starts speaking.
This is useful to prevent the agent from being interrupted by echo before AEC is ready.
Set to ``None`` to disable. Default ``3.0`` s.
session_close_transcript_timeout (float, optional): Seconds to wait for the
final STT transcript when closing the session (after audio is detached).
Default ``2.0`` s (independent of ``commit_user_turn``'s ``transcript_timeout``).
preemptive_generation (NotGivenOr[bool | PreemptiveGenerationOptions]): Deprecated, use turn_handling=TurnHandlingOptions(...) instead.
min_endpointing_delay (NotGivenOr[float]): Deprecated, use turn_handling=TurnHandlingOptions(...) instead.
max_endpointing_delay (NotGivenOr[float]): Deprecated, use turn_handling=TurnHandlingOptions(...) instead.
false_interruption_timeout (NotGivenOr[float | None]): Deprecated, use turn_handling=TurnHandlingOptions(...) instead.
Expand All @@ -330,12 +330,12 @@ def __init__(
turn_handling = (
_migrate_turn_handling(
# backward compatibility for deprecated parameters that had default values
min_endpointing_delay=min_endpointing_delay
if is_given(min_endpointing_delay)
else 0.5,
max_endpointing_delay=max_endpointing_delay
if is_given(max_endpointing_delay)
else 3.0,
min_endpointing_delay=(
min_endpointing_delay if is_given(min_endpointing_delay) else 0.5
),
max_endpointing_delay=(
max_endpointing_delay if is_given(max_endpointing_delay) else 3.0
),
false_interruption_timeout=false_interruption_timeout,
turn_detection=turn_detection,
discard_audio_if_uninterruptible=discard_audio_if_uninterruptible,
Expand All @@ -344,13 +344,15 @@ def __init__(
allow_interruptions=allow_interruptions,
resume_false_interruption=resume_false_interruption,
agent_false_interruption_timeout=agent_false_interruption_timeout,
preemptive_generation=preemptive_generation,
)
if not is_given(turn_handling)
else turn_handling
)

endpointing = _resolve_endpointing(turn_handling.get("endpointing"))
interruption = _resolve_interruption(turn_handling.get("interruption"))
preemptive_gen = _resolve_preemptive_generation(turn_handling.get("preemptive_generation"))
raw_turn_detection = turn_handling.get("turn_detection", None)

# This is the "global" chat_context, it holds the entire conversation history
Expand All @@ -360,10 +362,10 @@ def __init__(
endpointing=endpointing,
interruption=interruption,
turn_detection=raw_turn_detection,
preemptive_generation=preemptive_gen,
),
max_tool_steps=max_tool_steps,
user_away_timeout=user_away_timeout,
preemptive_generation=preemptive_generation,
min_consecutive_speech_delay=min_consecutive_speech_delay,
tts_text_transforms=(
tts_text_transforms
Expand Down
2 changes: 1 addition & 1 deletion livekit-agents/livekit/agents/voice/remote_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,7 @@ def _serialize_options(opts: AgentSessionOptions) -> dict[str, str]:
"interruption": str(dict(opts.interruption)),
"max_tool_steps": str(opts.max_tool_steps),
"user_away_timeout": str(opts.user_away_timeout),
"preemptive_generation": str(opts.preemptive_generation),
"preemptive_generation": str(dict(opts.preemptive_generation)),
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 Existing test mock uses bool for preemptive_generation, but _serialize_options now calls dict() on it

The change from str(opts.preemptive_generation) to str(dict(opts.preemptive_generation)) will crash with TypeError: cannot convert 'bool' object to dict items when called with the mock in tests/test_session_host.py:447, which sets options.preemptive_generation = False. The test_get_session_state test exercises this path via _handle_request(get_session_state)_serialize_options(self._session.options) at livekit-agents/livekit/agents/voice/remote_session.py:662. The mock needs to be updated to use a PreemptiveGenerationOptions dict (e.g., MagicMock(__iter__=lambda s: iter([])) or a real dict like {"enabled": False}).

Prompt for agents
The test file tests/test_session_host.py at line 447 sets options.preemptive_generation = False (a plain bool). After the change on remote_session.py:331 from str(opts.preemptive_generation) to str(dict(opts.preemptive_generation)), calling dict(False) raises TypeError. The fix should update the mock in tests/test_session_host.py _make_mock_session() to use a dict-like object for preemptive_generation, for example: options.preemptive_generation = {"enabled": False, "preemptive_tts": False, "max_speech_duration": 10.0, "max_retries": 3} (matching PreemptiveGenerationOptions structure), or use MagicMock(__iter__=lambda s: iter([])) like the endpointing and interruption mocks do.
Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

"min_consecutive_speech_delay": str(opts.min_consecutive_speech_delay),
"use_tts_aligned_transcript": str(opts.use_tts_aligned_transcript),
"ivr_detection": str(opts.ivr_detection),
Expand Down
2 changes: 1 addition & 1 deletion livekit-agents/livekit/agents/voice/report.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ def to_dict(self) -> dict:
"max_tool_steps": self.options.max_tool_steps,
"user_away_timeout": self.options.user_away_timeout,
"min_consecutive_speech_delay": self.options.min_consecutive_speech_delay,
"preemptive_generation": self.options.preemptive_generation,
"preemptive_generation": dict(self.options.preemptive_generation),
},
"chat_history": self.chat_history.to_dict(exclude_timestamp=False),
"timestamp": self.timestamp,
Expand Down
46 changes: 46 additions & 0 deletions livekit-agents/livekit/agents/voice/turn.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,36 @@ class InterruptionOptions(TypedDict, total=False):
}


class PreemptiveGenerationOptions(TypedDict, total=False):
"""Configuration for preemptive generation."""

enabled: bool
"""Whether preemptive generation is enabled. Defaults to ``True``."""

preemptive_tts: bool
"""Whether to also run TTS preemptively before the turn is confirmed.
When ``False`` (default), only LLM runs preemptively; TTS starts once the
turn is confirmed and the speech is scheduled."""

max_speech_duration: float
"""Maximum user speech duration (s) for which preemptive generation
is attempted. Beyond this threshold, preemptive generation is skipped
since long utterances are more likely to change and users may expect
slower responses. Defaults to ``10.0``."""

max_retries: int
"""Maximum number of preemptive generation attempts per user turn.
The counter resets when the turn completes. Defaults to ``3``."""


_PREEMPTIVE_GENERATION_DEFAULTS: PreemptiveGenerationOptions = {
"enabled": True,
"preemptive_tts": False,
"max_speech_duration": 10.0,
"max_retries": 3,
}


class TurnHandlingOptions(TypedDict, total=False):
"""Configuration for the turn handling system.

Expand All @@ -121,6 +151,7 @@ class TurnHandlingOptions(TypedDict, total=False):
turn_handling={
"endpointing": {"min_delay": 0.3},
"interruption": {"enabled": False},
"preemptive_generation": {"preemptive_tts": True},
},
)

Expand All @@ -134,6 +165,17 @@ class TurnHandlingOptions(TypedDict, total=False):
"""Endpointing configuration. Defaults to ``{"min_delay": 0.5, "max_delay": 3.0}``."""
interruption: InterruptionOptions
"""Interruption handling configuration. Use ``{"enabled": False}`` to disable."""
preemptive_generation: PreemptiveGenerationOptions
"""Preemptive generation configuration. Use ``{"enabled": False}`` to disable."""


def _resolve_preemptive_generation(
config: PreemptiveGenerationOptions | None = None,
) -> PreemptiveGenerationOptions:
"""Fill in defaults for missing keys."""
if config is None:
return PreemptiveGenerationOptions(**_PREEMPTIVE_GENERATION_DEFAULTS)
return PreemptiveGenerationOptions(**{**_PREEMPTIVE_GENERATION_DEFAULTS, **config})


def _resolve_endpointing(config: EndpointingOptions | None = None) -> EndpointingOptions:
Expand Down Expand Up @@ -163,6 +205,7 @@ def _migrate_turn_handling(
allow_interruptions: NotGivenOr[bool] = NOT_GIVEN,
resume_false_interruption: NotGivenOr[bool] = NOT_GIVEN,
agent_false_interruption_timeout: NotGivenOr[float | None] = NOT_GIVEN,
preemptive_generation: NotGivenOr[bool] = NOT_GIVEN,
) -> TurnHandlingOptions:
"""Build a TurnHandlingOptions from deprecated keyword arguments."""
if is_given(agent_false_interruption_timeout):
Expand Down Expand Up @@ -199,4 +242,7 @@ def _migrate_turn_handling(
if is_given(turn_detection):
result["turn_detection"] = turn_detection

if is_given(preemptive_generation):
result["preemptive_generation"] = {"enabled": preemptive_generation}

return result
Loading
Loading