Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,27 @@ cd conductor
make dev
```

### Windows

On Windows, use `uv` directly instead of `make`:

```powershell
uv sync --all-extras # instead of make dev
uv run pytest tests/ # instead of make test
uv run ruff check . # instead of make lint
uv run ruff format . # instead of make format
```

**Copilot CLI path:** Windows `subprocess` cannot resolve `.bat`/`.ps1` wrappers by name alone. If you see `[WinError 2] The system cannot find the file specified` when running workflows, set the full path to the Copilot CLI:

```powershell
# Find your copilot CLI
Get-Command copilot* | Format-Table Name, Source

# Set the path (use the .cmd variant from npm)
$env:COPILOT_CLI_PATH = "C:\Users\<you>\AppData\Roaming\npm\copilot.cmd"
```

### Common Commands

```bash
Expand Down
36 changes: 27 additions & 9 deletions examples/for-each-simple.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,17 @@
# This example demonstrates dynamic parallel execution with for-each groups.
# It shows:
# - Processing variable-length arrays with for-each
# - Optional workflow inputs with fallback behavior
# - Loop variables ({{ item }}, {{ _index }}, {{ _key }})
# - Index-based output aggregation
# - Batched parallel execution
# - Accessing for-each outputs in downstream agents
#
# Usage:
# # With explicit items:
# conductor run examples/for-each-simple.yaml --input items='["apple", "banana", "cherry"]'
#
# # Without items (agent generates random topics):
# conductor run examples/for-each-simple.yaml

workflow:
Expand All @@ -24,13 +29,20 @@ workflow:
limits:
max_iterations: 20

# Optional input: if provided, items are used directly; otherwise generated
input:
items:
type: string
required: false
description: JSON array of items to process (e.g. '["apple", "banana"]')

# For-each group definition
for_each:
- name: item_processors
type: for_each
description: Process each item found by the item_finder

source: item_finder.output.items # Reference to array in context
source: item_finder.output.topics # Reference to array in context
as: item # Loop variable name
max_concurrent: 3 # Process 3 items at a time
failure_mode: continue_on_error # Continue even if some items fail
Expand Down Expand Up @@ -59,19 +71,25 @@ for_each:
- to: aggregator

agents:
# Agent 1: Find items to process
# Agent 1: Resolve items to process (pass-through or generate)
- name: item_finder
description: Finds a list of items to process
description: Returns items to process — uses provided input or generates random topics
model: claude-sonnet-4.5
prompt: |
{% if workflow.input.items %}
The user provided the following items to process. Return them exactly
as your topics array without modification:
{{ workflow.input.items }}
{% else %}
Generate a list of 5 interesting programming concepts to analyze.

Return them as a simple array of strings.
Examples: "functional programming", "type systems", "concurrency"
{% endif %}
output:
items:
topics:
type: array
description: List of items (strings) to process
description: List of topics (strings) to process
routes:
- to: item_processors

Expand All @@ -85,9 +103,9 @@ agents:
prompt: |
Summarize the batch processing results:

Original Items ({{ item_finder.output.items | length }}):
{% for item in item_finder.output.items %}
{{ loop.index }}. {{ item }}
Original Items ({{ item_finder.output.topics | length }}):
{% for topic in item_finder.output.topics %}
{{ loop.index }}. {{ topic }}
{% endfor %}

Successfully Processed ({{ item_processors.outputs | length }}):
Expand Down Expand Up @@ -125,7 +143,7 @@ output:
summary: "{{ aggregator.output.summary }}"
insights: "{{ aggregator.output.insights | json }}"
success_rate: "{{ aggregator.output.success_rate }}"
total_items: "{{ item_finder.output.items | length }}"
total_items: "{{ item_finder.output.topics | length }}"
processed: "{{ item_processors.outputs | length }}"
failed: "{% if item_processors.errors is defined %}{{ item_processors.errors | length }}{% else %}0{% endif %}"

111 changes: 100 additions & 11 deletions src/conductor/engine/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -1784,19 +1784,18 @@ def _find_for_each_group(self, name: str) -> ForEachDef | None:
def _resolve_array_reference(self, source: str) -> list[Any]:
"""Resolve a source reference to a runtime array from workflow context.

Navigates dotted path notation to extract an array from agent outputs.
Handles the same wrapping logic as build_for_agent (regular agents are
wrapped with {"output": ...}, parallel/for-each groups are stored directly).
Navigates dotted path notation to extract an array from agent outputs
or workflow inputs. Handles the same wrapping logic as build_for_agent
(regular agents are wrapped with {"output": ...}, parallel/for-each
groups are stored directly).

Example:
source = "finder.output.kpis"
1. Lookup agent_outputs["finder"]
2. Wrap with {"output": ...} if not a parallel/for-each group
3. Navigate to ["output"]["kpis"]
4. Return the array value
Supports two reference styles:
- Agent output: ``finder.output.kpis`` → agent_outputs["finder"]["output"]["kpis"]
- Workflow input: ``workflow.input.items`` → workflow_inputs["items"]

Args:
source: Dotted path reference (e.g., 'finder.output.kpis').
source: Dotted path reference (e.g., 'finder.output.kpis'
or 'workflow.input.items').

Returns:
The resolved array (list).
Expand All @@ -1809,9 +1808,16 @@ def _resolve_array_reference(self, source: str) -> list[Any]:
if len(parts) < 3:
raise ExecutionError(
f"Invalid source reference format: '{source}'",
suggestion="Source must have at least 3 parts (e.g., 'agent_name.output.field')",
suggestion=(
"Source must have at least 3 parts "
"(e.g., 'agent_name.output.field' or 'workflow.input.field')"
),
)

# Handle workflow.input.* references
if parts[0] == "workflow" and parts[1] == "input":
return self._resolve_workflow_input_array(source, parts[2:])

# First part is the agent name
agent_name = parts[0]

Expand Down Expand Up @@ -1882,6 +1888,89 @@ def _resolve_array_reference(self, source: str) -> list[Any]:

return current

def _resolve_workflow_input_array(self, source: str, field_parts: list[str]) -> list[Any]:
"""Resolve a workflow.input.* reference to a runtime array.

