Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 50 additions & 0 deletions src/conductor/config/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -475,6 +475,36 @@ class AgentDef(BaseModel):
workflow: ./research-pipeline.yaml
"""

input_mapping: dict[str, str] | None = None
"""Optional mapping of sub-workflow input names to Jinja2 expressions.

Each key is a sub-workflow input parameter name. Each value is a Jinja2
template expression evaluated against the parent workflow's context.

When present, the rendered values are passed as the sub-workflow's inputs
instead of forwarding the parent's workflow.input.* values.

Only valid for type='workflow' agents.

Example::

input_mapping:
work_item_id: "{{ task_manager.output.current_issue_id }}"
title: "{{ task_manager.output.current_issue_title }}"
"""

max_depth: int | None = Field(None, ge=1, le=10)
"""Per-agent sub-workflow depth limit.

Overrides the global MAX_SUBWORKFLOW_DEPTH (10) with a tighter bound.
Only valid for type='workflow' agents. Useful for self-referential
workflows to set an explicit recursion limit.

Example::

max_depth: 3 # Allow at most 3 levels of recursion
"""

max_session_seconds: float | None = Field(None, ge=1.0)
"""Maximum wall-clock duration for this agent's session in seconds.

Expand Down Expand Up @@ -529,6 +559,10 @@ def validate_agent_type(self) -> AgentDef:
raise ValueError("human_gate agents require 'options'")
if not self.prompt:
raise ValueError("human_gate agents require 'prompt'")
if self.input_mapping:
raise ValueError("human_gate agents cannot have 'input_mapping'")
if self.max_depth is not None:
raise ValueError("human_gate agents cannot have 'max_depth'")
elif self.type == "script":
if not self.command:
raise ValueError("script agents require 'command'")
Expand All @@ -555,6 +589,10 @@ def validate_agent_type(self) -> AgentDef:
raise ValueError("script agents cannot have 'max_agent_iterations'")
if self.retry is not None:
raise ValueError("script agents cannot have 'retry'")
if self.input_mapping:
raise ValueError("script agents cannot have 'input_mapping'")
if self.max_depth is not None:
raise ValueError("script agents cannot have 'max_depth'")
elif self.type == "workflow":
if not self.workflow:
raise ValueError("workflow agents require 'workflow' path")
Expand All @@ -578,6 +616,18 @@ def validate_agent_type(self) -> AgentDef:
raise ValueError("workflow agents cannot have 'max_agent_iterations'")
if self.retry is not None:
raise ValueError("workflow agents cannot have 'retry'")
else:
# Regular agent or human_gate — input_mapping is not valid
if self.input_mapping:
raise ValueError(
f"'{self.type or 'agent'}' agents cannot have 'input_mapping' "
"(only workflow agents support input_mapping)"
)
if self.max_depth is not None:
raise ValueError(
f"'{self.type or 'agent'}' agents cannot have 'max_depth' "
"(only workflow agents support max_depth)"
)
return self


Expand Down
7 changes: 1 addition & 6 deletions src/conductor/config/validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,18 +111,13 @@ def validate_workflow_config(config: WorkflowConfig) -> list[str]:
parallel_errors = _validate_parallel_groups(config)
errors.extend(parallel_errors)

# Validate for_each groups: reject script and workflow steps as inline agents
# Validate for_each groups: reject script steps as inline agents
for for_each_group in config.for_each:
if for_each_group.agent.type == "script":
errors.append(
f"For-each group '{for_each_group.name}' uses a script step as its "
"inline agent. Script steps cannot be used in for_each groups."
)
if for_each_group.agent.type == "workflow":
errors.append(
f"For-each group '{for_each_group.name}' uses a workflow step as its "
"inline agent. Workflow steps cannot be used in for_each groups."
)

