Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion src/conductor/cli/validate.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ def validate_workflow(

try:
config = load_config(workflow_path)
return True, config
except ConductorError as e:
# Display structured error
display_validation_error(e, workflow_path, output_console)
Expand All @@ -56,6 +55,20 @@ def validate_workflow(
)
return False, None

# Semantic validation: cross-field references, template refs, etc.
try:
from conductor.config.validator import validate_workflow_config

warnings = validate_workflow_config(config, workflow_path=workflow_path)
if warnings:
for warning in warnings:
output_console.print(f" [yellow]⚠[/yellow] {warning}")
except ConductorError as e:
display_validation_error(e, workflow_path, output_console)
return False, None

return True, config


def display_validation_error(
error: ConductorError,
Expand Down
207 changes: 201 additions & 6 deletions src/conductor/config/validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,13 @@
from __future__ import annotations

import re
from pathlib import Path
from typing import TYPE_CHECKING

from conductor.exceptions import ConfigurationError

if TYPE_CHECKING:
from conductor.config.schema import WorkflowConfig
from conductor.config.schema import AgentDef, WorkflowConfig


# Matches agent/group name references in output templates.
Expand All @@ -23,6 +24,18 @@
# and .outputs plural for path coverage analysis.
_OUTPUT_REF_PATTERN = re.compile(r"(?:\{\{|\{%)[^}%]*?(\w+)\.outputs?\b")

# Matches agent/workflow template references in Jinja2 expressions:
# {{ agent_name.output.field }}
# {{ agent_name.output }}
# {% if agent_name.output.field %}
# Excludes built-in namespaces: workflow, context, item, _index, _key
_TEMPLATE_REF_PATTERN = re.compile(r"(?:\{\{|\{%)[^}%]*?\b(\w+)\.(?:output|outputs)\b")

# Matches workflow.input.X references in templates
_WORKFLOW_INPUT_REF_PATTERN = re.compile(r"(?:\{\{|\{%)[^}%]*?\bworkflow\.input\.(\w+)\b")

_BUILTIN_NAMES = frozenset({"workflow", "context", "item", "_index", "_key", "loop"})

# DFS path cap: larger workflows may get partial coverage analysis
_MAX_ENUMERATED_PATHS = 100

Expand All @@ -34,20 +47,25 @@
INPUT_REF_PATTERN = re.compile(
r"^(?:"
r"(?P<agent>[a-zA-Z_][a-zA-Z0-9_]*)\.output(?:\.(?P<field>[a-zA-Z_][a-zA-Z0-9_]*))?|"
r"(?P<parallel>[a-zA-Z_][a-zA-Z0-9_]*)\.outputs\.(?P<pg_agent>[a-zA-Z_][a-zA-Z0-9_]*)(?:\.(?P<pg_field>[a-zA-Z_][a-zA-Z0-9_]*))?|"
r"(?P<parallel>[a-zA-Z_][a-zA-Z0-9_]*)\.(?:outputs|errors)(?:\.(?P<pg_agent>[a-zA-Z_][a-zA-Z0-9_]*)(?:\.(?P<pg_field>[a-zA-Z_][a-zA-Z0-9_]*))?)?|"
r"workflow\.input\.(?P<input>[a-zA-Z_][a-zA-Z0-9_]*)"
r")(?P<optional>\?)?$"
)


def validate_workflow_config(config: WorkflowConfig) -> list[str]:
def validate_workflow_config(
config: WorkflowConfig,
workflow_path: Path | None = None,
) -> list[str]:
"""Perform comprehensive validation of a workflow configuration.

This function performs semantic validation beyond what Pydantic can check,
including cross-field references and consistency checks.
including cross-field references, consistency checks, and Jinja2 template
reference validation.

Args:
config: The WorkflowConfig to validate.
workflow_path: Optional path to the workflow file (for !file resolution).

Returns:
A list of warning messages (non-fatal issues).
Expand Down Expand Up @@ -97,6 +115,7 @@ def validate_workflow_config(config: WorkflowConfig) -> list[str]:
agent_names,
parallel_names,
set(config.workflow.input.keys()),
for_each_names,
)
errors.extend(input_errors)
warnings.extend(input_warnings)
Expand Down Expand Up @@ -135,6 +154,11 @@ def validate_workflow_config(config: WorkflowConfig) -> list[str]:
# Check output templates against conditional execution paths (warnings only)
warnings.extend(_validate_output_path_coverage(config))

# Validate Jinja2 template references across all agents
tmpl_errors, tmpl_warnings = _validate_template_references(config, workflow_path)
errors.extend(tmpl_errors)
warnings.extend(tmpl_warnings)

if errors:
raise ConfigurationError(
"Workflow configuration validation failed:\n - " + "\n - ".join(errors),
Expand Down Expand Up @@ -179,6 +203,7 @@ def _validate_input_references(
agent_names: set[str],
parallel_names: set[str],
workflow_inputs: set[str],
for_each_names: set[str] | None = None,
) -> tuple[list[str], list[str]]:
"""Validate input reference formats and targets.

Expand All @@ -188,12 +213,14 @@ def _validate_input_references(
agent_names: Set of valid agent names.
parallel_names: Set of valid parallel group names.
workflow_inputs: Set of valid workflow input parameter names.
for_each_names: Set of valid for-each group names.

Returns:
Tuple of (error messages, warning messages).
"""
errors: list[str] = []
warnings: list[str] = []
group_names = parallel_names | (for_each_names or set())

for input_ref in inputs:
match = INPUT_REF_PATTERN.match(input_ref)
Expand All @@ -220,9 +247,9 @@ def _validate_input_references(
f"Agent '{agent_name}' references unknown agent '{ref_agent}' in input"
)

# Check if referencing parallel group output
# Check if referencing parallel/for-each group output
ref_parallel = match.group("parallel")
if ref_parallel and ref_parallel not in parallel_names:
if ref_parallel and ref_parallel not in group_names:
is_optional = match.group("optional") == "?"
if is_optional:
warnings.append(
Expand Down Expand Up @@ -577,3 +604,171 @@ def _validate_output_path_coverage(config: WorkflowConfig) -> list[str]:
)

return warnings


def _collect_template_strings(
agent: AgentDef,
) -> list[tuple[str, str]]:
"""Collect all Jinja2 template strings from an agent definition.

Returns:
List of (source_label, template_string) tuples for error reporting.
"""

templates: list[tuple[str, str]] = []

if agent.prompt:
templates.append((f"agent '{agent.name}' prompt", agent.prompt))
if agent.system_prompt:
templates.append((f"agent '{agent.name}' system_prompt", agent.system_prompt))
if agent.command:
templates.append((f"agent '{agent.name}' command", agent.command))
for i, arg in enumerate(agent.args):
templates.append((f"agent '{agent.name}' args[{i}]", arg))
input_mapping: dict[str, str] | None = getattr(agent, "input_mapping", None)
if input_mapping:
for key, expr in input_mapping.items():
templates.append((f"agent '{agent.name}' input_mapping.{key}", expr))
if agent.working_dir:
templates.append((f"agent '{agent.name}' working_dir", agent.working_dir))

return templates


def _resolve_prompt_file(agent_name: str, prompt: str, workflow_path: Path | None) -> str | None:
"""If a prompt looks like it came from a !file tag, try to load the file content.

The !file tag is already resolved during YAML loading, so the prompt field
contains the file content. This function is for scanning prompts that may
have been loaded from files for additional template references.

Returns the prompt as-is (it's already resolved by the loader).
"""
return prompt if prompt else None


def _validate_template_references(
config: WorkflowConfig,
workflow_path: Path | None = None,
) -> tuple[list[str], list[str]]:
"""Validate Jinja2 template references across all agents.

Checks that:
- {{ X.output.Y }} references use valid agent names
- {{ workflow.input.X }} references use declared input names
- In explicit mode, agents reference only declared inputs

Args:
config: The WorkflowConfig to validate.
workflow_path: Optional path to the workflow file (for !file resolution).

Returns:
Tuple of (error messages, warning messages).
"""
errors: list[str] = []
warnings: list[str] = []

agent_names = {a.name for a in config.agents}
parallel_names = {pg.name for pg in config.parallel}
for_each_names = {fe.name for fe in config.for_each}
all_names = agent_names | parallel_names | for_each_names
workflow_input_names = set(config.workflow.input.keys())
is_explicit = config.workflow.context.mode == "explicit"

# Collect all agents including for-each inline agents
all_agents: list[tuple[AgentDef, set[str]]] = []
for agent in config.agents:
all_agents.append((agent, all_names))
for fe in config.for_each:
# For-each inline agents can reference the parent's agents
all_agents.append((fe.agent, all_names))

for agent, valid_names in all_agents:
templates = _collect_template_strings(agent)

# Extract declared input references for explicit mode checks
declared_agents: set[str] = set()
declared_workflow_inputs: set[str] = set()
has_full_workflow_input = False
for ref in agent.input:
match = INPUT_REF_PATTERN.match(ref.rstrip("?"))
if match:
ref_agent = match.group("agent")
if ref_agent:
declared_agents.add(ref_agent)
ref_parallel = match.group("parallel")
if ref_parallel:
declared_agents.add(ref_parallel)
ref_input = match.group("input")
if ref_input:
declared_workflow_inputs.add(ref_input)
# Check for bare "workflow.input" (all inputs)
clean_ref = ref.rstrip("?")
if clean_ref == "workflow.input":
has_full_workflow_input = True

for source, template in templates:
# Check agent/group output references
for match in _TEMPLATE_REF_PATTERN.finditer(template):
ref_name = match.group(1)
if ref_name in _BUILTIN_NAMES:
continue

if ref_name not in valid_names:
errors.append(
f"{source} references unknown agent '{ref_name}'. "
f"Available: {', '.join(sorted(valid_names))}"
)
elif (
is_explicit
and agent.type not in ("script", "workflow")
and ref_name not in declared_agents
):
# In explicit mode, LLM agents should declare their inputs
warnings.append(
f"{source} references '{ref_name}.output' but "
f"agent '{agent.name}' does not declare '{ref_name}.output' "
f"in its input: list (explicit context mode)"
)

# Check workflow.input references
for match in _WORKFLOW_INPUT_REF_PATTERN.finditer(template):
input_name = match.group(1)
if workflow_input_names and input_name not in workflow_input_names:
# Only error when inputs ARE declared — workflows without
# input: blocks may use workflow.input conditionally
errors.append(
f"{source} references unknown workflow input '{input_name}'. "
f"Declared inputs: {', '.join(sorted(workflow_input_names))}"
)
elif (
is_explicit
and agent.type not in ("script", "workflow")
and not has_full_workflow_input
and input_name not in declared_workflow_inputs
):
warnings.append(
f"{source} references 'workflow.input.{input_name}' but "
f"agent '{agent.name}' does not declare "
f"'workflow.input.{input_name}' in its input: list "
f"(explicit context mode)"
)

# Check workflow output templates
if config.output:
for field, template in config.output.items():
for match in _TEMPLATE_REF_PATTERN.finditer(template):
ref_name = match.group(1)
if ref_name not in _BUILTIN_NAMES and ref_name not in all_names:
errors.append(
f"Workflow output '{field}' references unknown agent '{ref_name}'"
)
for match in _WORKFLOW_INPUT_REF_PATTERN.finditer(template):
input_name = match.group(1)
if input_name not in workflow_input_names:
errors.append(
f"Workflow output '{field}' references unknown "
f"workflow input '{input_name}'"
)

return errors, warnings
Loading