From 625f1b84beb60d359c7cf097a3125611d54ae17f Mon Sep 17 00:00:00 2001 From: Jason Robert Date: Tue, 17 Mar 2026 09:43:46 -0400 Subject: [PATCH 1/3] fix(validator): warn when output templates reference conditionally-skipped agents Add static path analysis to the validator that enumerates all execution paths from entry_point to $end and checks whether output template references are guaranteed to have run. Emits warnings (not errors) for references to agents/groups that may be skipped due to conditional routing, with an example path and a {% if ... is defined %} suggestion. Also fixes a pre-existing gap where for-each group names were missing from the validator's all_names set and _validate_output_references, causing false errors when referencing for-each groups in routes or output templates. Closes #6 Co-Authored-By: Claude Opus 4.6 --- src/conductor/config/validator.py | 163 ++++++++++++++++- tests/test_config/test_validator.py | 260 ++++++++++++++++++++++++++++ 2 files changed, 418 insertions(+), 5 deletions(-) diff --git a/src/conductor/config/validator.py b/src/conductor/config/validator.py index 8db2261..bfcad99 100644 --- a/src/conductor/config/validator.py +++ b/src/conductor/config/validator.py @@ -16,6 +16,10 @@ from conductor.config.schema import WorkflowConfig +# Matches agent/group name references in output templates +# Catches {{ name.output.field }}, {{ group.outputs.member }}, and {% if name.output %} +_OUTPUT_REF_PATTERN = re.compile(r"(?:\{\{|\{%)[^}%]*?(\w+)\.outputs?\b") + # Pattern for input references: # - agent.output(.field)? # - parallel_group.outputs.agent(.field)? @@ -51,7 +55,8 @@ def validate_workflow_config(config: WorkflowConfig) -> list[str]: # Build index of agent names and parallel group names agent_names = {agent.name for agent in config.agents} parallel_names = {pg.name for pg in config.parallel} - all_names = agent_names | parallel_names + for_each_names = {fe.name for fe in config.for_each} + all_names = agent_names | parallel_names | for_each_names # Validate entry_point exists (already done by Pydantic, but good to have explicit) if config.workflow.entry_point not in all_names: @@ -110,10 +115,15 @@ def validate_workflow_config(config: WorkflowConfig) -> list[str]: # Validate workflow output references output_errors = _validate_output_references( - config.output, agent_names, set(config.workflow.input.keys()) + config.output, + agent_names | parallel_names | for_each_names, + set(config.workflow.input.keys()), ) errors.extend(output_errors) + # Check output templates against conditional execution paths (warnings only) + warnings.extend(_validate_output_path_coverage(config)) + if errors: raise ConfigurationError( "Workflow configuration validation failed:\n - " + "\n - ".join(errors), @@ -263,7 +273,7 @@ def _validate_tool_references( def _validate_output_references( output: dict[str, str], - agent_names: set[str], + valid_names: set[str], workflow_inputs: set[str], ) -> list[str]: """Validate that output template references are valid. @@ -273,7 +283,7 @@ def _validate_output_references( Args: output: Dict of output field names to template expressions. - agent_names: Set of valid agent names. + valid_names: Set of valid agent, parallel group, and for-each group names. workflow_inputs: Set of valid workflow input parameter names. Returns: @@ -289,7 +299,7 @@ def _validate_output_references( for field, template in output.items(): matches = agent_ref_pattern.findall(template) for ref in matches: - if ref not in agent_names and ref not in ("workflow", "context"): + if ref not in valid_names and ref not in ("workflow", "context"): errors.append(f"Workflow output '{field}' references unknown agent '{ref}'") return errors @@ -399,3 +409,146 @@ def _validate_parallel_groups(config: WorkflowConfig) -> list[str]: ) return errors + + +def _build_routing_graph(config: WorkflowConfig) -> dict[str, list[tuple[str, bool]]]: + """Build adjacency list from workflow config for path analysis. + + Args: + config: The WorkflowConfig to analyze. + + Returns: + Dict mapping node names to list of (target, is_conditional) tuples. + """ + graph: dict[str, list[tuple[str, bool]]] = {} + for agent in config.agents: + edges: list[tuple[str, bool]] = [] + if agent.routes: + for route in agent.routes: + edges.append((route.to, route.when is not None)) + elif agent.type == "human_gate" and agent.options: + for option in agent.options: + edges.append((option.route, True)) + graph[agent.name] = edges + for pg in config.parallel: + graph[pg.name] = [(r.to, r.when is not None) for r in pg.routes] + for fe in config.for_each: + graph[fe.name] = [(r.to, r.when is not None) for r in fe.routes] + return graph + + +def _enumerate_paths_to_end( + start: str, + graph: dict[str, list[tuple[str, bool]]], + max_depth: int = 50, +) -> list[list[str]]: + """Enumerate all paths from start to $end via DFS. + + Args: + start: Entry point node name. + graph: Adjacency list from _build_routing_graph. + max_depth: Maximum path depth (prevents infinite exploration). + + Returns: + List of paths, where each path is a list of node names. + """ + MAX_PATHS = 100 + paths: list[list[str]] = [] + + def dfs(current: str, path: list[str], visited: set[str]) -> None: + if len(paths) >= MAX_PATHS or len(path) > max_depth: + return + if current == "$end": + paths.append(list(path)) + return + if current not in graph or current in visited: + return + visited.add(current) + path.append(current) + for target, _ in graph[current]: + dfs(target, path, visited) + path.pop() + visited.discard(current) + + dfs(start, [], set()) + return paths + + +def _extract_output_template_refs(output: dict[str, str]) -> set[str]: + """Extract agent/group names referenced in output templates. + + Args: + output: Dict of output field names to template expressions. + + Returns: + Set of referenced agent/group names (excluding 'workflow' and 'context'). + """ + refs: set[str] = set() + for template in output.values(): + for match in _OUTPUT_REF_PATTERN.finditer(template): + name = match.group(1) + if name not in ("workflow", "context"): + refs.add(name) + return refs + + +def _name_on_path(name: str, path: list[str], config: WorkflowConfig) -> bool: + """Check if an agent/group name appears on a given execution path. + + Checks both direct presence and membership in a parallel group on the path. + + Args: + name: Agent or group name to check. + path: List of node names representing an execution path. + config: The WorkflowConfig for parallel group membership lookup. + + Returns: + True if the name is on the path (directly or via parallel group). + """ + if name in path: + return True + return any(pg.name in path and name in pg.agents for pg in config.parallel) + + +def _validate_output_path_coverage(config: WorkflowConfig) -> list[str]: + """Validate that output template references are reachable on all paths. + + Emits warnings (not errors) for output template references to agents/groups + that don't appear on every possible execution path from entry_point to $end. + + Args: + config: The WorkflowConfig to validate. + + Returns: + List of warning messages. + """ + if not config.output: + return [] + + graph = _build_routing_graph(config) + max_depth = config.workflow.limits.max_iterations + paths = _enumerate_paths_to_end(config.workflow.entry_point, graph, max_depth) + + if not paths: + return [] + + refs = _extract_output_template_refs(config.output) + if not refs: + return [] + + warnings: list[str] = [] + for ref in sorted(refs): + missing_paths = [p for p in paths if not _name_on_path(ref, p, config)] + if missing_paths: + # Pick the shortest example path for the warning message + missing_paths.sort(key=len) + example = missing_paths[0] + path_str = " \u2192 ".join(example + ["$end"]) + warnings.append( + f"Output template references '{ref}' which may not run on all paths. " + f"Example path where it is skipped: {path_str}. " + f"Consider wrapping with {{% if {ref} is defined %}} to handle " + f"cases where this agent/group does not execute." + ) + + return warnings diff --git a/tests/test_config/test_validator.py b/tests/test_config/test_validator.py index 81f9249..8829cc0 100644 --- a/tests/test_config/test_validator.py +++ b/tests/test_config/test_validator.py @@ -6,8 +6,10 @@ from conductor.config.schema import ( AgentDef, + ForEachDef, GateOption, InputDef, + ParallelGroup, RouteDef, WorkflowConfig, WorkflowDef, @@ -345,3 +347,261 @@ def test_output_reference_to_unknown_agent(self) -> None: with pytest.raises(ConfigurationError) as exc_info: validate_workflow_config(config) assert "unknown_agent" in str(exc_info.value) + + +class TestOutputPathCoverage: + """Tests for output template path coverage validation.""" + + def test_no_warning_linear_workflow(self) -> None: + """Linear A→B→$end with output refs to both produces no warnings.""" + config = WorkflowConfig( + workflow=WorkflowDef(name="test", entry_point="agent_a"), + agents=[ + AgentDef( + name="agent_a", + model="gpt-4", + prompt="A", + routes=[RouteDef(to="agent_b")], + ), + AgentDef( + name="agent_b", + model="gpt-4", + prompt="B", + routes=[RouteDef(to="$end")], + ), + ], + output={ + "a_result": "{{ agent_a.output }}", + "b_result": "{{ agent_b.output }}", + }, + ) + warnings = validate_workflow_config(config) + assert not warnings + + def test_warning_conditionally_skipped_agent(self) -> None: + """Evaluator routes to deployer OR $end; output refs deployer → warning.""" + config = WorkflowConfig( + workflow=WorkflowDef(name="test", entry_point="evaluator"), + agents=[ + AgentDef( + name="evaluator", + model="gpt-4", + prompt="Evaluate", + routes=[ + RouteDef(to="deployer", when="{{ evaluator.output.approved }}"), + RouteDef(to="$end"), + ], + ), + AgentDef( + name="deployer", + model="gpt-4", + prompt="Deploy", + routes=[RouteDef(to="$end")], + ), + ], + output={"summary": "{{ deployer.output.summary }}"}, + ) + warnings = validate_workflow_config(config) + assert any("deployer" in w for w in warnings) + + def test_no_warning_agent_on_all_branches(self) -> None: + """Router routes to A on both branches; output refs A → no warning.""" + config = WorkflowConfig( + workflow=WorkflowDef(name="test", entry_point="router"), + agents=[ + AgentDef( + name="router", + model="gpt-4", + prompt="Route", + routes=[ + RouteDef(to="agent_a", when="{{ router.output.fast }}"), + RouteDef(to="agent_a"), + ], + ), + AgentDef( + name="agent_a", + model="gpt-4", + prompt="Do work", + routes=[RouteDef(to="$end")], + ), + ], + output={"result": "{{ agent_a.output }}"}, + ) + warnings = validate_workflow_config(config) + assert not warnings + + def test_parallel_group_member_available(self) -> None: + """Entry→pg(a,b)→$end, output refs member 'a' via pg → no warning.""" + config = WorkflowConfig( + workflow=WorkflowDef(name="test", entry_point="entry"), + agents=[ + AgentDef(name="entry", model="gpt-4", prompt="Go", routes=[RouteDef(to="pg")]), + AgentDef(name="agent_a", model="gpt-4", prompt="A"), + AgentDef(name="agent_b", model="gpt-4", prompt="B"), + ], + parallel=[ + ParallelGroup( + name="pg", + agents=["agent_a", "agent_b"], + routes=[RouteDef(to="$end")], + ), + ], + output={"a_out": "{{ agent_a.output }}"}, + ) + warnings = validate_workflow_config(config) + assert not warnings + + def test_warning_skippable_parallel_group(self) -> None: + """Router→(pg→$end OR $end), output refs pg member agent → warning.""" + config = WorkflowConfig( + workflow=WorkflowDef(name="test", entry_point="router"), + agents=[ + AgentDef( + name="router", + model="gpt-4", + prompt="Route", + routes=[ + RouteDef(to="pg", when="{{ router.output.needs_parallel }}"), + RouteDef(to="$end"), + ], + ), + AgentDef(name="agent_a", model="gpt-4", prompt="A"), + AgentDef(name="agent_b", model="gpt-4", prompt="B"), + ], + parallel=[ + ParallelGroup( + name="pg", + agents=["agent_a", "agent_b"], + routes=[RouteDef(to="$end")], + ), + ], + output={"a_out": "{{ agent_a.output }}"}, + ) + warnings = validate_workflow_config(config) + assert any("agent_a" in w for w in warnings) + + def test_human_gate_conditional_paths(self) -> None: + """Gate(opt1→agent_a, opt2→$end), output refs agent_a → warning.""" + config = WorkflowConfig( + workflow=WorkflowDef(name="test", entry_point="gate"), + agents=[ + AgentDef( + name="gate", + type="human_gate", + prompt="Choose:", + options=[ + GateOption(label="Approve", value="yes", route="agent_a"), + GateOption(label="Reject", value="no", route="$end"), + ], + ), + AgentDef( + name="agent_a", + model="gpt-4", + prompt="Do work", + routes=[RouteDef(to="$end")], + ), + ], + output={"result": "{{ agent_a.output }}"}, + ) + warnings = validate_workflow_config(config) + assert any("agent_a" in w for w in warnings) + + def test_no_warning_when_no_output_section(self) -> None: + """No output dict → no warnings.""" + config = WorkflowConfig( + workflow=WorkflowDef(name="test", entry_point="agent1"), + agents=[ + AgentDef( + name="agent1", + model="gpt-4", + prompt="Hello", + routes=[RouteDef(to="$end")], + ), + ], + ) + warnings = validate_workflow_config(config) + assert not warnings + + def test_loop_does_not_crash(self) -> None: + """A→B→(A OR $end), output refs A and B → no crash, no warnings.""" + config = WorkflowConfig( + workflow=WorkflowDef(name="test", entry_point="agent_a"), + agents=[ + AgentDef( + name="agent_a", + model="gpt-4", + prompt="A", + routes=[RouteDef(to="agent_b")], + ), + AgentDef( + name="agent_b", + model="gpt-4", + prompt="B", + routes=[ + RouteDef(to="agent_a", when="{{ agent_b.output.retry }}"), + RouteDef(to="$end"), + ], + ), + ], + output={ + "a_result": "{{ agent_a.output }}", + "b_result": "{{ agent_b.output }}", + }, + ) + warnings = validate_workflow_config(config) + assert not warnings + + def test_warning_message_format(self) -> None: + """Warning contains path string and {% if suggestion.""" + config = WorkflowConfig( + workflow=WorkflowDef(name="test", entry_point="evaluator"), + agents=[ + AgentDef( + name="evaluator", + model="gpt-4", + prompt="Evaluate", + routes=[ + RouteDef(to="deployer", when="{{ evaluator.output.approved }}"), + RouteDef(to="$end"), + ], + ), + AgentDef( + name="deployer", + model="gpt-4", + prompt="Deploy", + routes=[RouteDef(to="$end")], + ), + ], + output={"summary": "{{ deployer.output.summary }}"}, + ) + warnings = validate_workflow_config(config) + assert len(warnings) == 1 + warning = warnings[0] + assert "deployer" in warning + assert "evaluator" in warning + assert "$end" in warning + assert "{% if deployer is defined %}" in warning + + def test_for_each_on_every_path(self) -> None: + """Entry→fe→$end, output refs fe.outputs → no warning. + + Uses a for-each group on the only path to $end, so the reference + should not produce a path coverage warning. + """ + config = WorkflowConfig( + workflow=WorkflowDef(name="test", entry_point="analyzers"), + agents=[], + for_each=[ + ForEachDef( + name="analyzers", + type="for_each", + source="workflow.input.items", + **{"as": "item"}, + agent=AgentDef(name="analyzer", model="gpt-4", prompt="Analyze {{ item }}"), + routes=[RouteDef(to="$end")], + ), + ], + output={"results": "{{ analyzers.outputs }}"}, + ) + warnings = validate_workflow_config(config) + assert not warnings From 24ede1ba297e641c0203e11b2b2db6cc4e5a0b75 Mon Sep 17 00:00:00 2001 From: Jason Robert Date: Tue, 17 Mar 2026 10:44:07 -0400 Subject: [PATCH 2/3] chore: sync uv.lock with version bump --- uv.lock | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/uv.lock b/uv.lock index 8f149e0..be910de 100644 --- a/uv.lock +++ b/uv.lock @@ -150,7 +150,7 @@ wheels = [ [[package]] name = "conductor-cli" -version = "0.1.0" +version = "0.1.1" source = { editable = "." } dependencies = [ { name = "anthropic" }, From f15a8d545e26a92a40a7bf5f074fdd5fdf550dd5 Mon Sep 17 00:00:00 2001 From: Jason Robert Date: Tue, 17 Mar 2026 12:16:09 -0400 Subject: [PATCH 3/3] fix(validator): address PR review findings for output path coverage - Fix max_depth using max(max_iterations, node_count) so linear workflows longer than max_iterations are still fully analyzed - Fix _validate_parallel_groups to include for-each names in route target validation (latent bug exposed by review) - Promote MAX_PATHS to module-level _MAX_ENUMERATED_PATHS constant with documented best-effort semantics in docstring - Update stale comments and docstrings to reflect for-each group support throughout the validator - Add cross-reference comment linking the two output ref regex patterns and explaining their intentional scope difference - Document _name_on_path for-each inline agent limitation Co-Authored-By: Claude Opus 4.6 --- src/conductor/config/validator.py | 37 +++++++++++++++++++++---------- 1 file changed, 25 insertions(+), 12 deletions(-) diff --git a/src/conductor/config/validator.py b/src/conductor/config/validator.py index bfcad99..8bd0c63 100644 --- a/src/conductor/config/validator.py +++ b/src/conductor/config/validator.py @@ -16,10 +16,16 @@ from conductor.config.schema import WorkflowConfig -# Matches agent/group name references in output templates -# Catches {{ name.output.field }}, {{ group.outputs.member }}, and {% if name.output %} +# Matches agent/group name references in output templates. +# Catches {{ name.output.field }}, {{ group.outputs.member }}, and {% if name.output %}. +# Note: this is intentionally broader than the agent_ref_pattern in _validate_output_references +# (which only matches {{ }} blocks with .output singular). This pattern also matches {% %} blocks +# and .outputs plural for path coverage analysis. _OUTPUT_REF_PATTERN = re.compile(r"(?:\{\{|\{%)[^}%]*?(\w+)\.outputs?\b") +# DFS path cap: larger workflows may get partial coverage analysis +_MAX_ENUMERATED_PATHS = 100 + # Pattern for input references: # - agent.output(.field)? # - parallel_group.outputs.agent(.field)? @@ -52,7 +58,7 @@ def validate_workflow_config(config: WorkflowConfig) -> list[str]: errors: list[str] = [] warnings: list[str] = [] - # Build index of agent names and parallel group names + # Build index of all addressable node names agent_names = {agent.name for agent in config.agents} parallel_names = {pg.name for pg in config.parallel} for_each_names = {fe.name for fe in config.for_each} @@ -143,7 +149,7 @@ def _validate_agent_routes( Args: agent_name: Name of the agent whose routes are being validated. routes: List of RouteDef objects. - valid_targets: Set of valid target names (agents and parallel groups). + valid_targets: Set of valid target names (agents, parallel groups, and for-each groups). Returns: List of error messages. @@ -153,8 +159,9 @@ def _validate_agent_routes( for i, route in enumerate(routes): if route.to != "$end" and route.to not in valid_targets: errors.append( - f"Agent '{agent_name}' route {i} targets unknown agent or " - f"parallel group '{route.to}'. Use '$end' to terminate or one of: " + f"Agent '{agent_name}' route {i} targets unknown agent, " + f"parallel group, or for-each group '{route.to}'. " + f"Use '$end' to terminate or one of: " f"{', '.join(sorted(valid_targets))}" ) @@ -373,7 +380,8 @@ def _validate_parallel_groups(config: WorkflowConfig) -> list[str]: ) # PE-6.2: Validate parallel group route targets - all_names = agent_names | parallel_names + for_each_names = {fe.name for fe in config.for_each} + all_names = agent_names | parallel_names | for_each_names route_errors = _validate_agent_routes(pg.name, pg.routes, all_names) errors.extend(route_errors) @@ -442,7 +450,7 @@ def _enumerate_paths_to_end( graph: dict[str, list[tuple[str, bool]]], max_depth: int = 50, ) -> list[list[str]]: - """Enumerate all paths from start to $end via DFS. + """Enumerate paths from start to $end via DFS, up to _MAX_ENUMERATED_PATHS. Args: start: Entry point node name. @@ -450,13 +458,15 @@ def _enumerate_paths_to_end( max_depth: Maximum path depth (prevents infinite exploration). Returns: - List of paths, where each path is a list of node names. + List of paths (up to _MAX_ENUMERATED_PATHS), where each path is a list + of node names. If the graph has more paths than the cap, returns the + first ones found. Callers should treat results as best-effort for + highly branchy workflows. """ - MAX_PATHS = 100 paths: list[list[str]] = [] def dfs(current: str, path: list[str], visited: set[str]) -> None: - if len(paths) >= MAX_PATHS or len(path) > max_depth: + if len(paths) >= _MAX_ENUMERATED_PATHS or len(path) > max_depth: return if current == "$end": paths.append(list(path)) @@ -496,6 +506,8 @@ def _name_on_path(name: str, path: list[str], config: WorkflowConfig) -> bool: """Check if an agent/group name appears on a given execution path. Checks both direct presence and membership in a parallel group on the path. + Note: for-each inline agents are not checked here because users reference + the group name (e.g., analyzers.outputs), not the inner agent name directly. Args: name: Agent or group name to check. @@ -526,7 +538,8 @@ def _validate_output_path_coverage(config: WorkflowConfig) -> list[str]: return [] graph = _build_routing_graph(config) - max_depth = config.workflow.limits.max_iterations + node_count = len(config.agents) + len(config.parallel) + len(config.for_each) + max_depth = max(config.workflow.limits.max_iterations, node_count) paths = _enumerate_paths_to_end(config.workflow.entry_point, graph, max_depth) if not paths: