Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 86 additions & 12 deletions desktop/src/features/agents/observerRelayStore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,38 @@ import type { RelayEvent, ManagedAgent } from "@/shared/api/types";
import { getIdentity } from "@/shared/api/tauri";
import { decryptObserverEvent } from "@/shared/api/tauriObserver";
import { normalizePubkey } from "@/shared/lib/pubkey";
import type { ConnectionState, ObserverEvent } from "./ui/agentSessionTypes";
import type {
ConnectionState,
ObserverEvent,
TranscriptItem,
} from "./ui/agentSessionTypes";
import {
type TranscriptState,
buildTranscriptState,
createEmptyTranscriptState,
processTranscriptEvent,
} from "./ui/agentSessionTranscript";

const MAX_OBSERVER_EVENTS = 800;

type ObserverSnapshot = {
export type ObserverSnapshot = {
connectionState: ConnectionState;
errorMessage: string | null;
events: ObserverEvent[];
};

const IDLE_SNAPSHOT: ObserverSnapshot = {
connectionState: "idle",
errorMessage: null,
events: [],
};

const EMPTY_TRANSCRIPT: TranscriptItem[] = [];

const listeners = new Set<() => void>();
const eventsByAgent = new Map<string, ObserverEvent[]>();
const transcriptByAgent = new Map<string, TranscriptState>();
const snapshotByAgent = new Map<string, ObserverSnapshot>();

// Normalized pubkeys of agents we are actively managing. Only events whose
// "agent" tag matches an entry here will be decrypted (defense-in-depth).
Expand All @@ -35,12 +55,18 @@ function notifyListeners() {
}
}

function invalidateSnapshot(key: string) {
snapshotByAgent.delete(key);
}

function setConnectionState(
nextState: ConnectionState,
nextErrorMessage: string | null = errorMessage,
) {
connectionState = nextState;
errorMessage = nextErrorMessage;
// Invalidate all cached snapshots since connectionState changed
snapshotByAgent.clear();
notifyListeners();
}

Expand All @@ -60,13 +86,32 @@ function appendAgentEvent(agentPubkey: string, event: ObserverEvent) {
return;
}

const next = [...current, event].sort(compareObserverEvents);
eventsByAgent.set(
key,
next.length > MAX_OBSERVER_EVENTS
? next.slice(next.length - MAX_OBSERVER_EVENTS)
: next,
);
const sorted = [...current, event].sort(compareObserverEvents);
const trimmed = sorted.length > MAX_OBSERVER_EVENTS;
const final = trimmed
? sorted.slice(sorted.length - MAX_OBSERVER_EVENTS)
: sorted;
eventsByAgent.set(key, final);

// Determine whether the new event landed at the end of the sorted array.
// If it did (common case), we can incrementally process just this event.
// If not (out-of-order arrival) or if we trimmed, fall back to full rebuild.
const eventAtEnd = sorted[sorted.length - 1] === event;

if (eventAtEnd && !trimmed) {
// Fast path: incremental update
const transcriptState =
transcriptByAgent.get(key) ?? createEmptyTranscriptState();
const updatedTranscript = processTranscriptEvent(transcriptState, event);
transcriptByAgent.set(key, updatedTranscript);
} else {
// Slow path: full rebuild (out-of-order insertion or trim fired)
transcriptByAgent.set(key, buildTranscriptState(final));
}

// Invalidate cached snapshot for this agent
invalidateSnapshot(key);

notifyListeners();
}

Expand Down Expand Up @@ -188,13 +233,40 @@ export function subscribeAgentObserverStore(listener: () => void) {
}