# Validate workflow output references
output_errors = _validate_output_references(
Expand Down
103 changes: 85 additions & 18 deletions src/conductor/engine/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -483,16 +483,20 @@ async def _execute_subworkflow(
self,
agent: AgentDef,
context: dict[str, Any],
*,
sub_inputs: dict[str, Any] | None = None,
) -> dict[str, Any]:
"""Execute a sub-workflow as a black-box step.

Loads the referenced workflow YAML, creates a child WorkflowEngine,
and runs it with the parent agent's context as input. The sub-workflow's
final output is returned as the agent's output.
and runs it. When ``sub_inputs`` are provided (e.g. from for_each with
pre-rendered input_mapping), they are used directly; otherwise inputs
are derived from ``context`` via input_mapping or forwarded workflow.input.

Args:
agent: Workflow agent definition with ``workflow`` path.
context: Workflow context for template rendering (used as sub-workflow input).
sub_inputs: Optional pre-built inputs that bypass input_mapping rendering.

Returns:
The sub-workflow's final output dict.
Expand All @@ -510,6 +514,14 @@ async def _execute_subworkflow(
suggestion=("Check for circular sub-workflow references or reduce nesting depth."),
)

# Per-agent depth limit (stricter than global MAX_SUBWORKFLOW_DEPTH)
if agent.max_depth is not None and self._subworkflow_depth >= agent.max_depth:
raise ExecutionError(
f"Agent '{agent.name}' max_depth ({agent.max_depth}) exceeded "
f"at depth {self._subworkflow_depth}.",
suggestion="Increase max_depth or restructure to reduce nesting.",
)

assert agent.workflow is not None # noqa: S101

# Resolve sub-workflow path relative to parent workflow file
Expand All @@ -526,15 +538,6 @@ async def _execute_subworkflow(
suggestion="Check that the 'workflow' path is correct and the file exists.",
)

# Detect circular references via file path
current_path = Path(self.workflow_path).resolve() if self.workflow_path else None
if current_path is not None and sub_path == current_path:
raise ExecutionError(
f"Circular sub-workflow reference: agent '{agent.name}' "
f"references its own workflow file '{agent.workflow}'.",
suggestion="A workflow cannot reference itself as a sub-workflow.",
)

try:
sub_config = load_config(sub_path)
except Exception as exc:
Expand All @@ -544,12 +547,31 @@ async def _execute_subworkflow(
suggestion="Check the sub-workflow YAML for syntax or validation errors.",
) from exc

# Build sub-workflow inputs from the parent context
# Extract workflow.input.* values from the parent context
workflow_ctx = context.get("workflow", {})
sub_inputs: dict[str, Any] = (
dict(workflow_ctx.get("input", {})) if isinstance(workflow_ctx, dict) else {}
)
# Build sub-workflow inputs — use pre-built inputs if provided,
# otherwise render from input_mapping or forward workflow.input.
if sub_inputs is None:
if agent.input_mapping is not None:
# Dynamic inputs: render each Jinja2 expression against parent context.
# Values are always passed as strings — use {{ expr | json }} filter
# if structured data is needed.
renderer = TemplateRenderer()
sub_inputs = {}
for key, template_expr in agent.input_mapping.items():
try:
sub_inputs[key] = renderer.render(template_expr, context)
except Exception as e:
raise ExecutionError(
f"Failed to render input_mapping key '{key}' for agent "
f"'{agent.name}': {e}",
suggestion=f"Check that the expression '{template_expr}' "
"references valid context variables.",
) from e
else:
# Default: forward parent's workflow.input.* values
workflow_ctx = context.get("workflow", {})
sub_inputs = (
dict(workflow_ctx.get("input", {})) if isinstance(workflow_ctx, dict) else {}
)

# Create child engine inheriting provider/registry but with deeper depth
child_engine = WorkflowEngine(
Expand Down Expand Up @@ -2587,7 +2609,52 @@ async def execute_single_item(item: Any, index: int, key: str) -> tuple[str, Any
key if for_each_group.key_by else None,
)

# Execute agent with injected context (get executor for multi-provider)
# Execute agent — sub-workflow or regular
if for_each_group.agent.type == "workflow":
# Build sub-workflow inputs from input_mapping with loop vars
sub_inputs: dict[str, Any] | None = None
if for_each_group.agent.input_mapping is not None:
renderer = TemplateRenderer()
sub_inputs = {}
for k, tmpl in for_each_group.agent.input_mapping.items():
try:
sub_inputs[k] = renderer.render(tmpl, agent_context)
except Exception as e:
raise ExecutionError(
f"Failed to render input_mapping key '{k}' for "
f"for_each agent '{for_each_group.name}': {e}",
suggestion=f"Check that the expression '{tmpl}' "
"references valid context variables.",
) from e

# Execute sub-workflow
self._emit(
"subworkflow_started",
{
"agent_name": for_each_group.name,
"item_key": key,
"workflow": for_each_group.agent.workflow,
},
)
output_content = await self._execute_subworkflow(
for_each_group.agent, agent_context, sub_inputs=sub_inputs
)
_item_elapsed = _time.time() - _item_start

self._emit(
"for_each_item_completed",
{
"group_name": for_each_group.name,
"item_key": key,
"elapsed": _item_elapsed,
"tokens": 0,
"cost_usd": 0.0,
"output": output_content,
},
)
return (key, output_content)

# Regular agent execution
executor = await self._get_executor_for_agent(for_each_group.agent)

# Item-scoped event callback that tags all streaming events with item_key
Expand Down
64 changes: 60 additions & 4 deletions tests/test_config/test_workflow_type_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,8 +201,8 @@ def test_workflow_in_parallel_group_raises(self) -> None:
class TestWorkflowInForEach:
"""Tests for workflow agents in for_each groups."""

def test_workflow_in_for_each_raises(self) -> None:
"""Test that workflow step in for_each inline agent raises ConfigurationError."""
def test_workflow_in_for_each_validates(self) -> None:
"""Test that workflow step in for_each inline agent validates successfully."""
config = WorkflowConfig(
workflow=WorkflowDef(
name="test",
Expand All @@ -227,8 +227,9 @@ def test_workflow_in_for_each_raises(self) -> None:
),
],
)
with pytest.raises(ConfigurationError, match="Workflow steps cannot be used in for_each"):
validate_workflow_config(config)
# Should not raise — workflow agents are now allowed in for_each
warnings = validate_workflow_config(config)
assert isinstance(warnings, list)


class TestWorkflowWorkflowConfig:
Expand Down Expand Up @@ -284,3 +285,58 @@ def test_workflow_with_routes_to_agents(self) -> None:
# Should not raise
warnings = validate_workflow_config(config)
assert isinstance(warnings, list)


class TestInputMapping:
"""Tests for input_mapping on workflow agents."""

def test_valid_input_mapping(self) -> None:
"""Test that input_mapping is accepted on workflow agents."""
agent = AgentDef(
name="sub_wf",
type="workflow",
workflow="./sub.yaml",
input_mapping={
"work_item_id": "{{ intake.output.epic_id }}",
"title": "{{ intake.output.epic_title }}",
},
)
assert agent.input_mapping is not None
assert len(agent.input_mapping) == 2

def test_workflow_without_input_mapping(self) -> None:
"""Test that workflow agents work without input_mapping (backward compat)."""
agent = AgentDef(name="sub_wf", type="workflow", workflow="./sub.yaml")
assert agent.input_mapping is None

def test_input_mapping_on_regular_agent_raises(self) -> None:
"""Test that input_mapping on a regular agent raises ValidationError."""
with pytest.raises(ValidationError, match="input_mapping"):
AgentDef(
name="regular",
prompt="do something",
input_mapping={"key": "{{ value }}"},
)

def test_input_mapping_on_human_gate_raises(self) -> None:
"""Test that input_mapping on a human_gate raises ValidationError."""
with pytest.raises(ValidationError, match="input_mapping"):
AgentDef(
name="gate",
type="human_gate",
prompt="Choose",
options=[
GateOption(label="Yes", value="yes", route="next"),
],
input_mapping={"key": "{{ value }}"},
)

def test_input_mapping_on_script_raises(self) -> None:
"""Test that input_mapping on a script agent raises ValidationError."""
with pytest.raises(ValidationError, match="input_mapping"):
AgentDef(
name="script",
type="script",
command="echo hi",
input_mapping={"key": "{{ value }}"},
)
Loading
Loading