From 46bce279499fb3477f0e61588cc9cd7390a00960 Mon Sep 17 00:00:00 2001 From: DmitriyK Date: Wed, 18 Mar 2026 16:59:08 -0700 Subject: [PATCH 1/2] Agents graph: Count and show agent executions per agent --- src/conductor/engine/limits.py | 25 +++ src/conductor/engine/workflow.py | 16 +- .../test_agent_iteration_counter.py | 154 ++++++++++++++++++ 3 files changed, 193 insertions(+), 2 deletions(-) create mode 100644 tests/test_engine/test_agent_iteration_counter.py diff --git a/src/conductor/engine/limits.py b/src/conductor/engine/limits.py index 65c7cca..a0baec5 100644 --- a/src/conductor/engine/limits.py +++ b/src/conductor/engine/limits.py @@ -249,6 +249,31 @@ def get_elapsed_time(self) -> float: return 0.0 return time.monotonic() - self.start_time + def get_agent_execution_count(self, agent_name: str) -> int: + """Get the number of times a specific agent has been executed. + + Counts occurrences of agent_name in execution_history to determine + how many times this agent has run. This is used for per-agent + iteration tracking in the web dashboard. + + Args: + agent_name: Name of the agent to count. + + Returns: + Number of times the agent has been executed (0 if never executed). + + Example: + >>> enforcer = LimitEnforcer() + >>> enforcer.execution_history = ["agent1", "agent2", "agent1", "agent1"] + >>> enforcer.get_agent_execution_count("agent1") + 3 + >>> enforcer.get_agent_execution_count("agent2") + 1 + >>> enforcer.get_agent_execution_count("agent3") + 0 + """ + return self.execution_history.count(agent_name) + def get_remaining_timeout(self) -> float | None: """Get the remaining time before timeout. diff --git a/src/conductor/engine/workflow.py b/src/conductor/engine/workflow.py index d8eb438..1195794 100644 --- a/src/conductor/engine/workflow.py +++ b/src/conductor/engine/workflow.py @@ -1239,11 +1239,17 @@ async def _execute_loop(self, current_agent_name: str) -> dict[str, Any]: iteration = self.limits.current_iteration + 1 _verbose_log_agent_start(current_agent_name, iteration) + # Count how many times this specific agent has been executed + # (for per-agent iteration tracking in the web dashboard) + agent_execution_count = ( + self.limits.get_agent_execution_count(agent.name) + 1 + ) + self._emit( "agent_started", { "agent_name": agent.name, - "iteration": iteration, + "iteration": agent_execution_count, "agent_type": agent.type or "agent", }, ) @@ -1330,11 +1336,17 @@ async def _execute_loop(self, current_agent_name: str) -> dict[str, Any]: ) _script_start = _time.time() + # Count how many times this specific script has been executed + # (for per-agent iteration tracking in the web dashboard) + script_execution_count = ( + self.limits.get_agent_execution_count(agent.name) + 1 + ) + self._emit( "script_started", { "agent_name": agent.name, - "iteration": self.limits.current_iteration + 1, + "iteration": script_execution_count, }, ) diff --git a/tests/test_engine/test_agent_iteration_counter.py b/tests/test_engine/test_agent_iteration_counter.py new file mode 100644 index 0000000..75f25fc --- /dev/null +++ b/tests/test_engine/test_agent_iteration_counter.py @@ -0,0 +1,154 @@ +"""Tests for per-agent iteration counter in workflow events. + +This module tests that the iteration counter sent in agent_started events +is agent-specific (counts how many times each agent has run), not a global +workflow iteration counter. +""" + +import pytest + +from conductor.config.schema import AgentDef, RouteDef, WorkflowConfig, WorkflowDef +from conductor.engine.workflow import WorkflowEngine +from conductor.events import WorkflowEvent, WorkflowEventEmitter +from conductor.providers.factory import create_provider + + +class EventCollector(WorkflowEventEmitter): + """Simple event emitter that collects events for testing.""" + + def __init__(self): + self.events: list[WorkflowEvent] = [] + + def emit(self, event: WorkflowEvent) -> None: + """Store event in the list.""" + self.events.append(event) + + def get_agent_started_events(self, agent_name: str) -> list[WorkflowEvent]: + """Get all agent_started events for a specific agent.""" + return [ + e + for e in self.events + if e.type == "agent_started" and e.data.get("agent_name") == agent_name + ] + + +class TestPerAgentIterationCounter: + """Test that iteration counters are per-agent, not global.""" + + @pytest.mark.asyncio + async def test_single_agent_loop_counter(self): + """Test that a single looping agent gets correct iteration counts.""" + # Create a workflow where one agent loops back to itself + config = WorkflowConfig( + workflow=WorkflowDef( + name="single-loop", + entry_point="looper", + ), + agents=[ + AgentDef( + name="looper", + type="agent", + model="gpt-4", + prompt="Loop iteration {{ context.iteration }}", + routes=[ + RouteDef(to="looper", when="{{ context.iteration < 3 }}"), + RouteDef(to="$end"), + ], + ) + ], + output={"result": "done"}, + ) + + provider = await create_provider("claude", validate=False) + collector = EventCollector() + + try: + engine = WorkflowEngine( + config, + provider=provider, + event_emitter=collector, + skip_gates=True, + ) + + await engine.run({}) + + # Get agent_started events for 'looper' + looper_events = collector.get_agent_started_events("looper") + + # Loop runs while context.iteration < 3 (iterations 0, 1, 2) + assert len(looper_events) == 3 + + # Check iteration counts - should be [1, 2, 3] + # (first execution is iteration 1, second is 2, third is 3) + iterations = [e.data["iteration"] for e in looper_events] + assert iterations == [1, 2, 3] + + finally: + await provider.close() + + @pytest.mark.asyncio + async def test_multi_agent_loop_counter(self): + """Test that each agent in a loop gets independent iteration counts.""" + # Create a workflow where two agents loop back and forth + config = WorkflowConfig( + workflow=WorkflowDef( + name="multi-loop", + entry_point="agent_a", + ), + agents=[ + AgentDef( + name="agent_a", + type="agent", + model="gpt-4", + prompt="Agent A iteration {{ context.iteration }}", + routes=[ + RouteDef(to="agent_b"), + ], + ), + AgentDef( + name="agent_b", + type="agent", + model="gpt-4", + prompt="Agent B iteration {{ context.iteration }}", + routes=[ + RouteDef(to="agent_a", when="{{ context.iteration < 5 }}"), + RouteDef(to="$end"), + ], + ), + ], + output={"result": "done"}, + ) + + provider = await create_provider("claude", validate=False) + collector = EventCollector() + + try: + engine = WorkflowEngine( + config, + provider=provider, + event_emitter=collector, + skip_gates=True, + ) + + await engine.run({}) + + # Get agent_started events for each agent + agent_a_events = collector.get_agent_started_events("agent_a") + agent_b_events = collector.get_agent_started_events("agent_b") + + # agent_a runs 3 times (iterations 0, 2, 4) + assert len(agent_a_events) == 3 + # agent_b runs 3 times (iterations 1, 3, 5) + assert len(agent_b_events) == 3 + + # Check that each agent's iteration counter is independent + # agent_a should have iteration counts [1, 2, 3] + agent_a_iterations = [e.data["iteration"] for e in agent_a_events] + assert agent_a_iterations == [1, 2, 3] + + # agent_b should have iteration counts [1, 2, 3] + agent_b_iterations = [e.data["iteration"] for e in agent_b_events] + assert agent_b_iterations == [1, 2, 3] + + finally: + await provider.close() From 2b7a64b137e8123efed1269f08f9ff9f748b7a62 Mon Sep 17 00:00:00 2001 From: DmitriyK Date: Thu, 19 Mar 2026 11:57:47 -0700 Subject: [PATCH 2/2] Update the unit tests to remove external dependencies --- .../test_agent_iteration_counter.py | 88 +++++++++---------- 1 file changed, 41 insertions(+), 47 deletions(-) diff --git a/tests/test_engine/test_agent_iteration_counter.py b/tests/test_engine/test_agent_iteration_counter.py index 75f25fc..ae0983a 100644 --- a/tests/test_engine/test_agent_iteration_counter.py +++ b/tests/test_engine/test_agent_iteration_counter.py @@ -10,7 +10,7 @@ from conductor.config.schema import AgentDef, RouteDef, WorkflowConfig, WorkflowDef from conductor.engine.workflow import WorkflowEngine from conductor.events import WorkflowEvent, WorkflowEventEmitter -from conductor.providers.factory import create_provider +from conductor.providers.copilot import CopilotProvider class EventCollector(WorkflowEventEmitter): @@ -59,32 +59,29 @@ async def test_single_agent_loop_counter(self): output={"result": "done"}, ) - provider = await create_provider("claude", validate=False) + # Use mock provider to avoid external dependencies + provider = CopilotProvider(mock_handler=lambda a, p, c: {"result": "ok"}) collector = EventCollector() - try: - engine = WorkflowEngine( - config, - provider=provider, - event_emitter=collector, - skip_gates=True, - ) - - await engine.run({}) + engine = WorkflowEngine( + config, + provider=provider, + event_emitter=collector, + skip_gates=True, + ) - # Get agent_started events for 'looper' - looper_events = collector.get_agent_started_events("looper") + await engine.run({}) - # Loop runs while context.iteration < 3 (iterations 0, 1, 2) - assert len(looper_events) == 3 + # Get agent_started events for 'looper' + looper_events = collector.get_agent_started_events("looper") - # Check iteration counts - should be [1, 2, 3] - # (first execution is iteration 1, second is 2, third is 3) - iterations = [e.data["iteration"] for e in looper_events] - assert iterations == [1, 2, 3] + # Loop runs while context.iteration < 3 (iterations 0, 1, 2) + assert len(looper_events) == 3 - finally: - await provider.close() + # Check iteration counts - should be [1, 2, 3] + # (first execution is iteration 1, second is 2, third is 3) + iterations = [e.data["iteration"] for e in looper_events] + assert iterations == [1, 2, 3] @pytest.mark.asyncio async def test_multi_agent_loop_counter(self): @@ -119,36 +116,33 @@ async def test_multi_agent_loop_counter(self): output={"result": "done"}, ) - provider = await create_provider("claude", validate=False) + # Use mock provider to avoid external dependencies + provider = CopilotProvider(mock_handler=lambda a, p, c: {"result": "ok"}) collector = EventCollector() - try: - engine = WorkflowEngine( - config, - provider=provider, - event_emitter=collector, - skip_gates=True, - ) - - await engine.run({}) + engine = WorkflowEngine( + config, + provider=provider, + event_emitter=collector, + skip_gates=True, + ) - # Get agent_started events for each agent - agent_a_events = collector.get_agent_started_events("agent_a") - agent_b_events = collector.get_agent_started_events("agent_b") + await engine.run({}) - # agent_a runs 3 times (iterations 0, 2, 4) - assert len(agent_a_events) == 3 - # agent_b runs 3 times (iterations 1, 3, 5) - assert len(agent_b_events) == 3 + # Get agent_started events for each agent + agent_a_events = collector.get_agent_started_events("agent_a") + agent_b_events = collector.get_agent_started_events("agent_b") - # Check that each agent's iteration counter is independent - # agent_a should have iteration counts [1, 2, 3] - agent_a_iterations = [e.data["iteration"] for e in agent_a_events] - assert agent_a_iterations == [1, 2, 3] + # agent_a runs 3 times (iterations 0, 2, 4) + assert len(agent_a_events) == 3 + # agent_b runs 3 times (iterations 1, 3, 5) + assert len(agent_b_events) == 3 - # agent_b should have iteration counts [1, 2, 3] - agent_b_iterations = [e.data["iteration"] for e in agent_b_events] - assert agent_b_iterations == [1, 2, 3] + # Check that each agent's iteration counter is independent + # agent_a should have iteration counts [1, 2, 3] + agent_a_iterations = [e.data["iteration"] for e in agent_a_events] + assert agent_a_iterations == [1, 2, 3] - finally: - await provider.close() + # agent_b should have iteration counts [1, 2, 3] + agent_b_iterations = [e.data["iteration"] for e in agent_b_events] + assert agent_b_iterations == [1, 2, 3]