From a2f2f58459623bb3c8cb519ea2c9e3831cdf9add Mon Sep 17 00:00:00 2001 From: Liu-KM Date: Thu, 16 Apr 2026 23:30:33 +0800 Subject: [PATCH 1/4] fix(hub): merge histories for duplicate agent sessions --- hub/src/sync/sessionCache.ts | 129 +++++++++++++++++++++--------- hub/src/sync/sessionModel.test.ts | 35 +++++--- hub/src/sync/syncEngine.ts | 1 + 3 files changed, 115 insertions(+), 50 deletions(-) diff --git a/hub/src/sync/sessionCache.ts b/hub/src/sync/sessionCache.ts index 902bda9a6..267f9f85f 100644 --- a/hub/src/sync/sessionCache.ts +++ b/hub/src/sync/sessionCache.ts @@ -11,6 +11,7 @@ export class SessionCache { private readonly lastBroadcastAtBySessionId: Map = new Map() private readonly todoBackfillAttemptedSessionIds: Set = new Set() private readonly deduplicateInProgress: Set = new Set() + private readonly deduplicatePending: Set = new Set() constructor( private readonly store: Store, @@ -406,6 +407,19 @@ export class SessionCache { } async mergeSessions(oldSessionId: string, newSessionId: string, namespace: string): Promise { + await this.mergeSessionData(oldSessionId, newSessionId, namespace, { deleteOldSession: true }) + } + + async mergeSessionHistory(oldSessionId: string, newSessionId: string, namespace: string): Promise { + await this.mergeSessionData(oldSessionId, newSessionId, namespace, { deleteOldSession: false }) + } + + private async mergeSessionData( + oldSessionId: string, + newSessionId: string, + namespace: string, + options: { deleteOldSession: boolean } + ): Promise { if (oldSessionId === newSessionId) { return } @@ -505,19 +519,26 @@ export class SessionCache { ) } - const deleted = this.store.sessions.deleteSession(oldSessionId, namespace) - if (!deleted) { - throw new Error('Failed to delete old session during merge') - } + if (options.deleteOldSession) { + const deleted = this.store.sessions.deleteSession(oldSessionId, namespace) + if (!deleted) { + throw new Error('Failed to delete old session during merge') + } - const existed = this.sessions.delete(oldSessionId) - if (existed) { - this.publisher.emit({ type: 'session-removed', sessionId: oldSessionId, namespace }) + const existed = this.sessions.delete(oldSessionId) + if (existed) { + this.publisher.emit({ type: 'session-removed', sessionId: oldSessionId, namespace }) + } + this.lastBroadcastAtBySessionId.delete(oldSessionId) + this.todoBackfillAttemptedSessionIds.delete(oldSessionId) + } else { + this.refreshSession(oldSessionId) } - this.lastBroadcastAtBySessionId.delete(oldSessionId) - this.todoBackfillAttemptedSessionIds.delete(oldSessionId) - this.refreshSession(newSessionId) + const refreshed = this.refreshSession(newSessionId) + if (refreshed) { + this.publisher.emit({ type: 'session-updated', sessionId: newSessionId, data: refreshed }) + } } private mergeSessionMetadata(oldMetadata: unknown | null, newMetadata: unknown | null): unknown | null { @@ -605,44 +626,72 @@ export class SessionCache { const agentId = this.extractAgentSessionId(session.metadata) if (!agentId) return - // Guard: skip if another dedup for this agent ID is already in progress. - // A skipped trigger is acceptable — the web-side display dedup hides any remaining duplicates. - if (this.deduplicateInProgress.has(agentId.value)) return + // Guard: if another dedup for this agent ID is already in progress, + // coalesce this trigger and run one more pass afterwards. This matters + // for active duplicates: a session can become inactive while the first + // pass is only allowed to move history, and the follow-up pass should + // then be allowed to delete the inactive duplicate record. + if (this.deduplicateInProgress.has(agentId.value)) { + this.deduplicatePending.add(agentId.value) + return + } this.deduplicateInProgress.add(agentId.value) try { - const candidates: { id: string; session: Session }[] = [{ id: sessionId, session }] - for (const [existingId, existing] of this.sessions) { - if (existingId === sessionId) continue - if (existing.namespace !== session.namespace) continue - if (!existing.metadata) continue - if (existing.metadata[agentId.field] !== agentId.value) continue - // Only merge inactive duplicates. Active ones still have a live CLI socket - // whose keepalive/messages would fail if we deleted their session record. - // The web-side display dedup hides active duplicates from the UI. - if (existing.active) continue - candidates.push({ id: existingId, session: existing }) - } + do { + this.deduplicatePending.delete(agentId.value) - if (candidates.length <= 1) return + const currentSession = this.sessions.get(sessionId) + const candidates: { id: string; session: Session }[] = [] + if (currentSession?.metadata && currentSession.metadata[agentId.field] === agentId.value) { + candidates.push({ id: sessionId, session: currentSession }) + } + for (const [existingId, existing] of this.sessions) { + if (existingId === sessionId) continue + if (existing.namespace !== session.namespace) continue + if (!existing.metadata) continue + if (existing.metadata[agentId.field] !== agentId.value) continue + candidates.push({ id: existingId, session: existing }) + } - // Keep the most recent session as the merge target so newer state survives. - candidates.sort((a, b) => - (b.session.activeAt - a.session.activeAt) || (b.session.updatedAt - a.session.updatedAt) - ) - const targetId = candidates[0].id - const targetNamespace = candidates[0].session.namespace - - for (const { id } of candidates.slice(1)) { - if (id === targetId) continue - try { - await this.mergeSessions(id, targetId, targetNamespace) - } catch { - // best-effort: duplicate remains if merge fails + if (candidates.length <= 1) continue + + // Keep the same canonical session the sidebar is likely to show: + // active sessions win, then the most recently updated session wins. + // If timestamps tie, prefer the session that triggered this dedup run + // so callers can intentionally preserve the visible/resumed session. + candidates.sort((a, b) => { + if (a.session.active !== b.session.active) return a.session.active ? -1 : 1 + const updatedDelta = b.session.updatedAt - a.session.updatedAt + if (updatedDelta !== 0) return updatedDelta + if (a.id === sessionId) return -1 + if (b.id === sessionId) return 1 + return b.session.activeAt - a.session.activeAt + }) + const targetId = candidates[0].id + const targetNamespace = candidates[0].session.namespace + + for (const { id } of candidates.slice(1)) { + if (id === targetId) continue + try { + const candidate = this.sessions.get(id) + if (candidate?.active) { + // Keep the live session record/socket intact, but move its already + // persisted history into the visible dedup target. This preserves + // left-sidebar dedup while making resumed/restarted sessions show + // the full conversation history. + await this.mergeSessionHistory(id, targetId, targetNamespace) + } else { + await this.mergeSessions(id, targetId, targetNamespace) + } + } catch { + // best-effort: duplicate remains if merge fails + } } - } + } while (this.deduplicatePending.has(agentId.value)) } finally { this.deduplicateInProgress.delete(agentId.value) + this.deduplicatePending.delete(agentId.value) } } } diff --git a/hub/src/sync/sessionModel.test.ts b/hub/src/sync/sessionModel.test.ts index 8b68c560b..2ea09b74d 100644 --- a/hub/src/sync/sessionModel.test.ts +++ b/hub/src/sync/sessionModel.test.ts @@ -606,7 +606,7 @@ describe('session model', () => { expect(cache.getSession(s1.id)).toBeDefined() }) - it('does not merge active duplicates', async () => { + it('moves history from active duplicates without deleting their live session records', async () => { const store = new Store(':memory:') const events: SyncEvent[] = [] const cache = new SessionCache(store, createPublisher(events)) @@ -618,7 +618,7 @@ describe('session model', () => { 'default' ) - // Mark s1 as active (simulating a live CLI connection) + store.messages.addMessage(s1.id, { type: 'text', text: 'history from s1' }, 'local-s1') cache.handleSessionAlive({ sid: s1.id, time: Date.now(), thinking: false }) const s2 = cache.getOrCreateSession( @@ -627,12 +627,22 @@ describe('session model', () => { null, 'default' ) + store.messages.addMessage(s2.id, { type: 'text', text: 'history from s2' }, 'local-s2') + cache.handleSessionAlive({ sid: s2.id, time: Date.now() + 1000, thinking: false }) await cache.deduplicateByAgentSessionId(s2.id) - // s1 is active, so it should NOT be merged/deleted + // Both live session records stay around so their sockets/keepalives + // remain valid, but the older active session's persisted history is + // moved into the visible dedup target. expect(cache.getSession(s1.id)).toBeDefined() expect(cache.getSession(s2.id)).toBeDefined() + expect(store.messages.getMessages(s1.id, 100)).toHaveLength(0) + const targetMessages = store.messages.getMessages(s2.id, 100) + expect(targetMessages.map((message) => (message.content as { text?: string }).text)).toEqual([ + 'history from s1', + 'history from s2' + ]) }) it('merges duplicate after it becomes inactive via session-end', async () => { @@ -661,12 +671,11 @@ describe('session model', () => { // Mark s1 as active engine.handleSessionAlive({ sid: s1.id, time: Date.now() }) - // s1 is active, dedup from s2 should skip it + // s1 is active, so dedup keeps its live record around const events: SyncEvent[] = [] const cache = (engine as any).sessionCache as SessionCache await cache.deduplicateByAgentSessionId(s2.id) expect(cache.getSession(s1.id)).toBeDefined() - expect(cache.getSession(s2.id)).toBeDefined() // Now s1 ends — handleSessionEnd should trigger dedup retry engine.handleSessionEnd({ sid: s1.id, time: Date.now() }) @@ -701,16 +710,22 @@ describe('session model', () => { 'default' ) - // Mark s1 as active now - cache.handleSessionAlive({ sid: s1.id, time: Date.now() }) + // Mark both duplicates active. The older live record should keep + // existing while active, because its socket may still send keepalives. + const now = Date.now() + cache.handleSessionAlive({ sid: s1.id, time: now }) + cache.handleSessionAlive({ sid: s2.id, time: now }) - // s1 is active — dedup skips it + // s1 is active — dedup only moves history and keeps the record. await cache.deduplicateByAgentSessionId(s2.id) expect(cache.getSession(s1.id)).toBeDefined() + expect(cache.getSession(s2.id)).toBeDefined() - // Simulate time passing beyond the 30s timeout - const expired = cache.expireInactive(Date.now() + 60_000) + // Simulate only s1 passing beyond the 30s timeout. + cache.getSession(s1.id)!.activeAt = now - 31_000 + const expired = cache.expireInactive(now) expect(expired).toContain(s1.id) + expect(expired).not.toContain(s2.id) // Now s1 is inactive — dedup should merge it await cache.deduplicateByAgentSessionId(s2.id) diff --git a/hub/src/sync/syncEngine.ts b/hub/src/sync/syncEngine.ts index b4d391005..04ac09831 100644 --- a/hub/src/sync/syncEngine.ts +++ b/hub/src/sync/syncEngine.ts @@ -202,6 +202,7 @@ export class SyncEngine { collaborationMode?: CodexCollaborationMode }): void { this.sessionCache.handleSessionAlive(payload) + this.triggerDedupIfNeeded(payload.sid) } handleSessionEnd(payload: { sid: string; time: number }): void { From 9e74cd7709962a7683a69e82bdad83c200923faf Mon Sep 17 00:00:00 2001 From: Liu-KM Date: Mon, 20 Apr 2026 12:09:58 +0800 Subject: [PATCH 2/4] fix(hub,web): refresh active duplicate history merges --- hub/src/sync/sessionCache.ts | 29 +++++++++++++++++++++-------- hub/src/sync/sessionModel.test.ts | 19 +++++++++++++++++-- shared/src/schemas.ts | 3 +++ web/src/App.tsx | 10 +++++++++- 4 files changed, 50 insertions(+), 11 deletions(-) diff --git a/hub/src/sync/sessionCache.ts b/hub/src/sync/sessionCache.ts index 267f9f85f..12fb916f4 100644 --- a/hub/src/sync/sessionCache.ts +++ b/hub/src/sync/sessionCache.ts @@ -410,15 +410,23 @@ export class SessionCache { await this.mergeSessionData(oldSessionId, newSessionId, namespace, { deleteOldSession: true }) } - async mergeSessionHistory(oldSessionId: string, newSessionId: string, namespace: string): Promise { - await this.mergeSessionData(oldSessionId, newSessionId, namespace, { deleteOldSession: false }) + async mergeSessionHistory( + oldSessionId: string, + newSessionId: string, + namespace: string, + options: { mergeAgentState?: boolean } = {} + ): Promise { + await this.mergeSessionData(oldSessionId, newSessionId, namespace, { + deleteOldSession: false, + mergeAgentState: options.mergeAgentState ?? true + }) } private async mergeSessionData( oldSessionId: string, newSessionId: string, namespace: string, - options: { deleteOldSession: boolean } + options: { deleteOldSession: boolean; mergeAgentState?: boolean } ): Promise { if (oldSessionId === newSessionId) { return @@ -430,7 +438,10 @@ export class SessionCache { throw new Error('Session not found for merge') } - this.store.messages.mergeSessionMessages(oldSessionId, newSessionId) + const movedMessages = this.store.messages.mergeSessionMessages(oldSessionId, newSessionId) + if (movedMessages.moved > 0) { + this.publisher.emit({ type: 'messages-invalidated', sessionId: newSessionId, namespace }) + } const mergedMetadata = this.mergeSessionMetadata(oldStored.metadata, newStored.metadata) if (mergedMetadata !== null && mergedMetadata !== newStored.metadata) { @@ -490,10 +501,10 @@ export class SessionCache { } // Merge agentState: union requests/completedRequests from both sessions so pending - // approvals on the duplicate are not lost. Only inactive duplicates reach this point - // (active ones are skipped by deduplicateByAgentSessionId). + // approvals on inactive duplicates are not lost. Active duplicates keep their + // own agentState because permission approve/deny RPCs are routed by session id. // Read the latest target state right before writing to avoid overwriting live updates. - if (oldStored.agentState !== null) { + if ((options.mergeAgentState ?? true) && oldStored.agentState !== null) { for (let attempt = 0; attempt < 2; attempt += 1) { const latest = this.store.sessions.getSessionByNamespace(newSessionId, namespace) if (!latest) break @@ -680,7 +691,9 @@ export class SessionCache { // persisted history into the visible dedup target. This preserves // left-sidebar dedup while making resumed/restarted sessions show // the full conversation history. - await this.mergeSessionHistory(id, targetId, targetNamespace) + await this.mergeSessionHistory(id, targetId, targetNamespace, { + mergeAgentState: false + }) } else { await this.mergeSessions(id, targetId, targetNamespace) } diff --git a/hub/src/sync/sessionModel.test.ts b/hub/src/sync/sessionModel.test.ts index 2ea09b74d..8741b1510 100644 --- a/hub/src/sync/sessionModel.test.ts +++ b/hub/src/sync/sessionModel.test.ts @@ -614,7 +614,10 @@ describe('session model', () => { const s1 = cache.getOrCreateSession( 'tag-1', { path: '/tmp/project', host: 'localhost', flavor: 'codex', codexSessionId: 'thread-X' }, - null, + { + requests: { 'req-from-active-duplicate': { tool: 'Bash', arguments: {} } }, + completedRequests: {} + }, 'default' ) @@ -624,7 +627,10 @@ describe('session model', () => { const s2 = cache.getOrCreateSession( 'tag-2', { path: '/tmp/project', host: 'localhost', flavor: 'codex', codexSessionId: 'thread-X' }, - null, + { + requests: { 'req-from-target': { tool: 'Read', arguments: {} } }, + completedRequests: {} + }, 'default' ) store.messages.addMessage(s2.id, { type: 'text', text: 'history from s2' }, 'local-s2') @@ -643,6 +649,15 @@ describe('session model', () => { 'history from s1', 'history from s2' ]) + expect(events).toContainEqual({ type: 'messages-invalidated', sessionId: s2.id, namespace: 'default' }) + + // Active duplicates keep their own pending permission requests because + // approve/deny RPCs still route by the originating HAPI session id. + const sourceRequests = cache.getSession(s1.id)?.agentState?.requests ?? {} + const targetRequests = cache.getSession(s2.id)?.agentState?.requests ?? {} + expect(sourceRequests['req-from-active-duplicate']).toBeDefined() + expect(targetRequests['req-from-active-duplicate']).toBeUndefined() + expect(targetRequests['req-from-target']).toBeDefined() }) it('merges duplicate after it becomes inactive via session-end', async () => { diff --git a/shared/src/schemas.ts b/shared/src/schemas.ts index c802efe51..fa9361081 100644 --- a/shared/src/schemas.ts +++ b/shared/src/schemas.ts @@ -213,6 +213,9 @@ export const SyncEventSchema = z.discriminatedUnion('type', [ type: z.literal('message-received'), message: DecryptedMessageSchema }), + SessionChangedSchema.extend({ + type: z.literal('messages-invalidated') + }), MachineChangedSchema.extend({ type: z.literal('machine-updated'), data: z.unknown().optional() diff --git a/web/src/App.tsx b/web/src/App.tsx index 7ab0798c2..c04638dfc 100644 --- a/web/src/App.tsx +++ b/web/src/App.tsx @@ -229,7 +229,15 @@ function AppInner() { } }, []) - const handleSseEvent = useCallback(() => {}, []) + const handleSseEvent = useCallback((event: SyncEvent) => { + if (event.type !== 'messages-invalidated') { + return + } + if (!api || event.sessionId !== selectedSessionId) { + return + } + void fetchLatestMessages(api, event.sessionId) + }, [api, selectedSessionId]) const handleToast = useCallback((event: ToastEvent) => { addToast({ title: event.data.title, From 645b420c69d7f1125f3905149befe48bfd1a1551 Mon Sep 17 00:00:00 2001 From: Liu-KM Date: Mon, 20 Apr 2026 20:26:35 +0800 Subject: [PATCH 3/4] fix(hub): avoid active-active history merges --- hub/src/sync/sessionCache.ts | 11 ++++++ hub/src/sync/sessionModel.test.ts | 62 ++++++++++++++++++++++++++----- 2 files changed, 63 insertions(+), 10 deletions(-) diff --git a/hub/src/sync/sessionCache.ts b/hub/src/sync/sessionCache.ts index 12fb916f4..be42edf68 100644 --- a/hub/src/sync/sessionCache.ts +++ b/hub/src/sync/sessionCache.ts @@ -440,6 +440,9 @@ export class SessionCache { const movedMessages = this.store.messages.mergeSessionMessages(oldSessionId, newSessionId) if (movedMessages.moved > 0) { + if (!options.deleteOldSession) { + this.publisher.emit({ type: 'messages-invalidated', sessionId: oldSessionId, namespace }) + } this.publisher.emit({ type: 'messages-invalidated', sessionId: newSessionId, namespace }) } @@ -667,6 +670,14 @@ export class SessionCache { if (candidates.length <= 1) continue + const activeCandidates = candidates.filter(({ session }) => session.active) + if (activeCandidates.length > 1) { + // Do not move history between two live session ids. The web may + // intentionally keep the currently selected duplicate visible, + // and the hub does not know which active duplicate that is. + continue + } + // Keep the same canonical session the sidebar is likely to show: // active sessions win, then the most recently updated session wins. // If timestamps tie, prefer the session that triggered this dedup run diff --git a/hub/src/sync/sessionModel.test.ts b/hub/src/sync/sessionModel.test.ts index 8741b1510..37a9f3c97 100644 --- a/hub/src/sync/sessionModel.test.ts +++ b/hub/src/sync/sessionModel.test.ts @@ -606,7 +606,7 @@ describe('session model', () => { expect(cache.getSession(s1.id)).toBeDefined() }) - it('moves history from active duplicates without deleting their live session records', async () => { + it('does not move history while duplicate sessions are both active', async () => { const store = new Store(':memory:') const events: SyncEvent[] = [] const cache = new SessionCache(store, createPublisher(events)) @@ -638,25 +638,67 @@ describe('session model', () => { await cache.deduplicateByAgentSessionId(s2.id) - // Both live session records stay around so their sockets/keepalives - // remain valid, but the older active session's persisted history is - // moved into the visible dedup target. + // Both live session records keep their own histories until one of the + // duplicates becomes inactive. The web may still be showing either + // active session id, so the hub must not pick a canonical target yet. expect(cache.getSession(s1.id)).toBeDefined() expect(cache.getSession(s2.id)).toBeDefined() + expect(store.messages.getMessages(s1.id, 100).map((message) => (message.content as { text?: string }).text)).toEqual([ + 'history from s1' + ]) + expect(store.messages.getMessages(s2.id, 100).map((message) => (message.content as { text?: string }).text)).toEqual([ + 'history from s2' + ]) + expect(events.some((event) => event.type === 'messages-invalidated')).toBe(false) + + const sourceRequests = cache.getSession(s1.id)?.agentState?.requests ?? {} + const targetRequests = cache.getSession(s2.id)?.agentState?.requests ?? {} + expect(sourceRequests['req-from-active-duplicate']).toBeDefined() + expect(targetRequests['req-from-active-duplicate']).toBeUndefined() + expect(targetRequests['req-from-target']).toBeDefined() + }) + + it('invalidates both sessions for history-only merges', async () => { + const store = new Store(':memory:') + const events: SyncEvent[] = [] + const cache = new SessionCache(store, createPublisher(events)) + + const s1 = cache.getOrCreateSession( + 'tag-1', + { path: '/tmp/project', host: 'localhost', flavor: 'codex' }, + { + requests: { 'req-from-source': { tool: 'Bash', arguments: {} } }, + completedRequests: {} + }, + 'default' + ) + const s2 = cache.getOrCreateSession( + 'tag-2', + { path: '/tmp/project', host: 'localhost', flavor: 'codex' }, + { + requests: { 'req-from-target': { tool: 'Read', arguments: {} } }, + completedRequests: {} + }, + 'default' + ) + + store.messages.addMessage(s1.id, { type: 'text', text: 'history from s1' }, 'local-s1') + store.messages.addMessage(s2.id, { type: 'text', text: 'history from s2' }, 'local-s2') + + await cache.mergeSessionHistory(s1.id, s2.id, 'default', { mergeAgentState: false }) + expect(store.messages.getMessages(s1.id, 100)).toHaveLength(0) - const targetMessages = store.messages.getMessages(s2.id, 100) - expect(targetMessages.map((message) => (message.content as { text?: string }).text)).toEqual([ + expect(store.messages.getMessages(s2.id, 100).map((message) => (message.content as { text?: string }).text)).toEqual([ 'history from s1', 'history from s2' ]) + expect(events).toContainEqual({ type: 'messages-invalidated', sessionId: s1.id, namespace: 'default' }) expect(events).toContainEqual({ type: 'messages-invalidated', sessionId: s2.id, namespace: 'default' }) - // Active duplicates keep their own pending permission requests because - // approve/deny RPCs still route by the originating HAPI session id. const sourceRequests = cache.getSession(s1.id)?.agentState?.requests ?? {} const targetRequests = cache.getSession(s2.id)?.agentState?.requests ?? {} - expect(sourceRequests['req-from-active-duplicate']).toBeDefined() - expect(targetRequests['req-from-active-duplicate']).toBeUndefined() + expect(sourceRequests['req-from-source']).toBeDefined() + expect(targetRequests['req-from-source']).toBeUndefined() expect(targetRequests['req-from-target']).toBeDefined() }) From 544db41a43b20daf02c2c4fa1e6a94d00af73149 Mon Sep 17 00:00:00 2001 From: Liu-KM Date: Mon, 20 Apr 2026 20:39:09 +0800 Subject: [PATCH 4/4] fix(web): reset message window on history invalidation --- web/src/App.tsx | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/web/src/App.tsx b/web/src/App.tsx index c04638dfc..0045124e2 100644 --- a/web/src/App.tsx +++ b/web/src/App.tsx @@ -13,7 +13,7 @@ import { useViewportHeight } from '@/hooks/useViewportHeight' import { useVisibilityReporter } from '@/hooks/useVisibilityReporter' import { queryKeys } from '@/lib/query-keys' import { AppContextProvider } from '@/lib/app-context' -import { fetchLatestMessages } from '@/lib/message-window-store' +import { clearMessageWindow, fetchLatestMessages } from '@/lib/message-window-store' import { useAppGoBack } from '@/hooks/useAppGoBack' import { useTranslation } from '@/lib/use-translation' import { VoiceProvider } from '@/lib/voice-context' @@ -236,6 +236,7 @@ function AppInner() { if (!api || event.sessionId !== selectedSessionId) { return } + clearMessageWindow(event.sessionId) void fetchLatestMessages(api, event.sessionId) }, [api, selectedSessionId]) const handleToast = useCallback((event: ToastEvent) => {