Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
186 changes: 176 additions & 10 deletions src/conductor/config/validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,16 @@
from conductor.config.schema import WorkflowConfig


# Matches agent/group name references in output templates.
# Catches {{ name.output.field }}, {{ group.outputs.member }}, and {% if name.output %}.
# Note: this is intentionally broader than the agent_ref_pattern in _validate_output_references
# (which only matches {{ }} blocks with .output singular). This pattern also matches {% %} blocks
# and .outputs plural for path coverage analysis.
_OUTPUT_REF_PATTERN = re.compile(r"(?:\{\{|\{%)[^}%]*?(\w+)\.outputs?\b")

# DFS path cap: larger workflows may get partial coverage analysis
_MAX_ENUMERATED_PATHS = 100

# Pattern for input references:
# - agent.output(.field)?
# - parallel_group.outputs.agent(.field)?
Expand Down Expand Up @@ -48,10 +58,11 @@ def validate_workflow_config(config: WorkflowConfig) -> list[str]:
errors: list[str] = []
warnings: list[str] = []

# Build index of agent names and parallel group names
# Build index of all addressable node names
agent_names = {agent.name for agent in config.agents}
parallel_names = {pg.name for pg in config.parallel}
all_names = agent_names | parallel_names
for_each_names = {fe.name for fe in config.for_each}
all_names = agent_names | parallel_names | for_each_names

# Validate entry_point exists (already done by Pydantic, but good to have explicit)
if config.workflow.entry_point not in all_names:
Expand Down Expand Up @@ -110,10 +121,15 @@ def validate_workflow_config(config: WorkflowConfig) -> list[str]:

# Validate workflow output references
output_errors = _validate_output_references(
config.output, agent_names, set(config.workflow.input.keys())
config.output,
agent_names | parallel_names | for_each_names,
set(config.workflow.input.keys()),
)
errors.extend(output_errors)

# Check output templates against conditional execution paths (warnings only)
warnings.extend(_validate_output_path_coverage(config))