Navigates into ``self.context.workflow_inputs`` using the remaining
dotted path segments after ``workflow.input``.

Args:
source: The full dotted source string (for error messages).
field_parts: Path segments after ``workflow.input``
(e.g., ``["items"]`` for ``workflow.input.items``).

Returns:
The resolved array (list).

Raises:
ExecutionError: If the path doesn't exist or value is not an array.
"""
if not field_parts:
raise ExecutionError(
f"Invalid source reference: '{source}'",
suggestion="workflow.input references need a field name "
"(e.g., 'workflow.input.items')",
)

current: Any = self.context.workflow_inputs
path_traversed = ["workflow", "input"]

for part in field_parts:
path_traversed.append(part)

if not isinstance(current, dict):
parent_path = ".".join(path_traversed[:-1])
raise ExecutionError(
f"Cannot navigate to '{part}' in source '{source}': "
f"'{parent_path}' is not a dictionary (type: {type(current).__name__})",
suggestion=f"Check that '{parent_path}' returns a dictionary structure",
)

if part not in current:
parent_path = ".".join(path_traversed[:-1])
available_keys = list(current.keys()) if isinstance(current, dict) else []
raise ExecutionError(
f"Field '{part}' not found in '{parent_path}' for source '{source}'",
suggestion=(
f"Available keys: {available_keys}"
if available_keys
else "Check the workflow input parameters"
),
)

current = current[part]

# Handle JSON string inputs (CLI passes arrays as strings)
if isinstance(current, str):
import json as _json

try:
parsed = _json.loads(current)
except (ValueError, TypeError):
raise ExecutionError(
f"Source '{source}' resolved to a string that is not valid JSON: "
f"{current!r}",
suggestion="Ensure the input is a JSON array string "
"(e.g., --input items='[\"a\", \"b\"]')",
)
if not isinstance(parsed, list):
raise ExecutionError(
f"Source '{source}' parsed from JSON string but got "
f"{type(parsed).__name__}, expected array",
suggestion="Ensure the input is a JSON array "
"(e.g., --input items='[\"a\", \"b\"]')",
)
return parsed

if not isinstance(current, (list, tuple)):
raise ExecutionError(
f"Source '{source}' resolved to {type(current).__name__}, "
f"expected list or tuple",
suggestion=f"Ensure '{source}' contains an array value",
)

return list(current)

def _inject_loop_variables(
self,
context: dict[str, Any],
Expand Down
26 changes: 25 additions & 1 deletion src/conductor/executor/template.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,30 @@
from conductor.exceptions import TemplateError


class _DictSafeEnvironment(Environment):
"""Jinja2 Environment that prefers dict key lookup over attribute access.

The default Jinja2 ``getattr`` tries Python ``getattr()`` first, then
``__getitem__``. This means ``some_dict.items`` resolves to the
``dict.items`` **method** rather than ``some_dict["items"]``, which
breaks templates like ``{{ agent.output.items | length }}``.

This subclass reverses the priority for ``dict`` objects so that key
lookup is attempted first — matching the intuitive expectation for
YAML-authored workflow templates.
"""

def getattr(self, obj: Any, attribute: str) -> Any: # noqa: ANN401
"""Get *attribute* from *obj*, preferring dict keys for dicts."""
if isinstance(obj, dict):
try:
return obj[attribute]
except KeyError:
pass
# Fall back to the default resolution (getattr → getitem → undefined)
return super().getattr(obj, attribute)


class TemplateRenderer:
"""Jinja2-based template renderer for prompts and expressions.

Expand All @@ -31,7 +55,7 @@ class TemplateRenderer:

def __init__(self) -> None:
"""Initialize the template renderer with Jinja2 environment."""
self.env = Environment(
self.env = _DictSafeEnvironment(
loader=BaseLoader(),
undefined=StrictUndefined, # Fail fast on missing variables
autoescape=False, # No HTML escaping for prompts
Expand Down
Loading