From 4fb799865525b0701aabf07d438e4031d5a9bac3 Mon Sep 17 00:00:00 2001 From: Daniel Green Date: Mon, 20 Apr 2026 14:39:30 -0700 Subject: [PATCH 1/5] feat(composition): dynamic sub-workflow inputs via input_mapping (#101) Add input_mapping field to AgentDef for type='workflow' agents. When present, each value is a Jinja2 expression rendered against the parent context to build sub-workflow inputs. When absent, existing behavior (forwarding parent's workflow.input.*) is preserved. - Schema: Add input_mapping to AgentDef with validation for workflow-only - Engine: Render input_mapping templates in _execute_subworkflow() - Tests: Schema validation for all agent types - Experimental workflows: test-input-mapping parent/child pair Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- src/conductor/config/schema.py | 29 ++++++++++ src/conductor/engine/workflow.py | 23 ++++++-- .../test_config/test_workflow_type_schema.py | 55 +++++++++++++++++++ uv.lock | 2 +- 4 files changed, 103 insertions(+), 6 deletions(-) diff --git a/src/conductor/config/schema.py b/src/conductor/config/schema.py index 527c50b..da05b48 100644 --- a/src/conductor/config/schema.py +++ b/src/conductor/config/schema.py @@ -475,6 +475,24 @@ class AgentDef(BaseModel): workflow: ./research-pipeline.yaml """ + input_mapping: dict[str, str] | None = None + """Optional mapping of sub-workflow input names to Jinja2 expressions. + + Each key is a sub-workflow input parameter name. Each value is a Jinja2 + template expression evaluated against the parent workflow's context. + + When present, the rendered values are passed as the sub-workflow's inputs + instead of forwarding the parent's workflow.input.* values. + + Only valid for type='workflow' agents. + + Example:: + + input_mapping: + work_item_id: "{{ task_manager.output.current_issue_id }}" + title: "{{ task_manager.output.current_issue_title }}" + """ + max_session_seconds: float | None = Field(None, ge=1.0) """Maximum wall-clock duration for this agent's session in seconds. @@ -529,6 +547,8 @@ def validate_agent_type(self) -> AgentDef: raise ValueError("human_gate agents require 'options'") if not self.prompt: raise ValueError("human_gate agents require 'prompt'") + if self.input_mapping: + raise ValueError("human_gate agents cannot have 'input_mapping'") elif self.type == "script": if not self.command: raise ValueError("script agents require 'command'") @@ -555,6 +575,8 @@ def validate_agent_type(self) -> AgentDef: raise ValueError("script agents cannot have 'max_agent_iterations'") if self.retry is not None: raise ValueError("script agents cannot have 'retry'") + if self.input_mapping: + raise ValueError("script agents cannot have 'input_mapping'") elif self.type == "workflow": if not self.workflow: raise ValueError("workflow agents require 'workflow' path") @@ -578,6 +600,13 @@ def validate_agent_type(self) -> AgentDef: raise ValueError("workflow agents cannot have 'max_agent_iterations'") if self.retry is not None: raise ValueError("workflow agents cannot have 'retry'") + else: + # Regular agent or human_gate — input_mapping is not valid + if self.input_mapping: + raise ValueError( + f"'{self.type or 'agent'}' agents cannot have 'input_mapping' " + "(only workflow agents support input_mapping)" + ) return self diff --git a/src/conductor/engine/workflow.py b/src/conductor/engine/workflow.py index 484192b..9dd9bc9 100644 --- a/src/conductor/engine/workflow.py +++ b/src/conductor/engine/workflow.py @@ -545,11 +545,24 @@ async def _execute_subworkflow( ) from exc # Build sub-workflow inputs from the parent context - # Extract workflow.input.* values from the parent context - workflow_ctx = context.get("workflow", {}) - sub_inputs: dict[str, Any] = ( - dict(workflow_ctx.get("input", {})) if isinstance(workflow_ctx, dict) else {} - ) + sub_inputs: dict[str, Any] + if agent.input_mapping: + # Dynamic inputs: render each Jinja2 expression against parent context + renderer = TemplateRenderer() + sub_inputs = {} + for key, template_expr in agent.input_mapping.items(): + rendered = renderer.render(template_expr, context) + # Attempt to parse rendered values as JSON for non-string types + try: + sub_inputs[key] = json.loads(rendered) + except (json.JSONDecodeError, ValueError): + sub_inputs[key] = rendered + else: + # Default: forward parent's workflow.input.* values + workflow_ctx = context.get("workflow", {}) + sub_inputs = ( + dict(workflow_ctx.get("input", {})) if isinstance(workflow_ctx, dict) else {} + ) # Create child engine inheriting provider/registry but with deeper depth child_engine = WorkflowEngine( diff --git a/tests/test_config/test_workflow_type_schema.py b/tests/test_config/test_workflow_type_schema.py index 995c21c..e5036fb 100644 --- a/tests/test_config/test_workflow_type_schema.py +++ b/tests/test_config/test_workflow_type_schema.py @@ -284,3 +284,58 @@ def test_workflow_with_routes_to_agents(self) -> None: # Should not raise warnings = validate_workflow_config(config) assert isinstance(warnings, list) + + +class TestInputMapping: + """Tests for input_mapping on workflow agents.""" + + def test_valid_input_mapping(self) -> None: + """Test that input_mapping is accepted on workflow agents.""" + agent = AgentDef( + name="sub_wf", + type="workflow", + workflow="./sub.yaml", + input_mapping={ + "work_item_id": "{{ intake.output.epic_id }}", + "title": "{{ intake.output.epic_title }}", + }, + ) + assert agent.input_mapping is not None + assert len(agent.input_mapping) == 2 + + def test_workflow_without_input_mapping(self) -> None: + """Test that workflow agents work without input_mapping (backward compat).""" + agent = AgentDef(name="sub_wf", type="workflow", workflow="./sub.yaml") + assert agent.input_mapping is None + + def test_input_mapping_on_regular_agent_raises(self) -> None: + """Test that input_mapping on a regular agent raises ValidationError.""" + with pytest.raises(ValidationError, match="input_mapping"): + AgentDef( + name="regular", + prompt="do something", + input_mapping={"key": "{{ value }}"}, + ) + + def test_input_mapping_on_human_gate_raises(self) -> None: + """Test that input_mapping on a human_gate raises ValidationError.""" + with pytest.raises(ValidationError, match="input_mapping"): + AgentDef( + name="gate", + type="human_gate", + prompt="Choose", + options=[ + GateOption(label="Yes", value="yes", route="next"), + ], + input_mapping={"key": "{{ value }}"}, + ) + + def test_input_mapping_on_script_raises(self) -> None: + """Test that input_mapping on a script agent raises ValidationError.""" + with pytest.raises(ValidationError, match="input_mapping"): + AgentDef( + name="script", + type="script", + command="echo hi", + input_mapping={"key": "{{ value }}"}, + ) diff --git a/uv.lock b/uv.lock index ac918e3..44d02b6 100644 --- a/uv.lock +++ b/uv.lock @@ -150,7 +150,7 @@ wheels = [ [[package]] name = "conductor-cli" -version = "0.1.8" +version = "0.1.9" source = { editable = "." } dependencies = [ { name = "anthropic" }, From 793e55a94a6c7b5bca6b00feec03eabd92d46783 Mon Sep 17 00:00:00 2001 From: Daniel Green Date: Mon, 20 Apr 2026 14:42:13 -0700 Subject: [PATCH 2/5] feat(composition): allow sub-workflows in for_each groups (#102) Remove validator restriction blocking type='workflow' in for_each groups. Wire execute_single_item() to call _execute_subworkflow_with_inputs() for workflow agents, rendering input_mapping with loop variables in scope. - Validator: Remove workflow rejection in for_each validation - Engine: Add workflow branch in execute_single_item(), new helper _execute_subworkflow_with_inputs() for pre-built inputs - Tests: Update test_workflow_in_for_each to validate (not reject) - Experimental workflows: test-for-each-workflow parent/child pair Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- src/conductor/config/validator.py | 7 +- src/conductor/engine/workflow.py | 124 +++++++++++++++++- .../test_config/test_workflow_type_schema.py | 9 +- 3 files changed, 129 insertions(+), 11 deletions(-) diff --git a/src/conductor/config/validator.py b/src/conductor/config/validator.py index 4581064..f4a0c94 100644 --- a/src/conductor/config/validator.py +++ b/src/conductor/config/validator.py @@ -111,18 +111,13 @@ def validate_workflow_config(config: WorkflowConfig) -> list[str]: parallel_errors = _validate_parallel_groups(config) errors.extend(parallel_errors) - # Validate for_each groups: reject script and workflow steps as inline agents + # Validate for_each groups: reject script steps as inline agents for for_each_group in config.for_each: if for_each_group.agent.type == "script": errors.append( f"For-each group '{for_each_group.name}' uses a script step as its " "inline agent. Script steps cannot be used in for_each groups." ) - if for_each_group.agent.type == "workflow": - errors.append( - f"For-each group '{for_each_group.name}' uses a workflow step as its " - "inline agent. Workflow steps cannot be used in for_each groups." - ) # Validate workflow output references output_errors = _validate_output_references( diff --git a/src/conductor/engine/workflow.py b/src/conductor/engine/workflow.py index 9dd9bc9..a6d79f3 100644 --- a/src/conductor/engine/workflow.py +++ b/src/conductor/engine/workflow.py @@ -580,6 +580,81 @@ async def _execute_subworkflow( return await child_engine.run(sub_inputs) + async def _execute_subworkflow_with_inputs( + self, + agent: AgentDef, + sub_inputs: dict[str, Any], + ) -> dict[str, Any]: + """Execute a sub-workflow with pre-built inputs. + + Like _execute_subworkflow but accepts explicit inputs instead of + extracting them from context. Used by for_each groups where + input_mapping has already been rendered with loop variables. + + Args: + agent: Workflow agent definition with ``workflow`` path. + sub_inputs: Pre-built input dict for the sub-workflow. + + Returns: + The sub-workflow's final output dict. + """ + from conductor.config.loader import load_config + + if self._subworkflow_depth >= MAX_SUBWORKFLOW_DEPTH: + raise ExecutionError( + f"Sub-workflow depth limit exceeded ({MAX_SUBWORKFLOW_DEPTH}). " + f"Agent '{agent.name}' cannot invoke sub-workflow '{agent.workflow}'.", + suggestion="Check for circular sub-workflow references or reduce nesting depth.", + ) + + assert agent.workflow is not None # noqa: S101 + + if self.workflow_path is not None: + base_dir = Path(self.workflow_path).resolve().parent + else: + base_dir = Path.cwd() + + sub_path = (base_dir / agent.workflow).resolve() + + if not sub_path.exists(): + raise ExecutionError( + f"Sub-workflow file not found: {sub_path} (referenced by agent '{agent.name}')", + suggestion="Check that the 'workflow' path is correct and the file exists.", + ) + + # Detect circular references via file path + current_path = Path(self.workflow_path).resolve() if self.workflow_path else None + if current_path is not None and sub_path == current_path: + raise ExecutionError( + f"Circular sub-workflow reference: agent '{agent.name}' " + f"references its own workflow file '{agent.workflow}'.", + suggestion="A workflow cannot reference itself as a sub-workflow.", + ) + + try: + sub_config = load_config(sub_path) + except Exception as exc: + raise ExecutionError( + f"Failed to load sub-workflow '{sub_path}' " + f"(referenced by agent '{agent.name}'): {exc}", + suggestion="Check the sub-workflow YAML for syntax or validation errors.", + ) from exc + + child_engine = WorkflowEngine( + config=sub_config, + provider=self._single_provider, + registry=self._registry, + skip_gates=self.skip_gates, + workflow_path=sub_path, + interrupt_event=self._interrupt_event, + event_emitter=self._event_emitter, + keyboard_listener=self._keyboard_listener, + web_dashboard=self._web_dashboard, + _subworkflow_depth=self._subworkflow_depth + 1, + ) + + return await child_engine.run(sub_inputs) + def _get_context_window_for_agent(self, agent: AgentDef) -> int | None: """Return the context window size for an agent's model.""" from conductor.engine.pricing import get_pricing @@ -2600,7 +2675,54 @@ async def execute_single_item(item: Any, index: int, key: str) -> tuple[str, Any key if for_each_group.key_by else None, ) - # Execute agent with injected context (get executor for multi-provider) + # Execute agent — sub-workflow or regular + if for_each_group.agent.type == "workflow": + # Build sub-workflow inputs from input_mapping with loop vars + if for_each_group.agent.input_mapping: + renderer = TemplateRenderer() + sub_inputs: dict[str, Any] = {} + for k, tmpl in for_each_group.agent.input_mapping.items(): + rendered = renderer.render(tmpl, agent_context) + try: + sub_inputs[k] = json.loads(rendered) + except (json.JSONDecodeError, ValueError): + sub_inputs[k] = rendered + else: + wf_ctx = agent_context.get("workflow", {}) + sub_inputs = ( + dict(wf_ctx.get("input", {})) + if isinstance(wf_ctx, dict) + else {} + ) + + # Execute sub-workflow + self._emit( + "subworkflow_started", + { + "agent_name": for_each_group.name, + "item_key": key, + "workflow": for_each_group.agent.workflow, + }, + ) + output_content = await self._execute_subworkflow_with_inputs( + for_each_group.agent, sub_inputs + ) + _item_elapsed = _time.time() - _item_start + + self._emit( + "for_each_item_completed", + { + "group_name": for_each_group.name, + "item_key": key, + "elapsed": _item_elapsed, + "tokens": 0, + "cost_usd": 0.0, + "output": output_content, + }, + ) + return (key, output_content) + + # Regular agent execution executor = await self._get_executor_for_agent(for_each_group.agent) # Item-scoped event callback that tags all streaming events with item_key diff --git a/tests/test_config/test_workflow_type_schema.py b/tests/test_config/test_workflow_type_schema.py index e5036fb..a1ad66d 100644 --- a/tests/test_config/test_workflow_type_schema.py +++ b/tests/test_config/test_workflow_type_schema.py @@ -201,8 +201,8 @@ def test_workflow_in_parallel_group_raises(self) -> None: class TestWorkflowInForEach: """Tests for workflow agents in for_each groups.""" - def test_workflow_in_for_each_raises(self) -> None: - """Test that workflow step in for_each inline agent raises ConfigurationError.""" + def test_workflow_in_for_each_validates(self) -> None: + """Test that workflow step in for_each inline agent validates successfully.""" config = WorkflowConfig( workflow=WorkflowDef( name="test", @@ -227,8 +227,9 @@ def test_workflow_in_for_each_raises(self) -> None: ), ], ) - with pytest.raises(ConfigurationError, match="Workflow steps cannot be used in for_each"): - validate_workflow_config(config) + # Should not raise — workflow agents are now allowed in for_each + warnings = validate_workflow_config(config) + assert isinstance(warnings, list) class TestWorkflowWorkflowConfig: From 0e3e6a0ce9f2eb91e8a2c94a9ee826fcc1a5f4b4 Mon Sep 17 00:00:00 2001 From: Daniel Green Date: Mon, 20 Apr 2026 14:45:36 -0700 Subject: [PATCH 3/5] feat(composition): allow self-referential sub-workflows with depth tracking (#103) Remove the circular reference path check that blocked workflows from referencing themselves. The existing MAX_SUBWORKFLOW_DEPTH=10 already prevents infinite recursion. Add optional per-agent max_depth field for tighter author-controlled bounds. - Engine: Remove self-reference path equality check in both _execute_subworkflow() and _execute_subworkflow_with_inputs() - Engine: Add per-agent max_depth enforcement alongside global limit - Schema: Add max_depth field to AgentDef with validation - Tests: Replace circular reference test with depth-limit tests - Experimental: test-recursive.yaml self-referential countdown Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- src/conductor/config/schema.py | 21 ++++++++ src/conductor/engine/workflow.py | 34 ++++++------ tests/test_engine/test_subworkflow.py | 78 +++++++++++++++++++++++++-- 3 files changed, 111 insertions(+), 22 deletions(-) diff --git a/src/conductor/config/schema.py b/src/conductor/config/schema.py index da05b48..fdb3a67 100644 --- a/src/conductor/config/schema.py +++ b/src/conductor/config/schema.py @@ -493,6 +493,18 @@ class AgentDef(BaseModel): title: "{{ task_manager.output.current_issue_title }}" """ + max_depth: int | None = Field(None, ge=1, le=10) + """Per-agent sub-workflow depth limit. + + Overrides the global MAX_SUBWORKFLOW_DEPTH (10) with a tighter bound. + Only valid for type='workflow' agents. Useful for self-referential + workflows to set an explicit recursion limit. + + Example:: + + max_depth: 3 # Allow at most 3 levels of recursion + """ + max_session_seconds: float | None = Field(None, ge=1.0) """Maximum wall-clock duration for this agent's session in seconds. @@ -549,6 +561,8 @@ def validate_agent_type(self) -> AgentDef: raise ValueError("human_gate agents require 'prompt'") if self.input_mapping: raise ValueError("human_gate agents cannot have 'input_mapping'") + if self.max_depth is not None: + raise ValueError("human_gate agents cannot have 'max_depth'") elif self.type == "script": if not self.command: raise ValueError("script agents require 'command'") @@ -577,6 +591,8 @@ def validate_agent_type(self) -> AgentDef: raise ValueError("script agents cannot have 'retry'") if self.input_mapping: raise ValueError("script agents cannot have 'input_mapping'") + if self.max_depth is not None: + raise ValueError("script agents cannot have 'max_depth'") elif self.type == "workflow": if not self.workflow: raise ValueError("workflow agents require 'workflow' path") @@ -607,6 +623,11 @@ def validate_agent_type(self) -> AgentDef: f"'{self.type or 'agent'}' agents cannot have 'input_mapping' " "(only workflow agents support input_mapping)" ) + if self.max_depth is not None: + raise ValueError( + f"'{self.type or 'agent'}' agents cannot have 'max_depth' " + "(only workflow agents support max_depth)" + ) return self diff --git a/src/conductor/engine/workflow.py b/src/conductor/engine/workflow.py index a6d79f3..a6e34cb 100644 --- a/src/conductor/engine/workflow.py +++ b/src/conductor/engine/workflow.py @@ -510,6 +510,14 @@ async def _execute_subworkflow( suggestion=("Check for circular sub-workflow references or reduce nesting depth."), ) + # Per-agent depth limit (stricter than global MAX_SUBWORKFLOW_DEPTH) + if agent.max_depth is not None and self._subworkflow_depth >= agent.max_depth: + raise ExecutionError( + f"Agent '{agent.name}' max_depth ({agent.max_depth}) exceeded " + f"at depth {self._subworkflow_depth}.", + suggestion="Increase max_depth or restructure to reduce nesting.", + ) + assert agent.workflow is not None # noqa: S101 # Resolve sub-workflow path relative to parent workflow file @@ -526,15 +534,6 @@ async def _execute_subworkflow( suggestion="Check that the 'workflow' path is correct and the file exists.", ) - # Detect circular references via file path - current_path = Path(self.workflow_path).resolve() if self.workflow_path else None - if current_path is not None and sub_path == current_path: - raise ExecutionError( - f"Circular sub-workflow reference: agent '{agent.name}' " - f"references its own workflow file '{agent.workflow}'.", - suggestion="A workflow cannot reference itself as a sub-workflow.", - ) - try: sub_config = load_config(sub_path) except Exception as exc: @@ -607,6 +606,14 @@ async def _execute_subworkflow_with_inputs( suggestion="Check for circular sub-workflow references or reduce nesting depth.", ) + # Per-agent depth limit (stricter than global MAX_SUBWORKFLOW_DEPTH) + if agent.max_depth is not None and self._subworkflow_depth >= agent.max_depth: + raise ExecutionError( + f"Agent '{agent.name}' max_depth ({agent.max_depth}) exceeded " + f"at depth {self._subworkflow_depth}.", + suggestion="Increase max_depth or restructure to reduce nesting.", + ) + assert agent.workflow is not None # noqa: S101 if self.workflow_path is not None: @@ -622,15 +629,6 @@ async def _execute_subworkflow_with_inputs( suggestion="Check that the 'workflow' path is correct and the file exists.", ) - # Detect circular references via file path - current_path = Path(self.workflow_path).resolve() if self.workflow_path else None - if current_path is not None and sub_path == current_path: - raise ExecutionError( - f"Circular sub-workflow reference: agent '{agent.name}' " - f"references its own workflow file '{agent.workflow}'.", - suggestion="A workflow cannot reference itself as a sub-workflow.", - ) - try: sub_config = load_config(sub_path) except Exception as exc: diff --git a/tests/test_engine/test_subworkflow.py b/tests/test_engine/test_subworkflow.py index 6ae8c13..51625e6 100644 --- a/tests/test_engine/test_subworkflow.py +++ b/tests/test_engine/test_subworkflow.py @@ -268,10 +268,79 @@ async def test_subworkflow_file_not_found(self, tmp_workflow_dir: Path) -> None: await engine.run({}) @pytest.mark.asyncio - async def test_self_referencing_workflow(self, tmp_workflow_dir: Path) -> None: - """Test that a workflow referencing itself raises ExecutionError.""" + async def test_self_referencing_workflow_hits_depth_limit(self, tmp_workflow_dir: Path) -> None: + """Test that a self-referencing workflow is allowed but bounded by depth limit.""" + # Write a real self-referencing workflow YAML parent_path = tmp_workflow_dir / "parent.yaml" - parent_path.write_text("dummy", encoding="utf-8") + _write_yaml( + parent_path, + """\ + workflow: + name: self-ref + entry_point: sub_wf + runtime: + provider: copilot + limits: + max_iterations: 50 + agents: + - name: sub_wf + type: workflow + workflow: parent.yaml + routes: + - to: "$end" + output: {} + """, + ) + + config = WorkflowConfig( + workflow=WorkflowDef( + name="parent", + entry_point="sub_wf", + runtime=RuntimeConfig(provider="copilot"), + context=ContextConfig(mode="accumulate"), + limits=LimitsConfig(max_iterations=10), + ), + agents=[ + AgentDef( + name="sub_wf", + type="workflow", + workflow="parent.yaml", + routes=[RouteDef(to="$end")], + ), + ], + ) + + mock_provider = MagicMock() + engine = WorkflowEngine(config, mock_provider, workflow_path=parent_path) + + # Self-reference is now allowed but will hit depth limit + with pytest.raises(ExecutionError, match="depth limit exceeded"): + await engine.run({}) + + @pytest.mark.asyncio + async def test_max_depth_per_agent(self, tmp_workflow_dir: Path) -> None: + """Test that per-agent max_depth is enforced before global limit.""" + parent_path = tmp_workflow_dir / "parent.yaml" + _write_yaml( + parent_path, + """\ + workflow: + name: self-ref + entry_point: sub_wf + runtime: + provider: copilot + limits: + max_iterations: 50 + agents: + - name: sub_wf + type: workflow + workflow: parent.yaml + max_depth: 2 + routes: + - to: "$end" + output: {} + """, + ) config = WorkflowConfig( workflow=WorkflowDef( @@ -286,6 +355,7 @@ async def test_self_referencing_workflow(self, tmp_workflow_dir: Path) -> None: name="sub_wf", type="workflow", workflow="parent.yaml", + max_depth=2, routes=[RouteDef(to="$end")], ), ], @@ -294,7 +364,7 @@ async def test_self_referencing_workflow(self, tmp_workflow_dir: Path) -> None: mock_provider = MagicMock() engine = WorkflowEngine(config, mock_provider, workflow_path=parent_path) - with pytest.raises(ExecutionError, match="Circular sub-workflow reference"): + with pytest.raises(ExecutionError, match="max_depth.*exceeded"): await engine.run({}) From c96a05f27994b159788d35bf516e03910754ac37 Mon Sep 17 00:00:00 2001 From: Daniel Green Date: Tue, 21 Apr 2026 14:15:51 -0700 Subject: [PATCH 4/5] style: ruff format workflow.py Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- src/conductor/engine/workflow.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/conductor/engine/workflow.py b/src/conductor/engine/workflow.py index a6e34cb..16444e8 100644 --- a/src/conductor/engine/workflow.py +++ b/src/conductor/engine/workflow.py @@ -2688,9 +2688,7 @@ async def execute_single_item(item: Any, index: int, key: str) -> tuple[str, Any else: wf_ctx = agent_context.get("workflow", {}) sub_inputs = ( - dict(wf_ctx.get("input", {})) - if isinstance(wf_ctx, dict) - else {} + dict(wf_ctx.get("input", {})) if isinstance(wf_ctx, dict) else {} ) # Execute sub-workflow From 7b57808752dc4c783e079b9983cd5e9d76070a56 Mon Sep 17 00:00:00 2001 From: Daniel Green Date: Tue, 28 Apr 2026 09:55:16 -0700 Subject: [PATCH 5/5] fix(composition): address PR review feedback on input_mapping - Remove json.loads coercion from input_mapping rendering (values stay as strings; use {{ expr | json }} filter for structured data) - Add error context identifying which input_mapping key failed to render - Fix falsy check: use 'is not None' instead of truthiness for input_mapping - Deduplicate _execute_subworkflow_with_inputs into _execute_subworkflow with an optional sub_inputs parameter Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- src/conductor/engine/workflow.py | 152 +++++++++---------------------- 1 file changed, 44 insertions(+), 108 deletions(-) diff --git a/src/conductor/engine/workflow.py b/src/conductor/engine/workflow.py index 16444e8..157c5ee 100644 --- a/src/conductor/engine/workflow.py +++ b/src/conductor/engine/workflow.py @@ -483,16 +483,20 @@ async def _execute_subworkflow( self, agent: AgentDef, context: dict[str, Any], + *, + sub_inputs: dict[str, Any] | None = None, ) -> dict[str, Any]: """Execute a sub-workflow as a black-box step. Loads the referenced workflow YAML, creates a child WorkflowEngine, - and runs it with the parent agent's context as input. The sub-workflow's - final output is returned as the agent's output. + and runs it. When ``sub_inputs`` are provided (e.g. from for_each with + pre-rendered input_mapping), they are used directly; otherwise inputs + are derived from ``context`` via input_mapping or forwarded workflow.input. Args: agent: Workflow agent definition with ``workflow`` path. context: Workflow context for template rendering (used as sub-workflow input). + sub_inputs: Optional pre-built inputs that bypass input_mapping rendering. Returns: The sub-workflow's final output dict. @@ -543,25 +547,31 @@ async def _execute_subworkflow( suggestion="Check the sub-workflow YAML for syntax or validation errors.", ) from exc - # Build sub-workflow inputs from the parent context - sub_inputs: dict[str, Any] - if agent.input_mapping: - # Dynamic inputs: render each Jinja2 expression against parent context - renderer = TemplateRenderer() - sub_inputs = {} - for key, template_expr in agent.input_mapping.items(): - rendered = renderer.render(template_expr, context) - # Attempt to parse rendered values as JSON for non-string types - try: - sub_inputs[key] = json.loads(rendered) - except (json.JSONDecodeError, ValueError): - sub_inputs[key] = rendered - else: - # Default: forward parent's workflow.input.* values - workflow_ctx = context.get("workflow", {}) - sub_inputs = ( - dict(workflow_ctx.get("input", {})) if isinstance(workflow_ctx, dict) else {} - ) + # Build sub-workflow inputs — use pre-built inputs if provided, + # otherwise render from input_mapping or forward workflow.input. + if sub_inputs is None: + if agent.input_mapping is not None: + # Dynamic inputs: render each Jinja2 expression against parent context. + # Values are always passed as strings — use {{ expr | json }} filter + # if structured data is needed. + renderer = TemplateRenderer() + sub_inputs = {} + for key, template_expr in agent.input_mapping.items(): + try: + sub_inputs[key] = renderer.render(template_expr, context) + except Exception as e: + raise ExecutionError( + f"Failed to render input_mapping key '{key}' for agent " + f"'{agent.name}': {e}", + suggestion=f"Check that the expression '{template_expr}' " + "references valid context variables.", + ) from e + else: + # Default: forward parent's workflow.input.* values + workflow_ctx = context.get("workflow", {}) + sub_inputs = ( + dict(workflow_ctx.get("input", {})) if isinstance(workflow_ctx, dict) else {} + ) # Create child engine inheriting provider/registry but with deeper depth child_engine = WorkflowEngine( @@ -579,80 +589,6 @@ async def _execute_subworkflow( return await child_engine.run(sub_inputs) - async def _execute_subworkflow_with_inputs( - self, - agent: AgentDef, - sub_inputs: dict[str, Any], - ) -> dict[str, Any]: - """Execute a sub-workflow with pre-built inputs. - - Like _execute_subworkflow but accepts explicit inputs instead of - extracting them from context. Used by for_each groups where - input_mapping has already been rendered with loop variables. - - Args: - agent: Workflow agent definition with ``workflow`` path. - sub_inputs: Pre-built input dict for the sub-workflow. - - Returns: - The sub-workflow's final output dict. - """ - from conductor.config.loader import load_config - - if self._subworkflow_depth >= MAX_SUBWORKFLOW_DEPTH: - raise ExecutionError( - f"Sub-workflow depth limit exceeded ({MAX_SUBWORKFLOW_DEPTH}). " - f"Agent '{agent.name}' cannot invoke sub-workflow '{agent.workflow}'.", - suggestion="Check for circular sub-workflow references or reduce nesting depth.", - ) - - # Per-agent depth limit (stricter than global MAX_SUBWORKFLOW_DEPTH) - if agent.max_depth is not None and self._subworkflow_depth >= agent.max_depth: - raise ExecutionError( - f"Agent '{agent.name}' max_depth ({agent.max_depth}) exceeded " - f"at depth {self._subworkflow_depth}.", - suggestion="Increase max_depth or restructure to reduce nesting.", - ) - - assert agent.workflow is not None # noqa: S101 - - if self.workflow_path is not None: - base_dir = Path(self.workflow_path).resolve().parent - else: - base_dir = Path.cwd() - - sub_path = (base_dir / agent.workflow).resolve() - - if not sub_path.exists(): - raise ExecutionError( - f"Sub-workflow file not found: {sub_path} (referenced by agent '{agent.name}')", - suggestion="Check that the 'workflow' path is correct and the file exists.", - ) - - try: - sub_config = load_config(sub_path) - except Exception as exc: - raise ExecutionError( - f"Failed to load sub-workflow '{sub_path}' " - f"(referenced by agent '{agent.name}'): {exc}", - suggestion="Check the sub-workflow YAML for syntax or validation errors.", - ) from exc - - child_engine = WorkflowEngine( - config=sub_config, - provider=self._single_provider, - registry=self._registry, - skip_gates=self.skip_gates, - workflow_path=sub_path, - interrupt_event=self._interrupt_event, - event_emitter=self._event_emitter, - keyboard_listener=self._keyboard_listener, - web_dashboard=self._web_dashboard, - _subworkflow_depth=self._subworkflow_depth + 1, - ) - - return await child_engine.run(sub_inputs) - def _get_context_window_for_agent(self, agent: AgentDef) -> int | None: """Return the context window size for an agent's model.""" from conductor.engine.pricing import get_pricing @@ -2676,20 +2612,20 @@ async def execute_single_item(item: Any, index: int, key: str) -> tuple[str, Any # Execute agent — sub-workflow or regular if for_each_group.agent.type == "workflow": # Build sub-workflow inputs from input_mapping with loop vars - if for_each_group.agent.input_mapping: + sub_inputs: dict[str, Any] | None = None + if for_each_group.agent.input_mapping is not None: renderer = TemplateRenderer() - sub_inputs: dict[str, Any] = {} + sub_inputs = {} for k, tmpl in for_each_group.agent.input_mapping.items(): - rendered = renderer.render(tmpl, agent_context) try: - sub_inputs[k] = json.loads(rendered) - except (json.JSONDecodeError, ValueError): - sub_inputs[k] = rendered - else: - wf_ctx = agent_context.get("workflow", {}) - sub_inputs = ( - dict(wf_ctx.get("input", {})) if isinstance(wf_ctx, dict) else {} - ) + sub_inputs[k] = renderer.render(tmpl, agent_context) + except Exception as e: + raise ExecutionError( + f"Failed to render input_mapping key '{k}' for " + f"for_each agent '{for_each_group.name}': {e}", + suggestion=f"Check that the expression '{tmpl}' " + "references valid context variables.", + ) from e # Execute sub-workflow self._emit( @@ -2700,8 +2636,8 @@ async def execute_single_item(item: Any, index: int, key: str) -> tuple[str, Any "workflow": for_each_group.agent.workflow, }, ) - output_content = await self._execute_subworkflow_with_inputs( - for_each_group.agent, sub_inputs + output_content = await self._execute_subworkflow( + for_each_group.agent, agent_context, sub_inputs=sub_inputs ) _item_elapsed = _time.time() - _item_start