Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ function useStatusTransition(status: NodeStatus): string {
prevStatusRef.current = status;
if (prev === status) return;

if (prev === 'pending' && status === 'running') {
if (status === 'running') {
setTransitionClass('node-activate');
} else if (prev === 'running' && (status === 'completed' || status === 'failed')) {
setTransitionClass(status === 'completed' ? 'node-complete' : 'node-fail');
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ function useStatusTransition(status: NodeStatus): string {
prevStatusRef.current = status;
if (prev === status) return;

if (prev === 'pending' && (status === 'running' || status === 'waiting')) {
if (status === 'running' || status === 'waiting') {
setTransitionClass('node-activate');
} else if ((prev === 'running' || prev === 'waiting') && status === 'completed') {
setTransitionClass('node-complete');
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ function useStatusTransition(status: NodeStatus): string {
prevStatusRef.current = status;
if (prev === status) return;

if (prev === 'pending' && status === 'running') {
if (status === 'running') {
setTransitionClass('node-activate');
} else if (prev === 'running' && (status === 'completed' || status === 'failed')) {
setTransitionClass(status === 'completed' ? 'node-complete' : 'node-fail');
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ function useStatusTransition(status: NodeStatus): string {
prevStatusRef.current = status;
if (prev === status) return;

if (prev === 'pending' && status === 'running') {
if (status === 'running') {
setTransitionClass('node-activate');
} else if (prev === 'running' && (status === 'completed' || status === 'failed')) {
setTransitionClass(status === 'completed' ? 'node-complete' : 'node-fail');
Expand Down
120 changes: 69 additions & 51 deletions src/conductor/web/frontend/src/hooks/use-websocket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,49 +13,9 @@ export function useWebSocket() {
const wsRef = useRef<WebSocket | null>(null);
const reconnectDelayRef = useRef(1000);
const reconnectTimerRef = useRef<ReturnType<typeof setTimeout> | null>(null);

const connect = useCallback(() => {
const proto = window.location.protocol === 'https:' ? 'wss:' : 'ws:';
const wsUrl = `${proto}//${window.location.host}/ws`;

try {
const ws = new WebSocket(wsUrl);
wsRef.current = ws;

ws.onopen = () => {
reconnectDelayRef.current = 1000;
setWsStatus('connected');
// Expose send function to the store
setWsSend((data: object) => {
if (ws.readyState === WebSocket.OPEN) {
ws.send(JSON.stringify(data));
}
});
};

ws.onmessage = (evt) => {
try {
const event = JSON.parse(evt.data) as WorkflowEvent;
processEvent(event);
} catch (e) {
console.error('Failed to parse WebSocket message:', e);
}
};

ws.onclose = () => {
setWsStatus('disconnected');
setWsSend(null);
wsRef.current = null;
scheduleReconnect();
};

ws.onerror = () => {
// onclose fires after onerror
};
} catch {
scheduleReconnect();
}
}, [processEvent, setWsStatus, setWsSend]);
const fetchAbortRef = useRef<AbortController | null>(null);
// Use a ref to break the circular dependency between connect and scheduleReconnect
const connectRef = useRef<() => void>(() => {});

const scheduleReconnect = useCallback(() => {
setWsStatus('reconnecting');
Expand All @@ -64,28 +24,86 @@ export function useWebSocket() {
reconnectDelayRef.current * 2,
MAX_RECONNECT_DELAY,
);
connect();
connectRef.current();
}, reconnectDelayRef.current);
}, [connect, setWsStatus]);
}, [setWsStatus]);

useEffect(() => {
// Fetch existing state for late-joiners, then connect
const connect = useCallback(() => {
setWsStatus('connecting');

fetch('/api/state')
// Cancel any in-flight fetch from a previous connect attempt
if (fetchAbortRef.current) {
fetchAbortRef.current.abort();
}
const abortController = new AbortController();
fetchAbortRef.current = abortController;

// Always fetch full state before opening WebSocket (handles initial + reconnect)
fetch('/api/state', { signal: abortController.signal })
.then((resp) => resp.json())
.then((events: WorkflowEvent[]) => {
if (events && events.length > 0) {
replayState(events);
}
connect();

const proto = window.location.protocol === 'https:' ? 'wss:' : 'ws:';
const wsUrl = `${proto}//${window.location.host}/ws`;

try {
const ws = new WebSocket(wsUrl);
wsRef.current = ws;

ws.onopen = () => {
reconnectDelayRef.current = 1000;
setWsStatus('connected');
// Expose send function to the store
setWsSend((data: object) => {
if (ws.readyState === WebSocket.OPEN) {
ws.send(JSON.stringify(data));
}
});
};

ws.onmessage = (evt) => {
try {
const event = JSON.parse(evt.data) as WorkflowEvent;
processEvent(event);
} catch (e) {
console.error('Failed to parse WebSocket message:', e);
}
};

ws.onclose = () => {
setWsStatus('disconnected');
setWsSend(null);
wsRef.current = null;
scheduleReconnect();
};

ws.onerror = () => {
// onclose fires after onerror
};
} catch {
scheduleReconnect();
}
})
.catch((err) => {
if (abortController.signal.aborted) return;
console.error('Failed to fetch state:', err);
connect();
scheduleReconnect();
});
}, [processEvent, replayState, setWsStatus, setWsSend, scheduleReconnect]);

// Keep the ref in sync with the latest connect callback
connectRef.current = connect;

useEffect(() => {
connect();

return () => {
if (fetchAbortRef.current) {
fetchAbortRef.current.abort();
}
if (reconnectTimerRef.current) {
clearTimeout(reconnectTimerRef.current);
}
Expand All @@ -94,5 +112,5 @@ export function useWebSocket() {
}
setWsSend(null);
};
}, [connect, replayState, setWsStatus, setWsSend]);
}, [connect, setWsSend]);
}
Loading
Loading