Skip to content
33 changes: 30 additions & 3 deletions cloudflare-gastown/src/dos/Town.do.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ import { getAgentDOStub } from './Agent.do';
import { getTownContainerStub } from './TownContainer.do';

import { writeEvent, type GastownEventData } from '../util/analytics.util';
import { reconstructConversation } from '../util/reconstruct-conversation.util';
import { RigAgentEventRecord } from '../db/tables/rig-agent-events.table';
import { BeadPriority } from '../types';
import type {
TownConfig,
Expand Down Expand Up @@ -1834,6 +1836,17 @@ export class TownDO extends DurableObject<Env> {
}
}

// Reconstruct the prior conversation so the Mayor resumes with full context.
const rawEvents = await this.getAgentEvents(mayor.id);
const priorEvents = RigAgentEventRecord.array().safeParse(rawEvents);
const priorTurns = priorEvents.success ? reconstructConversation(priorEvents.data) : [];
const priorTranscript =
priorTurns.length > 0
? priorTurns
.map(t => `[${t.role === 'user' ? 'User' : 'Assistant'}]: ${t.content}`)
.join('\n\n')
: '';

const started = await dispatch.startAgentInContainer(this.env, this.ctx.storage, {
townId,
rigId: `mayor-${townId}`,
Expand All @@ -1844,8 +1857,8 @@ export class TownDO extends DurableObject<Env> {
identity: mayor.identity,
beadId: '',
beadTitle: message,
beadBody: '',
checkpoint: null,
beadBody: priorTranscript ? `Prior conversation:\n\n${priorTranscript}` : '',
checkpoint: mayor.checkpoint,
gitUrl: rigConfig?.gitUrl ?? '',
defaultBranch: rigConfig?.defaultBranch ?? 'main',
kilocodeToken,
Expand Down Expand Up @@ -3097,6 +3110,20 @@ export class TownDO extends DurableObject<Env> {
[timestamp, agent.id]
);

// Reconstruct the agent's prior session transcript and inject it on
// re-dispatch (after a container restart) so work isn't duplicated.
// The presence of prior events is the signal: a fresh container has none.
const rawEvents = await this.getAgentEvents(agent.id);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

WARNING: Prior transcript is not scoped to the current bead/session

getAgentEvents(agent.id) returns the agent's entire event log. Polecats are reused once they go idle, and refineries are singleton per rig, so after a later container restart this will replay transcript from previous beads into the next assignment. This needs a bead/session boundary or an event-log reset before reconstructing context.

const priorEvents = RigAgentEventRecord.array().safeParse(rawEvents);
const priorTurns = priorEvents.success ? reconstructConversation(priorEvents.data) : [];
let beadBody = bead.body ?? '';
if (priorTurns.length > 0) {
const priorTranscript = priorTurns
.map(t => `[${t.role === 'user' ? 'User' : 'Assistant'}]: ${t.content}`)
.join('\n\n');
beadBody = `Prior conversation:\n\n${priorTranscript}`;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

WARNING: This drops the bead body on resume

When prior turns exist, beadBody becomes only the reconstructed transcript, so the original bead description and acceptance criteria in bead.body disappear from the restart prompt. A restarted agent can lose the task details even though the transcript is restored.

}

const started = await dispatch.startAgentInContainer(this.env, this.ctx.storage, {
townId: this.townId,
rigId,
Expand All @@ -3107,7 +3134,7 @@ export class TownDO extends DurableObject<Env> {
identity: agent.identity,
beadId: bead.bead_id,
beadTitle: bead.title,
beadBody: bead.body ?? '',
beadBody,
checkpoint: agent.checkpoint,
gitUrl: rigConfig.gitUrl,
defaultBranch: rigConfig.defaultBranch,
Expand Down
17 changes: 17 additions & 0 deletions cloudflare-gastown/src/trpc/router.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import {
RpcBeadOutput,
RpcAgentOutput,
RpcBeadEventOutput,
RpcAgentEventOutput,
RpcMayorSendResultOutput,
RpcMayorStatusOutput,
RpcStreamTicketOutput,
Expand Down Expand Up @@ -905,6 +906,22 @@ export const gastownRouter = router({
});
}),

getAgentEvents: gastownProcedure
.input(
z.object({
rigId: z.string().uuid(),
agentId: z.string().uuid(),
afterId: z.number().int().min(0).optional(),
limit: z.number().int().positive().max(500).default(100),
})
)
.output(z.array(RpcAgentEventOutput))
.query(async ({ ctx, input }) => {
const rig = await verifyRigOwnership(ctx.env, ctx.userId, input.rigId, ctx.orgMemberships);
const townStub = getTownDOStub(ctx.env, rig.town_id);
return townStub.getAgentEvents(input.agentId, input.afterId, input.limit);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CRITICAL: Missing rig-to-agent authorization check

This endpoint authorizes input.rigId but then returns events for an arbitrary input.agentId. Because TownDO.getAgentEvents() simply proxies to the AgentDO by id, a caller who knows another agent UUID can read its full transcript without proving that agent belongs to the requested rig/town.

}),

listConvoys: gastownProcedure
.input(
z.object({
Expand Down
10 changes: 10 additions & 0 deletions cloudflare-gastown/src/trpc/schemas.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,15 @@ export const AgentOutput = z.object({
agent_status_updated_at: z.string().nullable().optional().default(null),
});

// AgentEvent (output shape, after transforms)
export const AgentEventOutput = z.object({
id: z.number(),
agent_id: z.string(),
event_type: z.string(),
data: z.record(z.string(), z.unknown()),
created_at: z.string(),
});

// BeadEvent (output shape, after transforms)
export const BeadEventOutput = z.object({
bead_event_id: z.string(),
Expand Down Expand Up @@ -180,6 +189,7 @@ export const RpcRigOutput = rpcSafe(RigOutput);
export const RpcBeadOutput = rpcSafe(BeadOutput);
export const RpcAgentOutput = rpcSafe(AgentOutput);
export const RpcBeadEventOutput = rpcSafe(BeadEventOutput);
export const RpcAgentEventOutput = rpcSafe(AgentEventOutput);
export const RpcMayorSendResultOutput = rpcSafe(MayorSendResultOutput);
export const RpcMayorStatusOutput = rpcSafe(MayorStatusOutput);
export const RpcStreamTicketOutput = rpcSafe(StreamTicketOutput);
Expand Down
254 changes: 254 additions & 0 deletions cloudflare-gastown/src/util/reconstruct-conversation.util.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,254 @@
/**
* Reconstructs a human-readable conversation transcript from a sequence of
* AgentDO streaming events.
*
* The SDK emits a stream of fine-grained events as an agent works. This
* utility reassembles those events into clean `{ role, content }` turns that
* can be injected into a prompt for context recovery after a container restart.
*
* Event types consumed:
* - `message.updated` / `message.completed` — carries the final Message
* object (UserMessage | AssistantMessage) in `data.info`.
* - `message_part.updated` — carries a streaming Part in `data.part`;
* TextParts are accumulated per messageID so assistant text is captured
* even when no `message.updated` event follows (e.g. mid-stream crash).
*
* The returned transcript is ordered chronologically and truncated to
* `maxTurns` (keeping the most recent turns).
*/

import { z } from 'zod';
import { type RigAgentEventRecord } from '../db/tables/rig-agent-events.table';

// ── Output type ────────────────────────────────────────────────────────────

export type ConversationTurn = {
role: 'user' | 'assistant';
content: string;
};

// ── Zod schemas for the event data payloads ────────────────────────────────
// We only validate the fields we actually use, using .passthrough() to ignore
// everything else. This makes the schemas resilient to schema evolution.

const UserMessageSummary = z.object({
title: z.string().optional(),
body: z.string().optional(),
});

const UserMessageInfo = z
.object({
id: z.string(),
role: z.literal('user'),
// User text lives inside summary.body (generated later) or is not in the
// message object at all — the raw prompt is only visible in part events.
summary: UserMessageSummary.optional(),
})
.passthrough();

const AssistantMessageInfo = z
.object({
id: z.string(),
role: z.literal('assistant'),
error: z.record(z.string(), z.unknown()).optional(),
})
.passthrough();

const MessageInfo = z.union([UserMessageInfo, AssistantMessageInfo]);

// Payload of message.updated / message.completed events
const MessageEventData = z
.object({
info: MessageInfo,
})
.passthrough();

// TextPart from message_part.updated
const TextPartData = z
.object({
id: z.string(),
messageID: z.string(),
type: z.literal('text'),
text: z.string(),
synthetic: z.boolean().optional(),
ignored: z.boolean().optional(),
})
.passthrough();

// Minimal Part schema — we only care about TextPart; everything else is
// parsed as an unknown part so we can skip it gracefully.
const PartData = z.discriminatedUnion('type', [
TextPartData,
z.object({ type: z.literal('reasoning'), messageID: z.string() }).passthrough(),
z.object({ type: z.literal('tool'), messageID: z.string() }).passthrough(),
z.object({ type: z.literal('file'), messageID: z.string() }).passthrough(),
z.object({ type: z.literal('step-start'), messageID: z.string() }).passthrough(),
z.object({ type: z.literal('step-finish'), messageID: z.string() }).passthrough(),
z.object({ type: z.literal('snapshot'), messageID: z.string() }).passthrough(),
z.object({ type: z.literal('patch'), messageID: z.string() }).passthrough(),
z.object({ type: z.literal('agent'), messageID: z.string() }).passthrough(),
z.object({ type: z.literal('retry'), messageID: z.string() }).passthrough(),
z.object({ type: z.literal('compaction'), messageID: z.string() }).passthrough(),
z.object({ type: z.literal('subtask'), messageID: z.string() }).passthrough(),
]);

// Payload of message_part.updated events
const PartEventData = z
.object({
part: PartData,
})
.passthrough();

// ── Internal state during reconstruction ──────────────────────────────────

type UserInfo = z.infer<typeof UserMessageInfo>;
type AssistantInfo = z.infer<typeof AssistantMessageInfo>;

type MessageAccumulator = {
role: 'user' | 'assistant';
// Latest snapshot of the message metadata (may be null if we only saw parts)
info: UserInfo | AssistantInfo | null;
// Text parts keyed by part id; stored in insertion order
textParts: Map<string, string>;
// Whether this message had any non-text parts (tool calls, etc.)
hasNonTextParts: boolean;
};

// ── Main export ────────────────────────────────────────────────────────────

/**
* Reconstruct a conversation transcript from a flat list of AgentDO events.
*
* @param events Ordered sequence of `RigAgentEventRecord` rows from AgentDO.
* @param maxTurns Maximum number of turns to return. When the transcript
* exceeds this, the oldest turns are dropped so the most
* recent context is preserved. Pass `Infinity` to keep all.
* Defaults to 50.
*
* @returns Array of `{ role, content }` turns in chronological order.
*/
export function reconstructConversation(
events: RigAgentEventRecord[],
maxTurns = 50
): ConversationTurn[] {
// messageId → accumulator, insertion-ordered
const messages = new Map<string, MessageAccumulator>();

for (const event of events) {
const { event_type, data } = event;

if (event_type === 'message.updated' || event_type === 'message.completed') {
const parsed = MessageEventData.safeParse(data);
if (!parsed.success) continue;

const { info } = parsed.data;
let acc = messages.get(info.id);
if (!acc) {
acc = {
role: info.role,
info,
textParts: new Map(),
hasNonTextParts: false,
};
messages.set(info.id, acc);
} else {
// Update with the latest message metadata
acc.info = info;
acc.role = info.role;
}
} else if (event_type === 'message_part.updated' || event_type === 'message.part.updated') {
// The SDK emits both forms depending on version; handle both.
const parsed = PartEventData.safeParse(data);
if (!parsed.success) continue;

const { part } = parsed.data;
const messageId = part.messageID;

let acc = messages.get(messageId);
if (!acc) {
// We may see part events before the message.updated event — create a
// placeholder accumulator. Role will be filled in when we see the info.
acc = {
role: 'assistant', // default; corrected when message info arrives
info: null,
textParts: new Map(),
hasNonTextParts: false,
};
messages.set(messageId, acc);
}

if (part.type === 'text') {
// Skip synthetic / ignored parts (used for internal context injection)
if (part.synthetic || part.ignored) continue;
acc.textParts.set(part.id, part.text);
} else {
acc.hasNonTextParts = true;
}
}
}

// ── Assemble turns ───────────────────────────────────────────────────────

const turns: ConversationTurn[] = [];

for (const acc of messages.values()) {
const content = buildContent(acc);
if (content === null) continue; // skip tool-only or empty turns
turns.push({ role: acc.role, content });
}

// ── Truncate ─────────────────────────────────────────────────────────────

if (turns.length > maxTurns) {
return turns.slice(turns.length - maxTurns);
}

return turns;
}

// ── Helpers ────────────────────────────────────────────────────────────────

function buildContent(acc: MessageAccumulator): string | null {
if (acc.role === 'user') {
return buildUserContent(acc);
}
return buildAssistantContent(acc);
}

function buildUserContent(acc: MessageAccumulator): string | null {
// User messages: text parts hold the raw prompt text.
// If no text parts are present, fall back to summary.body if available.
const fromParts = joinTextParts(acc.textParts);
if (fromParts !== '') return fromParts;

const summaryBody = extractSummaryBody(acc.info);
if (summaryBody) return summaryBody;

// No text content found — user turn is unreadable (e.g. file-only message)
return null;
}

function buildAssistantContent(acc: MessageAccumulator): string | null {
const fromParts = joinTextParts(acc.textParts);
if (fromParts !== '') return fromParts;

// Assistant messages with only tool calls (no text) are tool-only turns.
// These are not meaningful for a human-readable transcript.
if (acc.hasNonTextParts) return null;

// No content at all (e.g. message was created but never had parts — perhaps
// due to a crash mid-stream). Skip.
return null;
}

function joinTextParts(parts: Map<string, string>): string {
return [...parts.values()].join('').trim();
}

function extractSummaryBody(info: UserInfo | AssistantInfo | null): string | null {
if (!info || info.role !== 'user') return null;
// info.summary is typed as { title?: string; body?: string } | undefined
const parsed = UserMessageSummary.safeParse(info.summary);
if (!parsed.success) return null;
return parsed.data.body ?? null;
}
Loading
Loading