From 4fb799865525b0701aabf07d438e4031d5a9bac3 Mon Sep 17 00:00:00 2001 From: Daniel Green Date: Mon, 20 Apr 2026 14:39:30 -0700 Subject: [PATCH 1/4] feat(composition): dynamic sub-workflow inputs via input_mapping (#101) Add input_mapping field to AgentDef for type='workflow' agents. When present, each value is a Jinja2 expression rendered against the parent context to build sub-workflow inputs. When absent, existing behavior (forwarding parent's workflow.input.*) is preserved. - Schema: Add input_mapping to AgentDef with validation for workflow-only - Engine: Render input_mapping templates in _execute_subworkflow() - Tests: Schema validation for all agent types - Experimental workflows: test-input-mapping parent/child pair Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- src/conductor/config/schema.py | 29 ++++++++++ src/conductor/engine/workflow.py | 23 ++++++-- .../test_config/test_workflow_type_schema.py | 55 +++++++++++++++++++ uv.lock | 2 +- 4 files changed, 103 insertions(+), 6 deletions(-) diff --git a/src/conductor/config/schema.py b/src/conductor/config/schema.py index 527c50b..da05b48 100644 --- a/src/conductor/config/schema.py +++ b/src/conductor/config/schema.py @@ -475,6 +475,24 @@ class AgentDef(BaseModel): workflow: ./research-pipeline.yaml """ + input_mapping: dict[str, str] | None = None + """Optional mapping of sub-workflow input names to Jinja2 expressions. + + Each key is a sub-workflow input parameter name. Each value is a Jinja2 + template expression evaluated against the parent workflow's context. + + When present, the rendered values are passed as the sub-workflow's inputs + instead of forwarding the parent's workflow.input.* values. + + Only valid for type='workflow' agents. + + Example:: + + input_mapping: + work_item_id: "{{ task_manager.output.current_issue_id }}" + title: "{{ task_manager.output.current_issue_title }}" + """ + max_session_seconds: float | None = Field(None, ge=1.0) """Maximum wall-clock duration for this agent's session in seconds. @@ -529,6 +547,8 @@ def validate_agent_type(self) -> AgentDef: raise ValueError("human_gate agents require 'options'") if not self.prompt: raise ValueError("human_gate agents require 'prompt'") + if self.input_mapping: + raise ValueError("human_gate agents cannot have 'input_mapping'") elif self.type == "script": if not self.command: raise ValueError("script agents require 'command'") @@ -555,6 +575,8 @@ def validate_agent_type(self) -> AgentDef: raise ValueError("script agents cannot have 'max_agent_iterations'") if self.retry is not None: raise ValueError("script agents cannot have 'retry'") + if self.input_mapping: + raise ValueError("script agents cannot have 'input_mapping'") elif self.type == "workflow": if not self.workflow: raise ValueError("workflow agents require 'workflow' path") @@ -578,6 +600,13 @@ def validate_agent_type(self) -> AgentDef: raise ValueError("workflow agents cannot have 'max_agent_iterations'") if self.retry is not None: raise ValueError("workflow agents cannot have 'retry'") + else: + # Regular agent or human_gate — input_mapping is not valid + if self.input_mapping: + raise ValueError( + f"'{self.type or 'agent'}' agents cannot have 'input_mapping' " + "(only workflow agents support input_mapping)" + ) return self diff --git a/src/conductor/engine/workflow.py b/src/conductor/engine/workflow.py index 484192b..9dd9bc9 100644 --- a/src/conductor/engine/workflow.py +++ b/src/conductor/engine/workflow.py @@ -545,11 +545,24 @@ async def _execute_subworkflow( ) from exc # Build sub-workflow inputs from the parent context - # Extract workflow.input.* values from the parent context - workflow_ctx = context.get("workflow", {}) - sub_inputs: dict[str, Any] = ( - dict(workflow_ctx.get("input", {})) if isinstance(workflow_ctx, dict) else {} - ) + sub_inputs: dict[str, Any] + if agent.input_mapping: + # Dynamic inputs: render each Jinja2 expression against parent context + renderer = TemplateRenderer() + sub_inputs = {} + for key, template_expr in agent.input_mapping.items(): + rendered = renderer.render(template_expr, context) + # Attempt to parse rendered values as JSON for non-string types + try: + sub_inputs[key] = json.loads(rendered) + except (json.JSONDecodeError, ValueError): + sub_inputs[key] = rendered + else: + # Default: forward parent's workflow.input.* values + workflow_ctx = context.get("workflow", {}) + sub_inputs = ( + dict(workflow_ctx.get("input", {})) if isinstance(workflow_ctx, dict) else {} + ) # Create child engine inheriting provider/registry but with deeper depth child_engine = WorkflowEngine( diff --git a/tests/test_config/test_workflow_type_schema.py b/tests/test_config/test_workflow_type_schema.py index 995c21c..e5036fb 100644 --- a/tests/test_config/test_workflow_type_schema.py +++ b/tests/test_config/test_workflow_type_schema.py @@ -284,3 +284,58 @@ def test_workflow_with_routes_to_agents(self) -> None: # Should not raise warnings = validate_workflow_config(config) assert isinstance(warnings, list) + + +class TestInputMapping: + """Tests for input_mapping on workflow agents.""" + + def test_valid_input_mapping(self) -> None: + """Test that input_mapping is accepted on workflow agents.""" + agent = AgentDef( + name="sub_wf", + type="workflow", + workflow="./sub.yaml", + input_mapping={ + "work_item_id": "{{ intake.output.epic_id }}", + "title": "{{ intake.output.epic_title }}", + }, + ) + assert agent.input_mapping is not None + assert len(agent.input_mapping) == 2 + + def test_workflow_without_input_mapping(self) -> None: + """Test that workflow agents work without input_mapping (backward compat).""" + agent = AgentDef(name="sub_wf", type="workflow", workflow="./sub.yaml") + assert agent.input_mapping is None + + def test_input_mapping_on_regular_agent_raises(self) -> None: + """Test that input_mapping on a regular agent raises ValidationError.""" + with pytest.raises(ValidationError, match="input_mapping"): + AgentDef( + name="regular", + prompt="do something", + input_mapping={"key": "{{ value }}"}, + ) + + def test_input_mapping_on_human_gate_raises(self) -> None: + """Test that input_mapping on a human_gate raises ValidationError.""" + with pytest.raises(ValidationError, match="input_mapping"): + AgentDef( + name="gate", + type="human_gate", + prompt="Choose", + options=[ + GateOption(label="Yes", value="yes", route="next"), + ], + input_mapping={"key": "{{ value }}"}, + ) + + def test_input_mapping_on_script_raises(self) -> None: + """Test that input_mapping on a script agent raises ValidationError.""" + with pytest.raises(ValidationError, match="input_mapping"): + AgentDef( + name="script", + type="script", + command="echo hi", + input_mapping={"key": "{{ value }}"}, + ) diff --git a/uv.lock b/uv.lock index ac918e3..44d02b6 100644 --- a/uv.lock +++ b/uv.lock @@ -150,7 +150,7 @@ wheels = [ [[package]] name = "conductor-cli" -version = "0.1.8" +version = "0.1.9" source = { editable = "." } dependencies = [ { name = "anthropic" }, From 793e55a94a6c7b5bca6b00feec03eabd92d46783 Mon Sep 17 00:00:00 2001 From: Daniel Green Date: Mon, 20 Apr 2026 14:42:13 -0700 Subject: [PATCH 2/4] feat(composition): allow sub-workflows in for_each groups (#102) Remove validator restriction blocking type='workflow' in for_each groups. Wire execute_single_item() to call _execute_subworkflow_with_inputs() for workflow agents, rendering input_mapping with loop variables in scope. - Validator: Remove workflow rejection in for_each validation - Engine: Add workflow branch in execute_single_item(), new helper _execute_subworkflow_with_inputs() for pre-built inputs - Tests: Update test_workflow_in_for_each to validate (not reject) - Experimental workflows: test-for-each-workflow parent/child pair Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- src/conductor/config/validator.py | 7 +- src/conductor/engine/workflow.py | 124 +++++++++++++++++- .../test_config/test_workflow_type_schema.py | 9 +- 3 files changed, 129 insertions(+), 11 deletions(-) diff --git a/src/conductor/config/validator.py b/src/conductor/config/validator.py index 4581064..f4a0c94 100644 --- a/src/conductor/config/validator.py +++ b/src/conductor/config/validator.py @@ -111,18 +111,13 @@ def validate_workflow_config(config: WorkflowConfig) -> list[str]: parallel_errors = _validate_parallel_groups(config) errors.extend(parallel_errors) - # Validate for_each groups: reject script and workflow steps as inline agents + # Validate for_each groups: reject script steps as inline agents for for_each_group in config.for_each: if for_each_group.agent.type == "script": errors.append( f"For-each group '{for_each_group.name}' uses a script step as its " "inline agent. Script steps cannot be used in for_each groups." ) - if for_each_group.agent.type == "workflow": - errors.append( - f"For-each group '{for_each_group.name}' uses a workflow step as its " - "inline agent. Workflow steps cannot be used in for_each groups." - ) # Validate workflow output references output_errors = _validate_output_references( diff --git a/src/conductor/engine/workflow.py b/src/conductor/engine/workflow.py index 9dd9bc9..a6d79f3 100644 --- a/src/conductor/engine/workflow.py +++ b/src/conductor/engine/workflow.py @@ -580,6 +580,81 @@ async def _execute_subworkflow( return await child_engine.run(sub_inputs) + async def _execute_subworkflow_with_inputs( + self, + agent: AgentDef, + sub_inputs: dict[str, Any], + ) -> dict[str, Any]: + """Execute a sub-workflow with pre-built inputs. + + Like _execute_subworkflow but accepts explicit inputs instead of + extracting them from context. Used by for_each groups where + input_mapping has already been rendered with loop variables. + + Args: + agent: Workflow agent definition with ``workflow`` path. + sub_inputs: Pre-built input dict for the sub-workflow. + + Returns: + The sub-workflow's final output dict. + """ + from conductor.config.loader import load_config + + if self._subworkflow_depth >= MAX_SUBWORKFLOW_DEPTH: + raise ExecutionError( + f"Sub-workflow depth limit exceeded ({MAX_SUBWORKFLOW_DEPTH}). " + f"Agent '{agent.name}' cannot invoke sub-workflow '{agent.workflow}'.", + suggestion="Check for circular sub-workflow references or reduce nesting depth.", + ) + + assert agent.workflow is not None # noqa: S101 + + if self.workflow_path is not None: + base_dir = Path(self.workflow_path).resolve().parent + else: + base_dir = Path.cwd() + + sub_path = (base_dir / agent.workflow).resolve() + + if not sub_path.exists(): + raise ExecutionError( + f"Sub-workflow file not found: {sub_path} (referenced by agent '{agent.name}')", + suggestion="Check that the 'workflow' path is correct and the file exists.", + ) + + # Detect circular references via file path + current_path = Path(self.workflow_path).resolve() if self.workflow_path else None + if current_path is not None and sub_path == current_path: + raise ExecutionError( + f"Circular sub-workflow reference: agent '{agent.name}' " + f"references its own workflow file '{agent.workflow}'.", + suggestion="A workflow cannot reference itself as a sub-workflow.", + ) + + try: + sub_config = load_config(sub_path) + except Exception as exc: + raise ExecutionError( + f"Failed to load sub-workflow '{sub_path}' " + f"(referenced by agent '{agent.name}'): {exc}", + suggestion="Check the sub-workflow YAML for syntax or validation errors.", + ) from exc + + child_engine = WorkflowEngine( + config=sub_config, + provider=self._single_provider, + registry=self._registry, + skip_gates=self.skip_gates, + workflow_path=sub_path, + interrupt_event=self._interrupt_event, + event_emitter=self._event_emitter, + keyboard_listener=self._keyboard_listener, + web_dashboard=self._web_dashboard, + _subworkflow_depth=self._subworkflow_depth + 1, + ) + + return await child_engine.run(sub_inputs) + def _get_context_window_for_agent(self, agent: AgentDef) -> int | None: """Return the context window size for an agent's model.""" from conductor.engine.pricing import get_pricing @@ -2600,7 +2675,54 @@ async def execute_single_item(item: Any, index: int, key: str) -> tuple[str, Any key if for_each_group.key_by else None, ) - # Execute agent with injected context (get executor for multi-provider) + # Execute agent — sub-workflow or regular + if for_each_group.agent.type == "workflow": + # Build sub-workflow inputs from input_mapping with loop vars + if for_each_group.agent.input_mapping: + renderer = TemplateRenderer() + sub_inputs: dict[str, Any] = {} + for k, tmpl in for_each_group.agent.input_mapping.items(): + rendered = renderer.render(tmpl, agent_context) + try: + sub_inputs[k] = json.loads(rendered) + except (json.JSONDecodeError, ValueError): + sub_inputs[k] = rendered + else: + wf_ctx = agent_context.get("workflow", {}) + sub_inputs = ( + dict(wf_ctx.get("input", {})) + if isinstance(wf_ctx, dict) + else {} + ) + + # Execute sub-workflow + self._emit( + "subworkflow_started", + { + "agent_name": for_each_group.name, + "item_key": key, + "workflow": for_each_group.agent.workflow, + }, + ) + output_content = await self._execute_subworkflow_with_inputs( + for_each_group.agent, sub_inputs + ) + _item_elapsed = _time.time() - _item_start + + self._emit( + "for_each_item_completed", + { + "group_name": for_each_group.name, + "item_key": key, + "elapsed": _item_elapsed, + "tokens": 0, + "cost_usd": 0.0, + "output": output_content, + }, + ) + return (key, output_content) + + # Regular agent execution executor = await self._get_executor_for_agent(for_each_group.agent) # Item-scoped event callback that tags all streaming events with item_key diff --git a/tests/test_config/test_workflow_type_schema.py b/tests/test_config/test_workflow_type_schema.py index e5036fb..a1ad66d 100644 --- a/tests/test_config/test_workflow_type_schema.py +++ b/tests/test_config/test_workflow_type_schema.py @@ -201,8 +201,8 @@ def test_workflow_in_parallel_group_raises(self) -> None: class TestWorkflowInForEach: """Tests for workflow agents in for_each groups.""" - def test_workflow_in_for_each_raises(self) -> None: - """Test that workflow step in for_each inline agent raises ConfigurationError.""" + def test_workflow_in_for_each_validates(self) -> None: + """Test that workflow step in for_each inline agent validates successfully.""" config = WorkflowConfig( workflow=WorkflowDef( name="test", @@ -227,8 +227,9 @@ def test_workflow_in_for_each_raises(self) -> None: ), ], ) - with pytest.raises(ConfigurationError, match="Workflow steps cannot be used in for_each"): - validate_workflow_config(config) + # Should not raise — workflow agents are now allowed in for_each + warnings = validate_workflow_config(config) + assert isinstance(warnings, list) class TestWorkflowWorkflowConfig: From 81e5720fed6fecfc77c610a9474fbeb571f69335 Mon Sep 17 00:00:00 2001 From: Daniel Green Date: Tue, 21 Apr 2026 14:15:39 -0700 Subject: [PATCH 3/4] style: ruff format workflow.py Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- src/conductor/engine/workflow.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/conductor/engine/workflow.py b/src/conductor/engine/workflow.py index a6d79f3..5c4891b 100644 --- a/src/conductor/engine/workflow.py +++ b/src/conductor/engine/workflow.py @@ -2690,9 +2690,7 @@ async def execute_single_item(item: Any, index: int, key: str) -> tuple[str, Any else: wf_ctx = agent_context.get("workflow", {}) sub_inputs = ( - dict(wf_ctx.get("input", {})) - if isinstance(wf_ctx, dict) - else {} + dict(wf_ctx.get("input", {})) if isinstance(wf_ctx, dict) else {} ) # Execute sub-workflow From eff4776a422389867ac354f6a87ea25f2afb2de0 Mon Sep 17 00:00:00 2001 From: Daniel Green Date: Tue, 28 Apr 2026 09:53:24 -0700 Subject: [PATCH 4/4] fix(composition): address PR review feedback on input_mapping - Remove json.loads coercion from input_mapping rendering (values stay as strings; use {{ expr | json }} filter for structured data) - Add error context identifying which input_mapping key failed to render - Fix falsy check: use 'is not None' instead of truthiness for input_mapping - Deduplicate _execute_subworkflow_with_inputs into _execute_subworkflow with an optional sub_inputs parameter Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- src/conductor/engine/workflow.py | 153 +++++++++---------------------- 1 file changed, 44 insertions(+), 109 deletions(-) diff --git a/src/conductor/engine/workflow.py b/src/conductor/engine/workflow.py index 5c4891b..44f1db4 100644 --- a/src/conductor/engine/workflow.py +++ b/src/conductor/engine/workflow.py @@ -483,16 +483,20 @@ async def _execute_subworkflow( self, agent: AgentDef, context: dict[str, Any], + *, + sub_inputs: dict[str, Any] | None = None, ) -> dict[str, Any]: """Execute a sub-workflow as a black-box step. Loads the referenced workflow YAML, creates a child WorkflowEngine, - and runs it with the parent agent's context as input. The sub-workflow's - final output is returned as the agent's output. + and runs it. When ``sub_inputs`` are provided (e.g. from for_each with + pre-rendered input_mapping), they are used directly; otherwise inputs + are derived from ``context`` via input_mapping or forwarded workflow.input. Args: agent: Workflow agent definition with ``workflow`` path. context: Workflow context for template rendering (used as sub-workflow input). + sub_inputs: Optional pre-built inputs that bypass input_mapping rendering. Returns: The sub-workflow's final output dict. @@ -544,25 +548,31 @@ async def _execute_subworkflow( suggestion="Check the sub-workflow YAML for syntax or validation errors.", ) from exc - # Build sub-workflow inputs from the parent context - sub_inputs: dict[str, Any] - if agent.input_mapping: - # Dynamic inputs: render each Jinja2 expression against parent context - renderer = TemplateRenderer() - sub_inputs = {} - for key, template_expr in agent.input_mapping.items(): - rendered = renderer.render(template_expr, context) - # Attempt to parse rendered values as JSON for non-string types - try: - sub_inputs[key] = json.loads(rendered) - except (json.JSONDecodeError, ValueError): - sub_inputs[key] = rendered - else: - # Default: forward parent's workflow.input.* values - workflow_ctx = context.get("workflow", {}) - sub_inputs = ( - dict(workflow_ctx.get("input", {})) if isinstance(workflow_ctx, dict) else {} - ) + # Build sub-workflow inputs — use pre-built inputs if provided, + # otherwise render from input_mapping or forward workflow.input. + if sub_inputs is None: + if agent.input_mapping is not None: + # Dynamic inputs: render each Jinja2 expression against parent context. + # Values are always passed as strings — use {{ expr | json }} in the + # template if structured data is needed. + renderer = TemplateRenderer() + sub_inputs = {} + for key, template_expr in agent.input_mapping.items(): + try: + sub_inputs[key] = renderer.render(template_expr, context) + except Exception as e: + raise ExecutionError( + f"Failed to render input_mapping key '{key}' for agent " + f"'{agent.name}': {e}", + suggestion=f"Check that the expression '{template_expr}' " + "references valid context variables.", + ) from e + else: + # Default: forward parent's workflow.input.* values + workflow_ctx = context.get("workflow", {}) + sub_inputs = ( + dict(workflow_ctx.get("input", {})) if isinstance(workflow_ctx, dict) else {} + ) # Create child engine inheriting provider/registry but with deeper depth child_engine = WorkflowEngine( @@ -580,81 +590,6 @@ async def _execute_subworkflow( return await child_engine.run(sub_inputs) - async def _execute_subworkflow_with_inputs( - self, - agent: AgentDef, - sub_inputs: dict[str, Any], - ) -> dict[str, Any]: - """Execute a sub-workflow with pre-built inputs. - - Like _execute_subworkflow but accepts explicit inputs instead of - extracting them from context. Used by for_each groups where - input_mapping has already been rendered with loop variables. - - Args: - agent: Workflow agent definition with ``workflow`` path. - sub_inputs: Pre-built input dict for the sub-workflow. - - Returns: - The sub-workflow's final output dict. - """ - from conductor.config.loader import load_config - - if self._subworkflow_depth >= MAX_SUBWORKFLOW_DEPTH: - raise ExecutionError( - f"Sub-workflow depth limit exceeded ({MAX_SUBWORKFLOW_DEPTH}). " - f"Agent '{agent.name}' cannot invoke sub-workflow '{agent.workflow}'.", - suggestion="Check for circular sub-workflow references or reduce nesting depth.", - ) - - assert agent.workflow is not None # noqa: S101 - - if self.workflow_path is not None: - base_dir = Path(self.workflow_path).resolve().parent - else: - base_dir = Path.cwd() - - sub_path = (base_dir / agent.workflow).resolve() - - if not sub_path.exists(): - raise ExecutionError( - f"Sub-workflow file not found: {sub_path} (referenced by agent '{agent.name}')", - suggestion="Check that the 'workflow' path is correct and the file exists.", - ) - - # Detect circular references via file path - current_path = Path(self.workflow_path).resolve() if self.workflow_path else None - if current_path is not None and sub_path == current_path: - raise ExecutionError( - f"Circular sub-workflow reference: agent '{agent.name}' " - f"references its own workflow file '{agent.workflow}'.", - suggestion="A workflow cannot reference itself as a sub-workflow.", - ) - - try: - sub_config = load_config(sub_path) - except Exception as exc: - raise ExecutionError( - f"Failed to load sub-workflow '{sub_path}' " - f"(referenced by agent '{agent.name}'): {exc}", - suggestion="Check the sub-workflow YAML for syntax or validation errors.", - ) from exc - - child_engine = WorkflowEngine( - config=sub_config, - provider=self._single_provider, - registry=self._registry, - skip_gates=self.skip_gates, - workflow_path=sub_path, - interrupt_event=self._interrupt_event, - event_emitter=self._event_emitter, - keyboard_listener=self._keyboard_listener, - web_dashboard=self._web_dashboard, - _subworkflow_depth=self._subworkflow_depth + 1, - ) - - return await child_engine.run(sub_inputs) - def _get_context_window_for_agent(self, agent: AgentDef) -> int | None: """Return the context window size for an agent's model.""" from conductor.engine.pricing import get_pricing @@ -2678,20 +2613,20 @@ async def execute_single_item(item: Any, index: int, key: str) -> tuple[str, Any # Execute agent — sub-workflow or regular if for_each_group.agent.type == "workflow": # Build sub-workflow inputs from input_mapping with loop vars - if for_each_group.agent.input_mapping: + sub_inputs: dict[str, Any] | None = None + if for_each_group.agent.input_mapping is not None: renderer = TemplateRenderer() - sub_inputs: dict[str, Any] = {} + sub_inputs = {} for k, tmpl in for_each_group.agent.input_mapping.items(): - rendered = renderer.render(tmpl, agent_context) try: - sub_inputs[k] = json.loads(rendered) - except (json.JSONDecodeError, ValueError): - sub_inputs[k] = rendered - else: - wf_ctx = agent_context.get("workflow", {}) - sub_inputs = ( - dict(wf_ctx.get("input", {})) if isinstance(wf_ctx, dict) else {} - ) + sub_inputs[k] = renderer.render(tmpl, agent_context) + except Exception as e: + raise ExecutionError( + f"Failed to render input_mapping key '{k}' for " + f"for_each agent '{for_each_group.name}': {e}", + suggestion=f"Check that the expression '{tmpl}' " + "references valid context variables.", + ) from e # Execute sub-workflow self._emit( @@ -2702,8 +2637,8 @@ async def execute_single_item(item: Any, index: int, key: str) -> tuple[str, Any "workflow": for_each_group.agent.workflow, }, ) - output_content = await self._execute_subworkflow_with_inputs( - for_each_group.agent, sub_inputs + output_content = await self._execute_subworkflow( + for_each_group.agent, agent_context, sub_inputs=sub_inputs ) _item_elapsed = _time.time() - _item_start