Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions livekit-agents/livekit/agents/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,12 @@
```
"""

ATTRIBUTE_AGENT_NAME = "lk.agent.name"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when should we start changing it to lk.dispatch_name?

"""
The name of the agent, stored in the agent's attributes.
This is set when the agent joins a room and can be used to identify the agent type.
"""

TOPIC_CHAT = "lk.chat"
TOPIC_TRANSCRIPTION = "lk.transcription"
TOPIC_CLIENT_EVENTS = "lk.agent.events"
Expand Down
3 changes: 2 additions & 1 deletion livekit-agents/livekit/agents/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from .log import log_exceptions
from .misc import is_given, nodename, shortuuid, time_ms
from .moving_average import MovingAverage
from .participant import wait_for_participant, wait_for_track_publication
from .participant import wait_for_agent, wait_for_participant, wait_for_track_publication

EventEmitter = rtc.EventEmitter

Expand All @@ -33,6 +33,7 @@
"hw",
"is_given",
"ConnectionPool",
"wait_for_agent",
"wait_for_participant",
"wait_for_track_publication",
]
Expand Down
56 changes: 56 additions & 0 deletions livekit-agents/livekit/agents/utils/participant.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,62 @@

from livekit import rtc

from ..types import ATTRIBUTE_AGENT_NAME


async def wait_for_agent(
room: rtc.Room,
*,
agent_name: str | None = None,
) -> rtc.RemoteParticipant:
"""
Wait for an agent participant to join the room.

Args:
room: The room to wait for the agent in.
agent_name: If provided, waits for an agent with matching lk.agent.name attribute.
If None, returns the first agent participant found.

Returns:
The agent participant.

Raises:
RuntimeError: If the room is not connected.
"""
if not room.isconnected():
raise RuntimeError("room is not connected")

fut: asyncio.Future[rtc.RemoteParticipant] = asyncio.Future()

def matches_agent(p: rtc.RemoteParticipant) -> bool:
if p.kind != rtc.ParticipantKind.PARTICIPANT_KIND_AGENT:
return False
if agent_name is None:
return True
return p.attributes.get(ATTRIBUTE_AGENT_NAME) == agent_name

def on_participant_connected(p: rtc.RemoteParticipant) -> None:
if matches_agent(p) and not fut.done():
fut.set_result(p)

def on_attributes_changed(changed: list[str], p: rtc.Participant) -> None:
if isinstance(p, rtc.RemoteParticipant) and matches_agent(p) and not fut.done():
fut.set_result(p)

room.on("participant_connected", on_participant_connected)
room.on("participant_attributes_changed", on_attributes_changed)

try:
# Check existing participants
for p in room.remote_participants.values():
if matches_agent(p):
return p

return await fut
finally:
room.off("participant_connected", on_participant_connected)
room.off("participant_attributes_changed", on_attributes_changed)


async def wait_for_participant(
room: rtc.Room,
Expand Down
5 changes: 4 additions & 1 deletion livekit-agents/livekit/agents/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
)
from .log import DEV_LEVEL, logger
from .plugin import Plugin
from .types import NOT_GIVEN, NotGivenOr
from .types import ATTRIBUTE_AGENT_NAME, NOT_GIVEN, NotGivenOr
from .utils import http_server, is_given
from .utils.hw import get_cpu_monitor
from .version import __version__
Expand Down Expand Up @@ -1172,6 +1172,9 @@ async def _on_accept(args: JobAcceptArguments) -> None:
availability_resp.availability.participant_identity = args.identity
availability_resp.availability.participant_name = args.name
availability_resp.availability.participant_metadata = args.metadata
availability_resp.availability.participant_attributes[ATTRIBUTE_AGENT_NAME] = (
self._agent_name
)
if args.attributes:
availability_resp.availability.participant_attributes.update(args.attributes)
await self._queue_msg(availability_resp)
Expand Down
Loading