Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions src/conductor/cli/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,17 @@ def run(
help="Workflow inputs in name=value format. Can be repeated.",
),
] = None,
raw_metadata: Annotated[
list[str] | None,
typer.Option(
"--metadata",
"-m",
help=(
"Workflow metadata in key=value format. "
"Merged on top of YAML metadata. Can be repeated."
),
),
] = None,
dry_run: Annotated[
bool,
typer.Option(
Expand Down Expand Up @@ -300,12 +311,14 @@ def run(

Execute a multi-agent workflow defined in the specified YAML file.
Workflow inputs can be provided using --input flags.
Metadata can be provided using --metadata flags (merged on top of YAML metadata).

\b
Examples:
conductor run workflow.yaml
conductor run workflow.yaml --input question="What is Python?"
conductor run workflow.yaml -i question="Hello" -i context="Programming"
conductor run workflow.yaml --metadata tracker=ado -m work_item_id=1814
conductor run workflow.yaml --provider copilot
conductor run workflow.yaml --dry-run
conductor run workflow.yaml --skip-gates
Expand Down Expand Up @@ -350,6 +363,7 @@ def run(
display_execution_plan,
generate_log_path,
parse_input_flags,
parse_metadata_flags,
run_workflow_async,
)

Expand Down Expand Up @@ -377,6 +391,11 @@ def run(
# Also parse --input.name=value style from sys.argv
inputs.update(InputCollector.extract_from_args())

# Parse --metadata key=value flags (no type coercion — values stay as strings)
cli_metadata: dict[str, str] = {}
if raw_metadata:
cli_metadata.update(parse_metadata_flags(raw_metadata))

# Resolve log file path
resolved_log_file: Path | None = None
if log_file is not None:
Expand All @@ -398,6 +417,7 @@ def run(
log_file=resolved_log_file,
no_interactive=True, # Always non-interactive in background
web_port=web_port,
metadata=cli_metadata,
)
console.print(f"[bold cyan]Dashboard:[/bold cyan] {url}")
console.print(
Expand All @@ -422,6 +442,7 @@ def run(
web=web,
web_port=web_port,
web_bg=web_bg,
metadata=cli_metadata,
)
)

Expand Down
7 changes: 7 additions & 0 deletions src/conductor/cli/bg_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ def launch_background(
log_file: Path | None = None,
no_interactive: bool = True,
web_port: int = 0,
metadata: dict[str, str] | None = None,
) -> str:
"""Fork a detached child process running the workflow with a web dashboard.

Expand All @@ -77,6 +78,7 @@ def launch_background(
log_file: Optional log file path.
no_interactive: Whether to disable interactive mode (always True for bg).
web_port: Desired port (0 = auto-select).
metadata: Optional CLI metadata key=value pairs.

Returns:
The dashboard URL (e.g. ``http://127.0.0.1:8080``).
Expand Down Expand Up @@ -107,6 +109,11 @@ def launch_background(
for key, value in inputs.items():
cmd.extend(["--input", f"{key}={_serialize_value(value)}"])

# Forward metadata
if metadata:
for key, value in metadata.items():
cmd.extend(["--metadata", f"{key}={_serialize_value(value)}"])

if provider_override:
cmd.extend(["--provider", provider_override])

Expand Down
12 changes: 11 additions & 1 deletion src/conductor/cli/pid.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,21 @@ def pid_dir() -> Path:
return d


def write_pid_file(pid: int, port: int, workflow_path: str | Path) -> Path:
def write_pid_file(
pid: int,
port: int,
workflow_path: str | Path,
run_id: str = "",
log_file: str = "",
) -> Path:
"""Write a PID file for a background workflow process.

Args:
pid: Process ID of the background child.
port: TCP port the web dashboard is listening on.
workflow_path: Path to the workflow YAML file.
run_id: Unique run identifier (from event log subscriber).
log_file: Path to the JSONL event log file.

Returns:
Path to the created PID file.
Expand All @@ -58,6 +66,8 @@ def write_pid_file(pid: int, port: int, workflow_path: str | Path) -> Path:
"port": port,
"workflow": str(workflow_path),
"started_at": datetime.now(UTC).isoformat(),
"run_id": run_id,
"log_file": log_file,
}

filepath.write_text(json.dumps(data, indent=2))
Expand Down
49 changes: 49 additions & 0 deletions src/conductor/cli/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -841,6 +841,41 @@ def parse_input_flags(raw_inputs: list[str]) -> dict[str, Any]:
return inputs


def parse_metadata_flags(raw_metadata: list[str]) -> dict[str, str]:
"""Parse --metadata key=value flags into a dictionary.

Unlike ``parse_input_flags``, values are kept as raw strings with no
type coercion — metadata is opaque key-value data.

Args:
raw_metadata: List of "key=value" strings from CLI.

Returns:
Dictionary of string key-value pairs.

Raises:
typer.BadParameter: If metadata format is invalid.
"""
result: dict[str, str] = {}

for raw in raw_metadata:
if "=" not in raw:
raise typer.BadParameter(
f"Invalid metadata format: '{raw}'. Expected format: key=value"
)

key, value = raw.split("=", 1)
key = key.strip()
value = value.strip()

if not key:
raise typer.BadParameter(f"Empty metadata key in: '{raw}'")

result[key] = value

return result


def coerce_value(value: str) -> Any:
"""Coerce a string value to an appropriate Python type.

Expand Down Expand Up @@ -990,6 +1025,7 @@ async def run_workflow_async(
web: bool = False,
web_port: int = 0,
web_bg: bool = False,
metadata: dict[str, str] | None = None,
) -> dict[str, Any]:
"""Execute a workflow asynchronously.

Expand All @@ -1003,6 +1039,7 @@ async def run_workflow_async(
web: If True, start a real-time web dashboard.
web_port: Port for the web dashboard (0 = auto-select).
web_bg: If True, auto-shutdown dashboard after workflow + client disconnect.
metadata: Optional CLI metadata to merge on top of YAML-declared metadata.

Returns:
The workflow output as a dictionary.
Expand Down Expand Up @@ -1054,6 +1091,10 @@ async def run_workflow_async(
config = load_config(workflow_path)
verbose_log_timing("Configuration loaded", time.time() - load_start)

# Merge CLI metadata on top of YAML-declared metadata
if metadata:
config.workflow.metadata.update(metadata)

# Log workflow details
verbose_log(f"Workflow: {config.workflow.name}")
verbose_log(f"Entry point: {config.workflow.entry_point}")
Expand Down Expand Up @@ -1107,6 +1148,8 @@ async def run_workflow_async(
# so POST /api/stop can interrupt the running agent mid-execution
interrupt_event = asyncio.Event()

from conductor.engine.workflow import RunContext

engine = WorkflowEngine(
config,
registry=registry,
Expand All @@ -1116,6 +1159,12 @@ async def run_workflow_async(
event_emitter=emitter,
keyboard_listener=listener,
web_dashboard=dashboard,
run_context=RunContext(
run_id=event_log_subscriber.run_id if event_log_subscriber else "",
log_file=str(event_log_subscriber.path) if event_log_subscriber else "",
dashboard_port=(dashboard.port if dashboard is not None else None),
bg_mode=web_bg or os.environ.get("CONDUCTOR_WEB_BG") == "1",
),
)

# Share interrupt_event with dashboard so POST /api/stop can abort agents
Expand Down
7 changes: 7 additions & 0 deletions src/conductor/config/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -738,6 +738,13 @@ class WorkflowDef(BaseModel):
hooks: HooksConfig | None = None
"""Lifecycle event hooks."""

metadata: dict[str, Any] = Field(default_factory=dict)
"""Arbitrary key-value metadata for external tooling (dashboards, trackers, etc.).

Included verbatim in the ``workflow_started`` event so downstream
consumers can use it for enrichment without parsing the YAML source.
"""


class WorkflowConfig(BaseModel):
"""Complete workflow configuration file."""
Expand Down
3 changes: 3 additions & 0 deletions src/conductor/engine/checkpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ def save_checkpoint(
error: BaseException,
inputs: dict[str, Any],
copilot_session_ids: dict[str, str] | None = None,
system_metadata: dict[str, Any] | None = None,
) -> Path | None:
"""Serialize workflow state to a checkpoint file.

Expand All @@ -153,6 +154,7 @@ def save_checkpoint(
error: The exception that triggered the checkpoint.
inputs: Workflow inputs.
copilot_session_ids: Optional mapping of agent names to session IDs.
system_metadata: Optional system metadata captured at workflow start.

Returns:
Path to the saved checkpoint file, or ``None`` if saving failed.
Expand Down Expand Up @@ -193,6 +195,7 @@ def save_checkpoint(
"context": _make_json_serializable(context.to_dict()),
"limits": _make_json_serializable(limits.to_dict()),
"copilot_session_ids": copilot_session_ids or {},
"system": system_metadata or {},
}

# Serialize to JSON
Expand Down
9 changes: 7 additions & 2 deletions src/conductor/engine/event_log.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@ def __init__(self, workflow_name: str) -> None:
ts = time.strftime("%Y%m%d-%H%M%S")
# Append random suffix to avoid filename collisions
# when multiple runs start in the same second
suffix = secrets.token_hex(4)
ts = f"{ts}-{suffix}"
self._run_id = secrets.token_hex(4)
ts = f"{ts}-{self._run_id}"
self._path = (
Path(tempfile.gettempdir())
/ "conductor"
Expand All @@ -71,6 +71,11 @@ def __init__(self, workflow_name: str) -> None:
self._path.parent.mkdir(parents=True, exist_ok=True)
self._handle = open(self._path, "w", encoding="utf-8") # noqa: SIM115

@property
def run_id(self) -> str:
"""Unique run identifier (8-char hex)."""
return self._run_id

@property
def path(self) -> Path:
"""Path to the JSONL log file."""
Expand Down
Loading
Loading