diff --git a/src/conductor/config/schema.py b/src/conductor/config/schema.py index 527c50b..fdb3a67 100644 --- a/src/conductor/config/schema.py +++ b/src/conductor/config/schema.py @@ -475,6 +475,36 @@ class AgentDef(BaseModel): workflow: ./research-pipeline.yaml """ + input_mapping: dict[str, str] | None = None + """Optional mapping of sub-workflow input names to Jinja2 expressions. + + Each key is a sub-workflow input parameter name. Each value is a Jinja2 + template expression evaluated against the parent workflow's context. + + When present, the rendered values are passed as the sub-workflow's inputs + instead of forwarding the parent's workflow.input.* values. + + Only valid for type='workflow' agents. + + Example:: + + input_mapping: + work_item_id: "{{ task_manager.output.current_issue_id }}" + title: "{{ task_manager.output.current_issue_title }}" + """ + + max_depth: int | None = Field(None, ge=1, le=10) + """Per-agent sub-workflow depth limit. + + Overrides the global MAX_SUBWORKFLOW_DEPTH (10) with a tighter bound. + Only valid for type='workflow' agents. Useful for self-referential + workflows to set an explicit recursion limit. + + Example:: + + max_depth: 3 # Allow at most 3 levels of recursion + """ + max_session_seconds: float | None = Field(None, ge=1.0) """Maximum wall-clock duration for this agent's session in seconds. @@ -529,6 +559,10 @@ def validate_agent_type(self) -> AgentDef: raise ValueError("human_gate agents require 'options'") if not self.prompt: raise ValueError("human_gate agents require 'prompt'") + if self.input_mapping: + raise ValueError("human_gate agents cannot have 'input_mapping'") + if self.max_depth is not None: + raise ValueError("human_gate agents cannot have 'max_depth'") elif self.type == "script": if not self.command: raise ValueError("script agents require 'command'") @@ -555,6 +589,10 @@ def validate_agent_type(self) -> AgentDef: raise ValueError("script agents cannot have 'max_agent_iterations'") if self.retry is not None: raise ValueError("script agents cannot have 'retry'") + if self.input_mapping: + raise ValueError("script agents cannot have 'input_mapping'") + if self.max_depth is not None: + raise ValueError("script agents cannot have 'max_depth'") elif self.type == "workflow": if not self.workflow: raise ValueError("workflow agents require 'workflow' path") @@ -578,6 +616,18 @@ def validate_agent_type(self) -> AgentDef: raise ValueError("workflow agents cannot have 'max_agent_iterations'") if self.retry is not None: raise ValueError("workflow agents cannot have 'retry'") + else: + # Regular agent or human_gate — input_mapping is not valid + if self.input_mapping: + raise ValueError( + f"'{self.type or 'agent'}' agents cannot have 'input_mapping' " + "(only workflow agents support input_mapping)" + ) + if self.max_depth is not None: + raise ValueError( + f"'{self.type or 'agent'}' agents cannot have 'max_depth' " + "(only workflow agents support max_depth)" + ) return self diff --git a/src/conductor/config/validator.py b/src/conductor/config/validator.py index 4581064..f4a0c94 100644 --- a/src/conductor/config/validator.py +++ b/src/conductor/config/validator.py @@ -111,18 +111,13 @@ def validate_workflow_config(config: WorkflowConfig) -> list[str]: parallel_errors = _validate_parallel_groups(config) errors.extend(parallel_errors) - # Validate for_each groups: reject script and workflow steps as inline agents + # Validate for_each groups: reject script steps as inline agents for for_each_group in config.for_each: if for_each_group.agent.type == "script": errors.append( f"For-each group '{for_each_group.name}' uses a script step as its " "inline agent. Script steps cannot be used in for_each groups." ) - if for_each_group.agent.type == "workflow": - errors.append( - f"For-each group '{for_each_group.name}' uses a workflow step as its " - "inline agent. Workflow steps cannot be used in for_each groups." - ) # Validate workflow output references output_errors = _validate_output_references( diff --git a/src/conductor/engine/workflow.py b/src/conductor/engine/workflow.py index 484192b..157c5ee 100644 --- a/src/conductor/engine/workflow.py +++ b/src/conductor/engine/workflow.py @@ -483,16 +483,20 @@ async def _execute_subworkflow( self, agent: AgentDef, context: dict[str, Any], + *, + sub_inputs: dict[str, Any] | None = None, ) -> dict[str, Any]: """Execute a sub-workflow as a black-box step. Loads the referenced workflow YAML, creates a child WorkflowEngine, - and runs it with the parent agent's context as input. The sub-workflow's - final output is returned as the agent's output. + and runs it. When ``sub_inputs`` are provided (e.g. from for_each with + pre-rendered input_mapping), they are used directly; otherwise inputs + are derived from ``context`` via input_mapping or forwarded workflow.input. Args: agent: Workflow agent definition with ``workflow`` path. context: Workflow context for template rendering (used as sub-workflow input). + sub_inputs: Optional pre-built inputs that bypass input_mapping rendering. Returns: The sub-workflow's final output dict. @@ -510,6 +514,14 @@ async def _execute_subworkflow( suggestion=("Check for circular sub-workflow references or reduce nesting depth."), ) + # Per-agent depth limit (stricter than global MAX_SUBWORKFLOW_DEPTH) + if agent.max_depth is not None and self._subworkflow_depth >= agent.max_depth: + raise ExecutionError( + f"Agent '{agent.name}' max_depth ({agent.max_depth}) exceeded " + f"at depth {self._subworkflow_depth}.", + suggestion="Increase max_depth or restructure to reduce nesting.", + ) + assert agent.workflow is not None # noqa: S101 # Resolve sub-workflow path relative to parent workflow file @@ -526,15 +538,6 @@ async def _execute_subworkflow( suggestion="Check that the 'workflow' path is correct and the file exists.", ) - # Detect circular references via file path - current_path = Path(self.workflow_path).resolve() if self.workflow_path else None - if current_path is not None and sub_path == current_path: - raise ExecutionError( - f"Circular sub-workflow reference: agent '{agent.name}' " - f"references its own workflow file '{agent.workflow}'.", - suggestion="A workflow cannot reference itself as a sub-workflow.", - ) - try: sub_config = load_config(sub_path) except Exception as exc: @@ -544,12 +547,31 @@ async def _execute_subworkflow( suggestion="Check the sub-workflow YAML for syntax or validation errors.", ) from exc - # Build sub-workflow inputs from the parent context - # Extract workflow.input.* values from the parent context - workflow_ctx = context.get("workflow", {}) - sub_inputs: dict[str, Any] = ( - dict(workflow_ctx.get("input", {})) if isinstance(workflow_ctx, dict) else {} - ) + # Build sub-workflow inputs — use pre-built inputs if provided, + # otherwise render from input_mapping or forward workflow.input. + if sub_inputs is None: + if agent.input_mapping is not None: + # Dynamic inputs: render each Jinja2 expression against parent context. + # Values are always passed as strings — use {{ expr | json }} filter + # if structured data is needed. + renderer = TemplateRenderer() + sub_inputs = {} + for key, template_expr in agent.input_mapping.items(): + try: + sub_inputs[key] = renderer.render(template_expr, context) + except Exception as e: + raise ExecutionError( + f"Failed to render input_mapping key '{key}' for agent " + f"'{agent.name}': {e}", + suggestion=f"Check that the expression '{template_expr}' " + "references valid context variables.", + ) from e + else: + # Default: forward parent's workflow.input.* values + workflow_ctx = context.get("workflow", {}) + sub_inputs = ( + dict(workflow_ctx.get("input", {})) if isinstance(workflow_ctx, dict) else {} + ) # Create child engine inheriting provider/registry but with deeper depth child_engine = WorkflowEngine( @@ -2587,7 +2609,52 @@ async def execute_single_item(item: Any, index: int, key: str) -> tuple[str, Any key if for_each_group.key_by else None, ) - # Execute agent with injected context (get executor for multi-provider) + # Execute agent — sub-workflow or regular + if for_each_group.agent.type == "workflow": + # Build sub-workflow inputs from input_mapping with loop vars + sub_inputs: dict[str, Any] | None = None + if for_each_group.agent.input_mapping is not None: + renderer = TemplateRenderer() + sub_inputs = {} + for k, tmpl in for_each_group.agent.input_mapping.items(): + try: + sub_inputs[k] = renderer.render(tmpl, agent_context) + except Exception as e: + raise ExecutionError( + f"Failed to render input_mapping key '{k}' for " + f"for_each agent '{for_each_group.name}': {e}", + suggestion=f"Check that the expression '{tmpl}' " + "references valid context variables.", + ) from e + + # Execute sub-workflow + self._emit( + "subworkflow_started", + { + "agent_name": for_each_group.name, + "item_key": key, + "workflow": for_each_group.agent.workflow, + }, + ) + output_content = await self._execute_subworkflow( + for_each_group.agent, agent_context, sub_inputs=sub_inputs + ) + _item_elapsed = _time.time() - _item_start + + self._emit( + "for_each_item_completed", + { + "group_name": for_each_group.name, + "item_key": key, + "elapsed": _item_elapsed, + "tokens": 0, + "cost_usd": 0.0, + "output": output_content, + }, + ) + return (key, output_content) + + # Regular agent execution executor = await self._get_executor_for_agent(for_each_group.agent) # Item-scoped event callback that tags all streaming events with item_key diff --git a/tests/test_config/test_workflow_type_schema.py b/tests/test_config/test_workflow_type_schema.py index 995c21c..a1ad66d 100644 --- a/tests/test_config/test_workflow_type_schema.py +++ b/tests/test_config/test_workflow_type_schema.py @@ -201,8 +201,8 @@ def test_workflow_in_parallel_group_raises(self) -> None: class TestWorkflowInForEach: """Tests for workflow agents in for_each groups.""" - def test_workflow_in_for_each_raises(self) -> None: - """Test that workflow step in for_each inline agent raises ConfigurationError.""" + def test_workflow_in_for_each_validates(self) -> None: + """Test that workflow step in for_each inline agent validates successfully.""" config = WorkflowConfig( workflow=WorkflowDef( name="test", @@ -227,8 +227,9 @@ def test_workflow_in_for_each_raises(self) -> None: ), ], ) - with pytest.raises(ConfigurationError, match="Workflow steps cannot be used in for_each"): - validate_workflow_config(config) + # Should not raise — workflow agents are now allowed in for_each + warnings = validate_workflow_config(config) + assert isinstance(warnings, list) class TestWorkflowWorkflowConfig: @@ -284,3 +285,58 @@ def test_workflow_with_routes_to_agents(self) -> None: # Should not raise warnings = validate_workflow_config(config) assert isinstance(warnings, list) + + +class TestInputMapping: + """Tests for input_mapping on workflow agents.""" + + def test_valid_input_mapping(self) -> None: + """Test that input_mapping is accepted on workflow agents.""" + agent = AgentDef( + name="sub_wf", + type="workflow", + workflow="./sub.yaml", + input_mapping={ + "work_item_id": "{{ intake.output.epic_id }}", + "title": "{{ intake.output.epic_title }}", + }, + ) + assert agent.input_mapping is not None + assert len(agent.input_mapping) == 2 + + def test_workflow_without_input_mapping(self) -> None: + """Test that workflow agents work without input_mapping (backward compat).""" + agent = AgentDef(name="sub_wf", type="workflow", workflow="./sub.yaml") + assert agent.input_mapping is None + + def test_input_mapping_on_regular_agent_raises(self) -> None: + """Test that input_mapping on a regular agent raises ValidationError.""" + with pytest.raises(ValidationError, match="input_mapping"): + AgentDef( + name="regular", + prompt="do something", + input_mapping={"key": "{{ value }}"}, + ) + + def test_input_mapping_on_human_gate_raises(self) -> None: + """Test that input_mapping on a human_gate raises ValidationError.""" + with pytest.raises(ValidationError, match="input_mapping"): + AgentDef( + name="gate", + type="human_gate", + prompt="Choose", + options=[ + GateOption(label="Yes", value="yes", route="next"), + ], + input_mapping={"key": "{{ value }}"}, + ) + + def test_input_mapping_on_script_raises(self) -> None: + """Test that input_mapping on a script agent raises ValidationError.""" + with pytest.raises(ValidationError, match="input_mapping"): + AgentDef( + name="script", + type="script", + command="echo hi", + input_mapping={"key": "{{ value }}"}, + ) diff --git a/tests/test_engine/test_subworkflow.py b/tests/test_engine/test_subworkflow.py index 6ae8c13..51625e6 100644 --- a/tests/test_engine/test_subworkflow.py +++ b/tests/test_engine/test_subworkflow.py @@ -268,10 +268,79 @@ async def test_subworkflow_file_not_found(self, tmp_workflow_dir: Path) -> None: await engine.run({}) @pytest.mark.asyncio - async def test_self_referencing_workflow(self, tmp_workflow_dir: Path) -> None: - """Test that a workflow referencing itself raises ExecutionError.""" + async def test_self_referencing_workflow_hits_depth_limit(self, tmp_workflow_dir: Path) -> None: + """Test that a self-referencing workflow is allowed but bounded by depth limit.""" + # Write a real self-referencing workflow YAML parent_path = tmp_workflow_dir / "parent.yaml" - parent_path.write_text("dummy", encoding="utf-8") + _write_yaml( + parent_path, + """\ + workflow: + name: self-ref + entry_point: sub_wf + runtime: + provider: copilot + limits: + max_iterations: 50 + agents: + - name: sub_wf + type: workflow + workflow: parent.yaml + routes: + - to: "$end" + output: {} + """, + ) + + config = WorkflowConfig( + workflow=WorkflowDef( + name="parent", + entry_point="sub_wf", + runtime=RuntimeConfig(provider="copilot"), + context=ContextConfig(mode="accumulate"), + limits=LimitsConfig(max_iterations=10), + ), + agents=[ + AgentDef( + name="sub_wf", + type="workflow", + workflow="parent.yaml", + routes=[RouteDef(to="$end")], + ), + ], + ) + + mock_provider = MagicMock() + engine = WorkflowEngine(config, mock_provider, workflow_path=parent_path) + + # Self-reference is now allowed but will hit depth limit + with pytest.raises(ExecutionError, match="depth limit exceeded"): + await engine.run({}) + + @pytest.mark.asyncio + async def test_max_depth_per_agent(self, tmp_workflow_dir: Path) -> None: + """Test that per-agent max_depth is enforced before global limit.""" + parent_path = tmp_workflow_dir / "parent.yaml" + _write_yaml( + parent_path, + """\ + workflow: + name: self-ref + entry_point: sub_wf + runtime: + provider: copilot + limits: + max_iterations: 50 + agents: + - name: sub_wf + type: workflow + workflow: parent.yaml + max_depth: 2 + routes: + - to: "$end" + output: {} + """, + ) config = WorkflowConfig( workflow=WorkflowDef( @@ -286,6 +355,7 @@ async def test_self_referencing_workflow(self, tmp_workflow_dir: Path) -> None: name="sub_wf", type="workflow", workflow="parent.yaml", + max_depth=2, routes=[RouteDef(to="$end")], ), ], @@ -294,7 +364,7 @@ async def test_self_referencing_workflow(self, tmp_workflow_dir: Path) -> None: mock_provider = MagicMock() engine = WorkflowEngine(config, mock_provider, workflow_path=parent_path) - with pytest.raises(ExecutionError, match="Circular sub-workflow reference"): + with pytest.raises(ExecutionError, match="max_depth.*exceeded"): await engine.run({}) diff --git a/uv.lock b/uv.lock index ac918e3..44d02b6 100644 --- a/uv.lock +++ b/uv.lock @@ -150,7 +150,7 @@ wheels = [ [[package]] name = "conductor-cli" -version = "0.1.8" +version = "0.1.9" source = { editable = "." } dependencies = [ { name = "anthropic" },