-
Notifications
You must be signed in to change notification settings - Fork 2.8k
client-events: add RemoteSession
#4643
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Important Review skippedAuto reviews are disabled on base/target branches other than the default branch. Please check the settings in the CodeRabbit UI or the You can disable this status message by setting the
📝 WalkthroughWalkthroughThe PR introduces a comprehensive evaluation framework enhancement and client event streaming infrastructure. New classes Changes
Sequence Diagram(s)sequenceDiagram
participant Client
participant RemoteSession
participant RoomIO
participant ClientEventsHandler
participant AgentSession
participant LLM as LLM/Judge
Client->>RemoteSession: start()
RemoteSession->>RoomIO: subscribe to TOPIC_CLIENT_EVENTS
Client->>RemoteSession: send_message(text)
RemoteSession->>RoomIO: RPC_SEND_MESSAGE
RoomIO->>ClientEventsHandler: route RPC
ClientEventsHandler->>AgentSession: queue text input
AgentSession->>LLM: process message
LLM-->>AgentSession: response
AgentSession->>ClientEventsHandler: emit events
ClientEventsHandler->>RoomIO: publish ClientEvent
RoomIO-->>RemoteSession: stream event
RemoteSession-->>Client: receive event
Client->>RemoteSession: fetch_session_state()
RemoteSession->>RoomIO: RPC_GET_SESSION_STATE
RoomIO->>ClientEventsHandler: retrieve state
ClientEventsHandler-->>RoomIO: ClientSessionState
RoomIO-->>RemoteSession: response
RemoteSession-->>Client: ClientSessionState
sequenceDiagram
participant Evaluator as External Evaluator
participant JudgeGroup
participant Judge1
participant Judge2
participant LLM
participant AgentSession
Evaluator->>JudgeGroup: evaluate(chat_ctx, reference)
JudgeGroup->>Judge1: concurrent evaluate
JudgeGroup->>Judge2: concurrent evaluate
Judge1->>AgentSession: extract conversation items
Judge1->>LLM: get judgment via LLM
LLM-->>Judge1: verdict + reasoning
Judge1-->>JudgeGroup: JudgmentResult
Judge2->>AgentSession: extract conversation items
Judge2->>LLM: get judgment via LLM
LLM-->>Judge2: verdict + reasoning
Judge2-->>JudgeGroup: JudgmentResult
JudgeGroup->>JudgeGroup: aggregate results
JudgeGroup-->>Evaluator: EvaluationResult
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches🧪 Generate unit tests (beta)
Tip 🧪 Unit Test Generation v2 is now available!We have significantly improved our unit test generation capabilities. To enable: Add this to your reviews:
finishing_touches:
unit_tests:
enabled: trueTry it out by using the Have feedback? Share your thoughts on our Discord thread! Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
| ] | ||
|
|
||
|
|
||
| class RemoteSession(rtc.EventEmitter[RemoteSessionEventTypes]): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we also provide this in js or other client sdk?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For now, it's quite experimental, so let's keep it in Python for now
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But ideally, it's also in every client SDKs
|
@coderabbitai review |
✅ Actions performedReview triggered.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
🤖 Fix all issues with AI agents
In `@livekit-agents/livekit/agents/voice/agent_session.py`:
- Around line 582-594: The ClientEventsHandler is initialized and started but
not cleaned up if start() fails, leaving a resource leak because _aclose_impl
returns early when _started is False; wrap the initialization and start sequence
(the ClientEventsHandler creation, optional register_text_input call, and await
self._client_events_handler.start()) in a try/except/finally so that on any
exception you explicitly stop/close the handler (e.g., call its
stop/close/aclose method) and null out self._client_events_handler before
re-raising the error; also consider extending the same cleanup to _room_io if
the broader initialization fails so no partially-initialized resources remain.
In `@livekit-agents/livekit/agents/voice/client_events.py`:
- Around line 119-123: The ChatHistoryResponse model is typed as items:
list[ChatMessage] but _handle_get_history passes self._session.history.items
which contains heterogeneous ChatItem instances (e.g., FunctionCall,
FunctionCallOutput), causing Pydantic validation errors; update
ChatHistoryResponse to accept the correct union by changing its items field to
list[ChatItem] (or a suitable Union of ChatMessage and the other ChatItem
subclasses) and ensure ChatItem is imported/defined in the module so that
_handle_get_history can pass self._session.history.items without runtime
validation failures.
- Around line 562-567: The handler _on_event_stream creates an untracked
background task for self._read_event which can survive shutdown and raise
unhandled exceptions; change it to create and track the task (e.g., add to a set
attribute like self._background_tasks or self._tasks), attach a done callback to
remove the task from the set when finished, and ensure your class teardown
(aclose/close/disconnect method) iterates over that set to cancel and await each
task; update _on_event_stream to add the created task to the set and remove it
on completion, and update the existing aclose/cleanup method to cancel and await
tasks to avoid untracked background work.
🧹 Nitpick comments (1)
livekit-agents/livekit/agents/evals/evaluation.py (1)
174-181: Consider using logger instead of print() for verbose output.Using
print()bypasses the logging infrastructure, making it harder to control output in production environments. Consider usinglogger.info()orlogger.debug()instead, which respects log levels and handlers.♻️ Suggested refactor
if _evals_verbose: - print("\n+ JudgeGroup evaluation results:") + logger.info("JudgeGroup evaluation results:") for name, result in results: if isinstance(result, JudgmentResult): - print(f" [{name}] verdict={result.verdict}") - print(f" reasoning: {result.reasoning}\n") + logger.info(f" [{name}] verdict={result.verdict}") + logger.info(f" reasoning: {result.reasoning}") else: - print(f" [{name}] ERROR: {result}\n") + logger.info(f" [{name}] ERROR: {result}")
📜 Review details
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (1)
uv.lockis excluded by!**/*.lock
📒 Files selected for processing (10)
livekit-agents/livekit/agents/evals/evaluation.pylivekit-agents/livekit/agents/evals/judge.pylivekit-agents/livekit/agents/job.pylivekit-agents/livekit/agents/llm/chat_context.pylivekit-agents/livekit/agents/observability.pylivekit-agents/livekit/agents/telemetry/traces.pylivekit-agents/livekit/agents/types.pylivekit-agents/livekit/agents/voice/agent_session.pylivekit-agents/livekit/agents/voice/client_events.pylivekit-agents/livekit/agents/voice/room_io/room_io.py
💤 Files with no reviewable changes (2)
- livekit-agents/livekit/agents/voice/room_io/room_io.py
- livekit-agents/livekit/agents/telemetry/traces.py
🧰 Additional context used
📓 Path-based instructions (1)
**/*.py
📄 CodeRabbit inference engine (AGENTS.md)
**/*.py: Format code with ruff
Run ruff linter and auto-fix issues
Run mypy type checker in strict mode
Maintain line length of 100 characters maximum
Ensure Python 3.9+ compatibility
Use Google-style docstrings
Files:
livekit-agents/livekit/agents/job.pylivekit-agents/livekit/agents/evals/judge.pylivekit-agents/livekit/agents/types.pylivekit-agents/livekit/agents/llm/chat_context.pylivekit-agents/livekit/agents/observability.pylivekit-agents/livekit/agents/voice/client_events.pylivekit-agents/livekit/agents/voice/agent_session.pylivekit-agents/livekit/agents/evals/evaluation.py
🧠 Learnings (1)
📚 Learning: 2026-01-22T03:28:16.289Z
Learnt from: longcw
Repo: livekit/agents PR: 4563
File: livekit-agents/livekit/agents/beta/tools/end_call.py:65-65
Timestamp: 2026-01-22T03:28:16.289Z
Learning: In code paths that check capabilities or behavior of the LLM processing the current interaction, prefer using the activity's LLM obtained via ctx.session.current_agent._get_activity_or_raise().llm instead of ctx.session.llm. The session-level LLM may be a fallback and not reflect the actual agent handling the interaction. Use the activity LLM to determine capabilities and to make capability checks or feature toggles relevant to the current processing agent.
Applied to files:
livekit-agents/livekit/agents/job.pylivekit-agents/livekit/agents/evals/judge.pylivekit-agents/livekit/agents/types.pylivekit-agents/livekit/agents/llm/chat_context.pylivekit-agents/livekit/agents/observability.pylivekit-agents/livekit/agents/voice/client_events.pylivekit-agents/livekit/agents/voice/agent_session.pylivekit-agents/livekit/agents/evals/evaluation.py
🧬 Code graph analysis (3)
livekit-agents/livekit/agents/evals/judge.py (1)
livekit-agents/livekit/agents/llm/chat_context.py (2)
items(242-243)items(246-247)
livekit-agents/livekit/agents/observability.py (2)
livekit-agents/livekit/agents/evals/evaluation.py (1)
name(21-23)livekit-agents/livekit/agents/evals/judge.py (3)
name(155-156)name(202-203)name(265-266)
livekit-agents/livekit/agents/voice/client_events.py (2)
livekit-agents/livekit/agents/llm/tool_context.py (4)
FunctionTool(163-166)RawFunctionTool(169-172)Toolset(41-46)info(142-143)livekit-agents/livekit/agents/voice/room_io/types.py (1)
TextInputEvent(32-35)
🔇 Additional comments (21)
livekit-agents/livekit/agents/observability.py (1)
97-103: LGTM!The multi-line formatting for the dictionary append improves readability without changing behavior.
livekit-agents/livekit/agents/job.py (1)
29-41: LGTM!Clean removal of unused imports (
cast,contextlib,opentelemetry.trace,trace_types,tracer). This reduces the module's dependency surface.livekit-agents/livekit/agents/evals/judge.py (3)
13-34: LGTM!Well-designed
JudgmentResultdataclass with clear field documentation and convenient boolean properties for checking verdict status.
88-90: LGTM!The refactored
_has_handoffsfunction is more concise while maintaining the same logic.
149-149: LGTM!Minor formatting adjustment to the constructor signature.
livekit-agents/livekit/agents/evals/evaluation.py (3)
99-119: LGTM!Clean initialization with flexible LLM input (string or instance) and proper type handling via dynamic import.
151-163: LGTM!Good error handling pattern - catching exceptions per-judge prevents one failing judge from blocking the entire evaluation.
183-188: LGTM!Good pattern for optional auto-tagging - gracefully handles the case where code runs outside a job context.
livekit-agents/livekit/agents/llm/chat_context.py (1)
226-230: LGTM!Clean discriminated union type alias using Pydantic's
Field(discriminator="type"). This enables proper polymorphic serialization/deserialization of chat items.livekit-agents/livekit/agents/types.py (1)
27-50: LGTM!Well-documented constants for the new client events infrastructure. The consistent
lk.agent.*naming convention is maintained throughout.livekit-agents/livekit/agents/voice/client_events.py (7)
50-106: LGTM!Well-structured client event models with consistent discriminator pattern. The
ClientEventdiscriminated union enables proper deserialization on the client side.
146-153: LGTM!Clean recursive helper for extracting tool names from various tool types.
199-245: LGTM!Proper lifecycle management with idempotency checks and thorough cleanup in
aclose().
384-408: LGTM!Sound event streaming implementation with proper task management and error handling. Creating a new stream per event ensures reliability even if the previous stream encountered issues.
445-456: Note:_handle_send_messageblocks until the agent fully responds.The RPC handler awaits
session.run()synchronously, which may cause long response times for complex agent interactions. This is appropriate for RPC semantics (request-response), but clients should be aware of potential timeout considerations.
458-486: LGTM!Proper text input handling with participant validation and support for both sync and async callbacks.
586-625: LGTM!Clean RPC wrapper methods with proper request/response serialization using Pydantic models.
livekit-agents/livekit/agents/voice/agent_session.py (4)
44-44: LGTM!Import of the new
ClientEventsHandlerfrom the sibling module is straightforward.
338-338: LGTM!Attribute properly typed and initialized to
None, consistent with the pattern used for other optional handlers like_room_ioand_recorder_io.
529-529: LGTM!Reset is consistent with the existing pattern for
_room_ioand_recorder_io, enabling proper session restart behavior.
846-849: LGTM!Correct shutdown ordering — the handler is closed before
room_iosince it depends on it. The explicitNoneassignment after close is a good practice for preventing accidental reuse.
✏️ Tip: You can disable this entire section by setting review_details to false in your review settings.
| # Initialize the client events handler for exposing session state to clients | ||
| self._client_events_handler = ClientEventsHandler( | ||
| session=self, | ||
| room=room, | ||
| room_io=self._room_io, | ||
| ) | ||
|
|
||
| # Register text input handler if configured | ||
| text_input_opts = room_options.get_text_input_options() | ||
| if text_input_opts: | ||
| self._client_events_handler.register_text_input(text_input_opts.text_input_cb) | ||
|
|
||
| await self._client_events_handler.start() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Potential resource leak if start() fails after handler initialization.
If an exception occurs between line 594 (handler started) and line 674 (_started = True), the _client_events_handler won't be closed. The _aclose_impl method returns early when _started is False (lines 778-779), leaving the handler running.
Consider wrapping the handler lifecycle in try/except or ensuring cleanup on failure:
🛡️ Proposed fix to ensure handler cleanup on failure
self._room_io = room_io.RoomIO(room=room, agent_session=self, options=room_options)
await self._room_io.start()
# Initialize the client events handler for exposing session state to clients
self._client_events_handler = ClientEventsHandler(
session=self,
room=room,
room_io=self._room_io,
)
# Register text input handler if configured
text_input_opts = room_options.get_text_input_options()
if text_input_opts:
self._client_events_handler.register_text_input(text_input_opts.text_input_cb)
- await self._client_events_handler.start()
+ try:
+ await self._client_events_handler.start()
+ except Exception:
+ await self._client_events_handler.aclose()
+ self._client_events_handler = None
+ raiseAlternatively, a more comprehensive approach would wrap the entire initialization block and clean up all resources (including _room_io) on failure.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| # Initialize the client events handler for exposing session state to clients | |
| self._client_events_handler = ClientEventsHandler( | |
| session=self, | |
| room=room, | |
| room_io=self._room_io, | |
| ) | |
| # Register text input handler if configured | |
| text_input_opts = room_options.get_text_input_options() | |
| if text_input_opts: | |
| self._client_events_handler.register_text_input(text_input_opts.text_input_cb) | |
| await self._client_events_handler.start() | |
| # Initialize the client events handler for exposing session state to clients | |
| self._client_events_handler = ClientEventsHandler( | |
| session=self, | |
| room=room, | |
| room_io=self._room_io, | |
| ) | |
| # Register text input handler if configured | |
| text_input_opts = room_options.get_text_input_options() | |
| if text_input_opts: | |
| self._client_events_handler.register_text_input(text_input_opts.text_input_cb) | |
| try: | |
| await self._client_events_handler.start() | |
| except Exception: | |
| await self._client_events_handler.aclose() | |
| self._client_events_handler = None | |
| raise |
🤖 Prompt for AI Agents
In `@livekit-agents/livekit/agents/voice/agent_session.py` around lines 582 - 594,
The ClientEventsHandler is initialized and started but not cleaned up if start()
fails, leaving a resource leak because _aclose_impl returns early when _started
is False; wrap the initialization and start sequence (the ClientEventsHandler
creation, optional register_text_input call, and await
self._client_events_handler.start()) in a try/except/finally so that on any
exception you explicitly stop/close the handler (e.g., call its
stop/close/aclose method) and null out self._client_events_handler before
re-raising the error; also consider extending the same cleanup to _room_io if
the broader initialization fails so no partially-initialized resources remain.
| class ChatHistoryResponse(BaseModel): | ||
| """Response containing the agent<>user conversation turns.""" | ||
|
|
||
| items: list[ChatMessage] | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Type mismatch: ChatHistoryResponse.items may receive non-ChatMessage items.
ChatHistoryResponse declares items: list[ChatMessage], but _handle_get_history (line 424) passes self._session.history.items which contains ChatItem types (including FunctionCall, FunctionCallOutput, etc.). This will cause Pydantic validation errors when non-message items are present.
🐛 Proposed fix
class ChatHistoryResponse(BaseModel):
"""Response containing the agent<>user conversation turns."""
- items: list[ChatMessage]
+ items: list[ChatItem]📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| class ChatHistoryResponse(BaseModel): | |
| """Response containing the agent<>user conversation turns.""" | |
| items: list[ChatMessage] | |
| class ChatHistoryResponse(BaseModel): | |
| """Response containing the agent<>user conversation turns.""" | |
| items: list[ChatItem] |
🤖 Prompt for AI Agents
In `@livekit-agents/livekit/agents/voice/client_events.py` around lines 119 - 123,
The ChatHistoryResponse model is typed as items: list[ChatMessage] but
_handle_get_history passes self._session.history.items which contains
heterogeneous ChatItem instances (e.g., FunctionCall, FunctionCallOutput),
causing Pydantic validation errors; update ChatHistoryResponse to accept the
correct union by changing its items field to list[ChatItem] (or a suitable Union
of ChatMessage and the other ChatItem subclasses) and ensure ChatItem is
imported/defined in the module so that _handle_get_history can pass
self._session.history.items without runtime validation failures.
| def _on_event_stream(self, reader: rtc.TextStreamReader, participant_identity: str) -> None: | ||
| if participant_identity != self._agent_identity: | ||
| return | ||
|
|
||
| asyncio.create_task(self._read_event(reader)) | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Untracked background task may cause issues during shutdown.
asyncio.create_task() without tracking means the task won't be cancelled when aclose() is called. If the room disconnects while _read_event is running, it may raise unhandled exceptions.
🔧 Proposed fix to track and cancel tasks
Add task tracking similar to ClientEventsHandler:
def __init__(
self,
room: rtc.Room,
agent_identity: str,
) -> None:
super().__init__()
self._room = room
self._agent_identity = agent_identity
self._started = False
+ self._tasks: set[asyncio.Task[Any]] = set()
async def aclose(self) -> None:
if not self._started:
return
self._started = False
try:
self._room.unregister_text_stream_handler(TOPIC_CLIENT_EVENTS)
except ValueError:
pass
+
+ for task in self._tasks:
+ task.cancel()
+ self._tasks.clear()
def _on_event_stream(self, reader: rtc.TextStreamReader, participant_identity: str) -> None:
if participant_identity != self._agent_identity:
return
- asyncio.create_task(self._read_event(reader))
+ task = asyncio.create_task(self._read_event(reader))
+ self._tasks.add(task)
+ task.add_done_callback(self._tasks.discard)🤖 Prompt for AI Agents
In `@livekit-agents/livekit/agents/voice/client_events.py` around lines 562 - 567,
The handler _on_event_stream creates an untracked background task for
self._read_event which can survive shutdown and raise unhandled exceptions;
change it to create and track the task (e.g., add to a set attribute like
self._background_tasks or self._tasks), attach a done callback to remove the
task from the set when finished, and ensure your class teardown
(aclose/close/disconnect method) iterates over that set to cancel and await each
task; update _on_event_stream to add the created task to the set and remove it
on completion, and update the existing aclose/cleanup method to cancel and await
tasks to avoid untracked background work.
| chat_ctx: list[ChatItem] | ||
|
|
||
|
|
||
| class SendMessageRequest(BaseModel): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we use a name like UserMessageReuqest or ClientMessageRequest?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't prefix RPC messages, only the events
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's fine knowing those structs are going to be on the client SDKs
|
Awesome work! Is there any plan to implement it in the frontend sdk? |
Summary by CodeRabbit
New Features
Refactor
✏️ Tip: You can customize this high-level summary in your review settings.