Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions examples/voice_agents/mcp/mcp-agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ def __init__(self) -> None:
async def on_enter(self):
# when the agent is added to the session, it'll generate a reply
# according to its instructions
self.session.generate_reply()
self.session.generate_reply(instructions="greeting the user and introducing yourself")


server = AgentServer()
Expand All @@ -37,8 +37,10 @@ async def entrypoint(ctx: JobContext):
llm=inference.LLM("openai/gpt-4.1-mini"),
tts=inference.TTS("cartesia/sonic-3"),
turn_detection=MultilingualModel(),
mcp_servers=[
mcp.MCPServerHTTP(url="http://localhost:8000/sse"),
tools=[
mcp.MCPToolset(
id="mcp_toolset_1", mcp_server=mcp.MCPServerHTTP(url="http://localhost:8000/sse")
)
],
)

Expand Down
19 changes: 8 additions & 11 deletions livekit-agents/livekit/agents/beta/tools/end_call.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from typing import Any

from ...job import get_job_context
from ...llm import RealtimeModel, Tool, Toolset, function_tool
from ...llm import RealtimeModel, Toolset, function_tool
from ...log import logger
from ...voice.events import CloseEvent, RunContext, SpeechCreatedEvent
from ...voice.speech_handle import SpeechHandle
Expand Down Expand Up @@ -45,19 +45,20 @@ def __init__(
on_tool_called: Callback to call when the tool is called.
on_tool_completed: Callback to call when the tool is completed.
"""
super().__init__(id="end_call")
end_call_tool = function_tool(
self._end_call,
name="end_call",
description=f"{END_CALL_DESCRIPTION}\n{extra_description}",
)
super().__init__(id="end_call", tools=[end_call_tool])

self._delete_room = delete_room
self._extra_description = extra_description

self._end_instructions = end_instructions
self._on_tool_called = on_tool_called
self._on_tool_completed = on_tool_completed

self._end_call_tool = function_tool(
self._end_call,
name="end_call",
description=f"{END_CALL_DESCRIPTION}\n{extra_description}",
)
self._shutdown_session_task: asyncio.Task[None] | None = None

async def _end_call(self, ctx: RunContext) -> Any | None:
Expand Down Expand Up @@ -126,7 +127,3 @@ async def _on_shutdown() -> None:

# shutdown the job process
job_ctx.shutdown(reason=ev.reason.value)

@property
def tools(self) -> list[Tool]:
return [self._end_call_tool]
60 changes: 59 additions & 1 deletion livekit-agents/livekit/agents/llm/mcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@
from urllib.parse import urlparse

from anyio.streams.memory import MemoryObjectReceiveStream, MemoryObjectSendStream
from typing_extensions import Self

from .tool_context import Toolset

try:
import httpx
Expand Down Expand Up @@ -106,7 +109,6 @@ async def initialize(self) -> None:
)
)
await self._client.initialize() # type: ignore[union-attr]
self._initialized = True
except Exception:
await self.aclose()
raise
Expand Down Expand Up @@ -369,3 +371,59 @@ def client_streams(

def __repr__(self) -> str:
return f"MCPServerStdio(command={self.command}, args={self.args}, cwd={self.cwd})"


class MCPToolset(Toolset):
"""A toolset that exposes tools from a Model Context Protocol (MCP) server.

MCPToolset wraps an ``MCPServer`` instance and makes its tools available for
use by an ``Agent``. On ``setup()``, it connects to the MCP server (if not
already connected), fetches the available tools, and caches them locally.
"""

def __init__(self, *, id: str, mcp_server: MCPServer) -> None:
super().__init__(id=id)
self._mcp_server = mcp_server
self._initialized = False
self._lock = asyncio.Lock()

async def setup(self, *, reload: bool = False) -> Self:
"""Initialize the MCP server connection and fetch available tools.

If the MCP server is not yet connected, this will call
``MCPServer.initialize()``. Subsequent calls are no-ops unless
``reload=True``.

Args:
reload: If ``True``, invalidate the tool cache and re-fetch
tools from the MCP server even if already initialized.
"""
await super().setup()
async with self._lock:
if not reload and self._initialized:
return self

if not self._mcp_server.initialized:
await self._mcp_server.initialize()
elif reload:
self._mcp_server.invalidate_cache()

tools = await self._mcp_server.list_tools()
self._tools = tools
self._initialized = True
return self

def filter_tools(self, filter_fn: Callable[[MCPTool], bool]) -> Self:
"""Filter the toolset's tools in-place using a predicate."""
self._tools = [
tool for tool in self._tools if isinstance(tool, MCPTool) and filter_fn(tool)
]
return self

async def aclose(self) -> None:
try:
await super().aclose()
await self._mcp_server.aclose()
finally:
self._initialized = False
self._tools = []
32 changes: 28 additions & 4 deletions livekit-agents/livekit/agents/llm/tool_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

from __future__ import annotations

import asyncio
import functools
import inspect
import itertools
Expand Down Expand Up @@ -47,7 +48,7 @@ def id(self) -> str:
return self._id


class Toolset(ABC):
class Toolset:
@dataclass
class ToolCalledEvent:
ctx: RunContext
Expand All @@ -58,16 +59,39 @@ class ToolCompletedEvent:
ctx: RunContext
output: Any | Exception | None

def __init__(self, *, id: str) -> None:
def __init__(self, *, id: str, tools: list[Tool | Toolset] | None = None) -> None:
self._id = id
self._tools: Sequence[Tool | Toolset] = tools or []

@property
def id(self) -> str:
return self._id

@property
@abstractmethod
def tools(self) -> list[Tool]: ...
def tools(self) -> Sequence[Tool | Toolset]:
return self._tools

async def setup(self) -> Self:
Comment thread
devin-ai-integration[bot] marked this conversation as resolved.
"""Initialize the toolset and any nested toolsets.

Called automatically by ``AgentActivity`` when an agent starts.
"""
toolsets = [tool for tool in self.tools if isinstance(tool, Toolset)]
if toolsets:
await asyncio.gather(*(toolset.setup() for toolset in toolsets))
return self

async def aclose(self) -> None:
Comment thread
devin-ai-integration[bot] marked this conversation as resolved.
"""Close the toolset and release any held resources.

Agent-scoped toolsets (passed to ``Agent(tools=...)``) are closed when the
``AgentActivity`` ends (on agent transition or session close). Session-scoped
toolsets (passed to ``AgentSession(tools=...)``) are closed only when the
``AgentSession`` shuts down.
"""
toolsets = [tool for tool in self.tools if isinstance(tool, Toolset)]
if toolsets:
await asyncio.gather(*(toolset.aclose() for toolset in toolsets))
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we add return_exceptions=True for both gather calls here? I guess fail fast is good too.



# Used by ToolChoice
Expand Down
50 changes: 26 additions & 24 deletions livekit-agents/livekit/agents/voice/agent_activity.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@
from ..llm import mcp
from .agent_session import AgentSession


_AgentActivityContextVar = contextvars.ContextVar["AgentActivity"]("agents_activity")
_SpeechHandleContextVar = contextvars.ContextVar["SpeechHandle"]("agents_speech_handle")

Expand Down Expand Up @@ -142,7 +143,7 @@ def __init__(self, agent: Agent, sess: AgentSession) -> None:
self._preemptive_generation: _PreemptiveGeneration | None = None

self._drain_blocked_tasks: list[asyncio.Task[Any]] = []
self._mcp_tools: list[mcp.MCPTool] = []
self._mcp_tools: list[mcp.MCPToolset] = []

self._on_enter_task: asyncio.Task | None = None
self._on_exit_task: asyncio.Task | None = None
Expand Down Expand Up @@ -582,32 +583,28 @@ async def _start_session(self) -> None:
self._interruption_detector.on("overlapping_speech", self._on_overlap_speech_ended)

if self.mcp_servers:
from ..llm.mcp import MCPToolset

@utils.log_exceptions(logger=logger)
async def _list_mcp_tools_task(
mcp_server: mcp.MCPServer,
) -> list[mcp.MCPTool]:
if not mcp_server.initialized:
await mcp_server.initialize()
logger.warning(
"passing MCP servers to AgentSession or Agent is deprecated "
"and will be removed in a future version. Use `MCPToolset` instead."
)
self._mcp_tools = [
MCPToolset(id=utils.shortuuid("mcp_toolset_"), mcp_server=server)
for server in self.mcp_servers
]

return await mcp_server.list_tools()
toolsets = [tool for tool in self.tools if isinstance(tool, llm.Toolset)]
if toolsets:

gathered = await asyncio.gather(
*(_list_mcp_tools_task(s) for s in self.mcp_servers),
@utils.log_exceptions(logger=logger)
async def _setup_toolset(toolset: llm.Toolset) -> None:
await toolset.setup()

await asyncio.gather(
*(_setup_toolset(toolset) for toolset in toolsets),
return_exceptions=True,
)
tools: list[mcp.MCPTool] = []
for mcp_server, res in zip(self.mcp_servers, gathered, strict=False):
if isinstance(res, BaseException):
logger.error(
f"failed to list tools from MCP server {mcp_server}",
exc_info=res,
)
continue

tools.extend(res)

self._mcp_tools = tools

if isinstance(self.llm, llm.RealtimeModel):
self._rt_session = self.llm.session()
Expand Down Expand Up @@ -822,9 +819,14 @@ async def _close_session(self) -> None:
if self._audio_recognition is not None:
await self._audio_recognition.aclose()

if self.mcp_servers:
# close the toolsets created internally and the ones from the agent
# leave the ones from the session open, they will be closed by the session
toolsets = self._mcp_tools + [
tool for tool in self._agent.tools if isinstance(tool, llm.Toolset)
]
if toolsets:
await asyncio.gather(
*(mcp_server.aclose() for mcp_server in self.mcp_servers), return_exceptions=True
*(toolset.aclose() for toolset in toolsets), return_exceptions=True
)

await self._cancel_speech_pause(
Expand Down
7 changes: 7 additions & 0 deletions livekit-agents/livekit/agents/voice/agent_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -948,6 +948,13 @@ async def _aclose_impl(
if self._ivr_activity is not None:
await self._ivr_activity.aclose()

toolsets = [tool for tool in self._tools if isinstance(tool, llm.Toolset)]
if toolsets:
await asyncio.gather(
*(toolset.aclose() for toolset in toolsets),
return_exceptions=True,
)

if self._session_span:
self._session_span.end()
self._session_span = None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,16 +38,11 @@ def __init__(
width: int = 1280,
height: int = 720,
) -> None:
super().__init__(id="computer")
self._actions = actions
self._provider_tool = ComputerUse(
display_width_px=width,
display_height_px=height,
super().__init__(
id="computer",
tools=[ComputerUse(display_width_px=width, display_height_px=height)],
)

@property
def tools(self) -> list[llm.Tool]:
return [self._provider_tool]
self._actions = actions

async def execute(self, action: str, **kwargs: Any) -> list[dict[str, Any]]:
"""Dispatch an Anthropic computer_use action and return screenshot content."""
Expand Down Expand Up @@ -113,7 +108,8 @@ async def execute(self, action: str, **kwargs: Any) -> list[dict[str, Any]]:
return [{"type": "text", "text": "(no frame available yet)"}]
return _screenshot_content(frame)

def aclose(self) -> None:
async def aclose(self) -> None:
await super().aclose()
self._actions.aclose()


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@ async def aclose(self) -> None:
await self._agent_loop_task

if self._computer_tool:
self._computer_tool.aclose()
await self._computer_tool.aclose()

if self._session:
await self._session.aclose()
Expand Down
Loading