From b26e838e10c90d9c7ba143b7cfa81bc1a9a648d5 Mon Sep 17 00:00:00 2001 From: cheliu Date: Wed, 25 Jun 2025 14:04:12 -0700 Subject: [PATCH 1/3] fix: Append metadata and context id when processing TaskStatusUpdateEvent, and add more error logs 1. Fix the bug that during TaskManager.save_task_event, metadata from the updated task event is lost 2. During EventQueue processing, if any of the processing asyncio task failed with an unexpected exception, this exception is silently caught but never exposed to user or logged in the console. Add more error logs to make errors visible during debugging. --- src/a2a/server/events/event_consumer.py | 4 ++++ .../default_request_handler.py | 4 +++- src/a2a/server/tasks/task_manager.py | 7 +++++- tests/server/tasks/test_task_manager.py | 23 +++++++++++++++++++ 4 files changed, 36 insertions(+), 2 deletions(-) diff --git a/src/a2a/server/events/event_consumer.py b/src/a2a/server/events/event_consumer.py index 56f4c66f..a73f6023 100644 --- a/src/a2a/server/events/event_consumer.py +++ b/src/a2a/server/events/event_consumer.py @@ -138,6 +138,10 @@ async def consume_all(self) -> AsyncGenerator[Event]: # python 3.12 and get a queue empty error on an open queue if self.queue.is_closed(): break + except Exception as e: + logger.debug(f'Stopping event consumption due to exception: {e}') + self._exception = e + continue def agent_task_callback(self, agent_task: asyncio.Task[None]) -> None: """Callback to handle exceptions from the agent's execution task. diff --git a/src/a2a/server/request_handlers/default_request_handler.py b/src/a2a/server/request_handlers/default_request_handler.py index f98d06c0..857a056e 100644 --- a/src/a2a/server/request_handlers/default_request_handler.py +++ b/src/a2a/server/request_handlers/default_request_handler.py @@ -248,7 +248,9 @@ async def on_message_send( raise ServerError( InternalError(message='Task ID mismatch in agent response') ) - + except Exception as e: + logger.error(f'Agent execution failed. Error: {e}') + raise e finally: if interrupted: # TODO: Track this disconnected cleanup task. diff --git a/src/a2a/server/tasks/task_manager.py b/src/a2a/server/tasks/task_manager.py index ca42b69b..6af03afc 100644 --- a/src/a2a/server/tasks/task_manager.py +++ b/src/a2a/server/tasks/task_manager.py @@ -130,7 +130,12 @@ async def save_task_event( task.history = [task.status.message] else: task.history.append(task.status.message) - + if event.contextId: + task.contextId = event.contextId + if event.metadata: + if not task.metadata: + task.metadata = {} + task.metadata.update(event.metadata) task.status = event.status else: logger.debug('Appending artifact to task %s', task.id) diff --git a/tests/server/tasks/test_task_manager.py b/tests/server/tasks/test_task_manager.py index 365baf2b..5ff6ddde 100644 --- a/tests/server/tasks/test_task_manager.py +++ b/tests/server/tasks/test_task_manager.py @@ -127,6 +127,29 @@ async def test_save_task_event_artifact_update( updated_task.artifacts = [new_artifact] mock_task_store.save.assert_called_once_with(updated_task) +@pytest.mark.asyncio +async def test_save_task_event_metadata_contextid_update( + task_manager: TaskManager, mock_task_store: AsyncMock +) -> None: + """Test saving an metadata and context update for an existing task.""" + initial_task = Task(**MINIMAL_TASK) + mock_task_store.get.return_value = initial_task + new_metadata = {"meta_key_test": "meta_value_test"} + new_context_id = 'new_context_id' + + event = TaskStatusUpdateEvent( + taskId=MINIMAL_TASK['id'], + contextId=new_context_id, + metadata=new_metadata, + status=TaskStatus(state=TaskState.working), + final=False, + ) + await task_manager.save_task_event(event) + + updated_task = mock_task_store.save.call_args.args[0] + assert updated_task.metadata == new_metadata + assert updated_task.contextId == new_context_id + @pytest.mark.asyncio async def test_ensure_task_existing( From 0ba9d745f5d1816582aa236c7af66174c07acb1a Mon Sep 17 00:00:00 2001 From: cheliu Date: Wed, 25 Jun 2025 15:47:46 -0700 Subject: [PATCH 2/3] fix: Resolve code assist comments --- src/a2a/server/events/event_consumer.py | 2 +- src/a2a/server/request_handlers/default_request_handler.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/a2a/server/events/event_consumer.py b/src/a2a/server/events/event_consumer.py index a73f6023..b6b05517 100644 --- a/src/a2a/server/events/event_consumer.py +++ b/src/a2a/server/events/event_consumer.py @@ -139,7 +139,7 @@ async def consume_all(self) -> AsyncGenerator[Event]: if self.queue.is_closed(): break except Exception as e: - logger.debug(f'Stopping event consumption due to exception: {e}') + logger.error(f'Stopping event consumption due to exception: {e}') self._exception = e continue diff --git a/src/a2a/server/request_handlers/default_request_handler.py b/src/a2a/server/request_handlers/default_request_handler.py index 857a056e..4ef889ad 100644 --- a/src/a2a/server/request_handlers/default_request_handler.py +++ b/src/a2a/server/request_handlers/default_request_handler.py @@ -250,7 +250,7 @@ async def on_message_send( ) except Exception as e: logger.error(f'Agent execution failed. Error: {e}') - raise e + raise finally: if interrupted: # TODO: Track this disconnected cleanup task. From 06699bab35cb466e65bd28a1d751e614028fb192 Mon Sep 17 00:00:00 2001 From: cheliu Date: Wed, 25 Jun 2025 16:49:28 -0700 Subject: [PATCH 3/3] Remove context_id support as this is currently being discussed --- src/a2a/server/tasks/task_manager.py | 2 -- tests/server/tasks/test_task_manager.py | 8 +++----- 2 files changed, 3 insertions(+), 7 deletions(-) diff --git a/src/a2a/server/tasks/task_manager.py b/src/a2a/server/tasks/task_manager.py index 6af03afc..27e67e1a 100644 --- a/src/a2a/server/tasks/task_manager.py +++ b/src/a2a/server/tasks/task_manager.py @@ -130,8 +130,6 @@ async def save_task_event( task.history = [task.status.message] else: task.history.append(task.status.message) - if event.contextId: - task.contextId = event.contextId if event.metadata: if not task.metadata: task.metadata = {} diff --git a/tests/server/tasks/test_task_manager.py b/tests/server/tasks/test_task_manager.py index 5ff6ddde..a99ad701 100644 --- a/tests/server/tasks/test_task_manager.py +++ b/tests/server/tasks/test_task_manager.py @@ -128,18 +128,17 @@ async def test_save_task_event_artifact_update( mock_task_store.save.assert_called_once_with(updated_task) @pytest.mark.asyncio -async def test_save_task_event_metadata_contextid_update( +async def test_save_task_event_metadata_update( task_manager: TaskManager, mock_task_store: AsyncMock ) -> None: - """Test saving an metadata and context update for an existing task.""" + """Test saving an updated metadata for an existing task.""" initial_task = Task(**MINIMAL_TASK) mock_task_store.get.return_value = initial_task new_metadata = {"meta_key_test": "meta_value_test"} - new_context_id = 'new_context_id' event = TaskStatusUpdateEvent( taskId=MINIMAL_TASK['id'], - contextId=new_context_id, + contextId=MINIMAL_TASK['contextId'], metadata=new_metadata, status=TaskStatus(state=TaskState.working), final=False, @@ -148,7 +147,6 @@ async def test_save_task_event_metadata_contextid_update( updated_task = mock_task_store.save.call_args.args[0] assert updated_task.metadata == new_metadata - assert updated_task.contextId == new_context_id @pytest.mark.asyncio