diff --git a/python/samples/concepts/README.md b/python/samples/concepts/README.md index 7832d6717a9b..105c0e94b636 100644 --- a/python/samples/concepts/README.md +++ b/python/samples/concepts/README.md @@ -4,6 +4,7 @@ This section contains code snippets that demonstrate the usage of Semantic Kerne | Features | Description | | -------- | ----------- | +| Agents | Creating and using agents in Semantic Kernel | | AutoFunctionCalling | Using `Auto Function Calling` to allow function call capable models to invoke Kernel Functions automatically | | ChatCompletion | Using [`ChatCompletion`](https://github.com/microsoft/semantic-kernel/blob/main/python/semantic_kernel/connectors/ai/chat_completion_client_base.py) messaging capable service with models | | Filtering | Creating and using Filters | diff --git a/python/samples/concepts/agents/README.md b/python/samples/concepts/agents/README.md new file mode 100644 index 000000000000..46a69a539633 --- /dev/null +++ b/python/samples/concepts/agents/README.md @@ -0,0 +1,30 @@ +# Semantic Kernel Agents - Getting Started + +This project contains a step by step guide to get started with _Semantic Kernel Agents_ in Python. + + +#### PyPI: +- For the use of agents, the minimum allowed Semantic Kernel pypi version is 1.3 # TODO Update + +#### Source +- [Semantic Kernel Agent Framework](../../../semantic_kernel/agents/) + +## Examples + +The getting started with agents examples include: + +Example|Description +---|--- +[step1_agent](../agents/step1_agent.py)|How to create and use an agent. +[step2_plugins](../agents/step2_plugins.py)|How to associate plugins with an agent. + +## Configuring the Kernel + +Similar to the Semantic Kernel Python concept samples, it is necessary to configure the secrets +and keys used by the kernel. See the follow "Configuring the Kernel" [guide](../README.md#configuring-the-kernel) for +more information. + +## Running Concept Samples + +Concept samples can be run in an IDE or via the command line. After setting up the required api key +for your AI connector, the samples run without any extra command line arguments. \ No newline at end of file diff --git a/python/samples/concepts/agents/step1_agent.py b/python/samples/concepts/agents/step1_agent.py new file mode 100644 index 000000000000..08e6fdeda8f0 --- /dev/null +++ b/python/samples/concepts/agents/step1_agent.py @@ -0,0 +1,67 @@ +# Copyright (c) Microsoft. All rights reserved. + +import asyncio +from functools import reduce + +from semantic_kernel.agents.chat_completion_agent import ChatCompletionAgent +from semantic_kernel.connectors.ai.open_ai import AzureChatCompletion +from semantic_kernel.contents.chat_history import ChatHistory +from semantic_kernel.contents.utils.author_role import AuthorRole +from semantic_kernel.kernel import Kernel + +################################################################### +# The following sample demonstrates how to create a simple, # +# non-group agent that repeats the user message in the voice # +# of a pirate and then ends with a parrot sound. # +################################################################### + +# To toggle streaming or non-streaming mode, change the following boolean +streaming = True + +# Define the agent name and instructions +PARROT_NAME = "Parrot" +PARROT_INSTRUCTIONS = "Repeat the user message in the voice of a pirate and then end with a parrot sound." + + +async def invoke_agent(agent: ChatCompletionAgent, input: str, chat: ChatHistory): + """Invoke the agent with the user input.""" + chat.add_user_message(input) + + print(f"# {AuthorRole.USER}: '{input}'") + + if streaming: + contents = [] + content_name = "" + async for content in agent.invoke_stream(chat): + content_name = content.name + contents.append(content) + streaming_chat_message = reduce(lambda first, second: first + second, contents) + print(f"# {content.role} - {content_name or '*'}: '{streaming_chat_message}'") + chat.add_message(content) + else: + async for content in agent.invoke(chat): + print(f"# {content.role} - {content.name or '*'}: '{content.content}'") + chat.add_message(content) + + +async def main(): + # Create the instance of the Kernel + kernel = Kernel() + + # Add the OpenAIChatCompletion AI Service to the Kernel + kernel.add_service(AzureChatCompletion(service_id="agent")) + + # Create the agent + agent = ChatCompletionAgent(service_id="agent", kernel=kernel, name=PARROT_NAME, instructions=PARROT_INSTRUCTIONS) + + # Define the chat history + chat = ChatHistory() + + # Respond to user input + await invoke_agent(agent, "Fortune favors the bold.", chat) + await invoke_agent(agent, "I came, I saw, I conquered.", chat) + await invoke_agent(agent, "Practice makes perfect.", chat) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/python/samples/concepts/agents/step2_plugins.py b/python/samples/concepts/agents/step2_plugins.py new file mode 100644 index 000000000000..46111da6100a --- /dev/null +++ b/python/samples/concepts/agents/step2_plugins.py @@ -0,0 +1,99 @@ +# Copyright (c) Microsoft. All rights reserved. + +import asyncio +from typing import Annotated + +from semantic_kernel.agents.chat_completion_agent import ChatCompletionAgent +from semantic_kernel.connectors.ai.function_choice_behavior import FunctionChoiceBehavior +from semantic_kernel.connectors.ai.open_ai import AzureChatCompletion +from semantic_kernel.contents.chat_history import ChatHistory +from semantic_kernel.contents.utils.author_role import AuthorRole +from semantic_kernel.functions.kernel_function_decorator import kernel_function +from semantic_kernel.kernel import Kernel + +################################################################### +# The following sample demonstrates how to create a simple, # +# non-group agent that utilizes plugins defined as part of # +# the Kernel. # +################################################################### + +# This sample allows for a streaming response verus a non-streaming response +streaming = True + +# Define the agent name and instructions +HOST_NAME = "Host" +HOST_INSTRUCTIONS = "Answer questions about the menu." + + +# Define a sample plugin for the sample +class MenuPlugin: + """A sample Menu Plugin used for the concept sample.""" + + @kernel_function(description="Provides a list of specials from the menu.") + def get_specials(self) -> Annotated[str, "Returns the specials from the menu."]: + return """ + Special Soup: Clam Chowder + Special Salad: Cobb Salad + Special Drink: Chai Tea + """ + + @kernel_function(description="Provides the price of the requested menu item.") + def get_item_price( + self, menu_item: Annotated[str, "The name of the menu item."] + ) -> Annotated[str, "Returns the price of the menu item."]: + return "$9.99" + + +# A helper method to invoke the agent with the user input +async def invoke_agent(agent: ChatCompletionAgent, input: str, chat: ChatHistory) -> None: + """Invoke the agent with the user input.""" + chat.add_user_message(input) + + print(f"# {AuthorRole.USER}: '{input}'") + + if streaming: + contents = [] + content_name = "" + async for content in agent.invoke_stream(chat): + content_name = content.name + contents.append(content) + message_content = "".join([content.content for content in contents]) + print(f"# {content.role} - {content_name or '*'}: '{message_content}'") + chat.add_assistant_message(message_content) + else: + async for content in agent.invoke(chat): + print(f"# {content.role} - {content.name or '*'}: '{content.content}'") + chat.add_message(content) + + +async def main(): + # Create the instance of the Kernel + kernel = Kernel() + + # Add the OpenAIChatCompletion AI Service to the Kernel + service_id = "agent" + kernel.add_service(AzureChatCompletion(service_id=service_id)) + + settings = kernel.get_prompt_execution_settings_from_service_id(service_id=service_id) + # Configure the function choice behavior to auto invoke kernel functions + settings.function_choice_behavior = FunctionChoiceBehavior.Auto() + + kernel.add_plugin(plugin=MenuPlugin(), plugin_name="menu") + + # Create the agent + agent = ChatCompletionAgent( + service_id="agent", kernel=kernel, name=HOST_NAME, instructions=HOST_INSTRUCTIONS, execution_settings=settings + ) + + # Define the chat history + chat = ChatHistory() + + # Respond to user input + await invoke_agent(agent, "Hello", chat) + await invoke_agent(agent, "What is the special soup?", chat) + await invoke_agent(agent, "What is the special drink?", chat) + await invoke_agent(agent, "Thank you", chat) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/python/semantic_kernel/agents/__init__.py b/python/semantic_kernel/agents/__init__.py new file mode 100644 index 000000000000..376202f33570 --- /dev/null +++ b/python/semantic_kernel/agents/__init__.py @@ -0,0 +1,7 @@ +# Copyright (c) Microsoft. All rights reserved. + +from semantic_kernel.agents.chat_completion_agent import ChatCompletionAgent + +__all__ = [ + "ChatCompletionAgent", +] diff --git a/python/semantic_kernel/agents/agent.py b/python/semantic_kernel/agents/agent.py new file mode 100644 index 000000000000..73ffcba0240e --- /dev/null +++ b/python/semantic_kernel/agents/agent.py @@ -0,0 +1,57 @@ +# Copyright (c) Microsoft. All rights reserved. + +import uuid +from abc import ABC +from typing import ClassVar + +from pydantic import Field + +from semantic_kernel.agents.agent_channel import AgentChannel +from semantic_kernel.kernel import Kernel +from semantic_kernel.kernel_pydantic import KernelBaseModel +from semantic_kernel.utils.experimental_decorator import experimental_class + + +@experimental_class +class Agent(ABC, KernelBaseModel): + """Base abstraction for all Semantic Kernel agents. + + An agent instance may participate in one or more conversations. + A conversation may include one or more agents. + In addition to identity and descriptive meta-data, an Agent + must define its communication protocol, or AgentChannel. + + Attributes: + name: The name of the agent (optional). + description: The description of the agent (optional). + id: The unique identifier of the agent (optional). If no id is provided, + a new UUID will be generated. + instructions: The instructions for the agent (optional + """ + + id: str = Field(default_factory=lambda: str(uuid.uuid4())) + description: str | None = None + name: str | None = None + instructions: str | None = None + kernel: Kernel = Field(default_factory=Kernel) + channel_type: ClassVar[type[AgentChannel] | None] = None + + def get_channel_keys(self) -> list[str]: + """Get the channel keys. + + Returns: + A list of channel keys. + """ + if not self.channel_type: + raise NotImplementedError("Unable to get channel keys. Channel type not configured.") + return [self.channel_type.__name__] + + def create_channel(self) -> AgentChannel: + """Create a channel. + + Returns: + An instance of AgentChannel. + """ + if not self.channel_type: + raise NotImplementedError("Unable to create channel. Channel type not configured.") + return self.channel_type() diff --git a/python/semantic_kernel/agents/agent_channel.py b/python/semantic_kernel/agents/agent_channel.py new file mode 100644 index 000000000000..ea834950e88e --- /dev/null +++ b/python/semantic_kernel/agents/agent_channel.py @@ -0,0 +1,59 @@ +# Copyright (c) Microsoft. All rights reserved. + +from abc import ABC, abstractmethod +from collections.abc import AsyncIterable +from typing import TYPE_CHECKING + +from semantic_kernel.utils.experimental_decorator import experimental_class + +if TYPE_CHECKING: + from semantic_kernel.agents.agent import Agent + from semantic_kernel.contents.chat_message_content import ChatMessageContent + + +@experimental_class +class AgentChannel(ABC): + """Defines the communication protocol for a particular Agent type. + + An agent provides it own AgentChannel via CreateChannel. + """ + + @abstractmethod + async def receive( + self, + history: list["ChatMessageContent"], + ) -> None: + """Receive the conversation messages. + + Used when joining a conversation and also during each agent interaction. + + Args: + history: The history of messages in the conversation. + """ + ... + + @abstractmethod + def invoke( + self, + agent: "Agent", + ) -> AsyncIterable["ChatMessageContent"]: + """Perform a discrete incremental interaction between a single Agent and AgentChat. + + Args: + agent: The agent to interact with. + + Returns: + An async iterable of ChatMessageContent. + """ + ... + + @abstractmethod + def get_history( + self, + ) -> AsyncIterable["ChatMessageContent"]: + """Retrieve the message history specific to this channel. + + Returns: + An async iterable of ChatMessageContent. + """ + ... diff --git a/python/semantic_kernel/agents/chat_completion_agent.py b/python/semantic_kernel/agents/chat_completion_agent.py new file mode 100644 index 000000000000..44cf48f94722 --- /dev/null +++ b/python/semantic_kernel/agents/chat_completion_agent.py @@ -0,0 +1,196 @@ +# Copyright (c) Microsoft. All rights reserved. + +import logging +from collections.abc import AsyncGenerator, AsyncIterable +from typing import TYPE_CHECKING, Any, ClassVar + +from semantic_kernel.agents.agent import Agent +from semantic_kernel.agents.agent_channel import AgentChannel +from semantic_kernel.agents.chat_history_channel import ChatHistoryChannel +from semantic_kernel.connectors.ai.chat_completion_client_base import ChatCompletionClientBase +from semantic_kernel.connectors.ai.prompt_execution_settings import PromptExecutionSettings +from semantic_kernel.const import DEFAULT_SERVICE_NAME +from semantic_kernel.contents.chat_history import ChatHistory +from semantic_kernel.contents.chat_message_content import ChatMessageContent +from semantic_kernel.contents.streaming_chat_message_content import StreamingChatMessageContent +from semantic_kernel.contents.utils.author_role import AuthorRole +from semantic_kernel.exceptions import KernelServiceNotFoundError +from semantic_kernel.utils.experimental_decorator import experimental_class + +if TYPE_CHECKING: + from semantic_kernel.kernel import Kernel + +logger: logging.Logger = logging.getLogger(__name__) + + +@experimental_class +class ChatCompletionAgent(Agent): + """A KernelAgent specialization based on ChatCompletionClientBase. + + Note: enable `function_choice_behavior` on the PromptExecutionSettings to enable function + choice behavior which allows the kernel to utilize plugins and functions registered in + the kernel. + """ + + service_id: str + execution_settings: PromptExecutionSettings | None = None + channel_type: ClassVar[type[AgentChannel]] = ChatHistoryChannel + + def __init__( + self, + service_id: str | None = None, + kernel: "Kernel | None" = None, + name: str | None = None, + id: str | None = None, + description: str | None = None, + instructions: str | None = None, + execution_settings: PromptExecutionSettings | None = None, + ) -> None: + """Initialize a new instance of ChatCompletionAgent. + + Args: + service_id: The service id for the chat completion service. (optional) If not provided, + the default service name `default` will be used. + kernel: The kernel instance. (optional) + name: The name of the agent. (optional) + id: The unique identifier for the agent. (optional) If not provided, + a unique GUID will be generated. + description: The description of the agent. (optional) + instructions: The instructions for the agent. (optional) + execution_settings: The execution settings for the agent. (optional) + """ + if not service_id: + service_id = DEFAULT_SERVICE_NAME + + args: dict[str, Any] = { + "service_id": service_id, + "name": name, + "description": description, + "instructions": instructions, + "execution_settings": execution_settings, + } + if id is not None: + args["id"] = id + if kernel is not None: + args["kernel"] = kernel + super().__init__(**args) + + async def invoke(self, history: ChatHistory) -> AsyncIterable[ChatMessageContent]: + """Invoke the chat history handler. + + Args: + kernel: The kernel instance. + history: The chat history. + + Returns: + An async iterable of ChatMessageContent. + """ + # Get the chat completion service + chat_completion_service = self.kernel.get_service(service_id=self.service_id, type=ChatCompletionClientBase) + + if not chat_completion_service: + raise KernelServiceNotFoundError(f"Chat completion service not found with service_id: {self.service_id}") + + assert isinstance(chat_completion_service, ChatCompletionClientBase) # nosec + + settings = ( + self.execution_settings + or self.kernel.get_prompt_execution_settings_from_service_id(self.service_id) + or chat_completion_service.instantiate_prompt_execution_settings( + service_id=self.service_id, extension_data={"ai_model_id": chat_completion_service.ai_model_id} + ) + ) + + chat = self._setup_agent_chat_history(history) + + message_count = len(chat) + + logger.debug(f"[{type(self).__name__}] Invoking {type(chat_completion_service).__name__}.") + + messages = await chat_completion_service.get_chat_message_contents( + chat_history=chat, + settings=settings, + kernel=self.kernel, + ) + + logger.info( + f"[{type(self).__name__}] Invoked {type(chat_completion_service).__name__} " + f"with message count: {message_count}." + ) + + # Capture mutated messages related function calling / tools + for message_index in range(message_count, len(chat)): + message = chat[message_index] + message.name = self.name + history.add_message(message) + + for message in messages: + message.name = self.name + yield message + + async def invoke_stream(self, history: ChatHistory) -> AsyncIterable[StreamingChatMessageContent]: + """Invoke the chat history handler in streaming mode. + + Args: + kernel: The kernel instance. + history: The chat history. + + Returns: + An async generator of StreamingChatMessageContent. + """ + # Get the chat completion service + chat_completion_service = self.kernel.get_service(service_id=self.service_id, type=ChatCompletionClientBase) + + if not chat_completion_service: + raise KernelServiceNotFoundError(f"Chat completion service not found with service_id: {self.service_id}") + + assert isinstance(chat_completion_service, ChatCompletionClientBase) # nosec + + settings = ( + self.execution_settings + or self.kernel.get_prompt_execution_settings_from_service_id(self.service_id) + or chat_completion_service.instantiate_prompt_execution_settings( + service_id=self.service_id, extension_data={"ai_model_id": chat_completion_service.ai_model_id} + ) + ) + + chat = self._setup_agent_chat_history(history) + + message_count = len(chat) + + logger.debug(f"[{type(self).__name__}] Invoking {type(chat_completion_service).__name__}.") + + messages: AsyncGenerator[list[StreamingChatMessageContent], Any] = ( + chat_completion_service.get_streaming_chat_message_contents( + chat_history=chat, + settings=settings, + kernel=self.kernel, + ) + ) + + logger.info( + f"[{type(self).__name__}] Invoked {type(chat_completion_service).__name__} " + f"with message count: {message_count}." + ) + + async for message_list in messages: + for message in message_list: + message.name = self.name + yield message + + # Capture mutated messages related function calling / tools + for message_index in range(message_count, len(chat)): + message = chat[message_index] # type: ignore + message.name = self.name + history.add_message(message) + + def _setup_agent_chat_history(self, history: ChatHistory) -> ChatHistory: + """Setup the agent chat history.""" + chat = [] + + if self.instructions is not None: + chat.append(ChatMessageContent(role=AuthorRole.SYSTEM, content=self.instructions, name=self.name)) + + chat.extend(history.messages if history.messages else []) + + return ChatHistory(messages=chat) diff --git a/python/semantic_kernel/agents/chat_history_channel.py b/python/semantic_kernel/agents/chat_history_channel.py new file mode 100644 index 000000000000..dc4a1b231b1d --- /dev/null +++ b/python/semantic_kernel/agents/chat_history_channel.py @@ -0,0 +1,92 @@ +# Copyright (c) Microsoft. All rights reserved. + +import sys +from collections.abc import AsyncIterable + +if sys.version_info >= (3, 12): + from typing import override # pragma: no cover +else: + from typing_extensions import override # pragma: no cover + +from abc import abstractmethod +from typing import TYPE_CHECKING, Protocol, runtime_checkable + +from semantic_kernel.agents.agent import Agent +from semantic_kernel.agents.agent_channel import AgentChannel +from semantic_kernel.contents import ChatMessageContent +from semantic_kernel.contents.chat_history import ChatHistory +from semantic_kernel.exceptions import ServiceInvalidTypeError +from semantic_kernel.utils.experimental_decorator import experimental_class + +if TYPE_CHECKING: + from semantic_kernel.contents.chat_history import ChatHistory + from semantic_kernel.contents.chat_message_content import ChatMessageContent + from semantic_kernel.contents.streaming_chat_message_content import StreamingChatMessageContent + + +@experimental_class +@runtime_checkable +class ChatHistoryAgentProtocol(Protocol): + """Contract for an agent that utilizes a ChatHistoryChannel.""" + + @abstractmethod + def invoke(self, history: "ChatHistory") -> AsyncIterable["ChatMessageContent"]: + """Invoke the chat history agent protocol.""" + ... + + @abstractmethod + def invoke_stream(self, history: "ChatHistory") -> AsyncIterable["StreamingChatMessageContent"]: + """Invoke the chat history agent protocol in streaming mode.""" + ... + + +@experimental_class +class ChatHistoryChannel(AgentChannel, ChatHistory): + """An AgentChannel specialization for that acts upon a ChatHistoryHandler.""" + + @override + async def invoke( + self, + agent: Agent, + ) -> AsyncIterable[ChatMessageContent]: + """Perform a discrete incremental interaction between a single Agent and AgentChat. + + Args: + agent: The agent to interact with. + + Returns: + An async iterable of ChatMessageContent. + """ + if not isinstance(agent, ChatHistoryAgentProtocol): + id = getattr(agent, "id", "") + raise ServiceInvalidTypeError( + f"Invalid channel binding for agent with id: `{id}` with name: ({type(agent).__name__})" + ) + + async for message in agent.invoke(self): + self.messages.append(message) + yield message + + @override + async def receive( + self, + history: list[ChatMessageContent], + ) -> None: + """Receive the conversation messages. + + Args: + history: The history of messages in the conversation. + """ + self.messages.extend(history) + + @override + async def get_history( # type: ignore + self, + ) -> AsyncIterable[ChatMessageContent]: + """Retrieve the message history specific to this channel. + + Returns: + An async iterable of ChatMessageContent. + """ + for message in reversed(self.messages): + yield message diff --git a/python/tests/samples/test_concepts.py b/python/tests/samples/test_concepts.py index 166366319bca..32a505926eb0 100644 --- a/python/tests/samples/test_concepts.py +++ b/python/tests/samples/test_concepts.py @@ -5,6 +5,8 @@ import pytest from pytest import mark, param +from samples.concepts.agents.step1_agent import main as step1_agent +from samples.concepts.agents.step2_plugins import main as step2_plugins from samples.concepts.auto_function_calling.azure_python_code_interpreter_function_calling import ( main as azure_python_code_interpreter_function_calling, ) @@ -95,6 +97,8 @@ param(custom_service_selector, [], id="custom_service_selector"), param(function_defined_in_json_prompt, ["What is 3+3?", "exit"], id="function_defined_in_json_prompt"), param(function_defined_in_yaml_prompt, ["What is 3+3?", "exit"], id="function_defined_in_yaml_prompt"), + param(step1_agent, [], id="step1_agent"), + param(step2_plugins, [], id="step2_agent_plugins"), param( ollama_chat_completion, ["Why is the sky blue?", "exit"], diff --git a/python/tests/unit/agents/test_agent.py b/python/tests/unit/agents/test_agent.py new file mode 100644 index 000000000000..6094b649e1e7 --- /dev/null +++ b/python/tests/unit/agents/test_agent.py @@ -0,0 +1,64 @@ +# Copyright (c) Microsoft. All rights reserved. + +import uuid +from unittest.mock import AsyncMock + +import pytest + +from semantic_kernel.agents.agent import Agent +from semantic_kernel.agents.agent_channel import AgentChannel + + +class MockAgent(Agent): + """A mock agent for testing purposes.""" + + def __init__(self, name: str = "Test Agent", description: str = "A test agent", id: str = None): + args = { + "name": name, + "description": description, + } + if id is not None: + args["id"] = id + super().__init__(**args) + + def get_channel_keys(self) -> list[str]: + return ["key1", "key2"] + + async def create_channel(self) -> AgentChannel: + return AsyncMock(spec=AgentChannel) + + +@pytest.mark.asyncio +async def test_agent_initialization(): + name = "Test Agent" + description = "A test agent" + id_value = str(uuid.uuid4()) + + agent = MockAgent(name=name, description=description, id=id_value) + + assert agent.name == name + assert agent.description == description + assert agent.id == id_value + + +@pytest.mark.asyncio +async def test_agent_default_id(): + agent = MockAgent() + + assert agent.id is not None + assert isinstance(uuid.UUID(agent.id), uuid.UUID) + + +def test_get_channel_keys(): + agent = MockAgent() + keys = agent.get_channel_keys() + + assert keys == ["key1", "key2"] + + +@pytest.mark.asyncio +async def test_create_channel(): + agent = MockAgent() + channel = await agent.create_channel() + + assert isinstance(channel, AgentChannel) diff --git a/python/tests/unit/agents/test_agent_channel.py b/python/tests/unit/agents/test_agent_channel.py new file mode 100644 index 000000000000..20b61d956686 --- /dev/null +++ b/python/tests/unit/agents/test_agent_channel.py @@ -0,0 +1,64 @@ +# Copyright (c) Microsoft. All rights reserved. + +from collections.abc import AsyncIterable +from unittest.mock import AsyncMock + +import pytest + +from semantic_kernel.agents.agent import Agent +from semantic_kernel.agents.agent_channel import AgentChannel +from semantic_kernel.contents.chat_message_content import ChatMessageContent +from semantic_kernel.contents.utils.author_role import AuthorRole + + +class MockAgentChannel(AgentChannel): + async def receive(self, history: list[ChatMessageContent]) -> None: + pass + + async def invoke(self, agent: "Agent") -> AsyncIterable[ChatMessageContent]: + yield ChatMessageContent(role=AuthorRole.SYSTEM, content="test message") + + async def get_history(self) -> AsyncIterable[ChatMessageContent]: + yield ChatMessageContent(role=AuthorRole.SYSTEM, content="test history message") + + +@pytest.mark.asyncio +async def test_receive(): + mock_channel = AsyncMock(spec=MockAgentChannel) + + history = [ + ChatMessageContent(role=AuthorRole.SYSTEM, content="test message 1"), + ChatMessageContent(role=AuthorRole.USER, content="test message 2"), + ] + + await mock_channel.receive(history) + mock_channel.receive.assert_called_once_with(history) + + +@pytest.mark.asyncio +async def test_invoke(): + mock_channel = AsyncMock(spec=MockAgentChannel) + agent = AsyncMock() + + async def async_generator(): + yield ChatMessageContent(role=AuthorRole.SYSTEM, content="test message") + + mock_channel.invoke.return_value = async_generator() + + async for message in mock_channel.invoke(agent): + assert message.content == "test message" + mock_channel.invoke.assert_called_once_with(agent) + + +@pytest.mark.asyncio +async def test_get_history(): + mock_channel = AsyncMock(spec=MockAgentChannel) + + async def async_generator(): + yield ChatMessageContent(role=AuthorRole.SYSTEM, content="test history message") + + mock_channel.get_history.return_value = async_generator() + + async for message in mock_channel.get_history(): + assert message.content == "test history message" + mock_channel.get_history.assert_called_once() diff --git a/python/tests/unit/agents/test_chat_completion_agent.py b/python/tests/unit/agents/test_chat_completion_agent.py new file mode 100644 index 000000000000..7b40176cbfd1 --- /dev/null +++ b/python/tests/unit/agents/test_chat_completion_agent.py @@ -0,0 +1,213 @@ +# Copyright (c) Microsoft. All rights reserved. + +from unittest.mock import AsyncMock, create_autospec, patch + +import pytest + +from semantic_kernel.agents.chat_completion_agent import ChatCompletionAgent +from semantic_kernel.agents.chat_history_channel import ChatHistoryChannel +from semantic_kernel.connectors.ai.chat_completion_client_base import ChatCompletionClientBase +from semantic_kernel.contents.chat_history import ChatHistory +from semantic_kernel.contents.chat_message_content import ChatMessageContent +from semantic_kernel.contents.utils.author_role import AuthorRole +from semantic_kernel.exceptions import KernelServiceNotFoundError +from semantic_kernel.kernel import Kernel + + +@pytest.fixture +def mock_streaming_chat_completion_response() -> AsyncMock: + """A fixture that returns a mock response for a streaming chat completion response.""" + + async def mock_response(chat_history, settings, kernel): + content1 = ChatMessageContent(role=AuthorRole.SYSTEM, content="Processed Message 1") + content2 = ChatMessageContent(role=AuthorRole.TOOL, content="Processed Message 2") + chat_history.messages.append(content1) + chat_history.messages.append(content2) + yield [content1] + yield [content2] + + return mock_response + + +@pytest.mark.asyncio +async def test_initialization(): + agent = ChatCompletionAgent( + service_id="test_service", + name="Test Agent", + id="test_id", + description="Test Description", + instructions="Test Instructions", + ) + + assert agent.service_id == "test_service" + assert agent.name == "Test Agent" + assert agent.id == "test_id" + assert agent.description == "Test Description" + assert agent.instructions == "Test Instructions" + + +@pytest.mark.asyncio +async def test_initialization_no_service_id(): + agent = ChatCompletionAgent( + name="Test Agent", + id="test_id", + description="Test Description", + instructions="Test Instructions", + ) + + assert agent.service_id == "default" + assert agent.kernel is not None + assert agent.name == "Test Agent" + assert agent.id == "test_id" + assert agent.description == "Test Description" + assert agent.instructions == "Test Instructions" + + +@pytest.mark.asyncio +async def test_initialization_with_kernel(kernel: Kernel): + agent = ChatCompletionAgent( + kernel=kernel, + name="Test Agent", + id="test_id", + description="Test Description", + instructions="Test Instructions", + ) + + assert agent.service_id == "default" + assert kernel == agent.kernel + assert agent.name == "Test Agent" + assert agent.id == "test_id" + assert agent.description == "Test Description" + assert agent.instructions == "Test Instructions" + + +@pytest.mark.asyncio +async def test_invoke(): + kernel = create_autospec(Kernel) + kernel.get_service.return_value = create_autospec(ChatCompletionClientBase) + kernel.get_service.return_value.get_chat_message_contents = AsyncMock( + return_value=[ChatMessageContent(role=AuthorRole.SYSTEM, content="Processed Message")] + ) + agent = ChatCompletionAgent( + kernel=kernel, service_id="test_service", name="Test Agent", instructions="Test Instructions" + ) + + history = ChatHistory(messages=[ChatMessageContent(role=AuthorRole.USER, content="Initial Message")]) + + messages = [message async for message in agent.invoke(history)] + + assert len(messages) == 1 + assert messages[0].content == "Processed Message" + + +@pytest.mark.asyncio +async def test_invoke_tool_call_added(): + kernel = create_autospec(Kernel) + chat_completion_service = create_autospec(ChatCompletionClientBase) + kernel.get_service.return_value = chat_completion_service + agent = ChatCompletionAgent(kernel=kernel, service_id="test_service", name="Test Agent") + + history = ChatHistory(messages=[ChatMessageContent(role=AuthorRole.USER, content="Initial Message")]) + + async def mock_get_chat_message_contents(chat_history, settings, kernel): + new_messages = [ + ChatMessageContent(role=AuthorRole.ASSISTANT, content="Processed Message 1"), + ChatMessageContent(role=AuthorRole.TOOL, content="Processed Message 2"), + ] + chat_history.messages.extend(new_messages) + return new_messages + + chat_completion_service.get_chat_message_contents = AsyncMock(side_effect=mock_get_chat_message_contents) + + messages = [message async for message in agent.invoke(history)] + + assert len(messages) == 2 + assert messages[0].content == "Processed Message 1" + assert messages[1].content == "Processed Message 2" + + assert len(history.messages) == 3 + assert history.messages[1].content == "Processed Message 1" + assert history.messages[2].content == "Processed Message 2" + assert history.messages[1].name == "Test Agent" + assert history.messages[2].name == "Test Agent" + + +@pytest.mark.asyncio +async def test_invoke_no_service_throws(): + kernel = create_autospec(Kernel) + kernel.get_service.return_value = None + agent = ChatCompletionAgent(kernel=kernel, service_id="test_service", name="Test Agent") + + history = ChatHistory(messages=[ChatMessageContent(role=AuthorRole.USER, content="Initial Message")]) + + with pytest.raises(KernelServiceNotFoundError): + async for _ in agent.invoke(history): + pass + + +@pytest.mark.asyncio +async def test_invoke_stream(): + kernel = create_autospec(Kernel) + kernel.get_service.return_value = create_autospec(ChatCompletionClientBase) + + agent = ChatCompletionAgent(kernel=kernel, service_id="test_service", name="Test Agent") + + history = ChatHistory(messages=[ChatMessageContent(role=AuthorRole.USER, content="Initial Message")]) + + with patch( + "semantic_kernel.connectors.ai.chat_completion_client_base.ChatCompletionClientBase.get_streaming_chat_message_contents", + return_value=AsyncMock(), + ) as mock: + mock.return_value.__aiter__.return_value = [ + [ChatMessageContent(role=AuthorRole.USER, content="Initial Message")] + ] + + async for message in agent.invoke_stream(history): + assert message.role == AuthorRole.USER + assert message.content == "Initial Message" + + +@pytest.mark.asyncio +async def test_invoke_stream_tool_call_added(mock_streaming_chat_completion_response): + kernel = create_autospec(Kernel) + chat_completion_service = create_autospec(ChatCompletionClientBase) + kernel.get_service.return_value = chat_completion_service + agent = ChatCompletionAgent(kernel=kernel, service_id="test_service", name="Test Agent") + + history = ChatHistory(messages=[ChatMessageContent(role=AuthorRole.USER, content="Initial Message")]) + + chat_completion_service.get_streaming_chat_message_contents = mock_streaming_chat_completion_response + + async for message in agent.invoke_stream(history): + print(f"Message role: {message.role}, content: {message.content}") + assert message.role in [AuthorRole.SYSTEM, AuthorRole.TOOL] + assert message.content in ["Processed Message 1", "Processed Message 2"] + + assert len(history.messages) == 3 + + +@pytest.mark.asyncio +async def test_invoke_stream_no_service_throws(): + kernel = create_autospec(Kernel) + kernel.get_service.return_value = None + agent = ChatCompletionAgent(kernel=kernel, service_id="test_service", name="Test Agent") + + history = ChatHistory(messages=[ChatMessageContent(role=AuthorRole.USER, content="Initial Message")]) + + with pytest.raises(KernelServiceNotFoundError): + async for _ in agent.invoke_stream(history): + pass + + +def test_get_channel_keys(): + agent = ChatCompletionAgent() + keys = agent.get_channel_keys() + + assert keys == [ChatHistoryChannel.__name__] + + +def test_create_channel(): + agent = ChatCompletionAgent() + channel = agent.create_channel() + + assert isinstance(channel, ChatHistoryChannel) diff --git a/python/tests/unit/agents/test_chat_history_channel.py b/python/tests/unit/agents/test_chat_history_channel.py new file mode 100644 index 000000000000..b3160cb91ebf --- /dev/null +++ b/python/tests/unit/agents/test_chat_history_channel.py @@ -0,0 +1,93 @@ +# Copyright (c) Microsoft. All rights reserved. + +from collections.abc import AsyncIterable + +import pytest + +from semantic_kernel.agents.chat_history_channel import ChatHistoryAgentProtocol, ChatHistoryChannel +from semantic_kernel.contents.chat_message_content import ChatMessageContent +from semantic_kernel.contents.streaming_chat_message_content import StreamingChatMessageContent +from semantic_kernel.contents.utils.author_role import AuthorRole +from semantic_kernel.exceptions import ServiceInvalidTypeError + + +class MockChatHistoryHandler: + """Mock agent to test chat history handling""" + + async def invoke(self, history: list[ChatMessageContent]) -> AsyncIterable[ChatMessageContent]: + for message in history: + yield ChatMessageContent(role=AuthorRole.SYSTEM, content=f"Processed: {message.content}") + + async def invoke_stream(self, history: list[ChatMessageContent]) -> AsyncIterable["StreamingChatMessageContent"]: + pass + + +class MockNonChatHistoryHandler: + """Mock agent to test incorrect instance handling.""" + + id: str = "mock_non_chat_history_handler" + + +ChatHistoryAgentProtocol.register(MockChatHistoryHandler) + + +@pytest.mark.asyncio +async def test_invoke(): + channel = ChatHistoryChannel() + agent = MockChatHistoryHandler() + + initial_message = ChatMessageContent(role=AuthorRole.USER, content="Initial message") + channel.messages.append(initial_message) + + received_messages = [] + async for message in channel.invoke(agent): + received_messages.append(message) + break # only process one message for the test + + assert len(received_messages) == 1 + assert "Processed: Initial message" in received_messages[0].content + + +@pytest.mark.asyncio +async def test_invoke_incorrect_instance_throws(): + channel = ChatHistoryChannel() + agent = MockNonChatHistoryHandler() + + with pytest.raises(ServiceInvalidTypeError): + async for _ in channel.invoke(agent): + pass + + +@pytest.mark.asyncio +async def test_receive(): + channel = ChatHistoryChannel() + history = [ + ChatMessageContent(role=AuthorRole.SYSTEM, content="test message 1"), + ChatMessageContent(role=AuthorRole.USER, content="test message 2"), + ] + + await channel.receive(history) + + assert len(channel.messages) == 2 + assert channel.messages[0].content == "test message 1" + assert channel.messages[0].role == AuthorRole.SYSTEM + assert channel.messages[1].content == "test message 2" + assert channel.messages[1].role == AuthorRole.USER + + +@pytest.mark.asyncio +async def test_get_history(): + channel = ChatHistoryChannel() + history = [ + ChatMessageContent(role=AuthorRole.SYSTEM, content="test message 1"), + ChatMessageContent(role=AuthorRole.USER, content="test message 2"), + ] + channel.messages.extend(history) + + messages = [message async for message in channel.get_history()] + + assert len(messages) == 2 + assert messages[0].content == "test message 2" + assert messages[0].role == AuthorRole.USER + assert messages[1].content == "test message 1" + assert messages[1].role == AuthorRole.SYSTEM