export function getAgentObserverSnapshot(
agentPubkey: string,
agentPubkey?: string | null,
enabled?: boolean,
): ObserverSnapshot {
return {
if (!enabled || !agentPubkey) {
return IDLE_SNAPSHOT;
}
const key = normalizePubkey(agentPubkey);
const cached = snapshotByAgent.get(key);
if (
cached &&
cached.connectionState === connectionState &&
cached.errorMessage === errorMessage
) {
return cached;
}
const snapshot: ObserverSnapshot = {
connectionState,
errorMessage,
events: eventsByAgent.get(normalizePubkey(agentPubkey)) ?? [],
events: eventsByAgent.get(key) ?? [],
};
snapshotByAgent.set(key, snapshot);
return snapshot;
}

export function getAgentTranscript(
agentPubkey?: string | null,
enabled?: boolean,
): TranscriptItem[] {
if (!enabled || !agentPubkey) {
return EMPTY_TRANSCRIPT;
}
const key = normalizePubkey(agentPubkey);
const state = transcriptByAgent.get(key);
return state?.items ?? EMPTY_TRANSCRIPT;
}

export function useManagedAgentObserverBridge(agents: readonly ManagedAgent[]) {
Expand Down Expand Up @@ -229,6 +301,8 @@ export function resetAgentObserverStore() {
startPromise = null;
eventProcessingQueue = Promise.resolve();
eventsByAgent.clear();
transcriptByAgent.clear();
snapshotByAgent.clear();
knownAgentPubkeys.clear();
connectionState = "idle";
errorMessage = null;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import * as React from "react";
import { Bot, Brain, ChevronDown, Radio, TerminalSquare } from "lucide-react";

import { cn } from "@/shared/lib/cn";
Expand Down Expand Up @@ -42,7 +43,7 @@ export function AgentSessionTranscriptList({
);
}

function TranscriptItemView({
const TranscriptItemView = React.memo(function TranscriptItemView({
agentName,
item,
}: {
Expand All @@ -62,7 +63,7 @@ function TranscriptItemView({
return <MetadataItem item={item} />;
}
return <LifecycleItem item={item} />;
}
});

function MessageItem({
agentName,
Expand Down
41 changes: 26 additions & 15 deletions desktop/src/features/agents/ui/ManagedAgentSessionPanel.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,8 @@ import type {
ObserverEvent,
TranscriptItem,
} from "./agentSessionTypes";
import { buildTranscript } from "./agentSessionTranscript";
import { shorten } from "./agentSessionUtils";
import { useObserverEvents } from "./useObserverEvents";
import { useObserverEvents, useAgentTranscript } from "./useObserverEvents";

type ManagedAgentSessionPanelProps = {
agent: ManagedAgent;
Expand All @@ -40,26 +39,38 @@ export function ManagedAgentSessionPanel({
showHeader = true,
showRaw = true,
}: ManagedAgentSessionPanelProps) {
const isRunning = agent.status === "running";
const { connectionState, errorMessage, events } = useObserverEvents(
agent.status === "running",
isRunning,
agent.pubkey,
);
const transcript = useAgentTranscript(isRunning, agent.pubkey);

// Filter transcript items by channelId (lightweight — items now carry channelId)
const scopedTranscript = React.useMemo(
() =>
channelId
? transcript.filter((item) => item.channelId === channelId)
: transcript,
[channelId, transcript],
);

// Filter raw events by channelId for the RawEventRail
const scopedEvents = React.useMemo(
() =>
channelId
? events.filter((event) => event.channelId === channelId)
: events,
[channelId, events],
);
const transcript = React.useMemo(
() => buildTranscript(scopedEvents),
[scopedEvents],
);
const latestSessionId = React.useMemo(
() =>
[...scopedEvents].reverse().find((event) => event.sessionId)?.sessionId,
[scopedEvents],
);

// Derive latestSessionId from channel-scoped events
const latestSessionId = React.useMemo(() => {
for (let i = scopedEvents.length - 1; i >= 0; i--) {
if (scopedEvents[i].sessionId) return scopedEvents[i].sessionId;
}
return null;
}, [scopedEvents]);

return (
<section
Expand All @@ -72,7 +83,7 @@ export function ManagedAgentSessionPanel({
<SessionHeader
connectionState={connectionState}
eventCount={scopedEvents.length}
hasObserver={agent.status === "running"}
hasObserver={isRunning}
latestSessionId={latestSessionId}
/>
) : null}
Expand All @@ -83,9 +94,9 @@ export function ManagedAgentSessionPanel({
emptyDescription={emptyDescription}
errorMessage={errorMessage}
events={scopedEvents}
hasObserver={agent.status === "running"}
hasObserver={isRunning}
showRaw={showRaw}
transcript={transcript}
transcript={scopedTranscript}
/>
</section>
);
Expand Down
Loading
Loading