if errors:
raise ConfigurationError(
"Workflow configuration validation failed:\n - " + "\n - ".join(errors),
Expand All @@ -133,7 +149,7 @@ def _validate_agent_routes(
Args:
agent_name: Name of the agent whose routes are being validated.
routes: List of RouteDef objects.
valid_targets: Set of valid target names (agents and parallel groups).
valid_targets: Set of valid target names (agents, parallel groups, and for-each groups).

Returns:
List of error messages.
Expand All @@ -143,8 +159,9 @@ def _validate_agent_routes(
for i, route in enumerate(routes):
if route.to != "$end" and route.to not in valid_targets:
errors.append(
f"Agent '{agent_name}' route {i} targets unknown agent or "
f"parallel group '{route.to}'. Use '$end' to terminate or one of: "
f"Agent '{agent_name}' route {i} targets unknown agent, "
f"parallel group, or for-each group '{route.to}'. "
f"Use '$end' to terminate or one of: "
f"{', '.join(sorted(valid_targets))}"
)

Expand Down Expand Up @@ -263,7 +280,7 @@ def _validate_tool_references(

def _validate_output_references(
output: dict[str, str],
agent_names: set[str],
valid_names: set[str],
workflow_inputs: set[str],
) -> list[str]:
"""Validate that output template references are valid.
Expand All @@ -273,7 +290,7 @@ def _validate_output_references(

Args:
output: Dict of output field names to template expressions.
agent_names: Set of valid agent names.
valid_names: Set of valid agent, parallel group, and for-each group names.
workflow_inputs: Set of valid workflow input parameter names.

Returns:
Expand All @@ -289,7 +306,7 @@ def _validate_output_references(
for field, template in output.items():
matches = agent_ref_pattern.findall(template)
for ref in matches:
if ref not in agent_names and ref not in ("workflow", "context"):
if ref not in valid_names and ref not in ("workflow", "context"):
errors.append(f"Workflow output '{field}' references unknown agent '{ref}'")

return errors
Expand Down Expand Up @@ -363,7 +380,8 @@ def _validate_parallel_groups(config: WorkflowConfig) -> list[str]:
)

# PE-6.2: Validate parallel group route targets
all_names = agent_names | parallel_names
for_each_names = {fe.name for fe in config.for_each}
all_names = agent_names | parallel_names | for_each_names
route_errors = _validate_agent_routes(pg.name, pg.routes, all_names)
errors.extend(route_errors)

Expand Down Expand Up @@ -399,3 +417,151 @@ def _validate_parallel_groups(config: WorkflowConfig) -> list[str]:
)

return errors


def _build_routing_graph(config: WorkflowConfig) -> dict[str, list[tuple[str, bool]]]:
"""Build adjacency list from workflow config for path analysis.

Args:
config: The WorkflowConfig to analyze.

Returns:
Dict mapping node names to list of (target, is_conditional) tuples.
"""
graph: dict[str, list[tuple[str, bool]]] = {}
for agent in config.agents:
edges: list[tuple[str, bool]] = []
if agent.routes:
for route in agent.routes:
edges.append((route.to, route.when is not None))
elif agent.type == "human_gate" and agent.options:
for option in agent.options:
edges.append((option.route, True))
graph[agent.name] = edges
for pg in config.parallel:
graph[pg.name] = [(r.to, r.when is not None) for r in pg.routes]
for fe in config.for_each:
graph[fe.name] = [(r.to, r.when is not None) for r in fe.routes]
return graph


def _enumerate_paths_to_end(
start: str,
graph: dict[str, list[tuple[str, bool]]],
max_depth: int = 50,
) -> list[list[str]]:
"""Enumerate paths from start to $end via DFS, up to _MAX_ENUMERATED_PATHS.

Args:
start: Entry point node name.
graph: Adjacency list from _build_routing_graph.
max_depth: Maximum path depth (prevents infinite exploration).

Returns:
List of paths (up to _MAX_ENUMERATED_PATHS), where each path is a list
of node names. If the graph has more paths than the cap, returns the
first ones found. Callers should treat results as best-effort for
highly branchy workflows.
"""
paths: list[list[str]] = []

def dfs(current: str, path: list[str], visited: set[str]) -> None:
if len(paths) >= _MAX_ENUMERATED_PATHS or len(path) > max_depth:
return
if current == "$end":
paths.append(list(path))
return
if current not in graph or current in visited:
return
visited.add(current)
path.append(current)
for target, _ in graph[current]:
dfs(target, path, visited)
path.pop()
visited.discard(current)

dfs(start, [], set())
return paths


def _extract_output_template_refs(output: dict[str, str]) -> set[str]:
"""Extract agent/group names referenced in output templates.

Args:
output: Dict of output field names to template expressions.

Returns:
Set of referenced agent/group names (excluding 'workflow' and 'context').
"""
refs: set[str] = set()
for template in output.values():
for match in _OUTPUT_REF_PATTERN.finditer(template):
name = match.group(1)
if name not in ("workflow", "context"):
refs.add(name)
return refs


def _name_on_path(name: str, path: list[str], config: WorkflowConfig) -> bool:
"""Check if an agent/group name appears on a given execution path.

Checks both direct presence and membership in a parallel group on the path.
Note: for-each inline agents are not checked here because users reference
the group name (e.g., analyzers.outputs), not the inner agent name directly.

Args:
name: Agent or group name to check.
path: List of node names representing an execution path.
config: The WorkflowConfig for parallel group membership lookup.

Returns:
True if the name is on the path (directly or via parallel group).
"""
if name in path:
return True
return any(pg.name in path and name in pg.agents for pg in config.parallel)


def _validate_output_path_coverage(config: WorkflowConfig) -> list[str]:
"""Validate that output template references are reachable on all paths.

Emits warnings (not errors) for output template references to agents/groups
that don't appear on every possible execution path from entry_point to $end.

Args:
config: The WorkflowConfig to validate.

Returns:
List of warning messages.
"""
if not config.output:
return []

graph = _build_routing_graph(config)
node_count = len(config.agents) + len(config.parallel) + len(config.for_each)
max_depth = max(config.workflow.limits.max_iterations, node_count)
paths = _enumerate_paths_to_end(config.workflow.entry_point, graph, max_depth)

if not paths:
return []

refs = _extract_output_template_refs(config.output)
if not refs:
return []

warnings: list[str] = []
for ref in sorted(refs):
missing_paths = [p for p in paths if not _name_on_path(ref, p, config)]
if missing_paths:
# Pick the shortest example path for the warning message
missing_paths.sort(key=len)
example = missing_paths[0]
path_str = " \u2192 ".join(example + ["$end"])
warnings.append(
f"Output template references '{ref}' which may not run on all paths. "
f"Example path where it is skipped: {path_str}. "
f"Consider wrapping with {{% if {ref} is defined %}} to handle "
f"cases where this agent/group does not execute."
)

return warnings
Loading
Loading