diff --git a/src/conductor/config/schema.py b/src/conductor/config/schema.py index 527c50b..da05b48 100644 --- a/src/conductor/config/schema.py +++ b/src/conductor/config/schema.py @@ -475,6 +475,24 @@ class AgentDef(BaseModel): workflow: ./research-pipeline.yaml """ + input_mapping: dict[str, str] | None = None + """Optional mapping of sub-workflow input names to Jinja2 expressions. + + Each key is a sub-workflow input parameter name. Each value is a Jinja2 + template expression evaluated against the parent workflow's context. + + When present, the rendered values are passed as the sub-workflow's inputs + instead of forwarding the parent's workflow.input.* values. + + Only valid for type='workflow' agents. + + Example:: + + input_mapping: + work_item_id: "{{ task_manager.output.current_issue_id }}" + title: "{{ task_manager.output.current_issue_title }}" + """ + max_session_seconds: float | None = Field(None, ge=1.0) """Maximum wall-clock duration for this agent's session in seconds. @@ -529,6 +547,8 @@ def validate_agent_type(self) -> AgentDef: raise ValueError("human_gate agents require 'options'") if not self.prompt: raise ValueError("human_gate agents require 'prompt'") + if self.input_mapping: + raise ValueError("human_gate agents cannot have 'input_mapping'") elif self.type == "script": if not self.command: raise ValueError("script agents require 'command'") @@ -555,6 +575,8 @@ def validate_agent_type(self) -> AgentDef: raise ValueError("script agents cannot have 'max_agent_iterations'") if self.retry is not None: raise ValueError("script agents cannot have 'retry'") + if self.input_mapping: + raise ValueError("script agents cannot have 'input_mapping'") elif self.type == "workflow": if not self.workflow: raise ValueError("workflow agents require 'workflow' path") @@ -578,6 +600,13 @@ def validate_agent_type(self) -> AgentDef: raise ValueError("workflow agents cannot have 'max_agent_iterations'") if self.retry is not None: raise ValueError("workflow agents cannot have 'retry'") + else: + # Regular agent or human_gate — input_mapping is not valid + if self.input_mapping: + raise ValueError( + f"'{self.type or 'agent'}' agents cannot have 'input_mapping' " + "(only workflow agents support input_mapping)" + ) return self diff --git a/src/conductor/config/validator.py b/src/conductor/config/validator.py index 4581064..f4a0c94 100644 --- a/src/conductor/config/validator.py +++ b/src/conductor/config/validator.py @@ -111,18 +111,13 @@ def validate_workflow_config(config: WorkflowConfig) -> list[str]: parallel_errors = _validate_parallel_groups(config) errors.extend(parallel_errors) - # Validate for_each groups: reject script and workflow steps as inline agents + # Validate for_each groups: reject script steps as inline agents for for_each_group in config.for_each: if for_each_group.agent.type == "script": errors.append( f"For-each group '{for_each_group.name}' uses a script step as its " "inline agent. Script steps cannot be used in for_each groups." ) - if for_each_group.agent.type == "workflow": - errors.append( - f"For-each group '{for_each_group.name}' uses a workflow step as its " - "inline agent. Workflow steps cannot be used in for_each groups." - ) # Validate workflow output references output_errors = _validate_output_references( diff --git a/src/conductor/engine/workflow.py b/src/conductor/engine/workflow.py index 484192b..44f1db4 100644 --- a/src/conductor/engine/workflow.py +++ b/src/conductor/engine/workflow.py @@ -483,16 +483,20 @@ async def _execute_subworkflow( self, agent: AgentDef, context: dict[str, Any], + *, + sub_inputs: dict[str, Any] | None = None, ) -> dict[str, Any]: """Execute a sub-workflow as a black-box step. Loads the referenced workflow YAML, creates a child WorkflowEngine, - and runs it with the parent agent's context as input. The sub-workflow's - final output is returned as the agent's output. + and runs it. When ``sub_inputs`` are provided (e.g. from for_each with + pre-rendered input_mapping), they are used directly; otherwise inputs + are derived from ``context`` via input_mapping or forwarded workflow.input. Args: agent: Workflow agent definition with ``workflow`` path. context: Workflow context for template rendering (used as sub-workflow input). + sub_inputs: Optional pre-built inputs that bypass input_mapping rendering. Returns: The sub-workflow's final output dict. @@ -544,12 +548,31 @@ async def _execute_subworkflow( suggestion="Check the sub-workflow YAML for syntax or validation errors.", ) from exc - # Build sub-workflow inputs from the parent context - # Extract workflow.input.* values from the parent context - workflow_ctx = context.get("workflow", {}) - sub_inputs: dict[str, Any] = ( - dict(workflow_ctx.get("input", {})) if isinstance(workflow_ctx, dict) else {} - ) + # Build sub-workflow inputs — use pre-built inputs if provided, + # otherwise render from input_mapping or forward workflow.input. + if sub_inputs is None: + if agent.input_mapping is not None: + # Dynamic inputs: render each Jinja2 expression against parent context. + # Values are always passed as strings — use {{ expr | json }} in the + # template if structured data is needed. + renderer = TemplateRenderer() + sub_inputs = {} + for key, template_expr in agent.input_mapping.items(): + try: + sub_inputs[key] = renderer.render(template_expr, context) + except Exception as e: + raise ExecutionError( + f"Failed to render input_mapping key '{key}' for agent " + f"'{agent.name}': {e}", + suggestion=f"Check that the expression '{template_expr}' " + "references valid context variables.", + ) from e + else: + # Default: forward parent's workflow.input.* values + workflow_ctx = context.get("workflow", {}) + sub_inputs = ( + dict(workflow_ctx.get("input", {})) if isinstance(workflow_ctx, dict) else {} + ) # Create child engine inheriting provider/registry but with deeper depth child_engine = WorkflowEngine( @@ -2587,7 +2610,52 @@ async def execute_single_item(item: Any, index: int, key: str) -> tuple[str, Any key if for_each_group.key_by else None, ) - # Execute agent with injected context (get executor for multi-provider) + # Execute agent — sub-workflow or regular + if for_each_group.agent.type == "workflow": + # Build sub-workflow inputs from input_mapping with loop vars + sub_inputs: dict[str, Any] | None = None + if for_each_group.agent.input_mapping is not None: + renderer = TemplateRenderer() + sub_inputs = {} + for k, tmpl in for_each_group.agent.input_mapping.items(): + try: + sub_inputs[k] = renderer.render(tmpl, agent_context) + except Exception as e: + raise ExecutionError( + f"Failed to render input_mapping key '{k}' for " + f"for_each agent '{for_each_group.name}': {e}", + suggestion=f"Check that the expression '{tmpl}' " + "references valid context variables.", + ) from e + + # Execute sub-workflow + self._emit( + "subworkflow_started", + { + "agent_name": for_each_group.name, + "item_key": key, + "workflow": for_each_group.agent.workflow, + }, + ) + output_content = await self._execute_subworkflow( + for_each_group.agent, agent_context, sub_inputs=sub_inputs + ) + _item_elapsed = _time.time() - _item_start + + self._emit( + "for_each_item_completed", + { + "group_name": for_each_group.name, + "item_key": key, + "elapsed": _item_elapsed, + "tokens": 0, + "cost_usd": 0.0, + "output": output_content, + }, + ) + return (key, output_content) + + # Regular agent execution executor = await self._get_executor_for_agent(for_each_group.agent) # Item-scoped event callback that tags all streaming events with item_key diff --git a/tests/test_config/test_workflow_type_schema.py b/tests/test_config/test_workflow_type_schema.py index 995c21c..a1ad66d 100644 --- a/tests/test_config/test_workflow_type_schema.py +++ b/tests/test_config/test_workflow_type_schema.py @@ -201,8 +201,8 @@ def test_workflow_in_parallel_group_raises(self) -> None: class TestWorkflowInForEach: """Tests for workflow agents in for_each groups.""" - def test_workflow_in_for_each_raises(self) -> None: - """Test that workflow step in for_each inline agent raises ConfigurationError.""" + def test_workflow_in_for_each_validates(self) -> None: + """Test that workflow step in for_each inline agent validates successfully.""" config = WorkflowConfig( workflow=WorkflowDef( name="test", @@ -227,8 +227,9 @@ def test_workflow_in_for_each_raises(self) -> None: ), ], ) - with pytest.raises(ConfigurationError, match="Workflow steps cannot be used in for_each"): - validate_workflow_config(config) + # Should not raise — workflow agents are now allowed in for_each + warnings = validate_workflow_config(config) + assert isinstance(warnings, list) class TestWorkflowWorkflowConfig: @@ -284,3 +285,58 @@ def test_workflow_with_routes_to_agents(self) -> None: # Should not raise warnings = validate_workflow_config(config) assert isinstance(warnings, list) + + +class TestInputMapping: + """Tests for input_mapping on workflow agents.""" + + def test_valid_input_mapping(self) -> None: + """Test that input_mapping is accepted on workflow agents.""" + agent = AgentDef( + name="sub_wf", + type="workflow", + workflow="./sub.yaml", + input_mapping={ + "work_item_id": "{{ intake.output.epic_id }}", + "title": "{{ intake.output.epic_title }}", + }, + ) + assert agent.input_mapping is not None + assert len(agent.input_mapping) == 2 + + def test_workflow_without_input_mapping(self) -> None: + """Test that workflow agents work without input_mapping (backward compat).""" + agent = AgentDef(name="sub_wf", type="workflow", workflow="./sub.yaml") + assert agent.input_mapping is None + + def test_input_mapping_on_regular_agent_raises(self) -> None: + """Test that input_mapping on a regular agent raises ValidationError.""" + with pytest.raises(ValidationError, match="input_mapping"): + AgentDef( + name="regular", + prompt="do something", + input_mapping={"key": "{{ value }}"}, + ) + + def test_input_mapping_on_human_gate_raises(self) -> None: + """Test that input_mapping on a human_gate raises ValidationError.""" + with pytest.raises(ValidationError, match="input_mapping"): + AgentDef( + name="gate", + type="human_gate", + prompt="Choose", + options=[ + GateOption(label="Yes", value="yes", route="next"), + ], + input_mapping={"key": "{{ value }}"}, + ) + + def test_input_mapping_on_script_raises(self) -> None: + """Test that input_mapping on a script agent raises ValidationError.""" + with pytest.raises(ValidationError, match="input_mapping"): + AgentDef( + name="script", + type="script", + command="echo hi", + input_mapping={"key": "{{ value }}"}, + ) diff --git a/uv.lock b/uv.lock index ac918e3..44d02b6 100644 --- a/uv.lock +++ b/uv.lock @@ -150,7 +150,7 @@ wheels = [ [[package]] name = "conductor-cli" -version = "0.1.8" +version = "0.1.9" source = { editable = "." } dependencies = [ { name = "anthropic" },