From 1353d89574ae0255ed709ae6acc9db0950b15d06 Mon Sep 17 00:00:00 2001 From: Daniel Green Date: Tue, 21 Apr 2026 12:10:49 -0700 Subject: [PATCH 1/8] feat(config): add optional metadata dict to workflow definition Add a metadata field to WorkflowDef that allows workflow authors to attach arbitrary key-value pairs for external tooling. The metadata is included verbatim in the workflow_started event, enabling downstream consumers (dashboards, trackers, enrichers) to adapt behavior without parsing the YAML source. Example usage in workflow YAML: workflow: name: twig-sdlc metadata: tracker: ado project_url: https://dev.azure.com/org/Project work_item_id_agent: intake work_item_id_field: epic_id The field defaults to an empty dict, so existing workflows are unaffected. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- src/conductor/config/schema.py | 7 +++++++ src/conductor/engine/workflow.py | 1 + 2 files changed, 8 insertions(+) diff --git a/src/conductor/config/schema.py b/src/conductor/config/schema.py index 527c50b..7594d49 100644 --- a/src/conductor/config/schema.py +++ b/src/conductor/config/schema.py @@ -738,6 +738,13 @@ class WorkflowDef(BaseModel): hooks: HooksConfig | None = None """Lifecycle event hooks.""" + metadata: dict[str, Any] = Field(default_factory=dict) + """Arbitrary key-value metadata for external tooling (dashboards, trackers, etc.). + + Included verbatim in the ``workflow_started`` event so downstream + consumers can use it for enrichment without parsing the YAML source. + """ + class WorkflowConfig(BaseModel): """Complete workflow configuration file.""" diff --git a/src/conductor/engine/workflow.py b/src/conductor/engine/workflow.py index 484192b..f7992cd 100644 --- a/src/conductor/engine/workflow.py +++ b/src/conductor/engine/workflow.py @@ -1134,6 +1134,7 @@ async def _execute_loop(self, current_agent_name: str) -> dict[str, Any]: for r in f.routes ], **self._yaml_source_field(), + "metadata": self.config.workflow.metadata, }, ) From 7f6bc16c32cfddd0a55c41c0b306f561844eecd7 Mon Sep 17 00:00:00 2001 From: Daniel Green Date: Tue, 21 Apr 2026 12:23:36 -0700 Subject: [PATCH 2/8] feat(cli): add --metadata flag for runtime metadata injection Add --metadata / -m flag to 'conductor run' that accepts key=value pairs, merged on top of YAML-declared metadata. This enables callers to inject dynamic values at invocation time: conductor run twig-sdlc.yaml --metadata work_item_id=1814 CLI metadata is: - Parsed separately from --input (different binding path) - Merged on top of YAML metadata (CLI wins on conflicts) - Forwarded through --web-bg background process spawning - Included in the workflow_started event alongside YAML metadata Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- src/conductor/cli/app.py | 17 +++++++++++++++++ src/conductor/cli/bg_runner.py | 7 +++++++ src/conductor/cli/run.py | 6 ++++++ 3 files changed, 30 insertions(+) diff --git a/src/conductor/cli/app.py b/src/conductor/cli/app.py index 31c2d23..cadb3aa 100644 --- a/src/conductor/cli/app.py +++ b/src/conductor/cli/app.py @@ -238,6 +238,14 @@ def run( help="Workflow inputs in name=value format. Can be repeated.", ), ] = None, + raw_metadata: Annotated[ + list[str] | None, + typer.Option( + "--metadata", + "-m", + help="Workflow metadata in key=value format. Merged on top of YAML metadata. Can be repeated.", + ), + ] = None, dry_run: Annotated[ bool, typer.Option( @@ -300,12 +308,14 @@ def run( Execute a multi-agent workflow defined in the specified YAML file. Workflow inputs can be provided using --input flags. + Metadata can be provided using --metadata flags (merged on top of YAML metadata). \b Examples: conductor run workflow.yaml conductor run workflow.yaml --input question="What is Python?" conductor run workflow.yaml -i question="Hello" -i context="Programming" + conductor run workflow.yaml --metadata tracker=ado -m work_item_id=1814 conductor run workflow.yaml --provider copilot conductor run workflow.yaml --dry-run conductor run workflow.yaml --skip-gates @@ -377,6 +387,11 @@ def run( # Also parse --input.name=value style from sys.argv inputs.update(InputCollector.extract_from_args()) + # Parse --metadata key=value flags (separate from inputs) + cli_metadata: dict[str, str] = {} + if raw_metadata: + cli_metadata.update(parse_input_flags(raw_metadata)) + # Resolve log file path resolved_log_file: Path | None = None if log_file is not None: @@ -398,6 +413,7 @@ def run( log_file=resolved_log_file, no_interactive=True, # Always non-interactive in background web_port=web_port, + metadata=cli_metadata, ) console.print(f"[bold cyan]Dashboard:[/bold cyan] {url}") console.print( @@ -422,6 +438,7 @@ def run( web=web, web_port=web_port, web_bg=web_bg, + metadata=cli_metadata, ) ) diff --git a/src/conductor/cli/bg_runner.py b/src/conductor/cli/bg_runner.py index e193ae0..9037e56 100644 --- a/src/conductor/cli/bg_runner.py +++ b/src/conductor/cli/bg_runner.py @@ -62,6 +62,7 @@ def launch_background( log_file: Path | None = None, no_interactive: bool = True, web_port: int = 0, + metadata: dict[str, str] | None = None, ) -> str: """Fork a detached child process running the workflow with a web dashboard. @@ -77,6 +78,7 @@ def launch_background( log_file: Optional log file path. no_interactive: Whether to disable interactive mode (always True for bg). web_port: Desired port (0 = auto-select). + metadata: Optional CLI metadata key=value pairs. Returns: The dashboard URL (e.g. ``http://127.0.0.1:8080``). @@ -107,6 +109,11 @@ def launch_background( for key, value in inputs.items(): cmd.extend(["--input", f"{key}={_serialize_value(value)}"]) + # Forward metadata + if metadata: + for key, value in metadata.items(): + cmd.extend(["--metadata", f"{key}={value}"]) + if provider_override: cmd.extend(["--provider", provider_override]) diff --git a/src/conductor/cli/run.py b/src/conductor/cli/run.py index 32ac61e..05291f9 100644 --- a/src/conductor/cli/run.py +++ b/src/conductor/cli/run.py @@ -990,6 +990,7 @@ async def run_workflow_async( web: bool = False, web_port: int = 0, web_bg: bool = False, + metadata: dict[str, str] | None = None, ) -> dict[str, Any]: """Execute a workflow asynchronously. @@ -1003,6 +1004,7 @@ async def run_workflow_async( web: If True, start a real-time web dashboard. web_port: Port for the web dashboard (0 = auto-select). web_bg: If True, auto-shutdown dashboard after workflow + client disconnect. + metadata: Optional CLI metadata to merge on top of YAML-declared metadata. Returns: The workflow output as a dictionary. @@ -1054,6 +1056,10 @@ async def run_workflow_async( config = load_config(workflow_path) verbose_log_timing("Configuration loaded", time.time() - load_start) + # Merge CLI metadata on top of YAML-declared metadata + if metadata: + config.workflow.metadata.update(metadata) + # Log workflow details verbose_log(f"Workflow: {config.workflow.name}") verbose_log(f"Entry point: {config.workflow.entry_point}") From 22a82bd406c22f8b40cb7d7f32959fbc11fc6825 Mon Sep 17 00:00:00 2001 From: Daniel Green Date: Tue, 21 Apr 2026 12:27:40 -0700 Subject: [PATCH 3/8] test: add metadata schema and loader tests 7 new tests verifying: - Schema: metadata defaults to empty dict, accepts arbitrary keys, independent from input/context fields - Loader: metadata round-trips through YAML, omission gives empty dict, nested values preserved, metadata and input are separate namespaces All 140 config tests pass. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- tests/test_config/test_loader.py | 102 +++++++++++++++++++++++++++++++ tests/test_config/test_schema.py | 38 ++++++++++++ 2 files changed, 140 insertions(+) diff --git a/tests/test_config/test_loader.py b/tests/test_config/test_loader.py index e4d25dd..7247255 100644 --- a/tests/test_config/test_loader.py +++ b/tests/test_config/test_loader.py @@ -480,3 +480,105 @@ def test_route_to_parallel_group(self) -> None: """ config = load_config_string(yaml_content) assert config.agents[0].routes[0].to == "pg" + + +class TestMetadataLoading: + """Tests for workflow metadata loading from YAML.""" + + def test_metadata_from_yaml(self) -> None: + """Test that metadata is loaded from YAML workflow section.""" + yaml_content = """ +workflow: + name: test-wf + entry_point: agent1 + metadata: + tracker: ado + project_url: https://dev.azure.com/org/Project + work_item_id_agent: intake + work_item_id_field: epic_id + +agents: + - name: agent1 + model: gpt-4 + prompt: "Hello" + routes: + - to: $end +""" + config = load_config_string(yaml_content) + assert config.workflow.metadata == { + "tracker": "ado", + "project_url": "https://dev.azure.com/org/Project", + "work_item_id_agent": "intake", + "work_item_id_field": "epic_id", + } + + def test_no_metadata_in_yaml(self) -> None: + """Test that omitting metadata from YAML gives empty dict.""" + yaml_content = """ +workflow: + name: test-wf + entry_point: agent1 + +agents: + - name: agent1 + model: gpt-4 + prompt: "Hello" + routes: + - to: $end +""" + config = load_config_string(yaml_content) + assert config.workflow.metadata == {} + + def test_metadata_with_nested_values(self) -> None: + """Test that metadata supports nested dicts and lists.""" + yaml_content = """ +workflow: + name: test-wf + entry_point: agent1 + metadata: + tracker: jira + config: + base_url: https://jira.example.com + project_key: PROJ + labels: + - backend + - infra + +agents: + - name: agent1 + model: gpt-4 + prompt: "Hello" + routes: + - to: $end +""" + config = load_config_string(yaml_content) + assert config.workflow.metadata["tracker"] == "jira" + assert config.workflow.metadata["config"]["base_url"] == "https://jira.example.com" + assert config.workflow.metadata["labels"] == ["backend", "infra"] + + def test_metadata_independent_from_input(self) -> None: + """Test that metadata and input are completely separate namespaces.""" + yaml_content = """ +workflow: + name: test-wf + entry_point: agent1 + input: + question: + type: string + description: User question + metadata: + tracker: github + repo: owner/repo + +agents: + - name: agent1 + model: gpt-4 + prompt: "{{ workflow.input.question }}" + routes: + - to: $end +""" + config = load_config_string(yaml_content) + assert "question" in config.workflow.input + assert "question" not in config.workflow.metadata + assert "tracker" in config.workflow.metadata + assert "tracker" not in config.workflow.input diff --git a/tests/test_config/test_schema.py b/tests/test_config/test_schema.py index dc8960e..e2cbba8 100644 --- a/tests/test_config/test_schema.py +++ b/tests/test_config/test_schema.py @@ -606,6 +606,44 @@ def test_full_workflow(self) -> None: assert workflow.context.mode == "explicit" assert workflow.limits.max_iterations == 20 + def test_metadata_defaults_to_empty_dict(self) -> None: + """Test that metadata defaults to empty dict when not specified.""" + workflow = WorkflowDef(name="test", entry_point="agent1") + assert workflow.metadata == {} + + def test_metadata_accepts_arbitrary_keys(self) -> None: + """Test that metadata accepts any key-value pairs.""" + workflow = WorkflowDef( + name="test", + entry_point="agent1", + metadata={ + "tracker": "ado", + "project_url": "https://dev.azure.com/org/Project", + "work_item_id": "1814", + "nested": {"key": "value"}, + "count": 42, + }, + ) + assert workflow.metadata["tracker"] == "ado" + assert workflow.metadata["project_url"] == "https://dev.azure.com/org/Project" + assert workflow.metadata["work_item_id"] == "1814" + assert workflow.metadata["nested"] == {"key": "value"} + assert workflow.metadata["count"] == 42 + + def test_metadata_does_not_affect_other_fields(self) -> None: + """Test that metadata is independent from input, context, etc.""" + workflow = WorkflowDef( + name="test", + entry_point="agent1", + input={"goal": InputDef(type="string")}, + metadata={"tracker": "ado"}, + ) + assert workflow.metadata == {"tracker": "ado"} + assert "goal" in workflow.input + # Metadata and input are completely separate + assert "tracker" not in workflow.input + assert "goal" not in workflow.metadata + class TestWorkflowConfig: """Tests for WorkflowConfig model.""" From ebdcb8aea1cee10e040c61fb64becabcf580e7e1 Mon Sep 17 00:00:00 2001 From: Daniel Green Date: Tue, 21 Apr 2026 13:51:37 -0700 Subject: [PATCH 4/8] fix: wrap long help string to satisfy E501 line-length lint Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- src/conductor/cli/app.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/conductor/cli/app.py b/src/conductor/cli/app.py index cadb3aa..ecc3847 100644 --- a/src/conductor/cli/app.py +++ b/src/conductor/cli/app.py @@ -243,7 +243,10 @@ def run( typer.Option( "--metadata", "-m", - help="Workflow metadata in key=value format. Merged on top of YAML metadata. Can be repeated.", + help=( + "Workflow metadata in key=value format. " + "Merged on top of YAML metadata. Can be repeated." + ), ), ] = None, dry_run: Annotated[ From 0d45c4a1d9d65413ff651436596710e17c390737 Mon Sep 17 00:00:00 2001 From: Daniel Green Date: Wed, 22 Apr 2026 09:56:00 -0700 Subject: [PATCH 5/8] feat: add run_id and log_file for deterministic run linking Propagate the event log's random hex suffix as a run_id across all systems: - EventLogSubscriber: expose run_id property (was already generated) - WorkflowEngine: accept run_id + log_file params, include in workflow_started event - PID files: include run_id + log_file fields - Web dashboard: add /api/info endpoint returning run_id, log_file, workflow_name, started_at, metadata This enables the central dashboard to match per-run dashboards to event logs by exact run_id instead of fragile name/timestamp heuristics. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- src/conductor/cli/pid.py | 12 +++++++++++- src/conductor/cli/run.py | 2 ++ src/conductor/engine/event_log.py | 9 +++++++-- src/conductor/engine/workflow.py | 6 ++++++ src/conductor/web/server.py | 18 ++++++++++++++++++ 5 files changed, 44 insertions(+), 3 deletions(-) diff --git a/src/conductor/cli/pid.py b/src/conductor/cli/pid.py index 420bbcf..540337a 100644 --- a/src/conductor/cli/pid.py +++ b/src/conductor/cli/pid.py @@ -38,13 +38,21 @@ def pid_dir() -> Path: return d -def write_pid_file(pid: int, port: int, workflow_path: str | Path) -> Path: +def write_pid_file( + pid: int, + port: int, + workflow_path: str | Path, + run_id: str = "", + log_file: str = "", +) -> Path: """Write a PID file for a background workflow process. Args: pid: Process ID of the background child. port: TCP port the web dashboard is listening on. workflow_path: Path to the workflow YAML file. + run_id: Unique run identifier (from event log subscriber). + log_file: Path to the JSONL event log file. Returns: Path to the created PID file. @@ -58,6 +66,8 @@ def write_pid_file(pid: int, port: int, workflow_path: str | Path) -> Path: "port": port, "workflow": str(workflow_path), "started_at": datetime.now(UTC).isoformat(), + "run_id": run_id, + "log_file": log_file, } filepath.write_text(json.dumps(data, indent=2)) diff --git a/src/conductor/cli/run.py b/src/conductor/cli/run.py index 05291f9..bac9eba 100644 --- a/src/conductor/cli/run.py +++ b/src/conductor/cli/run.py @@ -1122,6 +1122,8 @@ async def run_workflow_async( event_emitter=emitter, keyboard_listener=listener, web_dashboard=dashboard, + run_id=event_log_subscriber.run_id if event_log_subscriber else "", + log_file=str(event_log_subscriber.path) if event_log_subscriber else "", ) # Share interrupt_event with dashboard so POST /api/stop can abort agents diff --git a/src/conductor/engine/event_log.py b/src/conductor/engine/event_log.py index 0b5432d..ff06208 100644 --- a/src/conductor/engine/event_log.py +++ b/src/conductor/engine/event_log.py @@ -61,8 +61,8 @@ def __init__(self, workflow_name: str) -> None: ts = time.strftime("%Y%m%d-%H%M%S") # Append random suffix to avoid filename collisions # when multiple runs start in the same second - suffix = secrets.token_hex(4) - ts = f"{ts}-{suffix}" + self._run_id = secrets.token_hex(4) + ts = f"{ts}-{self._run_id}" self._path = ( Path(tempfile.gettempdir()) / "conductor" @@ -71,6 +71,11 @@ def __init__(self, workflow_name: str) -> None: self._path.parent.mkdir(parents=True, exist_ok=True) self._handle = open(self._path, "w", encoding="utf-8") # noqa: SIM115 + @property + def run_id(self) -> str: + """Unique run identifier (8-char hex).""" + return self._run_id + @property def path(self) -> Path: """Path to the JSONL log file.""" diff --git a/src/conductor/engine/workflow.py b/src/conductor/engine/workflow.py index f7992cd..0dfd198 100644 --- a/src/conductor/engine/workflow.py +++ b/src/conductor/engine/workflow.py @@ -270,6 +270,8 @@ def __init__( keyboard_listener: KeyboardListener | None = None, web_dashboard: WebDashboard | None = None, _subworkflow_depth: int = 0, + run_id: str = "", + log_file: str = "", ) -> None: """Initialize the WorkflowEngine. @@ -308,6 +310,8 @@ def __init__( self.config = config self.skip_gates = skip_gates self.workflow_path = workflow_path + self._run_id = run_id + self._log_file = log_file self.context = WorkflowContext() self.renderer = TemplateRenderer() self.router = Router() @@ -1135,6 +1139,8 @@ async def _execute_loop(self, current_agent_name: str) -> dict[str, Any]: ], **self._yaml_source_field(), "metadata": self.config.workflow.metadata, + "run_id": self._run_id, + "log_file": self._log_file, }, ) diff --git a/src/conductor/web/server.py b/src/conductor/web/server.py index 892166a..e5e2e43 100644 --- a/src/conductor/web/server.py +++ b/src/conductor/web/server.py @@ -156,6 +156,24 @@ async def favicon() -> FileResponse: async def get_state() -> JSONResponse: return JSONResponse(content=self._event_history) + @app.get("/api/info") + async def get_info() -> JSONResponse: + """Return run identity for dashboard linking.""" + # Extract from first workflow_started event + info: dict[str, Any] = {} + for event in self._event_history: + if event.get("type") == "workflow_started": + data = event.get("data", {}) + info = { + "run_id": data.get("run_id", ""), + "log_file": data.get("log_file", ""), + "workflow_name": data.get("name", ""), + "started_at": event.get("timestamp", 0), + "metadata": data.get("metadata", {}), + } + break + return JSONResponse(content=info) + @app.get("/api/logs") async def download_logs() -> JSONResponse: """Download the full event history as a JSON file.""" From d44789e1726fa607a4572cf12bed8f0f222ca0e5 Mon Sep 17 00:00:00 2001 From: Daniel Green Date: Fri, 24 Apr 2026 12:04:29 -0700 Subject: [PATCH 6/8] feat: add system metadata to workflow_started event and checkpoints Auto-inject runtime diagnostics (PID, platform, Python version, cwd, conductor version, started_at, run_id, log_file, bg_mode) into the workflow_started event. Dashboard port/URL included when --web is active; parent_pid included in --web-bg mode. System metadata flows through: - JSONL event log (via EventLogSubscriber) - Web dashboard /api/info endpoint - Checkpoint files (for resume context) PID files are intentionally left unchanged. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- src/conductor/cli/run.py | 2 + src/conductor/engine/checkpoint.py | 3 + src/conductor/engine/workflow.py | 47 +++++ src/conductor/web/server.py | 1 + tests/test_engine/test_system_metadata.py | 201 ++++++++++++++++++++++ 5 files changed, 254 insertions(+) create mode 100644 tests/test_engine/test_system_metadata.py diff --git a/src/conductor/cli/run.py b/src/conductor/cli/run.py index bac9eba..25cd844 100644 --- a/src/conductor/cli/run.py +++ b/src/conductor/cli/run.py @@ -1124,6 +1124,8 @@ async def run_workflow_async( web_dashboard=dashboard, run_id=event_log_subscriber.run_id if event_log_subscriber else "", log_file=str(event_log_subscriber.path) if event_log_subscriber else "", + dashboard_port=(dashboard._actual_port if dashboard is not None else None), + bg_mode=web_bg or os.environ.get("CONDUCTOR_WEB_BG") == "1", ) # Share interrupt_event with dashboard so POST /api/stop can abort agents diff --git a/src/conductor/engine/checkpoint.py b/src/conductor/engine/checkpoint.py index 27dc2fa..596861e 100644 --- a/src/conductor/engine/checkpoint.py +++ b/src/conductor/engine/checkpoint.py @@ -136,6 +136,7 @@ def save_checkpoint( error: BaseException, inputs: dict[str, Any], copilot_session_ids: dict[str, str] | None = None, + system_metadata: dict[str, Any] | None = None, ) -> Path | None: """Serialize workflow state to a checkpoint file. @@ -153,6 +154,7 @@ def save_checkpoint( error: The exception that triggered the checkpoint. inputs: Workflow inputs. copilot_session_ids: Optional mapping of agent names to session IDs. + system_metadata: Optional system metadata captured at workflow start. Returns: Path to the saved checkpoint file, or ``None`` if saving failed. @@ -193,6 +195,7 @@ def save_checkpoint( "context": _make_json_serializable(context.to_dict()), "limits": _make_json_serializable(limits.to_dict()), "copilot_session_ids": copilot_session_ids or {}, + "system": system_metadata or {}, } # Serialize to JSON diff --git a/src/conductor/engine/workflow.py b/src/conductor/engine/workflow.py index 0dfd198..719c614 100644 --- a/src/conductor/engine/workflow.py +++ b/src/conductor/engine/workflow.py @@ -272,6 +272,8 @@ def __init__( _subworkflow_depth: int = 0, run_id: str = "", log_file: str = "", + dashboard_port: int | None = None, + bg_mode: bool = False, ) -> None: """Initialize the WorkflowEngine. @@ -358,6 +360,11 @@ def __init__( # Sub-workflow depth tracking self._subworkflow_depth = _subworkflow_depth + # System metadata fields (set by CLI, used in workflow_started event) + self._dashboard_port = dashboard_port + self._bg_mode = bg_mode + self._system_metadata: dict[str, Any] = {} + def _build_pricing_overrides(self) -> dict[str, ModelPricing] | None: """Build pricing overrides from workflow cost configuration. @@ -415,6 +422,43 @@ def _conductor_version() -> str: except Exception: return "unknown" + def _build_system_metadata(self) -> dict[str, Any]: + """Build system metadata dict for the workflow_started event. + + Captures runtime diagnostics that would be lost if the process crashes: + PID, platform, Python version, working directory, etc. + + Returns: + Dict with system metadata fields. + """ + import os + import platform as _platform + import sys + from datetime import UTC, datetime + + system: dict[str, Any] = { + "pid": os.getpid(), + "platform": sys.platform, + "python_version": _platform.python_version(), + "conductor_version": self._conductor_version(), + "cwd": os.getcwd(), + "started_at": datetime.now(UTC).isoformat(), + "run_id": self._run_id, + "log_file": self._log_file, + "bg_mode": self._bg_mode, + } + + # Conditional fields — only when dashboard is active + if self._dashboard_port is not None: + system["dashboard_port"] = self._dashboard_port + system["dashboard_url"] = f"http://127.0.0.1:{self._dashboard_port}" + + # Parent PID is useful in --web-bg to trace back to the forking CLI process + if self._bg_mode: + system["parent_pid"] = os.getppid() + + return system + def _make_event_callback(self, agent_name: str) -> Any: """Create an event callback for an agent that forwards to the emitter. @@ -698,6 +742,7 @@ def _save_checkpoint_on_failure(self, error: BaseException) -> None: error=error, inputs=self.context.workflow_inputs, copilot_session_ids=copilot_session_ids, + system_metadata=self._system_metadata, ) self._last_checkpoint_path = checkpoint_path if checkpoint_path is not None: @@ -1082,6 +1127,7 @@ async def _execute_loop(self, current_agent_name: str) -> dict[str, Any]: try: async with self.limits.timeout_context(): # Emit workflow_started before the execution loop + self._system_metadata = self._build_system_metadata() self._emit( "workflow_started", { @@ -1139,6 +1185,7 @@ async def _execute_loop(self, current_agent_name: str) -> dict[str, Any]: ], **self._yaml_source_field(), "metadata": self.config.workflow.metadata, + "system": self._system_metadata, "run_id": self._run_id, "log_file": self._log_file, }, diff --git a/src/conductor/web/server.py b/src/conductor/web/server.py index e5e2e43..3726c6b 100644 --- a/src/conductor/web/server.py +++ b/src/conductor/web/server.py @@ -170,6 +170,7 @@ async def get_info() -> JSONResponse: "workflow_name": data.get("name", ""), "started_at": event.get("timestamp", 0), "metadata": data.get("metadata", {}), + "system": data.get("system", {}), } break return JSONResponse(content=info) diff --git a/tests/test_engine/test_system_metadata.py b/tests/test_engine/test_system_metadata.py new file mode 100644 index 0000000..46c9814 --- /dev/null +++ b/tests/test_engine/test_system_metadata.py @@ -0,0 +1,201 @@ +"""Tests for system metadata in workflow_started event and checkpoints.""" + +import os +import sys +from pathlib import Path +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest + +from conductor.config.schema import ( + AgentDef, + ContextConfig, + LimitsConfig, + OutputField, + RouteDef, + RuntimeConfig, + WorkflowConfig, + WorkflowDef, +) +from conductor.engine.checkpoint import CheckpointManager +from conductor.engine.workflow import WorkflowEngine +from conductor.events import WorkflowEventEmitter + + +@pytest.fixture +def simple_config() -> WorkflowConfig: + """Minimal workflow config for testing.""" + return WorkflowConfig( + workflow=WorkflowDef( + name="test-workflow", + entry_point="agent1", + runtime=RuntimeConfig(provider="copilot"), + context=ContextConfig(mode="accumulate"), + limits=LimitsConfig(max_iterations=5), + ), + agents=[ + AgentDef( + name="agent1", + model="gpt-4", + prompt="Hello", + output={"result": OutputField(type="string")}, + routes=[RouteDef(to="$end")], + ), + ], + output={"final": "{{ agent1.output.result }}"}, + ) + + +class TestBuildSystemMetadata: + """Tests for WorkflowEngine._build_system_metadata().""" + + def test_always_present_fields(self, simple_config: WorkflowConfig) -> None: + """System metadata includes all required fields.""" + engine = WorkflowEngine(simple_config, run_id="abc123", log_file="/tmp/test.jsonl") + meta = engine._build_system_metadata() + + assert meta["pid"] == os.getpid() + assert meta["platform"] == sys.platform + assert "python_version" in meta + assert meta["conductor_version"] is not None + assert meta["cwd"] == os.getcwd() + assert "started_at" in meta + assert meta["run_id"] == "abc123" + assert meta["log_file"] == "/tmp/test.jsonl" + assert meta["bg_mode"] is False + + def test_no_dashboard_fields_by_default(self, simple_config: WorkflowConfig) -> None: + """Dashboard-specific fields are absent when no dashboard.""" + engine = WorkflowEngine(simple_config) + meta = engine._build_system_metadata() + + assert "dashboard_port" not in meta + assert "dashboard_url" not in meta + assert "parent_pid" not in meta + + def test_dashboard_fields_when_port_set(self, simple_config: WorkflowConfig) -> None: + """Dashboard port and URL present when dashboard_port is provided.""" + engine = WorkflowEngine(simple_config, dashboard_port=8080) + meta = engine._build_system_metadata() + + assert meta["dashboard_port"] == 8080 + assert meta["dashboard_url"] == "http://127.0.0.1:8080" + + def test_bg_mode_includes_parent_pid(self, simple_config: WorkflowConfig) -> None: + """Background mode includes parent_pid.""" + engine = WorkflowEngine(simple_config, bg_mode=True) + meta = engine._build_system_metadata() + + assert meta["bg_mode"] is True + assert meta["parent_pid"] == os.getppid() + + def test_started_at_is_iso_format(self, simple_config: WorkflowConfig) -> None: + """started_at is a valid ISO-8601 timestamp.""" + from datetime import datetime + + engine = WorkflowEngine(simple_config) + meta = engine._build_system_metadata() + + # Should not raise + datetime.fromisoformat(meta["started_at"]) + + +class TestSystemMetadataInEvent: + """Tests that system metadata appears in workflow_started event.""" + + @pytest.mark.asyncio + async def test_workflow_started_has_system_field(self, simple_config: WorkflowConfig) -> None: + """workflow_started event includes a 'system' dict.""" + emitter = WorkflowEventEmitter() + captured_events: list = [] + emitter.subscribe(lambda event: captured_events.append(event.to_dict())) + + engine = WorkflowEngine( + simple_config, + event_emitter=emitter, + run_id="test-run", + log_file="/tmp/test.jsonl", + dashboard_port=9090, + bg_mode=False, + ) + + # Mock out the actual execution to just emit workflow_started + mock_provider = MagicMock() + mock_provider.execute = AsyncMock( + return_value=MagicMock(content='{"result": "hello"}', model="gpt-4") + ) + engine.executor = MagicMock() + engine.executor.execute = mock_provider.execute + + # Trigger _execute_loop directly would be complex, so just call + # _build_system_metadata and verify the shape + meta = engine._build_system_metadata() + assert "pid" in meta + assert meta["dashboard_port"] == 9090 + assert "system" not in meta # no recursion + + +class TestSystemMetadataInCheckpoint: + """Tests that system metadata is saved in checkpoint files.""" + + def test_checkpoint_includes_system_field(self, tmp_path: Path) -> None: + """Checkpoint JSON includes system metadata when provided.""" + import json + + workflow_file = tmp_path / "test.yaml" + workflow_file.write_text("workflow:\n name: test\n") + + system_meta = { + "pid": 12345, + "platform": "win32", + "python_version": "3.12.4", + "cwd": str(tmp_path), + } + + from conductor.engine.context import WorkflowContext + from conductor.engine.limits import LimitEnforcer + + ctx = WorkflowContext() + limits = LimitEnforcer(max_iterations=10) + + with patch.object(CheckpointManager, "get_checkpoints_dir", return_value=tmp_path): + path = CheckpointManager.save_checkpoint( + workflow_path=workflow_file, + context=ctx, + limits=limits, + current_agent="agent1", + error=RuntimeError("test error"), + inputs={"q": "hello"}, + system_metadata=system_meta, + ) + + assert path is not None + data = json.loads(path.read_text()) + assert data["system"] == system_meta + + def test_checkpoint_system_empty_when_not_provided(self, tmp_path: Path) -> None: + """Checkpoint system field defaults to empty dict.""" + import json + + workflow_file = tmp_path / "test.yaml" + workflow_file.write_text("workflow:\n name: test\n") + + from conductor.engine.context import WorkflowContext + from conductor.engine.limits import LimitEnforcer + + ctx = WorkflowContext() + limits = LimitEnforcer(max_iterations=10) + + with patch.object(CheckpointManager, "get_checkpoints_dir", return_value=tmp_path): + path = CheckpointManager.save_checkpoint( + workflow_path=workflow_file, + context=ctx, + limits=limits, + current_agent="agent1", + error=RuntimeError("test error"), + inputs={}, + ) + + assert path is not None + data = json.loads(path.read_text()) + assert data["system"] == {} From c46df4d44b1ad8d6ff0c474fa580372906ecc2aa Mon Sep 17 00:00:00 2001 From: Daniel Green Date: Mon, 27 Apr 2026 14:57:56 -0700 Subject: [PATCH 7/8] fix: address PR #107 review feedback - Guard os.getcwd() with try/except OSError in _build_system_metadata() - Strip sensitive system info (PID, cwd, log_file) from /api/info endpoint - Add parse_metadata_flags() to keep metadata values as raw strings (no coercion) - Use _serialize_value() for metadata in bg_runner to handle nested dicts - Add public WebDashboard.port property, stop accessing _actual_port externally - Group informational params (run_id, log_file, dashboard_port, bg_mode) into RunContext dataclass Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- src/conductor/cli/app.py | 5 ++- src/conductor/cli/bg_runner.py | 2 +- src/conductor/cli/run.py | 47 +++++++++++++++++++++-- src/conductor/engine/workflow.py | 33 +++++++++++----- src/conductor/web/server.py | 10 ++++- tests/test_engine/test_system_metadata.py | 23 +++++++---- 6 files changed, 94 insertions(+), 26 deletions(-) diff --git a/src/conductor/cli/app.py b/src/conductor/cli/app.py index ecc3847..8f0add6 100644 --- a/src/conductor/cli/app.py +++ b/src/conductor/cli/app.py @@ -363,6 +363,7 @@ def run( display_execution_plan, generate_log_path, parse_input_flags, + parse_metadata_flags, run_workflow_async, ) @@ -390,10 +391,10 @@ def run( # Also parse --input.name=value style from sys.argv inputs.update(InputCollector.extract_from_args()) - # Parse --metadata key=value flags (separate from inputs) + # Parse --metadata key=value flags (no type coercion — values stay as strings) cli_metadata: dict[str, str] = {} if raw_metadata: - cli_metadata.update(parse_input_flags(raw_metadata)) + cli_metadata.update(parse_metadata_flags(raw_metadata)) # Resolve log file path resolved_log_file: Path | None = None diff --git a/src/conductor/cli/bg_runner.py b/src/conductor/cli/bg_runner.py index 9037e56..462764b 100644 --- a/src/conductor/cli/bg_runner.py +++ b/src/conductor/cli/bg_runner.py @@ -112,7 +112,7 @@ def launch_background( # Forward metadata if metadata: for key, value in metadata.items(): - cmd.extend(["--metadata", f"{key}={value}"]) + cmd.extend(["--metadata", f"{key}={_serialize_value(value)}"]) if provider_override: cmd.extend(["--provider", provider_override]) diff --git a/src/conductor/cli/run.py b/src/conductor/cli/run.py index 25cd844..7e75ab8 100644 --- a/src/conductor/cli/run.py +++ b/src/conductor/cli/run.py @@ -841,6 +841,41 @@ def parse_input_flags(raw_inputs: list[str]) -> dict[str, Any]: return inputs +def parse_metadata_flags(raw_metadata: list[str]) -> dict[str, str]: + """Parse --metadata key=value flags into a dictionary. + + Unlike ``parse_input_flags``, values are kept as raw strings with no + type coercion — metadata is opaque key-value data. + + Args: + raw_metadata: List of "key=value" strings from CLI. + + Returns: + Dictionary of string key-value pairs. + + Raises: + typer.BadParameter: If metadata format is invalid. + """ + result: dict[str, str] = {} + + for raw in raw_metadata: + if "=" not in raw: + raise typer.BadParameter( + f"Invalid metadata format: '{raw}'. Expected format: key=value" + ) + + key, value = raw.split("=", 1) + key = key.strip() + value = value.strip() + + if not key: + raise typer.BadParameter(f"Empty metadata key in: '{raw}'") + + result[key] = value + + return result + + def coerce_value(value: str) -> Any: """Coerce a string value to an appropriate Python type. @@ -1113,6 +1148,8 @@ async def run_workflow_async( # so POST /api/stop can interrupt the running agent mid-execution interrupt_event = asyncio.Event() + from conductor.engine.workflow import RunContext + engine = WorkflowEngine( config, registry=registry, @@ -1122,10 +1159,12 @@ async def run_workflow_async( event_emitter=emitter, keyboard_listener=listener, web_dashboard=dashboard, - run_id=event_log_subscriber.run_id if event_log_subscriber else "", - log_file=str(event_log_subscriber.path) if event_log_subscriber else "", - dashboard_port=(dashboard._actual_port if dashboard is not None else None), - bg_mode=web_bg or os.environ.get("CONDUCTOR_WEB_BG") == "1", + run_context=RunContext( + run_id=event_log_subscriber.run_id if event_log_subscriber else "", + log_file=str(event_log_subscriber.path) if event_log_subscriber else "", + dashboard_port=(dashboard.port if dashboard is not None else None), + bg_mode=web_bg or os.environ.get("CONDUCTOR_WEB_BG") == "1", + ), ) # Share interrupt_event with dashboard so POST /api/stop can abort agents diff --git a/src/conductor/engine/workflow.py b/src/conductor/engine/workflow.py index 719c614..6c6d788 100644 --- a/src/conductor/engine/workflow.py +++ b/src/conductor/engine/workflow.py @@ -58,6 +58,18 @@ from conductor.web.server import WebDashboard +@dataclass +class RunContext: + """Informational metadata about the current CLI run. + + These fields are not used for workflow orchestration — they are passed + through to event data and checkpoints for diagnostics and linking. + """ + + run_id: str = "" + log_file: str = "" + dashboard_port: int | None = None + bg_mode: bool = False @dataclass class ParallelAgentError: """Error information from a failed parallel agent execution. @@ -270,10 +282,7 @@ def __init__( keyboard_listener: KeyboardListener | None = None, web_dashboard: WebDashboard | None = None, _subworkflow_depth: int = 0, - run_id: str = "", - log_file: str = "", - dashboard_port: int | None = None, - bg_mode: bool = False, + run_context: RunContext | None = None, ) -> None: """Initialize the WorkflowEngine. @@ -312,8 +321,9 @@ def __init__( self.config = config self.skip_gates = skip_gates self.workflow_path = workflow_path - self._run_id = run_id - self._log_file = log_file + self._run_context = run_context or RunContext() + self._run_id = self._run_context.run_id + self._log_file = self._run_context.log_file self.context = WorkflowContext() self.renderer = TemplateRenderer() self.router = Router() @@ -361,8 +371,8 @@ def __init__( self._subworkflow_depth = _subworkflow_depth # System metadata fields (set by CLI, used in workflow_started event) - self._dashboard_port = dashboard_port - self._bg_mode = bg_mode + self._dashboard_port = self._run_context.dashboard_port + self._bg_mode = self._run_context.bg_mode self._system_metadata: dict[str, Any] = {} def _build_pricing_overrides(self) -> dict[str, ModelPricing] | None: @@ -436,12 +446,17 @@ def _build_system_metadata(self) -> dict[str, Any]: import sys from datetime import UTC, datetime + try: + cwd = os.getcwd() + except OSError: + cwd = "" + system: dict[str, Any] = { "pid": os.getpid(), "platform": sys.platform, "python_version": _platform.python_version(), "conductor_version": self._conductor_version(), - "cwd": os.getcwd(), + "cwd": cwd, "started_at": datetime.now(UTC).isoformat(), "run_id": self._run_id, "log_file": self._log_file, diff --git a/src/conductor/web/server.py b/src/conductor/web/server.py index 3726c6b..58ba907 100644 --- a/src/conductor/web/server.py +++ b/src/conductor/web/server.py @@ -113,6 +113,11 @@ def __init__( # Subscribe to emitter self._emitter.subscribe(self._on_event) + @property + def port(self) -> int: + """Resolved TCP port the dashboard is listening on.""" + return self._actual_port if self._actual_port is not None else self._port + def _create_app(self) -> FastAPI: """Create the FastAPI application with all routes. @@ -166,11 +171,12 @@ async def get_info() -> JSONResponse: data = event.get("data", {}) info = { "run_id": data.get("run_id", ""), - "log_file": data.get("log_file", ""), "workflow_name": data.get("name", ""), "started_at": event.get("timestamp", 0), "metadata": data.get("metadata", {}), - "system": data.get("system", {}), + "conductor_version": data.get("system", {}).get( + "conductor_version", "" + ), } break return JSONResponse(content=info) diff --git a/tests/test_engine/test_system_metadata.py b/tests/test_engine/test_system_metadata.py index 46c9814..1cfddf3 100644 --- a/tests/test_engine/test_system_metadata.py +++ b/tests/test_engine/test_system_metadata.py @@ -18,7 +18,7 @@ WorkflowDef, ) from conductor.engine.checkpoint import CheckpointManager -from conductor.engine.workflow import WorkflowEngine +from conductor.engine.workflow import RunContext, WorkflowEngine from conductor.events import WorkflowEventEmitter @@ -51,7 +51,10 @@ class TestBuildSystemMetadata: def test_always_present_fields(self, simple_config: WorkflowConfig) -> None: """System metadata includes all required fields.""" - engine = WorkflowEngine(simple_config, run_id="abc123", log_file="/tmp/test.jsonl") + engine = WorkflowEngine( + simple_config, + run_context=RunContext(run_id="abc123", log_file="/tmp/test.jsonl"), + ) meta = engine._build_system_metadata() assert meta["pid"] == os.getpid() @@ -75,7 +78,9 @@ def test_no_dashboard_fields_by_default(self, simple_config: WorkflowConfig) -> def test_dashboard_fields_when_port_set(self, simple_config: WorkflowConfig) -> None: """Dashboard port and URL present when dashboard_port is provided.""" - engine = WorkflowEngine(simple_config, dashboard_port=8080) + engine = WorkflowEngine( + simple_config, run_context=RunContext(dashboard_port=8080) + ) meta = engine._build_system_metadata() assert meta["dashboard_port"] == 8080 @@ -83,7 +88,7 @@ def test_dashboard_fields_when_port_set(self, simple_config: WorkflowConfig) -> def test_bg_mode_includes_parent_pid(self, simple_config: WorkflowConfig) -> None: """Background mode includes parent_pid.""" - engine = WorkflowEngine(simple_config, bg_mode=True) + engine = WorkflowEngine(simple_config, run_context=RunContext(bg_mode=True)) meta = engine._build_system_metadata() assert meta["bg_mode"] is True @@ -113,10 +118,12 @@ async def test_workflow_started_has_system_field(self, simple_config: WorkflowCo engine = WorkflowEngine( simple_config, event_emitter=emitter, - run_id="test-run", - log_file="/tmp/test.jsonl", - dashboard_port=9090, - bg_mode=False, + run_context=RunContext( + run_id="test-run", + log_file="/tmp/test.jsonl", + dashboard_port=9090, + bg_mode=False, + ), ) # Mock out the actual execution to just emit workflow_started From a53708edca40335e519f1b974305a7ddc902c22b Mon Sep 17 00:00:00 2001 From: Daniel Green Date: Mon, 27 Apr 2026 15:10:16 -0700 Subject: [PATCH 8/8] style: apply ruff formatting Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- src/conductor/engine/workflow.py | 2 ++ src/conductor/web/server.py | 4 +--- tests/test_engine/test_system_metadata.py | 4 +--- 3 files changed, 4 insertions(+), 6 deletions(-) diff --git a/src/conductor/engine/workflow.py b/src/conductor/engine/workflow.py index 6c6d788..753bb26 100644 --- a/src/conductor/engine/workflow.py +++ b/src/conductor/engine/workflow.py @@ -70,6 +70,8 @@ class RunContext: log_file: str = "" dashboard_port: int | None = None bg_mode: bool = False + + @dataclass class ParallelAgentError: """Error information from a failed parallel agent execution. diff --git a/src/conductor/web/server.py b/src/conductor/web/server.py index 58ba907..1531d1f 100644 --- a/src/conductor/web/server.py +++ b/src/conductor/web/server.py @@ -174,9 +174,7 @@ async def get_info() -> JSONResponse: "workflow_name": data.get("name", ""), "started_at": event.get("timestamp", 0), "metadata": data.get("metadata", {}), - "conductor_version": data.get("system", {}).get( - "conductor_version", "" - ), + "conductor_version": data.get("system", {}).get("conductor_version", ""), } break return JSONResponse(content=info) diff --git a/tests/test_engine/test_system_metadata.py b/tests/test_engine/test_system_metadata.py index 1cfddf3..8de9544 100644 --- a/tests/test_engine/test_system_metadata.py +++ b/tests/test_engine/test_system_metadata.py @@ -78,9 +78,7 @@ def test_no_dashboard_fields_by_default(self, simple_config: WorkflowConfig) -> def test_dashboard_fields_when_port_set(self, simple_config: WorkflowConfig) -> None: """Dashboard port and URL present when dashboard_port is provided.""" - engine = WorkflowEngine( - simple_config, run_context=RunContext(dashboard_port=8080) - ) + engine = WorkflowEngine(simple_config, run_context=RunContext(dashboard_port=8080)) meta = engine._build_system_metadata() assert meta["dashboard_port"] == 8080