From 46d1fdaf2c72cbae71ad43a78aa0ada12b2b916c Mon Sep 17 00:00:00 2001 From: Guillaume Aquilina Date: Fri, 7 Feb 2025 09:37:04 -0500 Subject: [PATCH 1/3] feat: add test with complex payload --- tests/integration/conftest.py | 40 ++- .../integration/stream_agent_builder_test.py | 337 ++++++++++++++++++ 2 files changed, 363 insertions(+), 14 deletions(-) create mode 100644 tests/integration/stream_agent_builder_test.py diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index ac2cb40..7f16193 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -51,16 +51,30 @@ def mock_response( status_code=status_code, ) - def mock_stream(self, task_id: str = "city-to-capital"): + def mock_stream( + self, + task_id: str = "city-to-capital", + outputs: Optional[list[dict[str, Any]]] = None, + run_id: str = "1", + metadata: Optional[dict[str, Any]] = None, + ): + outputs = outputs or [ + {"capital": ""}, + {"capital": "Tok"}, + {"capital": "Tokyo"}, + ] + if metadata is None: + metadata = {"cost_usd": 0.01, "duration_seconds": 10.1} + + payloads = [{"id": run_id, "task_output": o} for o in outputs] + + final_payload = {**payloads[-1], **metadata} + payloads.append(final_payload) + streams = [f"data: {json.dumps(p)}\n\n".encode() for p in payloads] + self.httpx_mock.add_response( url=f"https://run.workflowai.dev/v1/_/agents/{task_id}/schemas/1/run", - stream=IteratorStream( - [ - b'data: {"id":"1","task_output":{"capital":""}}\n\n', - b'data: {"id":"1","task_output":{"capital":"Tok"}}\n\ndata: {"id":"1","task_output":{"capital":"Tokyo"}}\n\n', # noqa: E501 - b'data: {"id":"1","task_output":{"capital":"Tokyo"},"cost_usd":0.01,"duration_seconds":10.1}\n\n', - ], - ), + stream=IteratorStream(streams), ) def check_request( @@ -70,15 +84,13 @@ def check_request( task_input: Optional[dict[str, Any]] = None, **matchers: Any, ): + if not matchers: + matchers = {"url": f"https://run.workflowai.dev/v1/_/agents/{task_id}/schemas/1/run"} request = self.httpx_mock.get_request(**matchers) assert request is not None - assert request.url == f"https://run.workflowai.dev/v1/_/agents/{task_id}/schemas/1/run" body = json.loads(request.content) - assert body == { - "task_input": task_input or {"city": "Hello"}, - "version": version, - "stream": False, - } + assert body["task_input"] == task_input or {"city": "Hello"} + assert body["version"] == version assert request.headers["Authorization"] == "Bearer test" assert request.headers["Content-Type"] == "application/json" assert request.headers["x-workflowai-source"] == "sdk" diff --git a/tests/integration/stream_agent_builder_test.py b/tests/integration/stream_agent_builder_test.py new file mode 100644 index 0000000..345f587 --- /dev/null +++ b/tests/integration/stream_agent_builder_test.py @@ -0,0 +1,337 @@ +from collections.abc import AsyncIterator +from enum import Enum +from typing import Any, Literal, Optional, Union + +from pydantic import BaseModel, Field # pyright: ignore [reportUnknownVariableType] +from typing_extensions import TypeAlias + +import workflowai +from tests.integration.conftest import IntTestClient +from workflowai.core.domain.model import Model + + +class ChatMessage(BaseModel): + role: Literal["USER", "ASSISTANT"] = Field( + description="The role of the message sender", + examples=["USER", "ASSISTANT"], + ) + content: str = Field( + description="The content of the message", + examples=[ + "Thank you for your help!", + "What is the weather forecast for tomorrow?", + ], + ) + + +class UserChatMessage(ChatMessage): + role: Literal["USER", "ASSISTANT"] = "USER" + + +class AssistantChatMessage(ChatMessage): + role: Literal["USER", "ASSISTANT"] = "ASSISTANT" + + +class Product(BaseModel): + name: Optional[str] = None + features: Optional[list[str]] = None + description: Optional[str] = None + target_users: Optional[list[str]] = None + + +class AgentSchemaJson(BaseModel): + agent_name: str = Field(description="The name of the agent in Title Case", serialization_alias="task_name") + input_json_schema: Optional[dict[str, Any]] = Field( + default=None, + description="The JSON schema of the agent input", + ) + output_json_schema: Optional[dict[str, Any]] = Field( + default=None, + description="The JSON schema of the agent output", + ) + + +InputFieldType: TypeAlias = Optional[ + Union["InputGenericFieldConfig", "EnumFieldConfig", "InputArrayFieldConfig", "InputObjectFieldConfig"] +] + +OutputFieldType: TypeAlias = Optional[ + Union[ + "OutputGenericFieldConfig", + "OutputStringFieldConfig", + "EnumFieldConfig", + "OutputArrayFieldConfig", + "OutputObjectFieldConfig", + ] +] +InputItemType: TypeAlias = Optional[Union["EnumFieldConfig", "InputObjectFieldConfig", "InputGenericFieldConfig"]] +OutputItemType: TypeAlias = Optional[ + Union["OutputStringFieldConfig", "EnumFieldConfig", "OutputObjectFieldConfig", "OutputGenericFieldConfig"] +] + + +class InputSchemaFieldType(Enum): + STRING = "string" + NUMBER = "number" + BOOLEAN = "boolean" + AUDIO_FILE = "audio_file" + IMAGE_FILE = "image_file" + DOCUMENT_FILE = "document_file" # Include various text formats, pdfs and images + DATE = "date" + DATETIME = "datetime" + TIMEZONE = "timezone" + URL = "url" + EMAIL = "email" + HTML = "html" + + +class OutputSchemaFieldType(Enum): + NUMBER = "number" + BOOLEAN = "boolean" + DATE = "date" + DATETIME = "datetime" + DATETIME_LOCAL = "datetime_local" + TIMEZONE = "timezone" + URL = "url" + EMAIL = "email" + HTML = "html" + + +class BaseFieldConfig(BaseModel): + name: Optional[str] = Field( + default=None, + description="The name of the field, must be filled when the field is an object field", + ) + description: Optional[str] = Field(default=None, description="The description of the field") + + +class InputGenericFieldConfig(BaseFieldConfig): + type: Optional[InputSchemaFieldType] = Field(default=None, description="The type of the field") + + +class OutputStringFieldConfig(BaseFieldConfig): + type: Literal["string"] = "string" + examples: Optional[list[str]] = Field(default=None, description="The examples of the field") + + +class EnumFieldConfig(BaseFieldConfig): + type: Literal["enum"] = "enum" + values: Optional[list[str]] = Field(default=None, description="The possible values of the enum") + + +class InputObjectFieldConfig(BaseFieldConfig): + type: Literal["object"] = "object" + fields: list[InputFieldType] = Field(description="The fields of the object", default_factory=list) + + +class InputArrayFieldConfig(BaseFieldConfig): + type: Literal["array"] = "array" + item_type: InputItemType = Field(default=None, description="The type of the items in the array") + + +class OutputGenericFieldConfig(BaseFieldConfig): + type: OutputSchemaFieldType = Field(default=None, description="The type of the field") + + +class OutputObjectFieldConfig(BaseFieldConfig): + type: Literal["object"] = "object" + fields: list[OutputFieldType] = Field(description="The fields of the object", default_factory=list) + + +class OutputArrayFieldConfig(BaseFieldConfig): + type: Literal["array"] = "array" + item_type: OutputItemType = Field(default=None, description="The type of the items in the array") + + +class AgentBuilderInput(BaseModel): + previous_messages: list[ChatMessage] = Field( + description="List of previous messages exchanged between the user and the assistant", + ) + new_message: ChatMessage = Field( + description="The new message received from the user, based on which the routing decision is made", + ) + existing_agent_schema: Optional[AgentSchemaJson] = Field( + default=None, + description="The previous agent schema, to update, if any", + ) + available_tools_description: Optional[str] = Field( + default=None, + description="The description of the available tools", + ) + + class UserContent(BaseModel): + company_name: Optional[str] = None + company_description: Optional[str] = None + company_locations: Optional[list[str]] = None + company_industries: Optional[list[str]] = None + company_products: Optional[list[Product]] = None + current_agents: Optional[list[str]] = Field( + default=None, + description="The list of existing agents for the company", + ) + + user_context: Optional[UserContent] = Field( + default=None, + description="The context of the user, to inform the decision about the new agents schema", + ) + + +class AgentSchemaField(BaseModel): + agent_name: str = Field(description="The name of the agent in Title Case", default="") + input_schema: Optional[InputObjectFieldConfig] = Field(description="The schema of the agent input", default=None) + output_schema: Optional[OutputObjectFieldConfig] = Field(description="The schema of the agent output", default=None) + + +class AgentBuilderOutput(BaseModel): + answer_to_user: str = Field(description="The answer to the user, after processing of the 'new_message'", default="") + + new_agent_schema: Optional[AgentSchemaField] = Field( + description="The new agent schema, if any, after processing of the 'new_message'", + default=None, + ) + + +@workflowai.agent(id="chattaskschemageneration", model=Model.CLAUDE_3_5_SONNET_LATEST) +def stream_agent_builder(_: AgentBuilderInput) -> AsyncIterator[AgentBuilderOutput]: + """bla""" + ... + + +async def test_input_payload(test_client: IntTestClient): + builder_input = AgentBuilderInput( + previous_messages=[ + ChatMessage(role="USER", content="Hello"), + ChatMessage(role="ASSISTANT", content="Hello, how can I help you today?"), + ], + new_message=ChatMessage( + role="USER", + content="I want to create an agent that extracts the main colors from an image", + ), + existing_agent_schema=AgentSchemaJson( + agent_name="extract_colors", + input_json_schema={ + "type": "object", + "properties": { + "image_url": { + "type": "string", + "description": "The URL of the image to extract colors from", + }, + }, + }, + output_json_schema={ + "type": "object", + "properties": { + "colors": { + "type": "array", + "items": {"type": "string"}, + "description": "The main colors of the image", + }, + }, + }, + ), + available_tools_description="You can use the following tools to help you create the agent schema", + ) + + test_client.mock_register(task_id="chattaskschemageneration") + test_client.mock_stream( + task_id="chattaskschemageneration", + outputs=[ + {"answer_to_user": "a"}, + {"answer_to_user": "hello"}, + {"answer_to_user": "hello", "new_agent_schema": {"agent_name": "extract_colors"}}, + { + "answer_to_user": "hello", + "new_agent_schema": { + "agent_name": "extract_colors", + "input_schema": { + "type": "object", + "properties": { + "image_url": { + "type": "string", + }, + }, + }, + }, + }, + { + "answer_to_user": "hello", + "new_agent_schema": { + "agent_name": "extract_colors", + "input_schema": { + "type": "object", + "properties": { + "image_url": { + "type": "string", + }, + }, + }, + "output_schema": { + "type": "object", + "properties": { + "colors": { + "type": "array", + }, + }, + }, + }, + }, + ], + ) + + chunks = [c async for c in stream_agent_builder(builder_input)] + assert len(chunks) == 6 + + expected_version = { + "instructions": "bla", + "model": "claude-3-5-sonnet-latest", + } + + expected_task_input = { + "city": "Hello", + "available_tools_description": "You can use the following tools to help you create the agent schema", + "existing_agent_schema": { + "input_json_schema": { + "properties": { + "image_url": { + "description": "The URL of the image to extract colors from", + "type": "string", + }, + }, + "type": "object", + }, + "output_json_schema": { + "properties": { + "colors": { + "description": "The main colors of the image", + "items": { + "type": "string", + }, + "type": "array", + }, + }, + "type": "object", + }, + "task_name": "extract_colors", + }, + "new_message": { + "content": "I want to create an agent that extracts the main colors from an image", + "role": "USER", + }, + "previous_messages": [ + { + "content": "Hello", + "role": "USER", + }, + { + "content": "Hello, how can I help you today?", + "role": "ASSISTANT", + }, + ], + "user_context": None, + } + + test_client.check_request( + task_id="chattaskschemageneration", + task_input=expected_task_input, + version=expected_version, + ) From 12b7d1e3e71d87066383f3b3e654fa236c623843 Mon Sep 17 00:00:00 2001 From: Guillaume Aquilina Date: Fri, 7 Feb 2025 10:28:14 -0500 Subject: [PATCH 2/3] feat: sanitize json schemas with aliases --- tests/integration/conftest.py | 27 +++++- tests/integration/run_test.py | 4 - .../integration/stream_agent_builder_test.py | 13 +++ workflowai/core/client/agent.py | 11 ++- workflowai/core/client/agent_test.py | 85 +++++++++++++++++++ 5 files changed, 133 insertions(+), 7 deletions(-) diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index 7f16193..eb6361a 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -1,5 +1,6 @@ import json -from typing import Any, Optional +from collections.abc import Callable +from typing import Any, Optional, Union from unittest.mock import patch import pytest @@ -77,6 +78,30 @@ def mock_stream( stream=IteratorStream(streams), ) + def check_register( + self, + task_id: str = "city-to-capital", + input_schema: Optional[Union[dict[str, Any], Callable[[dict[str, Any]], None]]] = None, + output_schema: Optional[Union[dict[str, Any], Callable[[dict[str, Any]], None]]] = None, + ): + request = self.httpx_mock.get_request(url=self.REGISTER_URL) + assert request is not None + assert request.headers["Authorization"] == "Bearer test" + assert request.headers["Content-Type"] == "application/json" + assert request.headers["x-workflowai-source"] == "sdk" + assert request.headers["x-workflowai-language"] == "python" + + body = json.loads(request.content) + assert body["id"] == task_id + if callable(input_schema): + input_schema(body["input_schema"]) + else: + assert body["input_schema"] == input_schema or {"city": {"type": "string"}} + if callable(output_schema): + output_schema(body["output_schema"]) + else: + assert body["output_schema"] == output_schema or {"capital": {"type": "string"}} + def check_request( self, version: Any = "production", diff --git a/tests/integration/run_test.py b/tests/integration/run_test.py index b6e6f1d..44683a4 100644 --- a/tests/integration/run_test.py +++ b/tests/integration/run_test.py @@ -112,27 +112,23 @@ async def city_to_capital(task_input: CityToCapitalTaskInput) -> CityToCapitalTa "input_schema": { "properties": { "city": { - "title": "City", "type": "string", }, }, "required": [ "city", ], - "title": "CityToCapitalTaskInput", "type": "object", }, "output_schema": { "properties": { "capital": { - "title": "Capital", "type": "string", }, }, "required": [ "capital", ], - "title": "CityToCapitalTaskOutput", "type": "object", }, } diff --git a/tests/integration/stream_agent_builder_test.py b/tests/integration/stream_agent_builder_test.py index 345f587..9aaccad 100644 --- a/tests/integration/stream_agent_builder_test.py +++ b/tests/integration/stream_agent_builder_test.py @@ -335,3 +335,16 @@ async def test_input_payload(test_client: IntTestClient): task_input=expected_task_input, version=expected_version, ) + + def _check_input_schema(x: dict[str, Any]): + # Checking the alias + assert x["$defs"]["AgentSchemaJson"]["properties"]["task_name"] == { + "type": "string", + "description": "The name of the agent in Title Case", + } + + test_client.check_register( + task_id="chattaskschemageneration", + input_schema=_check_input_schema, + output_schema=lambda _: None, + ) diff --git a/workflowai/core/client/agent.py b/workflowai/core/client/agent.py index ddc4fc6..c4dab8c 100644 --- a/workflowai/core/client/agent.py +++ b/workflowai/core/client/agent.py @@ -28,6 +28,7 @@ from workflowai.core.domain.tool_call import ToolCallRequest, ToolCallResult from workflowai.core.domain.version_properties import VersionProperties from workflowai.core.domain.version_reference import VersionReference +from workflowai.core.utils._schema_generator import JsonSchemaGenerator class Agent(Generic[AgentInput, AgentOutput]): @@ -149,8 +150,14 @@ async def register(self): "/v1/_/agents", CreateAgentRequest( id=self.agent_id, - input_schema=self.input_cls.model_json_schema(), - output_schema=self.output_cls.model_json_schema(), + input_schema=self.input_cls.model_json_schema( + mode="serialization", + schema_generator=JsonSchemaGenerator, + ), + output_schema=self.output_cls.model_json_schema( + mode="validation", + schema_generator=JsonSchemaGenerator, + ), ), returns=CreateAgentResponse, ) diff --git a/workflowai/core/client/agent_test.py b/workflowai/core/client/agent_test.py index ec4d881..7c80ffc 100644 --- a/workflowai/core/client/agent_test.py +++ b/workflowai/core/client/agent_test.py @@ -3,6 +3,7 @@ import httpx import pytest +from pydantic import BaseModel, Field # pyright: ignore [reportUnknownVariableType] from pytest_httpx import HTTPXMock, IteratorStream from tests.models.hello_task import ( @@ -256,3 +257,87 @@ async def test_auto_register(self, httpx_mock: HTTPXMock, agent_no_schema: Agent assert reqs[0].url == "http://localhost:8000/v1/_/agents" assert reqs[1].url == "http://localhost:8000/v1/_/agents/123/schemas/2/run" assert reqs[2].url == "http://localhost:8000/v1/_/agents/123/schemas/2/run" + + register_body = json.loads(reqs[0].content) + assert register_body["input_schema"] == { + "type": "object", + "properties": { + "name": {"type": "string"}, + }, + "required": ["name"], + } + assert register_body["output_schema"] == { + "type": "object", + "properties": { + "message": {"type": "string"}, + }, + "required": ["message"], + } + + async def test_with_alias(self, httpx_mock: HTTPXMock, api_client: APIClient): + class AliasInput(BaseModel): + name: str = Field(alias="name_alias") + aliased_ser: str = Field(serialization_alias="aliased_ser_alias") + aliased_val: str = Field(validation_alias="aliased_val_alias") + + class AliasOutput(BaseModel): + message: str = Field(alias="message_alias") + aliased_ser: str = Field(serialization_alias="aliased_ser_alias") + aliased_val: str = Field(validation_alias="aliased_val_alias") + + agent = Agent(agent_id="123", input_cls=AliasInput, output_cls=AliasOutput, api=api_client) + + httpx_mock.add_response(url="http://localhost:8000/v1/_/agents", json={"id": "123", "schema_id": 2}) + + httpx_mock.add_response( + url="http://localhost:8000/v1/_/agents/123/schemas/2/run", + json={ + "id": "1", + # task output should be compatible with the output schema below + "task_output": { + "message_alias": "1", + "aliased_ser": "2", + "aliased_val_alias": "3", + }, + }, + ) + + out2 = await agent.run( + # Using model validate instead of constructing directly, since pyright does not + # Understand asymmetric aliases + AliasInput.model_validate({"name_alias": "1", "aliased_ser": "2", "aliased_val_alias": "3"}), + ) + assert out2.output.message == "1" + assert out2.output.aliased_ser == "2" + assert out2.output.aliased_val == "3" + + register_req = httpx_mock.get_request(url="http://localhost:8000/v1/_/agents") + assert register_req + register_body = json.loads(register_req.content) + assert register_body["input_schema"] == { + "type": "object", + "properties": { + "name_alias": {"type": "string"}, + "aliased_ser_alias": {"type": "string"}, + "aliased_val": {"type": "string"}, + }, + "required": ["name_alias", "aliased_ser_alias", "aliased_val"], + } + assert register_body["output_schema"] == { + "type": "object", + "properties": { + "message_alias": {"type": "string"}, + "aliased_ser": {"type": "string"}, + "aliased_val_alias": {"type": "string"}, + }, + "required": ["message_alias", "aliased_ser", "aliased_val_alias"], + } + + run_req = httpx_mock.get_request(url="http://localhost:8000/v1/_/agents/123/schemas/2/run") + assert run_req + # Task input should be compatible with the input schema + assert json.loads(run_req.content)["task_input"] == { + "name_alias": "1", + "aliased_ser_alias": "2", + "aliased_val": "3", + } From 723930ba7ed156c74d34380e6cbd3f42bb1afdfc Mon Sep 17 00:00:00 2001 From: Guillaume Aquilina Date: Fri, 7 Feb 2025 13:47:13 -0500 Subject: [PATCH 3/3] fix: sanitize agent serialization --- pyproject.toml | 2 +- workflowai/core/_common_types.py | 6 ++++++ workflowai/core/client/_models.py | 2 ++ workflowai/core/client/agent.py | 1 + workflowai/core/client/client_test.py | 1 + workflowai/core/domain/run.py | 6 +----- 6 files changed, 12 insertions(+), 6 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index dd69710..1ecac8d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "workflowai" -version = "0.6.0.dev6" +version = "0.6.0.dev7" description = "" authors = ["Guillaume Aquilina "] readme = "README.md" diff --git a/workflowai/core/_common_types.py b/workflowai/core/_common_types.py index af3f595..1d3032d 100644 --- a/workflowai/core/_common_types.py +++ b/workflowai/core/_common_types.py @@ -1,4 +1,5 @@ from typing import ( + Annotated, Any, Generic, Optional, @@ -33,4 +34,9 @@ class BaseRunParams(TypedDict): class RunParams(BaseRunParams, Generic[AgentOutput]): + id: Annotated[ + NotRequired[str], + "A user defined ID for the run. The ID must be a UUID7, ordered by creation time." + "If not provided, a UUID7 will be assigned by the server", + ] validator: NotRequired[OutputValidator["AgentOutput"]] diff --git a/workflowai/core/client/_models.py b/workflowai/core/client/_models.py index 06fce4a..2f4e429 100644 --- a/workflowai/core/client/_models.py +++ b/workflowai/core/client/_models.py @@ -19,6 +19,8 @@ class RunRequest(BaseModel): + id: Optional[str] = Field(default=None, description="A cliend defined ID. Must be a UUID7") + task_input: dict[str, Any] version: Union[str, int, dict[str, Any]] diff --git a/workflowai/core/client/agent.py b/workflowai/core/client/agent.py index c4dab8c..f61d1db 100644 --- a/workflowai/core/client/agent.py +++ b/workflowai/core/client/agent.py @@ -100,6 +100,7 @@ async def _prepare_run(self, task_input: AgentInput, stream: bool, **kwargs: Unp version = self._sanitize_version(kwargs.get("version")) request = RunRequest( + id=kwargs.get("id"), task_input=task_input.model_dump(by_alias=True), version=version, stream=stream, diff --git a/workflowai/core/client/client_test.py b/workflowai/core/client/client_test.py index 9f81c16..f152628 100644 --- a/workflowai/core/client/client_test.py +++ b/workflowai/core/client/client_test.py @@ -39,6 +39,7 @@ async def test_run_output_only(self, workflowai: WorkflowAI, mock_run_fn: Mock): async def fn(task_input: HelloTaskInput) -> HelloTaskOutput: ... mock_run_fn.return_value = Run( + id="1", output=HelloTaskOutput(message="hello"), agent_id="123", schema_id=1, diff --git a/workflowai/core/domain/run.py b/workflowai/core/domain/run.py index 3b2281a..1db9aed 100644 --- a/workflowai/core/domain/run.py +++ b/workflowai/core/domain/run.py @@ -1,4 +1,3 @@ -import uuid from collections.abc import Iterable from typing import Any, Generic, Optional, Protocol @@ -21,10 +20,7 @@ class Run(BaseModel, Generic[AgentOutput]): been evaluated """ - id: str = Field( - default_factory=lambda: str(uuid.uuid4()), - description="The unique identifier of the run. This is a UUIDv7.", - ) + id: str agent_id: str schema_id: int output: AgentOutput