diff --git a/src/conductor/cli/app.py b/src/conductor/cli/app.py index 31c2d23..8f0add6 100644 --- a/src/conductor/cli/app.py +++ b/src/conductor/cli/app.py @@ -238,6 +238,17 @@ def run( help="Workflow inputs in name=value format. Can be repeated.", ), ] = None, + raw_metadata: Annotated[ + list[str] | None, + typer.Option( + "--metadata", + "-m", + help=( + "Workflow metadata in key=value format. " + "Merged on top of YAML metadata. Can be repeated." + ), + ), + ] = None, dry_run: Annotated[ bool, typer.Option( @@ -300,12 +311,14 @@ def run( Execute a multi-agent workflow defined in the specified YAML file. Workflow inputs can be provided using --input flags. + Metadata can be provided using --metadata flags (merged on top of YAML metadata). \b Examples: conductor run workflow.yaml conductor run workflow.yaml --input question="What is Python?" conductor run workflow.yaml -i question="Hello" -i context="Programming" + conductor run workflow.yaml --metadata tracker=ado -m work_item_id=1814 conductor run workflow.yaml --provider copilot conductor run workflow.yaml --dry-run conductor run workflow.yaml --skip-gates @@ -350,6 +363,7 @@ def run( display_execution_plan, generate_log_path, parse_input_flags, + parse_metadata_flags, run_workflow_async, ) @@ -377,6 +391,11 @@ def run( # Also parse --input.name=value style from sys.argv inputs.update(InputCollector.extract_from_args()) + # Parse --metadata key=value flags (no type coercion — values stay as strings) + cli_metadata: dict[str, str] = {} + if raw_metadata: + cli_metadata.update(parse_metadata_flags(raw_metadata)) + # Resolve log file path resolved_log_file: Path | None = None if log_file is not None: @@ -398,6 +417,7 @@ def run( log_file=resolved_log_file, no_interactive=True, # Always non-interactive in background web_port=web_port, + metadata=cli_metadata, ) console.print(f"[bold cyan]Dashboard:[/bold cyan] {url}") console.print( @@ -422,6 +442,7 @@ def run( web=web, web_port=web_port, web_bg=web_bg, + metadata=cli_metadata, ) ) diff --git a/src/conductor/cli/bg_runner.py b/src/conductor/cli/bg_runner.py index e193ae0..462764b 100644 --- a/src/conductor/cli/bg_runner.py +++ b/src/conductor/cli/bg_runner.py @@ -62,6 +62,7 @@ def launch_background( log_file: Path | None = None, no_interactive: bool = True, web_port: int = 0, + metadata: dict[str, str] | None = None, ) -> str: """Fork a detached child process running the workflow with a web dashboard. @@ -77,6 +78,7 @@ def launch_background( log_file: Optional log file path. no_interactive: Whether to disable interactive mode (always True for bg). web_port: Desired port (0 = auto-select). + metadata: Optional CLI metadata key=value pairs. Returns: The dashboard URL (e.g. ``http://127.0.0.1:8080``). @@ -107,6 +109,11 @@ def launch_background( for key, value in inputs.items(): cmd.extend(["--input", f"{key}={_serialize_value(value)}"]) + # Forward metadata + if metadata: + for key, value in metadata.items(): + cmd.extend(["--metadata", f"{key}={_serialize_value(value)}"]) + if provider_override: cmd.extend(["--provider", provider_override]) diff --git a/src/conductor/cli/pid.py b/src/conductor/cli/pid.py index 420bbcf..540337a 100644 --- a/src/conductor/cli/pid.py +++ b/src/conductor/cli/pid.py @@ -38,13 +38,21 @@ def pid_dir() -> Path: return d -def write_pid_file(pid: int, port: int, workflow_path: str | Path) -> Path: +def write_pid_file( + pid: int, + port: int, + workflow_path: str | Path, + run_id: str = "", + log_file: str = "", +) -> Path: """Write a PID file for a background workflow process. Args: pid: Process ID of the background child. port: TCP port the web dashboard is listening on. workflow_path: Path to the workflow YAML file. + run_id: Unique run identifier (from event log subscriber). + log_file: Path to the JSONL event log file. Returns: Path to the created PID file. @@ -58,6 +66,8 @@ def write_pid_file(pid: int, port: int, workflow_path: str | Path) -> Path: "port": port, "workflow": str(workflow_path), "started_at": datetime.now(UTC).isoformat(), + "run_id": run_id, + "log_file": log_file, } filepath.write_text(json.dumps(data, indent=2)) diff --git a/src/conductor/cli/run.py b/src/conductor/cli/run.py index 32ac61e..7e75ab8 100644 --- a/src/conductor/cli/run.py +++ b/src/conductor/cli/run.py @@ -841,6 +841,41 @@ def parse_input_flags(raw_inputs: list[str]) -> dict[str, Any]: return inputs +def parse_metadata_flags(raw_metadata: list[str]) -> dict[str, str]: + """Parse --metadata key=value flags into a dictionary. + + Unlike ``parse_input_flags``, values are kept as raw strings with no + type coercion — metadata is opaque key-value data. + + Args: + raw_metadata: List of "key=value" strings from CLI. + + Returns: + Dictionary of string key-value pairs. + + Raises: + typer.BadParameter: If metadata format is invalid. + """ + result: dict[str, str] = {} + + for raw in raw_metadata: + if "=" not in raw: + raise typer.BadParameter( + f"Invalid metadata format: '{raw}'. Expected format: key=value" + ) + + key, value = raw.split("=", 1) + key = key.strip() + value = value.strip() + + if not key: + raise typer.BadParameter(f"Empty metadata key in: '{raw}'") + + result[key] = value + + return result + + def coerce_value(value: str) -> Any: """Coerce a string value to an appropriate Python type. @@ -990,6 +1025,7 @@ async def run_workflow_async( web: bool = False, web_port: int = 0, web_bg: bool = False, + metadata: dict[str, str] | None = None, ) -> dict[str, Any]: """Execute a workflow asynchronously. @@ -1003,6 +1039,7 @@ async def run_workflow_async( web: If True, start a real-time web dashboard. web_port: Port for the web dashboard (0 = auto-select). web_bg: If True, auto-shutdown dashboard after workflow + client disconnect. + metadata: Optional CLI metadata to merge on top of YAML-declared metadata. Returns: The workflow output as a dictionary. @@ -1054,6 +1091,10 @@ async def run_workflow_async( config = load_config(workflow_path) verbose_log_timing("Configuration loaded", time.time() - load_start) + # Merge CLI metadata on top of YAML-declared metadata + if metadata: + config.workflow.metadata.update(metadata) + # Log workflow details verbose_log(f"Workflow: {config.workflow.name}") verbose_log(f"Entry point: {config.workflow.entry_point}") @@ -1107,6 +1148,8 @@ async def run_workflow_async( # so POST /api/stop can interrupt the running agent mid-execution interrupt_event = asyncio.Event() + from conductor.engine.workflow import RunContext + engine = WorkflowEngine( config, registry=registry, @@ -1116,6 +1159,12 @@ async def run_workflow_async( event_emitter=emitter, keyboard_listener=listener, web_dashboard=dashboard, + run_context=RunContext( + run_id=event_log_subscriber.run_id if event_log_subscriber else "", + log_file=str(event_log_subscriber.path) if event_log_subscriber else "", + dashboard_port=(dashboard.port if dashboard is not None else None), + bg_mode=web_bg or os.environ.get("CONDUCTOR_WEB_BG") == "1", + ), ) # Share interrupt_event with dashboard so POST /api/stop can abort agents diff --git a/src/conductor/config/schema.py b/src/conductor/config/schema.py index 527c50b..7594d49 100644 --- a/src/conductor/config/schema.py +++ b/src/conductor/config/schema.py @@ -738,6 +738,13 @@ class WorkflowDef(BaseModel): hooks: HooksConfig | None = None """Lifecycle event hooks.""" + metadata: dict[str, Any] = Field(default_factory=dict) + """Arbitrary key-value metadata for external tooling (dashboards, trackers, etc.). + + Included verbatim in the ``workflow_started`` event so downstream + consumers can use it for enrichment without parsing the YAML source. + """ + class WorkflowConfig(BaseModel): """Complete workflow configuration file.""" diff --git a/src/conductor/engine/checkpoint.py b/src/conductor/engine/checkpoint.py index 27dc2fa..596861e 100644 --- a/src/conductor/engine/checkpoint.py +++ b/src/conductor/engine/checkpoint.py @@ -136,6 +136,7 @@ def save_checkpoint( error: BaseException, inputs: dict[str, Any], copilot_session_ids: dict[str, str] | None = None, + system_metadata: dict[str, Any] | None = None, ) -> Path | None: """Serialize workflow state to a checkpoint file. @@ -153,6 +154,7 @@ def save_checkpoint( error: The exception that triggered the checkpoint. inputs: Workflow inputs. copilot_session_ids: Optional mapping of agent names to session IDs. + system_metadata: Optional system metadata captured at workflow start. Returns: Path to the saved checkpoint file, or ``None`` if saving failed. @@ -193,6 +195,7 @@ def save_checkpoint( "context": _make_json_serializable(context.to_dict()), "limits": _make_json_serializable(limits.to_dict()), "copilot_session_ids": copilot_session_ids or {}, + "system": system_metadata or {}, } # Serialize to JSON diff --git a/src/conductor/engine/event_log.py b/src/conductor/engine/event_log.py index 0b5432d..ff06208 100644 --- a/src/conductor/engine/event_log.py +++ b/src/conductor/engine/event_log.py @@ -61,8 +61,8 @@ def __init__(self, workflow_name: str) -> None: ts = time.strftime("%Y%m%d-%H%M%S") # Append random suffix to avoid filename collisions # when multiple runs start in the same second - suffix = secrets.token_hex(4) - ts = f"{ts}-{suffix}" + self._run_id = secrets.token_hex(4) + ts = f"{ts}-{self._run_id}" self._path = ( Path(tempfile.gettempdir()) / "conductor" @@ -71,6 +71,11 @@ def __init__(self, workflow_name: str) -> None: self._path.parent.mkdir(parents=True, exist_ok=True) self._handle = open(self._path, "w", encoding="utf-8") # noqa: SIM115 + @property + def run_id(self) -> str: + """Unique run identifier (8-char hex).""" + return self._run_id + @property def path(self) -> Path: """Path to the JSONL log file.""" diff --git a/src/conductor/engine/workflow.py b/src/conductor/engine/workflow.py index 484192b..753bb26 100644 --- a/src/conductor/engine/workflow.py +++ b/src/conductor/engine/workflow.py @@ -58,6 +58,20 @@ from conductor.web.server import WebDashboard +@dataclass +class RunContext: + """Informational metadata about the current CLI run. + + These fields are not used for workflow orchestration — they are passed + through to event data and checkpoints for diagnostics and linking. + """ + + run_id: str = "" + log_file: str = "" + dashboard_port: int | None = None + bg_mode: bool = False + + @dataclass class ParallelAgentError: """Error information from a failed parallel agent execution. @@ -270,6 +284,7 @@ def __init__( keyboard_listener: KeyboardListener | None = None, web_dashboard: WebDashboard | None = None, _subworkflow_depth: int = 0, + run_context: RunContext | None = None, ) -> None: """Initialize the WorkflowEngine. @@ -308,6 +323,9 @@ def __init__( self.config = config self.skip_gates = skip_gates self.workflow_path = workflow_path + self._run_context = run_context or RunContext() + self._run_id = self._run_context.run_id + self._log_file = self._run_context.log_file self.context = WorkflowContext() self.renderer = TemplateRenderer() self.router = Router() @@ -354,6 +372,11 @@ def __init__( # Sub-workflow depth tracking self._subworkflow_depth = _subworkflow_depth + # System metadata fields (set by CLI, used in workflow_started event) + self._dashboard_port = self._run_context.dashboard_port + self._bg_mode = self._run_context.bg_mode + self._system_metadata: dict[str, Any] = {} + def _build_pricing_overrides(self) -> dict[str, ModelPricing] | None: """Build pricing overrides from workflow cost configuration. @@ -411,6 +434,48 @@ def _conductor_version() -> str: except Exception: return "unknown" + def _build_system_metadata(self) -> dict[str, Any]: + """Build system metadata dict for the workflow_started event. + + Captures runtime diagnostics that would be lost if the process crashes: + PID, platform, Python version, working directory, etc. + + Returns: + Dict with system metadata fields. + """ + import os + import platform as _platform + import sys + from datetime import UTC, datetime + + try: + cwd = os.getcwd() + except OSError: + cwd = "" + + system: dict[str, Any] = { + "pid": os.getpid(), + "platform": sys.platform, + "python_version": _platform.python_version(), + "conductor_version": self._conductor_version(), + "cwd": cwd, + "started_at": datetime.now(UTC).isoformat(), + "run_id": self._run_id, + "log_file": self._log_file, + "bg_mode": self._bg_mode, + } + + # Conditional fields — only when dashboard is active + if self._dashboard_port is not None: + system["dashboard_port"] = self._dashboard_port + system["dashboard_url"] = f"http://127.0.0.1:{self._dashboard_port}" + + # Parent PID is useful in --web-bg to trace back to the forking CLI process + if self._bg_mode: + system["parent_pid"] = os.getppid() + + return system + def _make_event_callback(self, agent_name: str) -> Any: """Create an event callback for an agent that forwards to the emitter. @@ -694,6 +759,7 @@ def _save_checkpoint_on_failure(self, error: BaseException) -> None: error=error, inputs=self.context.workflow_inputs, copilot_session_ids=copilot_session_ids, + system_metadata=self._system_metadata, ) self._last_checkpoint_path = checkpoint_path if checkpoint_path is not None: @@ -1078,6 +1144,7 @@ async def _execute_loop(self, current_agent_name: str) -> dict[str, Any]: try: async with self.limits.timeout_context(): # Emit workflow_started before the execution loop + self._system_metadata = self._build_system_metadata() self._emit( "workflow_started", { @@ -1134,6 +1201,10 @@ async def _execute_loop(self, current_agent_name: str) -> dict[str, Any]: for r in f.routes ], **self._yaml_source_field(), + "metadata": self.config.workflow.metadata, + "system": self._system_metadata, + "run_id": self._run_id, + "log_file": self._log_file, }, ) diff --git a/src/conductor/web/server.py b/src/conductor/web/server.py index 892166a..1531d1f 100644 --- a/src/conductor/web/server.py +++ b/src/conductor/web/server.py @@ -113,6 +113,11 @@ def __init__( # Subscribe to emitter self._emitter.subscribe(self._on_event) + @property + def port(self) -> int: + """Resolved TCP port the dashboard is listening on.""" + return self._actual_port if self._actual_port is not None else self._port + def _create_app(self) -> FastAPI: """Create the FastAPI application with all routes. @@ -156,6 +161,24 @@ async def favicon() -> FileResponse: async def get_state() -> JSONResponse: return JSONResponse(content=self._event_history) + @app.get("/api/info") + async def get_info() -> JSONResponse: + """Return run identity for dashboard linking.""" + # Extract from first workflow_started event + info: dict[str, Any] = {} + for event in self._event_history: + if event.get("type") == "workflow_started": + data = event.get("data", {}) + info = { + "run_id": data.get("run_id", ""), + "workflow_name": data.get("name", ""), + "started_at": event.get("timestamp", 0), + "metadata": data.get("metadata", {}), + "conductor_version": data.get("system", {}).get("conductor_version", ""), + } + break + return JSONResponse(content=info) + @app.get("/api/logs") async def download_logs() -> JSONResponse: """Download the full event history as a JSON file.""" diff --git a/tests/test_config/test_loader.py b/tests/test_config/test_loader.py index e4d25dd..7247255 100644 --- a/tests/test_config/test_loader.py +++ b/tests/test_config/test_loader.py @@ -480,3 +480,105 @@ def test_route_to_parallel_group(self) -> None: """ config = load_config_string(yaml_content) assert config.agents[0].routes[0].to == "pg" + + +class TestMetadataLoading: + """Tests for workflow metadata loading from YAML.""" + + def test_metadata_from_yaml(self) -> None: + """Test that metadata is loaded from YAML workflow section.""" + yaml_content = """ +workflow: + name: test-wf + entry_point: agent1 + metadata: + tracker: ado + project_url: https://dev.azure.com/org/Project + work_item_id_agent: intake + work_item_id_field: epic_id + +agents: + - name: agent1 + model: gpt-4 + prompt: "Hello" + routes: + - to: $end +""" + config = load_config_string(yaml_content) + assert config.workflow.metadata == { + "tracker": "ado", + "project_url": "https://dev.azure.com/org/Project", + "work_item_id_agent": "intake", + "work_item_id_field": "epic_id", + } + + def test_no_metadata_in_yaml(self) -> None: + """Test that omitting metadata from YAML gives empty dict.""" + yaml_content = """ +workflow: + name: test-wf + entry_point: agent1 + +agents: + - name: agent1 + model: gpt-4 + prompt: "Hello" + routes: + - to: $end +""" + config = load_config_string(yaml_content) + assert config.workflow.metadata == {} + + def test_metadata_with_nested_values(self) -> None: + """Test that metadata supports nested dicts and lists.""" + yaml_content = """ +workflow: + name: test-wf + entry_point: agent1 + metadata: + tracker: jira + config: + base_url: https://jira.example.com + project_key: PROJ + labels: + - backend + - infra + +agents: + - name: agent1 + model: gpt-4 + prompt: "Hello" + routes: + - to: $end +""" + config = load_config_string(yaml_content) + assert config.workflow.metadata["tracker"] == "jira" + assert config.workflow.metadata["config"]["base_url"] == "https://jira.example.com" + assert config.workflow.metadata["labels"] == ["backend", "infra"] + + def test_metadata_independent_from_input(self) -> None: + """Test that metadata and input are completely separate namespaces.""" + yaml_content = """ +workflow: + name: test-wf + entry_point: agent1 + input: + question: + type: string + description: User question + metadata: + tracker: github + repo: owner/repo + +agents: + - name: agent1 + model: gpt-4 + prompt: "{{ workflow.input.question }}" + routes: + - to: $end +""" + config = load_config_string(yaml_content) + assert "question" in config.workflow.input + assert "question" not in config.workflow.metadata + assert "tracker" in config.workflow.metadata + assert "tracker" not in config.workflow.input diff --git a/tests/test_config/test_schema.py b/tests/test_config/test_schema.py index dc8960e..e2cbba8 100644 --- a/tests/test_config/test_schema.py +++ b/tests/test_config/test_schema.py @@ -606,6 +606,44 @@ def test_full_workflow(self) -> None: assert workflow.context.mode == "explicit" assert workflow.limits.max_iterations == 20 + def test_metadata_defaults_to_empty_dict(self) -> None: + """Test that metadata defaults to empty dict when not specified.""" + workflow = WorkflowDef(name="test", entry_point="agent1") + assert workflow.metadata == {} + + def test_metadata_accepts_arbitrary_keys(self) -> None: + """Test that metadata accepts any key-value pairs.""" + workflow = WorkflowDef( + name="test", + entry_point="agent1", + metadata={ + "tracker": "ado", + "project_url": "https://dev.azure.com/org/Project", + "work_item_id": "1814", + "nested": {"key": "value"}, + "count": 42, + }, + ) + assert workflow.metadata["tracker"] == "ado" + assert workflow.metadata["project_url"] == "https://dev.azure.com/org/Project" + assert workflow.metadata["work_item_id"] == "1814" + assert workflow.metadata["nested"] == {"key": "value"} + assert workflow.metadata["count"] == 42 + + def test_metadata_does_not_affect_other_fields(self) -> None: + """Test that metadata is independent from input, context, etc.""" + workflow = WorkflowDef( + name="test", + entry_point="agent1", + input={"goal": InputDef(type="string")}, + metadata={"tracker": "ado"}, + ) + assert workflow.metadata == {"tracker": "ado"} + assert "goal" in workflow.input + # Metadata and input are completely separate + assert "tracker" not in workflow.input + assert "goal" not in workflow.metadata + class TestWorkflowConfig: """Tests for WorkflowConfig model.""" diff --git a/tests/test_engine/test_system_metadata.py b/tests/test_engine/test_system_metadata.py new file mode 100644 index 0000000..8de9544 --- /dev/null +++ b/tests/test_engine/test_system_metadata.py @@ -0,0 +1,206 @@ +"""Tests for system metadata in workflow_started event and checkpoints.""" + +import os +import sys +from pathlib import Path +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest + +from conductor.config.schema import ( + AgentDef, + ContextConfig, + LimitsConfig, + OutputField, + RouteDef, + RuntimeConfig, + WorkflowConfig, + WorkflowDef, +) +from conductor.engine.checkpoint import CheckpointManager +from conductor.engine.workflow import RunContext, WorkflowEngine +from conductor.events import WorkflowEventEmitter + + +@pytest.fixture +def simple_config() -> WorkflowConfig: + """Minimal workflow config for testing.""" + return WorkflowConfig( + workflow=WorkflowDef( + name="test-workflow", + entry_point="agent1", + runtime=RuntimeConfig(provider="copilot"), + context=ContextConfig(mode="accumulate"), + limits=LimitsConfig(max_iterations=5), + ), + agents=[ + AgentDef( + name="agent1", + model="gpt-4", + prompt="Hello", + output={"result": OutputField(type="string")}, + routes=[RouteDef(to="$end")], + ), + ], + output={"final": "{{ agent1.output.result }}"}, + ) + + +class TestBuildSystemMetadata: + """Tests for WorkflowEngine._build_system_metadata().""" + + def test_always_present_fields(self, simple_config: WorkflowConfig) -> None: + """System metadata includes all required fields.""" + engine = WorkflowEngine( + simple_config, + run_context=RunContext(run_id="abc123", log_file="/tmp/test.jsonl"), + ) + meta = engine._build_system_metadata() + + assert meta["pid"] == os.getpid() + assert meta["platform"] == sys.platform + assert "python_version" in meta + assert meta["conductor_version"] is not None + assert meta["cwd"] == os.getcwd() + assert "started_at" in meta + assert meta["run_id"] == "abc123" + assert meta["log_file"] == "/tmp/test.jsonl" + assert meta["bg_mode"] is False + + def test_no_dashboard_fields_by_default(self, simple_config: WorkflowConfig) -> None: + """Dashboard-specific fields are absent when no dashboard.""" + engine = WorkflowEngine(simple_config) + meta = engine._build_system_metadata() + + assert "dashboard_port" not in meta + assert "dashboard_url" not in meta + assert "parent_pid" not in meta + + def test_dashboard_fields_when_port_set(self, simple_config: WorkflowConfig) -> None: + """Dashboard port and URL present when dashboard_port is provided.""" + engine = WorkflowEngine(simple_config, run_context=RunContext(dashboard_port=8080)) + meta = engine._build_system_metadata() + + assert meta["dashboard_port"] == 8080 + assert meta["dashboard_url"] == "http://127.0.0.1:8080" + + def test_bg_mode_includes_parent_pid(self, simple_config: WorkflowConfig) -> None: + """Background mode includes parent_pid.""" + engine = WorkflowEngine(simple_config, run_context=RunContext(bg_mode=True)) + meta = engine._build_system_metadata() + + assert meta["bg_mode"] is True + assert meta["parent_pid"] == os.getppid() + + def test_started_at_is_iso_format(self, simple_config: WorkflowConfig) -> None: + """started_at is a valid ISO-8601 timestamp.""" + from datetime import datetime + + engine = WorkflowEngine(simple_config) + meta = engine._build_system_metadata() + + # Should not raise + datetime.fromisoformat(meta["started_at"]) + + +class TestSystemMetadataInEvent: + """Tests that system metadata appears in workflow_started event.""" + + @pytest.mark.asyncio + async def test_workflow_started_has_system_field(self, simple_config: WorkflowConfig) -> None: + """workflow_started event includes a 'system' dict.""" + emitter = WorkflowEventEmitter() + captured_events: list = [] + emitter.subscribe(lambda event: captured_events.append(event.to_dict())) + + engine = WorkflowEngine( + simple_config, + event_emitter=emitter, + run_context=RunContext( + run_id="test-run", + log_file="/tmp/test.jsonl", + dashboard_port=9090, + bg_mode=False, + ), + ) + + # Mock out the actual execution to just emit workflow_started + mock_provider = MagicMock() + mock_provider.execute = AsyncMock( + return_value=MagicMock(content='{"result": "hello"}', model="gpt-4") + ) + engine.executor = MagicMock() + engine.executor.execute = mock_provider.execute + + # Trigger _execute_loop directly would be complex, so just call + # _build_system_metadata and verify the shape + meta = engine._build_system_metadata() + assert "pid" in meta + assert meta["dashboard_port"] == 9090 + assert "system" not in meta # no recursion + + +class TestSystemMetadataInCheckpoint: + """Tests that system metadata is saved in checkpoint files.""" + + def test_checkpoint_includes_system_field(self, tmp_path: Path) -> None: + """Checkpoint JSON includes system metadata when provided.""" + import json + + workflow_file = tmp_path / "test.yaml" + workflow_file.write_text("workflow:\n name: test\n") + + system_meta = { + "pid": 12345, + "platform": "win32", + "python_version": "3.12.4", + "cwd": str(tmp_path), + } + + from conductor.engine.context import WorkflowContext + from conductor.engine.limits import LimitEnforcer + + ctx = WorkflowContext() + limits = LimitEnforcer(max_iterations=10) + + with patch.object(CheckpointManager, "get_checkpoints_dir", return_value=tmp_path): + path = CheckpointManager.save_checkpoint( + workflow_path=workflow_file, + context=ctx, + limits=limits, + current_agent="agent1", + error=RuntimeError("test error"), + inputs={"q": "hello"}, + system_metadata=system_meta, + ) + + assert path is not None + data = json.loads(path.read_text()) + assert data["system"] == system_meta + + def test_checkpoint_system_empty_when_not_provided(self, tmp_path: Path) -> None: + """Checkpoint system field defaults to empty dict.""" + import json + + workflow_file = tmp_path / "test.yaml" + workflow_file.write_text("workflow:\n name: test\n") + + from conductor.engine.context import WorkflowContext + from conductor.engine.limits import LimitEnforcer + + ctx = WorkflowContext() + limits = LimitEnforcer(max_iterations=10) + + with patch.object(CheckpointManager, "get_checkpoints_dir", return_value=tmp_path): + path = CheckpointManager.save_checkpoint( + workflow_path=workflow_file, + context=ctx, + limits=limits, + current_agent="agent1", + error=RuntimeError("test error"), + inputs={}, + ) + + assert path is not None + data = json.loads(path.read_text()) + assert data["system"] == {}