Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 63 additions & 0 deletions src/conductor/cli/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -705,6 +705,69 @@ def checkpoints(
output_console.print(f"\n[dim]Total: {len(checkpoint_list)} checkpoint(s)[/dim]")


@app.command()
def replay(
log_file: Annotated[
Path,
typer.Argument(
help="Path to a JSON or JSONL event log file.",
exists=True,
readable=True,
),
],
web_port: Annotated[
int,
typer.Option(
"--web-port",
help="Port for the replay dashboard (0 = auto-select).",
),
] = 0,
) -> None:
"""Replay a recorded workflow from a JSON/JSONL event log.

Opens the web dashboard in replay mode with a timeline slider
for scrubbing through the workflow history.

The log file can be:
- A JSON array downloaded from the dashboard (GET /api/logs)
- A JSONL file written by the EventLogSubscriber

Example:
conductor replay conductor-logs.json
conductor replay /tmp/conductor/conductor-my-workflow-20260101-120000.events.jsonl
"""
import asyncio

async def _run_replay() -> None:
from conductor.web.replay import ReplayDashboard

try:
dashboard = ReplayDashboard(
log_file.resolve(),
host="127.0.0.1",
port=web_port,
)
except ValueError as exc:
print_error(exc)
raise typer.Exit(1) from exc

await dashboard.start()
console.print(f"\n[bold green]▶ Replay dashboard:[/] {dashboard.url}\n")
console.print("[dim]Press Ctrl+C to exit[/dim]\n")

try:
await asyncio.Event().wait()
except asyncio.CancelledError:
pass
finally:
await dashboard.stop()

try:
asyncio.run(_run_replay())
except KeyboardInterrupt:
console.print("\n[dim]Replay stopped.[/dim]")


@app.command()
def stop(
port: Annotated[
Expand Down
21 changes: 21 additions & 0 deletions src/conductor/engine/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,25 @@ def _emit(self, event_type: str, data: dict[str, Any]) -> None:
event = WorkflowEvent(type=event_type, timestamp=_time.time(), data=data)
self._event_emitter.emit(event)

def _yaml_source_field(self) -> dict[str, str]:
"""Return ``{"yaml_source": <text>}`` if the workflow file is readable."""
if self.workflow_path is None:
return {}
try:
return {"yaml_source": Path(self.workflow_path).read_text(encoding="utf-8")}
except (OSError, ValueError):
return {}

@staticmethod
def _conductor_version() -> str:
"""Return the installed conductor-cli version."""
try:
from conductor import __version__

return __version__
except Exception:
return "unknown"

def _make_event_callback(self, agent_name: str) -> Any:
"""Create an event callback for an agent that forwards to the emitter.

Expand Down Expand Up @@ -959,6 +978,7 @@ async def _execute_loop(self, current_agent_name: str) -> dict[str, Any]:
"workflow_started",
{
"name": self.config.workflow.name,
"version": self._conductor_version(),
"entry_point": self.config.workflow.entry_point,
"agents": [
{
Expand Down Expand Up @@ -1009,6 +1029,7 @@ async def _execute_loop(self, current_agent_name: str) -> dict[str, Any]:
for f in self.config.for_each
for r in f.routes
],
**self._yaml_source_field(),
},
)

Expand Down
31 changes: 28 additions & 3 deletions src/conductor/web/frontend/src/App.tsx
Original file line number Diff line number Diff line change
@@ -1,16 +1,38 @@
import { useEffect } from 'react';
import { useState, useEffect } from 'react';
import { Header } from '@/components/layout/Header';
import { StatusBar } from '@/components/layout/StatusBar';
import { ReplayBar } from '@/components/layout/ReplayBar';
import { ResizableLayout } from '@/components/layout/ResizableLayout';
import { useWebSocket } from '@/hooks/use-websocket';
import { useReplay } from '@/hooks/use-replay';
import { useWorkflowStore } from '@/stores/workflow-store';

export default function App() {
function LiveMode() {
useWebSocket();
return null;
}

function ReplayMode() {
useReplay();
return null;
}

export default function App() {
const [isReplayMode, setIsReplayMode] = useState<boolean | null>(null);
const replayMode = useWorkflowStore((s) => s.replayMode);
const selectNode = useWorkflowStore((s) => s.selectNode);
const workflowName = useWorkflowStore((s) => s.workflowName);

// Detect replay mode on mount
useEffect(() => {
fetch('/api/replay/info')
.then((r) => {
if (r.ok) setIsReplayMode(true);
else setIsReplayMode(false);
})
.catch(() => setIsReplayMode(false));
}, []);

// Update document title
useEffect(() => {
document.title = workflowName ? `Conductor — ${workflowName}` : 'Conductor Dashboard';
Expand All @@ -27,11 +49,14 @@ export default function App() {
return () => window.removeEventListener('keydown', handleKeyDown);
}, [selectNode]);

if (isReplayMode === null) return null;

return (
<div className="h-full flex flex-col bg-[var(--bg)]">
{isReplayMode ? <ReplayMode /> : <LiveMode />}
<Header />
<ResizableLayout />
<StatusBar />
{replayMode ? <ReplayBar /> : <StatusBar />}
</div>
);
}
12 changes: 11 additions & 1 deletion src/conductor/web/frontend/src/components/graph/AgentNode.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -133,11 +133,21 @@ export const AgentNode = memo(function AgentNode({ data, id, selected }: NodePro
/** Hook that returns a live-ticking elapsed string while status is 'running'. */
function useLiveElapsed(id: string, status: NodeStatus): string {
const startedAt = useWorkflowStore((s) => s.nodes[id]?.startedAt);
const replayMode = useWorkflowStore((s) => s.replayMode);
const lastEventTime = useWorkflowStore((s) => s.lastEventTime);
const [display, setDisplay] = useState('0.0s');
const rafRef = useRef<ReturnType<typeof setInterval> | null>(null);

useEffect(() => {
if (status === 'running') {
if (replayMode) {
// In replay mode, use event timestamps instead of wall clock
if (rafRef.current) clearInterval(rafRef.current);
const origin = startedAt ?? (lastEventTime ?? 0);
const now = lastEventTime ?? origin;
setDisplay(formatElapsed(now - origin));
return;
}
const origin = startedAt != null ? startedAt * 1000 : Date.now();
const tick = () => {
const sec = (Date.now() - origin) / 1000;
Expand All @@ -151,7 +161,7 @@ function useLiveElapsed(id: string, status: NodeStatus): string {
} else {
if (rafRef.current) clearInterval(rafRef.current);
}
}, [status, startedAt]);
}, [status, startedAt, replayMode, lastEventTime]);

return display;
}
Expand Down
12 changes: 11 additions & 1 deletion src/conductor/web/frontend/src/components/graph/ScriptNode.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -90,11 +90,21 @@ export const ScriptNode = memo(function ScriptNode({ data, id, selected }: NodeP

function useLiveElapsed(id: string, status: NodeStatus): string {
const startedAt = useWorkflowStore((s) => s.nodes[id]?.startedAt);
const replayMode = useWorkflowStore((s) => s.replayMode);
const lastEventTime = useWorkflowStore((s) => s.lastEventTime);
const [display, setDisplay] = useState('0.0s');
const rafRef = useRef<ReturnType<typeof setInterval> | null>(null);

useEffect(() => {
if (status === 'running') {
if (replayMode) {
// In replay mode, use event timestamps instead of wall clock
if (rafRef.current) clearInterval(rafRef.current);
const origin = startedAt ?? (lastEventTime ?? 0);
const now = lastEventTime ?? origin;
setDisplay(formatElapsed(now - origin));
return;
}
const origin = startedAt != null ? startedAt * 1000 : Date.now();
const tick = () => {
const sec = (Date.now() - origin) / 1000;
Expand All @@ -108,7 +118,7 @@ function useLiveElapsed(id: string, status: NodeStatus): string {
} else {
if (rafRef.current) clearInterval(rafRef.current);
}
}, [status, startedAt]);
}, [status, startedAt, replayMode, lastEventTime]);

return display;
}
Expand Down
24 changes: 22 additions & 2 deletions src/conductor/web/frontend/src/components/layout/Header.tsx
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
import { useState, useEffect } from 'react';
import { Activity, Square, Play, X, Download } from 'lucide-react';
import { Activity, Square, Play, X, Download, FileCode } from 'lucide-react';
import { useWorkflowStore } from '@/stores/workflow-store';
import { YamlViewer } from '@/components/layout/YamlViewer';

export function Header() {
const workflowName = useWorkflowStore((s) => s.workflowName);
const workflowStatus = useWorkflowStore((s) => s.workflowStatus);
const isPaused = useWorkflowStore((s) => s.isPaused);
const workflowYaml = useWorkflowStore((s) => s.workflowYaml);
const conductorVersion = useWorkflowStore((s) => s.conductorVersion);
const [stopping, setStopping] = useState(false);
const [resuming, setResuming] = useState(false);
const [killing, setKilling] = useState(false);
const [showYaml, setShowYaml] = useState(false);

const isRunning = workflowStatus === 'running' || workflowStatus === 'pending';

Expand Down Expand Up @@ -108,6 +112,19 @@ export function Header() {
{stopping ? 'Stopping...' : 'Stop'}
</button>
) : null}
{workflowYaml && (
<button
onClick={() => setShowYaml(true)}
className="flex items-center gap-1.5 px-2.5 py-1 text-xs font-medium rounded
bg-[var(--surface-hover)] text-[var(--text-secondary)] border border-[var(--border)]
hover:text-[var(--text)] hover:bg-[var(--surface)]
transition-colors"
title="View workflow YAML configuration"
>
<FileCode className="w-3 h-3" />
YAML
</button>
)}
<a
href="/api/logs"
download="conductor-logs.json"
Expand All @@ -120,8 +137,11 @@ export function Header() {
<Download className="w-3 h-3" />
Logs
</a>
<span className="text-xs text-[var(--text-muted)]">Dashboard v1.0</span>
<span className="text-xs text-[var(--text-muted)]">v{conductorVersion ?? '—'}</span>
</div>
{showYaml && workflowYaml && (
<YamlViewer yaml={workflowYaml} onClose={() => setShowYaml(false)} />
)}
</header>
);
}
Loading
Loading