diff --git a/src/a2a/server/events/event_consumer.py b/src/a2a/server/events/event_consumer.py index 56f4c66f..b6b05517 100644 --- a/src/a2a/server/events/event_consumer.py +++ b/src/a2a/server/events/event_consumer.py @@ -138,6 +138,10 @@ async def consume_all(self) -> AsyncGenerator[Event]: # python 3.12 and get a queue empty error on an open queue if self.queue.is_closed(): break + except Exception as e: + logger.error(f'Stopping event consumption due to exception: {e}') + self._exception = e + continue def agent_task_callback(self, agent_task: asyncio.Task[None]) -> None: """Callback to handle exceptions from the agent's execution task. diff --git a/src/a2a/server/request_handlers/default_request_handler.py b/src/a2a/server/request_handlers/default_request_handler.py index f98d06c0..4ef889ad 100644 --- a/src/a2a/server/request_handlers/default_request_handler.py +++ b/src/a2a/server/request_handlers/default_request_handler.py @@ -248,7 +248,9 @@ async def on_message_send( raise ServerError( InternalError(message='Task ID mismatch in agent response') ) - + except Exception as e: + logger.error(f'Agent execution failed. Error: {e}') + raise finally: if interrupted: # TODO: Track this disconnected cleanup task. diff --git a/src/a2a/server/tasks/task_manager.py b/src/a2a/server/tasks/task_manager.py index ca42b69b..27e67e1a 100644 --- a/src/a2a/server/tasks/task_manager.py +++ b/src/a2a/server/tasks/task_manager.py @@ -130,7 +130,10 @@ async def save_task_event( task.history = [task.status.message] else: task.history.append(task.status.message) - + if event.metadata: + if not task.metadata: + task.metadata = {} + task.metadata.update(event.metadata) task.status = event.status else: logger.debug('Appending artifact to task %s', task.id) diff --git a/tests/server/tasks/test_task_manager.py b/tests/server/tasks/test_task_manager.py index 365baf2b..a99ad701 100644 --- a/tests/server/tasks/test_task_manager.py +++ b/tests/server/tasks/test_task_manager.py @@ -127,6 +127,27 @@ async def test_save_task_event_artifact_update( updated_task.artifacts = [new_artifact] mock_task_store.save.assert_called_once_with(updated_task) +@pytest.mark.asyncio +async def test_save_task_event_metadata_update( + task_manager: TaskManager, mock_task_store: AsyncMock +) -> None: + """Test saving an updated metadata for an existing task.""" + initial_task = Task(**MINIMAL_TASK) + mock_task_store.get.return_value = initial_task + new_metadata = {"meta_key_test": "meta_value_test"} + + event = TaskStatusUpdateEvent( + taskId=MINIMAL_TASK['id'], + contextId=MINIMAL_TASK['contextId'], + metadata=new_metadata, + status=TaskStatus(state=TaskState.working), + final=False, + ) + await task_manager.save_task_event(event) + + updated_task = mock_task_store.save.call_args.args[0] + assert updated_task.metadata == new_metadata + @pytest.mark.asyncio async def test_ensure_task_existing(