From 81302a1c0d57e5de7a4fa4589f4a252ead9871b6 Mon Sep 17 00:00:00 2001 From: Shannon Date: Wed, 4 Mar 2026 19:09:29 -0700 Subject: [PATCH 1/3] Fix template rendering error when dict keys shadow built-in methods Jinja2's default attribute resolution tries Python getattr() before dict key lookup. When a dict has a key named 'items' (or 'keys', 'values', etc.), getattr(d, 'items') returns the dict.items method instead of d['items'] (the stored value). Filters like '| length' then call len() on the method object, producing: Template rendering failed: object of type 'builtin_function_or_method' has no len() Fix: Subclass Jinja2 Environment with _DictSafeEnvironment that prefers dict key lookup over attribute access for dict objects. This resolves the collision for any field name that matches a dict method. Also rename the 'items' output field in for-each-simple.yaml to 'topics' to avoid the confusing collision in the example. --- examples/for-each-simple.yaml | 12 ++++++------ src/conductor/executor/template.py | 26 +++++++++++++++++++++++++- 2 files changed, 31 insertions(+), 7 deletions(-) diff --git a/examples/for-each-simple.yaml b/examples/for-each-simple.yaml index 646f3be..aa5b7ed 100644 --- a/examples/for-each-simple.yaml +++ b/examples/for-each-simple.yaml @@ -30,7 +30,7 @@ for_each: type: for_each description: Process each item found by the item_finder - source: item_finder.output.items # Reference to array in context + source: item_finder.output.topics # Reference to array in context as: item # Loop variable name max_concurrent: 3 # Process 3 items at a time failure_mode: continue_on_error # Continue even if some items fail @@ -69,9 +69,9 @@ agents: Return them as a simple array of strings. Examples: "functional programming", "type systems", "concurrency" output: - items: + topics: type: array - description: List of items (strings) to process + description: List of topics (strings) to process routes: - to: item_processors @@ -85,8 +85,8 @@ agents: prompt: | Summarize the batch processing results: - Original Items ({{ item_finder.output.items | length }}): - {% for item in item_finder.output.items %} + Original Items ({{ item_finder.output.topics | length }}): + {% for item in item_finder.output.topics %} {{ loop.index }}. {{ item }} {% endfor %} @@ -125,7 +125,7 @@ output: summary: "{{ aggregator.output.summary }}" insights: "{{ aggregator.output.insights | json }}" success_rate: "{{ aggregator.output.success_rate }}" - total_items: "{{ item_finder.output.items | length }}" + total_items: "{{ item_finder.output.topics | length }}" processed: "{{ item_processors.outputs | length }}" failed: "{% if item_processors.errors is defined %}{{ item_processors.errors | length }}{% else %}0{% endif %}" diff --git a/src/conductor/executor/template.py b/src/conductor/executor/template.py index c45cdc0..e16f27f 100644 --- a/src/conductor/executor/template.py +++ b/src/conductor/executor/template.py @@ -15,6 +15,30 @@ from conductor.exceptions import TemplateError +class _DictSafeEnvironment(Environment): + """Jinja2 Environment that prefers dict key lookup over attribute access. + + The default Jinja2 ``getattr`` tries Python ``getattr()`` first, then + ``__getitem__``. This means ``some_dict.items`` resolves to the + ``dict.items`` **method** rather than ``some_dict["items"]``, which + breaks templates like ``{{ agent.output.items | length }}``. + + This subclass reverses the priority for ``dict`` objects so that key + lookup is attempted first — matching the intuitive expectation for + YAML-authored workflow templates. + """ + + def getattr(self, obj: Any, attribute: str) -> Any: # noqa: ANN401 + """Get *attribute* from *obj*, preferring dict keys for dicts.""" + if isinstance(obj, dict): + try: + return obj[attribute] + except KeyError: + pass + # Fall back to the default resolution (getattr → getitem → undefined) + return super().getattr(obj, attribute) + + class TemplateRenderer: """Jinja2-based template renderer for prompts and expressions. @@ -31,7 +55,7 @@ class TemplateRenderer: def __init__(self) -> None: """Initialize the template renderer with Jinja2 environment.""" - self.env = Environment( + self.env = _DictSafeEnvironment( loader=BaseLoader(), undefined=StrictUndefined, # Fail fast on missing variables autoescape=False, # No HTML escaping for prompts From d1650fdc0ee40805839834e696367862815d0538 Mon Sep 17 00:00:00 2001 From: Shannon Date: Thu, 5 Mar 2026 09:27:50 -0700 Subject: [PATCH 2/3] feat(engine): support workflow.input.* as for-each source Add _resolve_workflow_input_array to handle workflow.input.* references in for-each source fields, with automatic JSON string parsing for CLI --input values. Update for-each-simple.yaml to accept optional --input items array with fallback to agent-generated topics when not provided. Add 9 tests covering workflow.input source resolution. --- examples/for-each-simple.yaml | 26 +++++-- src/conductor/engine/workflow.py | 111 ++++++++++++++++++++++++++--- tests/test_engine/test_workflow.py | 100 ++++++++++++++++++++++++++ 3 files changed, 222 insertions(+), 15 deletions(-) diff --git a/examples/for-each-simple.yaml b/examples/for-each-simple.yaml index aa5b7ed..9937bbc 100644 --- a/examples/for-each-simple.yaml +++ b/examples/for-each-simple.yaml @@ -3,12 +3,17 @@ # This example demonstrates dynamic parallel execution with for-each groups. # It shows: # - Processing variable-length arrays with for-each +# - Optional workflow inputs with fallback behavior # - Loop variables ({{ item }}, {{ _index }}, {{ _key }}) # - Index-based output aggregation # - Batched parallel execution # - Accessing for-each outputs in downstream agents # # Usage: +# # With explicit items: +# conductor run examples/for-each-simple.yaml --input items='["apple", "banana", "cherry"]' +# +# # Without items (agent generates random topics): # conductor run examples/for-each-simple.yaml workflow: @@ -24,6 +29,13 @@ workflow: limits: max_iterations: 20 +# Optional input: if provided, items are used directly; otherwise generated +input: + items: + type: string + required: false + description: JSON array of items to process (e.g. '["apple", "banana"]') + # For-each group definition for_each: - name: item_processors @@ -59,15 +71,21 @@ for_each: - to: aggregator agents: - # Agent 1: Find items to process + # Agent 1: Resolve items to process (pass-through or generate) - name: item_finder - description: Finds a list of items to process + description: Returns items to process — uses provided input or generates random topics model: claude-sonnet-4.5 prompt: | + {% if workflow.input.items %} + The user provided the following items to process. Return them exactly + as your topics array without modification: + {{ workflow.input.items }} + {% else %} Generate a list of 5 interesting programming concepts to analyze. Return them as a simple array of strings. Examples: "functional programming", "type systems", "concurrency" + {% endif %} output: topics: type: array @@ -86,8 +104,8 @@ agents: Summarize the batch processing results: Original Items ({{ item_finder.output.topics | length }}): - {% for item in item_finder.output.topics %} - {{ loop.index }}. {{ item }} + {% for topic in item_finder.output.topics %} + {{ loop.index }}. {{ topic }} {% endfor %} Successfully Processed ({{ item_processors.outputs | length }}): diff --git a/src/conductor/engine/workflow.py b/src/conductor/engine/workflow.py index 86195e5..d523393 100644 --- a/src/conductor/engine/workflow.py +++ b/src/conductor/engine/workflow.py @@ -1784,19 +1784,18 @@ def _find_for_each_group(self, name: str) -> ForEachDef | None: def _resolve_array_reference(self, source: str) -> list[Any]: """Resolve a source reference to a runtime array from workflow context. - Navigates dotted path notation to extract an array from agent outputs. - Handles the same wrapping logic as build_for_agent (regular agents are - wrapped with {"output": ...}, parallel/for-each groups are stored directly). + Navigates dotted path notation to extract an array from agent outputs + or workflow inputs. Handles the same wrapping logic as build_for_agent + (regular agents are wrapped with {"output": ...}, parallel/for-each + groups are stored directly). - Example: - source = "finder.output.kpis" - 1. Lookup agent_outputs["finder"] - 2. Wrap with {"output": ...} if not a parallel/for-each group - 3. Navigate to ["output"]["kpis"] - 4. Return the array value + Supports two reference styles: + - Agent output: ``finder.output.kpis`` → agent_outputs["finder"]["output"]["kpis"] + - Workflow input: ``workflow.input.items`` → workflow_inputs["items"] Args: - source: Dotted path reference (e.g., 'finder.output.kpis'). + source: Dotted path reference (e.g., 'finder.output.kpis' + or 'workflow.input.items'). Returns: The resolved array (list). @@ -1809,9 +1808,16 @@ def _resolve_array_reference(self, source: str) -> list[Any]: if len(parts) < 3: raise ExecutionError( f"Invalid source reference format: '{source}'", - suggestion="Source must have at least 3 parts (e.g., 'agent_name.output.field')", + suggestion=( + "Source must have at least 3 parts " + "(e.g., 'agent_name.output.field' or 'workflow.input.field')" + ), ) + # Handle workflow.input.* references + if parts[0] == "workflow" and parts[1] == "input": + return self._resolve_workflow_input_array(source, parts[2:]) + # First part is the agent name agent_name = parts[0] @@ -1882,6 +1888,89 @@ def _resolve_array_reference(self, source: str) -> list[Any]: return current + def _resolve_workflow_input_array(self, source: str, field_parts: list[str]) -> list[Any]: + """Resolve a workflow.input.* reference to a runtime array. + + Navigates into ``self.context.workflow_inputs`` using the remaining + dotted path segments after ``workflow.input``. + + Args: + source: The full dotted source string (for error messages). + field_parts: Path segments after ``workflow.input`` + (e.g., ``["items"]`` for ``workflow.input.items``). + + Returns: + The resolved array (list). + + Raises: + ExecutionError: If the path doesn't exist or value is not an array. + """ + if not field_parts: + raise ExecutionError( + f"Invalid source reference: '{source}'", + suggestion="workflow.input references need a field name " + "(e.g., 'workflow.input.items')", + ) + + current: Any = self.context.workflow_inputs + path_traversed = ["workflow", "input"] + + for part in field_parts: + path_traversed.append(part) + + if not isinstance(current, dict): + parent_path = ".".join(path_traversed[:-1]) + raise ExecutionError( + f"Cannot navigate to '{part}' in source '{source}': " + f"'{parent_path}' is not a dictionary (type: {type(current).__name__})", + suggestion=f"Check that '{parent_path}' returns a dictionary structure", + ) + + if part not in current: + parent_path = ".".join(path_traversed[:-1]) + available_keys = list(current.keys()) if isinstance(current, dict) else [] + raise ExecutionError( + f"Field '{part}' not found in '{parent_path}' for source '{source}'", + suggestion=( + f"Available keys: {available_keys}" + if available_keys + else "Check the workflow input parameters" + ), + ) + + current = current[part] + + # Handle JSON string inputs (CLI passes arrays as strings) + if isinstance(current, str): + import json as _json + + try: + parsed = _json.loads(current) + except (ValueError, TypeError): + raise ExecutionError( + f"Source '{source}' resolved to a string that is not valid JSON: " + f"{current!r}", + suggestion="Ensure the input is a JSON array string " + "(e.g., --input items='[\"a\", \"b\"]')", + ) + if not isinstance(parsed, list): + raise ExecutionError( + f"Source '{source}' parsed from JSON string but got " + f"{type(parsed).__name__}, expected array", + suggestion="Ensure the input is a JSON array " + "(e.g., --input items='[\"a\", \"b\"]')", + ) + return parsed + + if not isinstance(current, (list, tuple)): + raise ExecutionError( + f"Source '{source}' resolved to {type(current).__name__}, " + f"expected list or tuple", + suggestion=f"Ensure '{source}' contains an array value", + ) + + return list(current) + def _inject_loop_variables( self, context: dict[str, Any], diff --git a/tests/test_engine/test_workflow.py b/tests/test_engine/test_workflow.py index 7c32cde..bbaeb66 100644 --- a/tests/test_engine/test_workflow.py +++ b/tests/test_engine/test_workflow.py @@ -1619,6 +1619,106 @@ def test_resolve_array_reference_accepts_tuple( assert len(result) == 3 assert result == ("item1", "item2", "item3") + def test_resolve_workflow_input_array(self, workflow_engine_with_context: WorkflowEngine): + """Test resolving an array from workflow.input.*.""" + workflow_engine_with_context.context.set_workflow_inputs( + {"items": ["apple", "banana", "cherry"]} + ) + + result = workflow_engine_with_context._resolve_array_reference("workflow.input.items") + + assert isinstance(result, list) + assert result == ["apple", "banana", "cherry"] + + def test_resolve_workflow_input_json_string(self, workflow_engine_with_context: WorkflowEngine): + """Test resolving a JSON string array from workflow.input.* (CLI passes strings).""" + workflow_engine_with_context.context.set_workflow_inputs( + {"items": '["apple", "banana", "cherry"]'} + ) + + result = workflow_engine_with_context._resolve_array_reference("workflow.input.items") + + assert isinstance(result, list) + assert result == ["apple", "banana", "cherry"] + + def test_resolve_workflow_input_nested(self, workflow_engine_with_context: WorkflowEngine): + """Test resolving a nested path from workflow.input.*.""" + workflow_engine_with_context.context.set_workflow_inputs( + {"config": {"tasks": ["task1", "task2"]}} + ) + + result = workflow_engine_with_context._resolve_array_reference( + "workflow.input.config.tasks" + ) + + assert isinstance(result, list) + assert result == ["task1", "task2"] + + def test_resolve_workflow_input_missing_field( + self, workflow_engine_with_context: WorkflowEngine + ): + """Test error when workflow input field doesn't exist.""" + workflow_engine_with_context.context.set_workflow_inputs({"other": "value"}) + + with pytest.raises(ExecutionError) as exc_info: + workflow_engine_with_context._resolve_array_reference("workflow.input.items") + + assert "Field 'items' not found" in str(exc_info.value) + assert "Available keys:" in str(exc_info.value.suggestion) + + def test_resolve_workflow_input_not_array(self, workflow_engine_with_context: WorkflowEngine): + """Test error when workflow input value is not an array.""" + workflow_engine_with_context.context.set_workflow_inputs({"items": 42}) + + with pytest.raises(ExecutionError) as exc_info: + workflow_engine_with_context._resolve_array_reference("workflow.input.items") + + assert "resolved to int" in str(exc_info.value) + + def test_resolve_workflow_input_invalid_json_string( + self, workflow_engine_with_context: WorkflowEngine + ): + """Test error when workflow input string is not valid JSON.""" + workflow_engine_with_context.context.set_workflow_inputs({"items": "not json at all"}) + + with pytest.raises(ExecutionError) as exc_info: + workflow_engine_with_context._resolve_array_reference("workflow.input.items") + + assert "not valid JSON" in str(exc_info.value) + + def test_resolve_workflow_input_json_string_not_array( + self, workflow_engine_with_context: WorkflowEngine + ): + """Test error when workflow input JSON string parses to non-array.""" + workflow_engine_with_context.context.set_workflow_inputs( + {"items": '{"key": "value"}'} + ) + + with pytest.raises(ExecutionError) as exc_info: + workflow_engine_with_context._resolve_array_reference("workflow.input.items") + + assert "parsed from JSON string but got dict" in str(exc_info.value) + + def test_resolve_workflow_input_empty_array( + self, workflow_engine_with_context: WorkflowEngine + ): + """Test resolving an empty array from workflow.input.*.""" + workflow_engine_with_context.context.set_workflow_inputs({"items": []}) + + result = workflow_engine_with_context._resolve_array_reference("workflow.input.items") + + assert isinstance(result, list) + assert len(result) == 0 + + def test_resolve_workflow_input_no_field_parts( + self, workflow_engine_with_context: WorkflowEngine + ): + """Test error when workflow.input has no field name.""" + # This would be caught by the len(parts) < 3 check since + # "workflow.input" only has 2 parts, but test the boundary. + with pytest.raises(ExecutionError): + workflow_engine_with_context._resolve_array_reference("workflow.input") + class TestInjectLoopVariables: """Tests for _inject_loop_variables() method.""" From 588187934263af1edaf1b37e4fcba9c4ae8621d9 Mon Sep 17 00:00:00 2001 From: Shannon Date: Thu, 5 Mar 2026 09:51:23 -0700 Subject: [PATCH 3/3] Update readme --- README.md | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/README.md b/README.md index ff6fa89..ed600e8 100644 --- a/README.md +++ b/README.md @@ -218,6 +218,27 @@ cd conductor make dev ``` +### Windows + +On Windows, use `uv` directly instead of `make`: + +```powershell +uv sync --all-extras # instead of make dev +uv run pytest tests/ # instead of make test +uv run ruff check . # instead of make lint +uv run ruff format . # instead of make format +``` + +**Copilot CLI path:** Windows `subprocess` cannot resolve `.bat`/`.ps1` wrappers by name alone. If you see `[WinError 2] The system cannot find the file specified` when running workflows, set the full path to the Copilot CLI: + +```powershell +# Find your copilot CLI +Get-Command copilot* | Format-Table Name, Source + +# Set the path (use the .cmd variant from npm) +$env:COPILOT_CLI_PATH = "C:\Users\\AppData\Roaming\npm\copilot.cmd" +``` + ### Common Commands ```bash