From 49ba5f34d5fc920cd44ee88c50d4e3cf2d027475 Mon Sep 17 00:00:00 2001 From: John Fawcett Date: Thu, 19 Mar 2026 16:34:14 +0000 Subject: [PATCH 1/7] feat(gastown): expose getAgentEvents via tRPC Add AgentEventOutput schema and RpcAgentEventOutput wrapper to trpc/schemas.ts, then wire up a getAgentEvents gastownProcedure in trpc/router.ts that delegates to TownDO.getAgentEvents() with cursor-based pagination (afterId, limit). --- cloudflare-gastown/src/trpc/router.ts | 17 +++++++++++++++++ cloudflare-gastown/src/trpc/schemas.ts | 10 ++++++++++ 2 files changed, 27 insertions(+) diff --git a/cloudflare-gastown/src/trpc/router.ts b/cloudflare-gastown/src/trpc/router.ts index 45ee0260d0..87801f58f3 100644 --- a/cloudflare-gastown/src/trpc/router.ts +++ b/cloudflare-gastown/src/trpc/router.ts @@ -24,6 +24,7 @@ import { RpcBeadOutput, RpcAgentOutput, RpcBeadEventOutput, + RpcAgentEventOutput, RpcMayorSendResultOutput, RpcMayorStatusOutput, RpcStreamTicketOutput, @@ -905,6 +906,22 @@ export const gastownRouter = router({ }); }), + getAgentEvents: gastownProcedure + .input( + z.object({ + rigId: z.string().uuid(), + agentId: z.string().uuid(), + afterId: z.number().int().nonneg().optional(), + limit: z.number().int().positive().max(500).default(100), + }) + ) + .output(z.array(RpcAgentEventOutput)) + .query(async ({ ctx, input }) => { + const rig = await verifyRigOwnership(ctx.env, ctx.userId, input.rigId, ctx.orgMemberships); + const townStub = getTownDOStub(ctx.env, rig.town_id); + return townStub.getAgentEvents(input.agentId, input.afterId, input.limit); + }), + listConvoys: gastownProcedure .input( z.object({ diff --git a/cloudflare-gastown/src/trpc/schemas.ts b/cloudflare-gastown/src/trpc/schemas.ts index 65067ec784..a65e0dae45 100644 --- a/cloudflare-gastown/src/trpc/schemas.ts +++ b/cloudflare-gastown/src/trpc/schemas.ts @@ -66,6 +66,15 @@ export const AgentOutput = z.object({ agent_status_updated_at: z.string().nullable().optional().default(null), }); +// AgentEvent (output shape, after transforms) +export const AgentEventOutput = z.object({ + id: z.number(), + agent_id: z.string(), + event_type: z.string(), + data: z.record(z.string(), z.unknown()), + created_at: z.string(), +}); + // BeadEvent (output shape, after transforms) export const BeadEventOutput = z.object({ bead_event_id: z.string(), @@ -180,6 +189,7 @@ export const RpcRigOutput = rpcSafe(RigOutput); export const RpcBeadOutput = rpcSafe(BeadOutput); export const RpcAgentOutput = rpcSafe(AgentOutput); export const RpcBeadEventOutput = rpcSafe(BeadEventOutput); +export const RpcAgentEventOutput = rpcSafe(AgentEventOutput); export const RpcMayorSendResultOutput = rpcSafe(MayorSendResultOutput); export const RpcMayorStatusOutput = rpcSafe(MayorStatusOutput); export const RpcStreamTicketOutput = rpcSafe(StreamTicketOutput); From 14a48c8f3211da9d0df98a4df16bbf857b57c081 Mon Sep 17 00:00:00 2001 From: John Fawcett Date: Thu, 19 Mar 2026 18:57:51 +0000 Subject: [PATCH 2/7] feat(agent-do): add conversation reconstruction utility Adds reconstructConversation() that takes a sequence of AgentDO streaming events and reassembles them into clean { role, content } turns. Handles message.updated / message.completed (info payload), message_part.updated (both underscore and dot variants), tool-only turns, synthetic/ignored parts, parts arriving before message info, and malformed events. Supports configurable maxTurns truncation (default 50, most-recent kept). 21 unit tests covering happy path, edge cases, and truncation. --- .../src/util/reconstruct-conversation.util.ts | 254 ++++++++++++++ .../unit/reconstruct-conversation.test.ts | 320 ++++++++++++++++++ 2 files changed, 574 insertions(+) create mode 100644 cloudflare-gastown/src/util/reconstruct-conversation.util.ts create mode 100644 cloudflare-gastown/test/unit/reconstruct-conversation.test.ts diff --git a/cloudflare-gastown/src/util/reconstruct-conversation.util.ts b/cloudflare-gastown/src/util/reconstruct-conversation.util.ts new file mode 100644 index 0000000000..b9d6f4acd3 --- /dev/null +++ b/cloudflare-gastown/src/util/reconstruct-conversation.util.ts @@ -0,0 +1,254 @@ +/** + * Reconstructs a human-readable conversation transcript from a sequence of + * AgentDO streaming events. + * + * The SDK emits a stream of fine-grained events as an agent works. This + * utility reassembles those events into clean `{ role, content }` turns that + * can be injected into a prompt for context recovery after a container restart. + * + * Event types consumed: + * - `message.updated` / `message.completed` — carries the final Message + * object (UserMessage | AssistantMessage) in `data.info`. + * - `message_part.updated` — carries a streaming Part in `data.part`; + * TextParts are accumulated per messageID so assistant text is captured + * even when no `message.updated` event follows (e.g. mid-stream crash). + * + * The returned transcript is ordered chronologically and truncated to + * `maxTurns` (keeping the most recent turns). + */ + +import { z } from 'zod'; +import { type RigAgentEventRecord } from '../db/tables/rig-agent-events.table'; + +// ── Output type ──────────────────────────────────────────────────────────── + +export type ConversationTurn = { + role: 'user' | 'assistant'; + content: string; +}; + +// ── Zod schemas for the event data payloads ──────────────────────────────── +// We only validate the fields we actually use, using .passthrough() to ignore +// everything else. This makes the schemas resilient to schema evolution. + +const UserMessageSummary = z.object({ + title: z.string().optional(), + body: z.string().optional(), +}); + +const UserMessageInfo = z + .object({ + id: z.string(), + role: z.literal('user'), + // User text lives inside summary.body (generated later) or is not in the + // message object at all — the raw prompt is only visible in part events. + summary: UserMessageSummary.optional(), + }) + .passthrough(); + +const AssistantMessageInfo = z + .object({ + id: z.string(), + role: z.literal('assistant'), + error: z.record(z.string(), z.unknown()).optional(), + }) + .passthrough(); + +const MessageInfo = z.union([UserMessageInfo, AssistantMessageInfo]); + +// Payload of message.updated / message.completed events +const MessageEventData = z + .object({ + info: MessageInfo, + }) + .passthrough(); + +// TextPart from message_part.updated +const TextPartData = z + .object({ + id: z.string(), + messageID: z.string(), + type: z.literal('text'), + text: z.string(), + synthetic: z.boolean().optional(), + ignored: z.boolean().optional(), + }) + .passthrough(); + +// Minimal Part schema — we only care about TextPart; everything else is +// parsed as an unknown part so we can skip it gracefully. +const PartData = z.discriminatedUnion('type', [ + TextPartData, + z.object({ type: z.literal('reasoning'), messageID: z.string() }).passthrough(), + z.object({ type: z.literal('tool'), messageID: z.string() }).passthrough(), + z.object({ type: z.literal('file'), messageID: z.string() }).passthrough(), + z.object({ type: z.literal('step-start'), messageID: z.string() }).passthrough(), + z.object({ type: z.literal('step-finish'), messageID: z.string() }).passthrough(), + z.object({ type: z.literal('snapshot'), messageID: z.string() }).passthrough(), + z.object({ type: z.literal('patch'), messageID: z.string() }).passthrough(), + z.object({ type: z.literal('agent'), messageID: z.string() }).passthrough(), + z.object({ type: z.literal('retry'), messageID: z.string() }).passthrough(), + z.object({ type: z.literal('compaction'), messageID: z.string() }).passthrough(), + z.object({ type: z.literal('subtask'), messageID: z.string() }).passthrough(), +]); + +// Payload of message_part.updated events +const PartEventData = z + .object({ + part: PartData, + }) + .passthrough(); + +// ── Internal state during reconstruction ────────────────────────────────── + +type UserInfo = z.infer; +type AssistantInfo = z.infer; + +type MessageAccumulator = { + role: 'user' | 'assistant'; + // Latest snapshot of the message metadata (may be null if we only saw parts) + info: UserInfo | AssistantInfo | null; + // Text parts keyed by part id; stored in insertion order + textParts: Map; + // Whether this message had any non-text parts (tool calls, etc.) + hasNonTextParts: boolean; +}; + +// ── Main export ──────────────────────────────────────────────────────────── + +/** + * Reconstruct a conversation transcript from a flat list of AgentDO events. + * + * @param events Ordered sequence of `RigAgentEventRecord` rows from AgentDO. + * @param maxTurns Maximum number of turns to return. When the transcript + * exceeds this, the oldest turns are dropped so the most + * recent context is preserved. Pass `Infinity` to keep all. + * Defaults to 50. + * + * @returns Array of `{ role, content }` turns in chronological order. + */ +export function reconstructConversation( + events: RigAgentEventRecord[], + maxTurns = 50 +): ConversationTurn[] { + // messageId → accumulator, insertion-ordered + const messages = new Map(); + + for (const event of events) { + const { event_type, data } = event; + + if (event_type === 'message.updated' || event_type === 'message.completed') { + const parsed = MessageEventData.safeParse(data); + if (!parsed.success) continue; + + const { info } = parsed.data; + let acc = messages.get(info.id); + if (!acc) { + acc = { + role: info.role, + info, + textParts: new Map(), + hasNonTextParts: false, + }; + messages.set(info.id, acc); + } else { + // Update with the latest message metadata + acc.info = info; + acc.role = info.role; + } + } else if (event_type === 'message_part.updated' || event_type === 'message.part.updated') { + // The SDK emits both forms depending on version; handle both. + const parsed = PartEventData.safeParse(data); + if (!parsed.success) continue; + + const { part } = parsed.data; + const messageId = part.messageID; + + let acc = messages.get(messageId); + if (!acc) { + // We may see part events before the message.updated event — create a + // placeholder accumulator. Role will be filled in when we see the info. + acc = { + role: 'assistant', // default; corrected when message info arrives + info: null, + textParts: new Map(), + hasNonTextParts: false, + }; + messages.set(messageId, acc); + } + + if (part.type === 'text') { + // Skip synthetic / ignored parts (used for internal context injection) + if (part.synthetic || part.ignored) continue; + acc.textParts.set(part.id, part.text); + } else { + acc.hasNonTextParts = true; + } + } + } + + // ── Assemble turns ─────────────────────────────────────────────────────── + + const turns: ConversationTurn[] = []; + + for (const acc of messages.values()) { + const content = buildContent(acc); + if (content === null) continue; // skip tool-only or empty turns + turns.push({ role: acc.role, content }); + } + + // ── Truncate ───────────────────────────────────────────────────────────── + + if (turns.length > maxTurns) { + return turns.slice(turns.length - maxTurns); + } + + return turns; +} + +// ── Helpers ──────────────────────────────────────────────────────────────── + +function buildContent(acc: MessageAccumulator): string | null { + if (acc.role === 'user') { + return buildUserContent(acc); + } + return buildAssistantContent(acc); +} + +function buildUserContent(acc: MessageAccumulator): string | null { + // User messages: text parts hold the raw prompt text. + // If no text parts are present, fall back to summary.body if available. + const fromParts = joinTextParts(acc.textParts); + if (fromParts !== '') return fromParts; + + const summaryBody = extractSummaryBody(acc.info); + if (summaryBody) return summaryBody; + + // No text content found — user turn is unreadable (e.g. file-only message) + return null; +} + +function buildAssistantContent(acc: MessageAccumulator): string | null { + const fromParts = joinTextParts(acc.textParts); + if (fromParts !== '') return fromParts; + + // Assistant messages with only tool calls (no text) are tool-only turns. + // These are not meaningful for a human-readable transcript. + if (acc.hasNonTextParts) return null; + + // No content at all (e.g. message was created but never had parts — perhaps + // due to a crash mid-stream). Skip. + return null; +} + +function joinTextParts(parts: Map): string { + return [...parts.values()].join('').trim(); +} + +function extractSummaryBody(info: UserInfo | AssistantInfo | null): string | null { + if (!info || info.role !== 'user') return null; + // info.summary is typed as { title?: string; body?: string } | undefined + const parsed = UserMessageSummary.safeParse(info.summary); + if (!parsed.success) return null; + return parsed.data.body ?? null; +} diff --git a/cloudflare-gastown/test/unit/reconstruct-conversation.test.ts b/cloudflare-gastown/test/unit/reconstruct-conversation.test.ts new file mode 100644 index 0000000000..7daa365e35 --- /dev/null +++ b/cloudflare-gastown/test/unit/reconstruct-conversation.test.ts @@ -0,0 +1,320 @@ +import { describe, it, expect } from 'vitest'; +import { + reconstructConversation, + type ConversationTurn, +} from '../../src/util/reconstruct-conversation.util'; +import { type RigAgentEventRecord } from '../../src/db/tables/rig-agent-events.table'; + +// ── Test fixtures ────────────────────────────────────────────────────────── + +let nextId = 1; + +function makeEvent( + event_type: string, + data: Record +): RigAgentEventRecord { + return { + id: nextId++, + agent_id: 'test-agent', + event_type, + data, + created_at: new Date().toISOString(), + }; +} + +function messageUpdated(id: string, role: 'user' | 'assistant', extra: Record = {}) { + return makeEvent('message.updated', { + sessionID: 'sess-1', + info: { id, role, sessionID: 'sess-1', ...extra }, + }); +} + +function messageCompleted(id: string, role: 'user' | 'assistant', extra: Record = {}) { + return makeEvent('message.completed', { + sessionID: 'sess-1', + info: { id, role, sessionID: 'sess-1', ...extra }, + }); +} + +function textPartUpdated(messageID: string, partId: string, text: string, extra: Record = {}) { + return makeEvent('message_part.updated', { + sessionID: 'sess-1', + part: { id: partId, messageID, type: 'text', text, ...extra }, + }); +} + +function toolPartUpdated(messageID: string, partId: string) { + return makeEvent('message_part.updated', { + sessionID: 'sess-1', + part: { + id: partId, + messageID, + type: 'tool', + callID: 'call-1', + tool: 'bash', + state: { status: 'completed', input: {}, output: 'ok', title: 'bash', metadata: {}, time: { start: 1, end: 2 } }, + }, + }); +} + +// ── Tests ────────────────────────────────────────────────────────────────── + +describe('reconstructConversation', () => { + describe('basic happy path', () => { + it('reconstructs a simple user → assistant exchange', () => { + const events = [ + textPartUpdated('msg-u1', 'part-u1', 'Hello, world!'), + messageUpdated('msg-u1', 'user'), + textPartUpdated('msg-a1', 'part-a1', 'Hi there!'), + messageCompleted('msg-a1', 'assistant'), + ]; + + const turns = reconstructConversation(events); + + expect(turns).toEqual([ + { role: 'user', content: 'Hello, world!' }, + { role: 'assistant', content: 'Hi there!' }, + ]); + }); + + it('concatenates multiple text parts in a single assistant message', () => { + const events = [ + textPartUpdated('msg-a1', 'part-1', 'First '), + textPartUpdated('msg-a1', 'part-2', 'second '), + textPartUpdated('msg-a1', 'part-3', 'third'), + messageCompleted('msg-a1', 'assistant'), + ]; + + const turns = reconstructConversation(events); + + expect(turns).toHaveLength(1); + expect(turns[0]).toEqual({ role: 'assistant', content: 'First second third' }); + }); + + it('uses the most recent text for a part when updated multiple times', () => { + // Part events carry the full accumulated text on each update + const events = [ + textPartUpdated('msg-a1', 'part-1', 'Hel'), + textPartUpdated('msg-a1', 'part-1', 'Hello'), + textPartUpdated('msg-a1', 'part-1', 'Hello world'), + messageCompleted('msg-a1', 'assistant'), + ]; + + const turns = reconstructConversation(events); + + expect(turns[0]?.content).toBe('Hello world'); + }); + + it('preserves message order (insertion order)', () => { + const events = [ + textPartUpdated('msg-u1', 'p-u1', 'Question'), + messageUpdated('msg-u1', 'user'), + textPartUpdated('msg-a1', 'p-a1', 'Answer'), + messageCompleted('msg-a1', 'assistant'), + textPartUpdated('msg-u2', 'p-u2', 'Follow-up'), + messageUpdated('msg-u2', 'user'), + textPartUpdated('msg-a2', 'p-a2', 'Follow answer'), + messageCompleted('msg-a2', 'assistant'), + ]; + + const turns = reconstructConversation(events); + + expect(turns.map(t => t.content)).toEqual(['Question', 'Answer', 'Follow-up', 'Follow answer']); + }); + }); + + describe('message event variants', () => { + it('handles message.completed in addition to message.updated', () => { + const events = [ + textPartUpdated('msg-a1', 'p1', 'Done.'), + makeEvent('message.completed', { + sessionID: 'sess-1', + info: { id: 'msg-a1', role: 'assistant', sessionID: 'sess-1' }, + }), + ]; + + const turns = reconstructConversation(events); + expect(turns).toEqual([{ role: 'assistant', content: 'Done.' }]); + }); + + it('handles message.part.updated (dot variant) as well as message_part.updated', () => { + const events = [ + makeEvent('message.part.updated', { + sessionID: 'sess-1', + part: { id: 'p1', messageID: 'msg-a1', type: 'text', text: 'dot variant' }, + }), + messageCompleted('msg-a1', 'assistant'), + ]; + + const turns = reconstructConversation(events); + expect(turns).toEqual([{ role: 'assistant', content: 'dot variant' }]); + }); + + it('accepts parts before the message info event arrives', () => { + // Parts may arrive before message.updated in the event stream + const events = [ + textPartUpdated('msg-a1', 'p1', 'early text'), + messageUpdated('msg-a1', 'assistant'), + ]; + + const turns = reconstructConversation(events); + expect(turns).toEqual([{ role: 'assistant', content: 'early text' }]); + }); + }); + + describe('edge cases', () => { + it('skips tool-only assistant turns (no text content)', () => { + const events = [ + toolPartUpdated('msg-a1', 'tool-p1'), + messageCompleted('msg-a1', 'assistant'), + ]; + + const turns = reconstructConversation(events); + expect(turns).toHaveLength(0); + }); + + it('includes assistant turns that mix text and tool calls', () => { + const events = [ + textPartUpdated('msg-a1', 'p-text', 'Let me run that for you.'), + toolPartUpdated('msg-a1', 'p-tool'), + messageCompleted('msg-a1', 'assistant'), + ]; + + const turns = reconstructConversation(events); + expect(turns).toHaveLength(1); + expect(turns[0]).toEqual({ role: 'assistant', content: 'Let me run that for you.' }); + }); + + it('skips synthetic text parts', () => { + const events = [ + textPartUpdated('msg-a1', 'p-synth', 'injected context', { synthetic: true }), + textPartUpdated('msg-a1', 'p-real', 'real reply'), + messageCompleted('msg-a1', 'assistant'), + ]; + + const turns = reconstructConversation(events); + expect(turns[0]?.content).toBe('real reply'); + }); + + it('skips ignored text parts', () => { + const events = [ + textPartUpdated('msg-a1', 'p-ignored', 'ignored text', { ignored: true }), + textPartUpdated('msg-a1', 'p-real', 'visible text'), + messageCompleted('msg-a1', 'assistant'), + ]; + + const turns = reconstructConversation(events); + expect(turns[0]?.content).toBe('visible text'); + }); + + it('skips assistant messages with no content at all', () => { + // message.updated with no parts — likely a partial/crashed session + const events = [messageUpdated('msg-a1', 'assistant')]; + + const turns = reconstructConversation(events); + expect(turns).toHaveLength(0); + }); + + it('skips user messages with no text and no summary', () => { + const events = [messageUpdated('msg-u1', 'user')]; + + const turns = reconstructConversation(events); + expect(turns).toHaveLength(0); + }); + + it('uses summary.body as fallback for user messages without text parts', () => { + const events = [ + messageUpdated('msg-u1', 'user', { + summary: { title: 'A question', body: 'What is 2+2?' }, + }), + ]; + + const turns = reconstructConversation(events); + expect(turns).toEqual([{ role: 'user', content: 'What is 2+2?' }]); + }); + + it('handles malformed events gracefully (non-object data)', () => { + const events: RigAgentEventRecord[] = [ + makeEvent('message.updated', { bad: 'no info field here' }), + textPartUpdated('msg-a1', 'p1', 'still works'), + messageCompleted('msg-a1', 'assistant'), + ]; + + const turns = reconstructConversation(events); + expect(turns).toEqual([{ role: 'assistant', content: 'still works' }]); + }); + + it('handles unknown event types gracefully', () => { + const events = [ + makeEvent('session.idle', { sessionID: 'sess-1' }), + makeEvent('agent.exited', { reason: 'completed' }), + textPartUpdated('msg-a1', 'p1', 'the answer'), + messageCompleted('msg-a1', 'assistant'), + ]; + + const turns = reconstructConversation(events); + expect(turns).toEqual([{ role: 'assistant', content: 'the answer' }]); + }); + + it('returns empty array for empty event list', () => { + expect(reconstructConversation([])).toEqual([]); + }); + }); + + describe('truncation', () => { + it('truncates to the most recent maxTurns turns', () => { + const events: RigAgentEventRecord[] = []; + for (let i = 0; i < 10; i++) { + events.push(textPartUpdated(`msg-u${i}`, `p-u${i}`, `question ${i}`)); + events.push(messageUpdated(`msg-u${i}`, 'user')); + events.push(textPartUpdated(`msg-a${i}`, `p-a${i}`, `answer ${i}`)); + events.push(messageCompleted(`msg-a${i}`, 'assistant')); + } + + const turns = reconstructConversation(events, 4); + + expect(turns).toHaveLength(4); + // Should keep the last 4 turns (turns 8 and 9 of 10) + expect(turns[0]?.content).toBe('question 8'); + expect(turns[1]?.content).toBe('answer 8'); + expect(turns[2]?.content).toBe('question 9'); + expect(turns[3]?.content).toBe('answer 9'); + }); + + it('does not truncate when turns <= maxTurns', () => { + const events = [ + textPartUpdated('msg-u1', 'p-u1', 'hi'), + messageUpdated('msg-u1', 'user'), + textPartUpdated('msg-a1', 'p-a1', 'hello'), + messageCompleted('msg-a1', 'assistant'), + ]; + + expect(reconstructConversation(events, 2)).toHaveLength(2); + expect(reconstructConversation(events, 10)).toHaveLength(2); + }); + + it('respects maxTurns of 1', () => { + const events = [ + textPartUpdated('msg-u1', 'p-u1', 'first'), + messageUpdated('msg-u1', 'user'), + textPartUpdated('msg-a1', 'p-a1', 'last'), + messageCompleted('msg-a1', 'assistant'), + ]; + + const turns = reconstructConversation(events, 1); + expect(turns).toEqual([{ role: 'assistant', content: 'last' }]); + }); + + it('uses default maxTurns of 50', () => { + const events: RigAgentEventRecord[] = []; + for (let i = 0; i < 60; i++) { + events.push(textPartUpdated(`msg-a${i}`, `p-a${i}`, `turn ${i}`)); + events.push(messageCompleted(`msg-a${i}`, 'assistant')); + } + + const turns = reconstructConversation(events); + expect(turns).toHaveLength(50); + expect(turns[49]?.content).toBe('turn 59'); + }); + }); +}); From a8cc1020717234a31614860523e62831ff24b83b Mon Sep 17 00:00:00 2001 From: John Fawcett Date: Thu, 19 Mar 2026 18:58:24 +0000 Subject: [PATCH 3/7] style: format reconstruct-conversation files with oxfmt --- .../unit/reconstruct-conversation.test.ts | 40 ++++++++++++++----- 1 file changed, 31 insertions(+), 9 deletions(-) diff --git a/cloudflare-gastown/test/unit/reconstruct-conversation.test.ts b/cloudflare-gastown/test/unit/reconstruct-conversation.test.ts index 7daa365e35..d62ffb3989 100644 --- a/cloudflare-gastown/test/unit/reconstruct-conversation.test.ts +++ b/cloudflare-gastown/test/unit/reconstruct-conversation.test.ts @@ -9,10 +9,7 @@ import { type RigAgentEventRecord } from '../../src/db/tables/rig-agent-events.t let nextId = 1; -function makeEvent( - event_type: string, - data: Record -): RigAgentEventRecord { +function makeEvent(event_type: string, data: Record): RigAgentEventRecord { return { id: nextId++, agent_id: 'test-agent', @@ -22,21 +19,34 @@ function makeEvent( }; } -function messageUpdated(id: string, role: 'user' | 'assistant', extra: Record = {}) { +function messageUpdated( + id: string, + role: 'user' | 'assistant', + extra: Record = {} +) { return makeEvent('message.updated', { sessionID: 'sess-1', info: { id, role, sessionID: 'sess-1', ...extra }, }); } -function messageCompleted(id: string, role: 'user' | 'assistant', extra: Record = {}) { +function messageCompleted( + id: string, + role: 'user' | 'assistant', + extra: Record = {} +) { return makeEvent('message.completed', { sessionID: 'sess-1', info: { id, role, sessionID: 'sess-1', ...extra }, }); } -function textPartUpdated(messageID: string, partId: string, text: string, extra: Record = {}) { +function textPartUpdated( + messageID: string, + partId: string, + text: string, + extra: Record = {} +) { return makeEvent('message_part.updated', { sessionID: 'sess-1', part: { id: partId, messageID, type: 'text', text, ...extra }, @@ -52,7 +62,14 @@ function toolPartUpdated(messageID: string, partId: string) { type: 'tool', callID: 'call-1', tool: 'bash', - state: { status: 'completed', input: {}, output: 'ok', title: 'bash', metadata: {}, time: { start: 1, end: 2 } }, + state: { + status: 'completed', + input: {}, + output: 'ok', + title: 'bash', + metadata: {}, + time: { start: 1, end: 2 }, + }, }, }); } @@ -119,7 +136,12 @@ describe('reconstructConversation', () => { const turns = reconstructConversation(events); - expect(turns.map(t => t.content)).toEqual(['Question', 'Answer', 'Follow-up', 'Follow answer']); + expect(turns.map(t => t.content)).toEqual([ + 'Question', + 'Answer', + 'Follow-up', + 'Follow answer', + ]); }); }); From 28f23e547e36b0c25739ed7ce37974d4548fa745 Mon Sep 17 00:00:00 2001 From: John Fawcett Date: Thu, 19 Mar 2026 19:12:33 +0000 Subject: [PATCH 4/7] feat(mayor): inject conversation transcript and checkpoint on re-dispatch MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When the Mayor container is dead and needs re-dispatch, reconstruct the prior conversation using reconstructConversation() and inject it into beadBody so the Mayor resumes with full context. Also fix the checkpoint: null bug in sendMayorMessage — now reads mayor.checkpoint from the agent record instead of always passing null. --- cloudflare-gastown/src/dos/Town.do.ts | 19 +++++++++++++++++-- 1 file changed, 17 insertions(+), 2 deletions(-) diff --git a/cloudflare-gastown/src/dos/Town.do.ts b/cloudflare-gastown/src/dos/Town.do.ts index 211639da78..f5ccd17910 100644 --- a/cloudflare-gastown/src/dos/Town.do.ts +++ b/cloudflare-gastown/src/dos/Town.do.ts @@ -52,6 +52,8 @@ import { getAgentDOStub } from './Agent.do'; import { getTownContainerStub } from './TownContainer.do'; import { writeEvent, type GastownEventData } from '../util/analytics.util'; +import { reconstructConversation } from '../util/reconstruct-conversation.util'; +import { RigAgentEventRecord } from '../db/tables/rig-agent-events.table'; import { BeadPriority } from '../types'; import type { TownConfig, @@ -1834,6 +1836,17 @@ export class TownDO extends DurableObject { } } + // Reconstruct the prior conversation so the Mayor resumes with full context. + const rawEvents = await this.getAgentEvents(mayor.id); + const priorEvents = RigAgentEventRecord.array().safeParse(rawEvents); + const priorTurns = priorEvents.success ? reconstructConversation(priorEvents.data) : []; + const priorTranscript = + priorTurns.length > 0 + ? priorTurns + .map(t => `[${t.role === 'user' ? 'User' : 'Assistant'}]: ${t.content}`) + .join('\n\n') + : ''; + const started = await dispatch.startAgentInContainer(this.env, this.ctx.storage, { townId, rigId: `mayor-${townId}`, @@ -1844,8 +1857,10 @@ export class TownDO extends DurableObject { identity: mayor.identity, beadId: '', beadTitle: message, - beadBody: '', - checkpoint: null, + beadBody: priorTranscript + ? `Prior conversation:\n\n${priorTranscript}` + : '', + checkpoint: mayor.checkpoint, gitUrl: rigConfig?.gitUrl ?? '', defaultBranch: rigConfig?.defaultBranch ?? 'main', kilocodeToken, From e523b2d5e3c4c9f4f9fa576881de9fa82236c37a Mon Sep 17 00:00:00 2001 From: John Fawcett Date: Thu, 19 Mar 2026 19:26:20 +0000 Subject: [PATCH 5/7] feat(polecat): inject prior session transcript on re-dispatch When a polecat is re-dispatched after a container restart, reconstruct the agent's prior session from AgentDO events and inject it as 'Prior conversation:...' in beadBody, matching the same pattern used for Mayor re-dispatch in sendMayorMessage(). This prevents duplicate work and gives the new container full context of what was done before. --- cloudflare-gastown/src/dos/Town.do.ts | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/cloudflare-gastown/src/dos/Town.do.ts b/cloudflare-gastown/src/dos/Town.do.ts index f5ccd17910..1a32b1c331 100644 --- a/cloudflare-gastown/src/dos/Town.do.ts +++ b/cloudflare-gastown/src/dos/Town.do.ts @@ -3112,6 +3112,20 @@ export class TownDO extends DurableObject { [timestamp, agent.id] ); + // Reconstruct the agent's prior session transcript and inject it on + // re-dispatch (after a container restart) so work isn't duplicated. + // The presence of prior events is the signal: a fresh container has none. + const rawEvents = await this.getAgentEvents(agent.id); + const priorEvents = RigAgentEventRecord.array().safeParse(rawEvents); + const priorTurns = priorEvents.success ? reconstructConversation(priorEvents.data) : []; + let beadBody = bead.body ?? ''; + if (priorTurns.length > 0) { + const priorTranscript = priorTurns + .map(t => `[${t.role === 'user' ? 'User' : 'Assistant'}]: ${t.content}`) + .join('\n\n'); + beadBody = `Prior conversation:\n\n${priorTranscript}`; + } + const started = await dispatch.startAgentInContainer(this.env, this.ctx.storage, { townId: this.townId, rigId, @@ -3122,7 +3136,7 @@ export class TownDO extends DurableObject { identity: agent.identity, beadId: bead.bead_id, beadTitle: bead.title, - beadBody: bead.body ?? '', + beadBody, checkpoint: agent.checkpoint, gitUrl: rigConfig.gitUrl, defaultBranch: rigConfig.defaultBranch, From 835a624cebcc1e260a1168ce16f61a387ed912d8 Mon Sep 17 00:00:00 2001 From: John Fawcett Date: Thu, 19 Mar 2026 19:26:50 +0000 Subject: [PATCH 6/7] style: apply formatter --- cloudflare-gastown/src/dos/Town.do.ts | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/cloudflare-gastown/src/dos/Town.do.ts b/cloudflare-gastown/src/dos/Town.do.ts index 1a32b1c331..1b83d5b4de 100644 --- a/cloudflare-gastown/src/dos/Town.do.ts +++ b/cloudflare-gastown/src/dos/Town.do.ts @@ -1857,9 +1857,7 @@ export class TownDO extends DurableObject { identity: mayor.identity, beadId: '', beadTitle: message, - beadBody: priorTranscript - ? `Prior conversation:\n\n${priorTranscript}` - : '', + beadBody: priorTranscript ? `Prior conversation:\n\n${priorTranscript}` : '', checkpoint: mayor.checkpoint, gitUrl: rigConfig?.gitUrl ?? '', defaultBranch: rigConfig?.defaultBranch ?? 'main', From 0e4a464aa122d3a47aee8b5e4200dda7f679ebc8 Mon Sep 17 00:00:00 2001 From: John Fawcett Date: Thu, 19 Mar 2026 19:30:35 +0000 Subject: [PATCH 7/7] fix(trpc): replace z.number().nonneg() with z.number().min(0) nonneg() does not exist in the installed version of zod; min(0) is the correct equivalent. --- cloudflare-gastown/src/trpc/router.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cloudflare-gastown/src/trpc/router.ts b/cloudflare-gastown/src/trpc/router.ts index 87801f58f3..d7851efb8e 100644 --- a/cloudflare-gastown/src/trpc/router.ts +++ b/cloudflare-gastown/src/trpc/router.ts @@ -911,7 +911,7 @@ export const gastownRouter = router({ z.object({ rigId: z.string().uuid(), agentId: z.string().uuid(), - afterId: z.number().int().nonneg().optional(), + afterId: z.number().int().min(0).optional(), limit: z.number().int().positive().max(500).default(100), }) )