diff --git a/src/conductor/config/schema.py b/src/conductor/config/schema.py index 527c50b..7f73d84 100644 --- a/src/conductor/config/schema.py +++ b/src/conductor/config/schema.py @@ -475,6 +475,24 @@ class AgentDef(BaseModel): workflow: ./research-pipeline.yaml """ + input_mapping: dict[str, str] | None = None + """Optional mapping of sub-workflow input names to Jinja2 expressions. + + Each key is a sub-workflow input parameter name. Each value is a Jinja2 + template expression evaluated against the parent workflow's context. + + When present, the rendered values are passed as the sub-workflow's inputs + instead of forwarding the parent's workflow.input.* values. + + Only valid for type='workflow' agents. + + Example:: + + input_mapping: + work_item_id: "{{ task_manager.output.current_issue_id }}" + title: "{{ task_manager.output.current_issue_title }}" + """ + max_session_seconds: float | None = Field(None, ge=1.0) """Maximum wall-clock duration for this agent's session in seconds. @@ -529,6 +547,8 @@ def validate_agent_type(self) -> AgentDef: raise ValueError("human_gate agents require 'options'") if not self.prompt: raise ValueError("human_gate agents require 'prompt'") + if self.input_mapping is not None: + raise ValueError("human_gate agents cannot have 'input_mapping'") elif self.type == "script": if not self.command: raise ValueError("script agents require 'command'") @@ -555,6 +575,8 @@ def validate_agent_type(self) -> AgentDef: raise ValueError("script agents cannot have 'max_agent_iterations'") if self.retry is not None: raise ValueError("script agents cannot have 'retry'") + if self.input_mapping is not None: + raise ValueError("script agents cannot have 'input_mapping'") elif self.type == "workflow": if not self.workflow: raise ValueError("workflow agents require 'workflow' path") @@ -578,6 +600,13 @@ def validate_agent_type(self) -> AgentDef: raise ValueError("workflow agents cannot have 'max_agent_iterations'") if self.retry is not None: raise ValueError("workflow agents cannot have 'retry'") + else: + # Regular agent or human_gate — input_mapping is not valid + if self.input_mapping is not None: + raise ValueError( + f"'{self.type or 'agent'}' agents cannot have 'input_mapping' " + "(only workflow agents support input_mapping)" + ) return self diff --git a/src/conductor/engine/workflow.py b/src/conductor/engine/workflow.py index 484192b..be9f70b 100644 --- a/src/conductor/engine/workflow.py +++ b/src/conductor/engine/workflow.py @@ -545,11 +545,27 @@ async def _execute_subworkflow( ) from exc # Build sub-workflow inputs from the parent context - # Extract workflow.input.* values from the parent context - workflow_ctx = context.get("workflow", {}) - sub_inputs: dict[str, Any] = ( - dict(workflow_ctx.get("input", {})) if isinstance(workflow_ctx, dict) else {} - ) + sub_inputs: dict[str, Any] + if agent.input_mapping is not None: + # Dynamic inputs: render each Jinja2 expression against parent context + renderer = TemplateRenderer() + sub_inputs = {} + for key, template_expr in agent.input_mapping.items(): + try: + rendered = renderer.render(template_expr, context) + except Exception as e: + raise ExecutionError( + f"Failed to render input_mapping key '{key}' for agent '{agent.name}': {e}", + suggestion=f"Check that the expression '{template_expr}' " + "references valid context variables.", + ) from e + sub_inputs[key] = rendered + else: + # Default: forward parent's workflow.input.* values + workflow_ctx = context.get("workflow", {}) + sub_inputs = ( + dict(workflow_ctx.get("input", {})) if isinstance(workflow_ctx, dict) else {} + ) # Create child engine inheriting provider/registry but with deeper depth child_engine = WorkflowEngine( diff --git a/tests/test_config/test_workflow_type_schema.py b/tests/test_config/test_workflow_type_schema.py index 995c21c..e5036fb 100644 --- a/tests/test_config/test_workflow_type_schema.py +++ b/tests/test_config/test_workflow_type_schema.py @@ -284,3 +284,58 @@ def test_workflow_with_routes_to_agents(self) -> None: # Should not raise warnings = validate_workflow_config(config) assert isinstance(warnings, list) + + +class TestInputMapping: + """Tests for input_mapping on workflow agents.""" + + def test_valid_input_mapping(self) -> None: + """Test that input_mapping is accepted on workflow agents.""" + agent = AgentDef( + name="sub_wf", + type="workflow", + workflow="./sub.yaml", + input_mapping={ + "work_item_id": "{{ intake.output.epic_id }}", + "title": "{{ intake.output.epic_title }}", + }, + ) + assert agent.input_mapping is not None + assert len(agent.input_mapping) == 2 + + def test_workflow_without_input_mapping(self) -> None: + """Test that workflow agents work without input_mapping (backward compat).""" + agent = AgentDef(name="sub_wf", type="workflow", workflow="./sub.yaml") + assert agent.input_mapping is None + + def test_input_mapping_on_regular_agent_raises(self) -> None: + """Test that input_mapping on a regular agent raises ValidationError.""" + with pytest.raises(ValidationError, match="input_mapping"): + AgentDef( + name="regular", + prompt="do something", + input_mapping={"key": "{{ value }}"}, + ) + + def test_input_mapping_on_human_gate_raises(self) -> None: + """Test that input_mapping on a human_gate raises ValidationError.""" + with pytest.raises(ValidationError, match="input_mapping"): + AgentDef( + name="gate", + type="human_gate", + prompt="Choose", + options=[ + GateOption(label="Yes", value="yes", route="next"), + ], + input_mapping={"key": "{{ value }}"}, + ) + + def test_input_mapping_on_script_raises(self) -> None: + """Test that input_mapping on a script agent raises ValidationError.""" + with pytest.raises(ValidationError, match="input_mapping"): + AgentDef( + name="script", + type="script", + command="echo hi", + input_mapping={"key": "{{ value }}"}, + ) diff --git a/tests/test_engine/test_subworkflow.py b/tests/test_engine/test_subworkflow.py index 6ae8c13..f5bf59a 100644 --- a/tests/test_engine/test_subworkflow.py +++ b/tests/test_engine/test_subworkflow.py @@ -525,3 +525,418 @@ def mock_handler(agent, prompt, context): await engine.run({}) assert engine.limits.current_iteration == 1 + + +class TestSubWorkflowInputMapping: + """Tests for input_mapping on sub-workflow agents.""" + + @pytest.mark.asyncio + async def test_input_mapping_renders_expressions(self, tmp_workflow_dir: Path) -> None: + """Test that input_mapping Jinja2 expressions are rendered and passed as strings.""" + _write_yaml( + tmp_workflow_dir / "sub.yaml", + """\ + workflow: + name: sub-wf + entry_point: inner + runtime: + provider: copilot + input: + item_id: + type: string + required: true + title: + type: string + required: true + limits: + max_iterations: 5 + agents: + - name: inner + prompt: "Work on {{ workflow.input.item_id }}: {{ workflow.input.title }}" + routes: + - to: "$end" + output: + result: "{{ inner.output.result }}" + """, + ) + + parent_path = tmp_workflow_dir / "parent.yaml" + parent_path.write_text("dummy", encoding="utf-8") + + config = WorkflowConfig( + workflow=WorkflowDef( + name="parent", + entry_point="setup", + runtime=RuntimeConfig(provider="copilot"), + context=ContextConfig(mode="accumulate"), + limits=LimitsConfig(max_iterations=10), + ), + agents=[ + AgentDef( + name="setup", + prompt="Setup", + routes=[RouteDef(to="sub_wf")], + ), + AgentDef( + name="sub_wf", + type="workflow", + workflow="sub.yaml", + input_mapping={ + "item_id": "{{ setup.output.id }}", + "title": "{{ setup.output.name }}", + }, + routes=[RouteDef(to="$end")], + ), + ], + output={"result": "{{ sub_wf.output.result }}"}, + ) + + received_prompts: list[str] = [] + + def mock_handler(agent, prompt, context): + received_prompts.append(prompt) + if agent.name == "setup": + return {"id": "42", "name": "Fix the bug"} + return {"result": "done"} + + provider = CopilotProvider(mock_handler=mock_handler) + engine = WorkflowEngine(config, provider, workflow_path=parent_path) + result = await engine.run({}) + + assert result["result"] == "done" + # The inner agent should have received the mapped values + assert any("42" in p and "Fix the bug" in p for p in received_prompts) + + @pytest.mark.asyncio + async def test_input_mapping_values_are_strings(self, tmp_workflow_dir: Path) -> None: + """Test that input_mapping passes values as strings (no json.loads coercion). + + The rendered template values are always strings when entering the child + workflow. Output template rendering may coerce them further, so we verify + via the prompt the child agent actually receives. + """ + _write_yaml( + tmp_workflow_dir / "sub.yaml", + """\ + workflow: + name: sub-wf + entry_point: inner + runtime: + provider: copilot + input: + count: + type: string + required: true + flag: + type: string + required: true + limits: + max_iterations: 5 + agents: + - name: inner + prompt: "Count={{ workflow.input.count }} Flag={{ workflow.input.flag }}" + routes: + - to: "$end" + output: + result: "{{ inner.output.result }}" + """, + ) + + parent_path = tmp_workflow_dir / "parent.yaml" + parent_path.write_text("dummy", encoding="utf-8") + + config = WorkflowConfig( + workflow=WorkflowDef( + name="parent", + entry_point="setup", + runtime=RuntimeConfig(provider="copilot"), + context=ContextConfig(mode="accumulate"), + limits=LimitsConfig(max_iterations=10), + ), + agents=[ + AgentDef( + name="setup", + prompt="Setup", + routes=[RouteDef(to="sub_wf")], + ), + AgentDef( + name="sub_wf", + type="workflow", + workflow="sub.yaml", + input_mapping={ + "count": "{{ setup.output.num }}", + "flag": "{{ setup.output.active }}", + }, + routes=[RouteDef(to="$end")], + ), + ], + output={"result": "{{ sub_wf.output.result }}"}, + ) + + received_prompts: list[str] = [] + + def mock_handler(agent, prompt, context): + received_prompts.append(prompt) + if agent.name == "setup": + return {"num": "42", "active": "true"} + return {"result": "ok"} + + provider = CopilotProvider(mock_handler=mock_handler) + engine = WorkflowEngine(config, provider, workflow_path=parent_path) + await engine.run({}) + + # The child's inner agent should see the string values rendered into the prompt + inner_prompt = [p for p in received_prompts if "Count=" in p][0] + assert "Count=42" in inner_prompt + assert "Flag=true" in inner_prompt + + @pytest.mark.asyncio + async def test_no_input_mapping_forwards_parent_inputs(self, tmp_workflow_dir: Path) -> None: + """Test backward compat: no input_mapping forwards parent workflow.input.*.""" + _write_yaml( + tmp_workflow_dir / "sub.yaml", + """\ + workflow: + name: sub-wf + entry_point: inner + runtime: + provider: copilot + input: + topic: + type: string + required: false + default: "default" + limits: + max_iterations: 5 + agents: + - name: inner + prompt: "Work on {{ workflow.input.topic }}" + routes: + - to: "$end" + output: + topic: "{{ workflow.input.topic }}" + """, + ) + + parent_path = tmp_workflow_dir / "parent.yaml" + parent_path.write_text("dummy", encoding="utf-8") + + config = WorkflowConfig( + workflow=WorkflowDef( + name="parent", + entry_point="sub_wf", + runtime=RuntimeConfig(provider="copilot"), + context=ContextConfig(mode="accumulate"), + limits=LimitsConfig(max_iterations=10), + ), + agents=[ + AgentDef( + name="sub_wf", + type="workflow", + workflow="sub.yaml", + # No input_mapping — should forward parent's workflow.input.* + routes=[RouteDef(to="$end")], + ), + ], + output={"topic": "{{ sub_wf.output.topic }}"}, + ) + + def mock_handler(agent, prompt, context): + return {"result": "ok"} + + provider = CopilotProvider(mock_handler=mock_handler) + engine = WorkflowEngine(config, provider, workflow_path=parent_path) + result = await engine.run({"topic": "Python"}) + + # Parent's workflow.input.topic should be forwarded to child + assert result["topic"] == "Python" + + @pytest.mark.asyncio + async def test_empty_input_mapping_passes_nothing(self, tmp_workflow_dir: Path) -> None: + """Test that input_mapping: {} means 'pass no inputs' (not default forwarding).""" + _write_yaml( + tmp_workflow_dir / "sub.yaml", + """\ + workflow: + name: sub-wf + entry_point: inner + runtime: + provider: copilot + input: + topic: + type: string + required: false + default: "fallback" + limits: + max_iterations: 5 + agents: + - name: inner + prompt: "Work on {{ workflow.input.topic }}" + routes: + - to: "$end" + output: + topic: "{{ workflow.input.topic }}" + """, + ) + + parent_path = tmp_workflow_dir / "parent.yaml" + parent_path.write_text("dummy", encoding="utf-8") + + config = WorkflowConfig( + workflow=WorkflowDef( + name="parent", + entry_point="sub_wf", + runtime=RuntimeConfig(provider="copilot"), + context=ContextConfig(mode="accumulate"), + limits=LimitsConfig(max_iterations=10), + ), + agents=[ + AgentDef( + name="sub_wf", + type="workflow", + workflow="sub.yaml", + input_mapping={}, # Explicitly empty — pass nothing + routes=[RouteDef(to="$end")], + ), + ], + output={"topic": "{{ sub_wf.output.topic }}"}, + ) + + def mock_handler(agent, prompt, context): + return {"result": "ok"} + + provider = CopilotProvider(mock_handler=mock_handler) + engine = WorkflowEngine(config, provider, workflow_path=parent_path) + result = await engine.run({"topic": "Python"}) + + # Empty input_mapping = no inputs passed, child should use its default + assert result["topic"] == "fallback" + + @pytest.mark.asyncio + async def test_input_mapping_error_includes_key_name(self, tmp_workflow_dir: Path) -> None: + """Test that template errors include the failing key name.""" + _write_yaml( + tmp_workflow_dir / "sub.yaml", + """\ + workflow: + name: sub-wf + entry_point: inner + runtime: + provider: copilot + input: + value: + type: string + required: true + limits: + max_iterations: 5 + agents: + - name: inner + prompt: "Use {{ workflow.input.value }}" + routes: + - to: "$end" + output: + result: "done" + """, + ) + + parent_path = tmp_workflow_dir / "parent.yaml" + parent_path.write_text("dummy", encoding="utf-8") + + config = WorkflowConfig( + workflow=WorkflowDef( + name="parent", + entry_point="sub_wf", + runtime=RuntimeConfig(provider="copilot"), + context=ContextConfig(mode="accumulate"), + limits=LimitsConfig(max_iterations=10), + ), + agents=[ + AgentDef( + name="sub_wf", + type="workflow", + workflow="sub.yaml", + input_mapping={ + "value": "{{ nonexistent_agent.output.missing }}", + }, + routes=[RouteDef(to="$end")], + ), + ], + ) + + mock_provider = MagicMock() + engine = WorkflowEngine(config, mock_provider, workflow_path=parent_path) + + with pytest.raises(ExecutionError, match="input_mapping key 'value'"): + await engine.run({}) + + @pytest.mark.asyncio + async def test_no_parent_context_leaks_to_child(self, tmp_workflow_dir: Path) -> None: + """Test that parent agent outputs are NOT injected into child context.""" + _write_yaml( + tmp_workflow_dir / "sub.yaml", + """\ + workflow: + name: sub-wf + entry_point: inner + runtime: + provider: copilot + input: + data: + type: string + required: true + limits: + max_iterations: 5 + agents: + - name: inner + prompt: "Use {{ workflow.input.data }}" + routes: + - to: "$end" + output: + result: "{{ inner.output.result }}" + """, + ) + + parent_path = tmp_workflow_dir / "parent.yaml" + parent_path.write_text("dummy", encoding="utf-8") + + config = WorkflowConfig( + workflow=WorkflowDef( + name="parent", + entry_point="setup", + runtime=RuntimeConfig(provider="copilot"), + context=ContextConfig(mode="accumulate"), + limits=LimitsConfig(max_iterations=10), + ), + agents=[ + AgentDef( + name="setup", + prompt="Setup", + routes=[RouteDef(to="sub_wf")], + ), + AgentDef( + name="sub_wf", + type="workflow", + workflow="sub.yaml", + input_mapping={"data": "{{ setup.output.value }}"}, + routes=[RouteDef(to="$end")], + ), + ], + output={"result": "{{ sub_wf.output.result }}"}, + ) + + child_contexts: list[dict] = [] + + def mock_handler(agent, prompt, context): + if agent.name == "setup": + return {"value": "hello"} + # Capture what the child's inner agent can see + child_contexts.append(dict(context)) + return {"result": "done"} + + provider = CopilotProvider(mock_handler=mock_handler) + engine = WorkflowEngine(config, provider, workflow_path=parent_path) + await engine.run({}) + + # Parent's "setup" agent should NOT appear in child's context + assert len(child_contexts) == 1 + assert "setup" not in child_contexts[0] diff --git a/uv.lock b/uv.lock index ac918e3..44d02b6 100644 --- a/uv.lock +++ b/uv.lock @@ -150,7 +150,7 @@ wheels = [ [[package]] name = "conductor-cli" -version = "0.1.8" +version = "0.1.9" source = { editable = "." } dependencies = [ { name = "anthropic" },