diff --git a/src/conductor/cli/validate.py b/src/conductor/cli/validate.py index 8409cd8..64a6a49 100644 --- a/src/conductor/cli/validate.py +++ b/src/conductor/cli/validate.py @@ -40,7 +40,6 @@ def validate_workflow( try: config = load_config(workflow_path) - return True, config except ConductorError as e: # Display structured error display_validation_error(e, workflow_path, output_console) @@ -56,6 +55,20 @@ def validate_workflow( ) return False, None + # Semantic validation: cross-field references, template refs, etc. + try: + from conductor.config.validator import validate_workflow_config + + warnings = validate_workflow_config(config, workflow_path=workflow_path) + if warnings: + for warning in warnings: + output_console.print(f" [yellow]⚠[/yellow] {warning}") + except ConductorError as e: + display_validation_error(e, workflow_path, output_console) + return False, None + + return True, config + def display_validation_error( error: ConductorError, diff --git a/src/conductor/config/validator.py b/src/conductor/config/validator.py index 4581064..dc46d73 100644 --- a/src/conductor/config/validator.py +++ b/src/conductor/config/validator.py @@ -8,12 +8,13 @@ from __future__ import annotations import re +from pathlib import Path from typing import TYPE_CHECKING from conductor.exceptions import ConfigurationError if TYPE_CHECKING: - from conductor.config.schema import WorkflowConfig + from conductor.config.schema import AgentDef, WorkflowConfig # Matches agent/group name references in output templates. @@ -23,6 +24,18 @@ # and .outputs plural for path coverage analysis. _OUTPUT_REF_PATTERN = re.compile(r"(?:\{\{|\{%)[^}%]*?(\w+)\.outputs?\b") +# Matches agent/workflow template references in Jinja2 expressions: +# {{ agent_name.output.field }} +# {{ agent_name.output }} +# {% if agent_name.output.field %} +# Excludes built-in namespaces: workflow, context, item, _index, _key +_TEMPLATE_REF_PATTERN = re.compile(r"(?:\{\{|\{%)[^}%]*?\b(\w+)\.(?:output|outputs)\b") + +# Matches workflow.input.X references in templates +_WORKFLOW_INPUT_REF_PATTERN = re.compile(r"(?:\{\{|\{%)[^}%]*?\bworkflow\.input\.(\w+)\b") + +_BUILTIN_NAMES = frozenset({"workflow", "context", "item", "_index", "_key", "loop"}) + # DFS path cap: larger workflows may get partial coverage analysis _MAX_ENUMERATED_PATHS = 100 @@ -34,20 +47,25 @@ INPUT_REF_PATTERN = re.compile( r"^(?:" r"(?P[a-zA-Z_][a-zA-Z0-9_]*)\.output(?:\.(?P[a-zA-Z_][a-zA-Z0-9_]*))?|" - r"(?P[a-zA-Z_][a-zA-Z0-9_]*)\.outputs\.(?P[a-zA-Z_][a-zA-Z0-9_]*)(?:\.(?P[a-zA-Z_][a-zA-Z0-9_]*))?|" + r"(?P[a-zA-Z_][a-zA-Z0-9_]*)\.(?:outputs|errors)(?:\.(?P[a-zA-Z_][a-zA-Z0-9_]*)(?:\.(?P[a-zA-Z_][a-zA-Z0-9_]*))?)?|" r"workflow\.input\.(?P[a-zA-Z_][a-zA-Z0-9_]*)" r")(?P\?)?$" ) -def validate_workflow_config(config: WorkflowConfig) -> list[str]: +def validate_workflow_config( + config: WorkflowConfig, + workflow_path: Path | None = None, +) -> list[str]: """Perform comprehensive validation of a workflow configuration. This function performs semantic validation beyond what Pydantic can check, - including cross-field references and consistency checks. + including cross-field references, consistency checks, and Jinja2 template + reference validation. Args: config: The WorkflowConfig to validate. + workflow_path: Optional path to the workflow file (for !file resolution). Returns: A list of warning messages (non-fatal issues). @@ -97,6 +115,7 @@ def validate_workflow_config(config: WorkflowConfig) -> list[str]: agent_names, parallel_names, set(config.workflow.input.keys()), + for_each_names, ) errors.extend(input_errors) warnings.extend(input_warnings) @@ -135,6 +154,11 @@ def validate_workflow_config(config: WorkflowConfig) -> list[str]: # Check output templates against conditional execution paths (warnings only) warnings.extend(_validate_output_path_coverage(config)) + # Validate Jinja2 template references across all agents + tmpl_errors, tmpl_warnings = _validate_template_references(config, workflow_path) + errors.extend(tmpl_errors) + warnings.extend(tmpl_warnings) + if errors: raise ConfigurationError( "Workflow configuration validation failed:\n - " + "\n - ".join(errors), @@ -179,6 +203,7 @@ def _validate_input_references( agent_names: set[str], parallel_names: set[str], workflow_inputs: set[str], + for_each_names: set[str] | None = None, ) -> tuple[list[str], list[str]]: """Validate input reference formats and targets. @@ -188,12 +213,14 @@ def _validate_input_references( agent_names: Set of valid agent names. parallel_names: Set of valid parallel group names. workflow_inputs: Set of valid workflow input parameter names. + for_each_names: Set of valid for-each group names. Returns: Tuple of (error messages, warning messages). """ errors: list[str] = [] warnings: list[str] = [] + group_names = parallel_names | (for_each_names or set()) for input_ref in inputs: match = INPUT_REF_PATTERN.match(input_ref) @@ -220,9 +247,9 @@ def _validate_input_references( f"Agent '{agent_name}' references unknown agent '{ref_agent}' in input" ) - # Check if referencing parallel group output + # Check if referencing parallel/for-each group output ref_parallel = match.group("parallel") - if ref_parallel and ref_parallel not in parallel_names: + if ref_parallel and ref_parallel not in group_names: is_optional = match.group("optional") == "?" if is_optional: warnings.append( @@ -577,3 +604,171 @@ def _validate_output_path_coverage(config: WorkflowConfig) -> list[str]: ) return warnings + + +def _collect_template_strings( + agent: AgentDef, +) -> list[tuple[str, str]]: + """Collect all Jinja2 template strings from an agent definition. + + Returns: + List of (source_label, template_string) tuples for error reporting. + """ + + templates: list[tuple[str, str]] = [] + + if agent.prompt: + templates.append((f"agent '{agent.name}' prompt", agent.prompt)) + if agent.system_prompt: + templates.append((f"agent '{agent.name}' system_prompt", agent.system_prompt)) + if agent.command: + templates.append((f"agent '{agent.name}' command", agent.command)) + for i, arg in enumerate(agent.args): + templates.append((f"agent '{agent.name}' args[{i}]", arg)) + input_mapping: dict[str, str] | None = getattr(agent, "input_mapping", None) + if input_mapping: + for key, expr in input_mapping.items(): + templates.append((f"agent '{agent.name}' input_mapping.{key}", expr)) + if agent.working_dir: + templates.append((f"agent '{agent.name}' working_dir", agent.working_dir)) + + return templates + + +def _resolve_prompt_file(agent_name: str, prompt: str, workflow_path: Path | None) -> str | None: + """If a prompt looks like it came from a !file tag, try to load the file content. + + The !file tag is already resolved during YAML loading, so the prompt field + contains the file content. This function is for scanning prompts that may + have been loaded from files for additional template references. + + Returns the prompt as-is (it's already resolved by the loader). + """ + return prompt if prompt else None + + +def _validate_template_references( + config: WorkflowConfig, + workflow_path: Path | None = None, +) -> tuple[list[str], list[str]]: + """Validate Jinja2 template references across all agents. + + Checks that: + - {{ X.output.Y }} references use valid agent names + - {{ workflow.input.X }} references use declared input names + - In explicit mode, agents reference only declared inputs + + Args: + config: The WorkflowConfig to validate. + workflow_path: Optional path to the workflow file (for !file resolution). + + Returns: + Tuple of (error messages, warning messages). + """ + errors: list[str] = [] + warnings: list[str] = [] + + agent_names = {a.name for a in config.agents} + parallel_names = {pg.name for pg in config.parallel} + for_each_names = {fe.name for fe in config.for_each} + all_names = agent_names | parallel_names | for_each_names + workflow_input_names = set(config.workflow.input.keys()) + is_explicit = config.workflow.context.mode == "explicit" + + # Collect all agents including for-each inline agents + all_agents: list[tuple[AgentDef, set[str]]] = [] + for agent in config.agents: + all_agents.append((agent, all_names)) + for fe in config.for_each: + # For-each inline agents can reference the parent's agents + all_agents.append((fe.agent, all_names)) + + for agent, valid_names in all_agents: + templates = _collect_template_strings(agent) + + # Extract declared input references for explicit mode checks + declared_agents: set[str] = set() + declared_workflow_inputs: set[str] = set() + has_full_workflow_input = False + for ref in agent.input: + match = INPUT_REF_PATTERN.match(ref.rstrip("?")) + if match: + ref_agent = match.group("agent") + if ref_agent: + declared_agents.add(ref_agent) + ref_parallel = match.group("parallel") + if ref_parallel: + declared_agents.add(ref_parallel) + ref_input = match.group("input") + if ref_input: + declared_workflow_inputs.add(ref_input) + # Check for bare "workflow.input" (all inputs) + clean_ref = ref.rstrip("?") + if clean_ref == "workflow.input": + has_full_workflow_input = True + + for source, template in templates: + # Check agent/group output references + for match in _TEMPLATE_REF_PATTERN.finditer(template): + ref_name = match.group(1) + if ref_name in _BUILTIN_NAMES: + continue + + if ref_name not in valid_names: + errors.append( + f"{source} references unknown agent '{ref_name}'. " + f"Available: {', '.join(sorted(valid_names))}" + ) + elif ( + is_explicit + and agent.type not in ("script", "workflow") + and ref_name not in declared_agents + ): + # In explicit mode, LLM agents should declare their inputs + warnings.append( + f"{source} references '{ref_name}.output' but " + f"agent '{agent.name}' does not declare '{ref_name}.output' " + f"in its input: list (explicit context mode)" + ) + + # Check workflow.input references + for match in _WORKFLOW_INPUT_REF_PATTERN.finditer(template): + input_name = match.group(1) + if workflow_input_names and input_name not in workflow_input_names: + # Only error when inputs ARE declared — workflows without + # input: blocks may use workflow.input conditionally + errors.append( + f"{source} references unknown workflow input '{input_name}'. " + f"Declared inputs: {', '.join(sorted(workflow_input_names))}" + ) + elif ( + is_explicit + and agent.type not in ("script", "workflow") + and not has_full_workflow_input + and input_name not in declared_workflow_inputs + ): + warnings.append( + f"{source} references 'workflow.input.{input_name}' but " + f"agent '{agent.name}' does not declare " + f"'workflow.input.{input_name}' in its input: list " + f"(explicit context mode)" + ) + + # Check workflow output templates + if config.output: + for field, template in config.output.items(): + for match in _TEMPLATE_REF_PATTERN.finditer(template): + ref_name = match.group(1) + if ref_name not in _BUILTIN_NAMES and ref_name not in all_names: + errors.append( + f"Workflow output '{field}' references unknown agent '{ref_name}'" + ) + for match in _WORKFLOW_INPUT_REF_PATTERN.finditer(template): + input_name = match.group(1) + if input_name not in workflow_input_names: + errors.append( + f"Workflow output '{field}' references unknown " + f"workflow input '{input_name}'" + ) + + return errors, warnings