From 7b0693ede372b9014c30315ea4cda67be0bd5497 Mon Sep 17 00:00:00 2001 From: John Fawcett Date: Thu, 12 Mar 2026 16:57:11 -0500 Subject: [PATCH 1/2] feat(gastown): add cloud-native nudge system for reliable agent message delivery Replaces fire-and-hope mail push with a nudge queue that guarantees delivery. Adds agent_nudges table and TownDO nudge queue methods (queueNudge, getPendingNudges, acknowledgeNudge). Non-mayor agents enter idle-but-available state on session.idle instead of exiting, checking for pending nudges before terminating. Adds gt_nudge tool for inter-agent messaging, nudge() client method, and handleNudge worker endpoint. Patrol functions use queueNudge instead of sendMail for time-sensitive messages. Dashboard shows pending nudge counts. Closes #1032 --- cloudflare-gastown/container/plugin/client.ts | 30 +++ .../container/plugin/mayor-tools.test.ts | 31 +++ .../container/plugin/mayor-tools.ts | 29 +++ cloudflare-gastown/container/plugin/tools.ts | 29 ++- .../container/src/control-server.ts | 74 ++++++ .../container/src/process-manager.ts | 246 ++++++++++++++++-- .../src/db/tables/agent-nudges.table.ts | 44 ++++ cloudflare-gastown/src/dos/Town.do.ts | 199 +++++++++++++- cloudflare-gastown/src/dos/town/patrol.ts | 65 ++--- cloudflare-gastown/src/gastown.worker.ts | 16 ++ .../src/handlers/mayor-tools.handler.ts | 24 ++ .../src/handlers/rig-agents.handler.ts | 70 +++++ cloudflare-gastown/src/ui/dashboard.ui.ts | 89 ++++++- 13 files changed, 879 insertions(+), 67 deletions(-) create mode 100644 cloudflare-gastown/src/db/tables/agent-nudges.table.ts diff --git a/cloudflare-gastown/container/plugin/client.ts b/cloudflare-gastown/container/plugin/client.ts index fc8cd871c5..cb728fde8f 100644 --- a/cloudflare-gastown/container/plugin/client.ts +++ b/cloudflare-gastown/container/plugin/client.ts @@ -157,6 +157,17 @@ export class GastownClient { }); } + async nudge(input: { + target_agent_id: string; + message: string; + mode: 'wait-idle' | 'immediate' | 'queue'; + }): Promise<{ nudge_id: string }> { + return this.request<{ nudge_id: string }>(this.rigPath('/nudge'), { + method: 'POST', + body: JSON.stringify(input), + }); + } + async createEscalation(input: { title: string; body?: string; @@ -351,6 +362,25 @@ export class MayorGastownClient { }); } + async nudge(input: { + rig_id: string; + target_agent_id: string; + message: string; + mode: 'wait-idle' | 'immediate' | 'queue'; + }): Promise<{ nudge_id: string }> { + return this.request<{ nudge_id: string }>( + `${this.baseUrl}/api/towns/${this.townId}/rigs/${input.rig_id}/nudge`, + { + method: 'POST', + body: JSON.stringify({ + target_agent_id: input.target_agent_id, + message: input.message, + mode: input.mode, + }), + } + ); + } + async listConvoys(): Promise { return this.request(this.mayorPath('/convoys')); } diff --git a/cloudflare-gastown/container/plugin/mayor-tools.test.ts b/cloudflare-gastown/container/plugin/mayor-tools.test.ts index fe21e25673..7f7e299c7f 100644 --- a/cloudflare-gastown/container/plugin/mayor-tools.test.ts +++ b/cloudflare-gastown/container/plugin/mayor-tools.test.ts @@ -91,6 +91,7 @@ function makeFakeMayorClient(overrides: Partial = {}): Mayor listBeads: vi.fn<() => Promise>().mockResolvedValue([]), listAgents: vi.fn<() => Promise>().mockResolvedValue([]), sendMail: vi.fn().mockResolvedValue(undefined), + nudge: vi.fn<() => Promise<{ nudge_id: string }>>().mockResolvedValue({ nudge_id: 'nudge-1' }), slingBatch: vi.fn<() => Promise>().mockResolvedValue({ convoy: FAKE_CONVOY, beads: [ @@ -409,4 +410,34 @@ describe('mayor tools', () => { expect(client.startConvoy).toHaveBeenCalledWith('convoy-staged-1'); }); }); + + describe('gt_nudge', () => { + it('sends a nudge and returns the nudge_id', async () => { + const result = await tools.gt_nudge.execute( + { rig_id: 'rig-1', target_agent_id: 'agent-1', message: 'Wake up!' }, + CTX + ); + expect(result).toContain('nudge-1'); + expect(result).toContain('wait-idle'); + expect(client.nudge).toHaveBeenCalledWith({ + rig_id: 'rig-1', + target_agent_id: 'agent-1', + message: 'Wake up!', + mode: 'wait-idle', + }); + }); + + it('passes explicit mode through to the client', async () => { + await tools.gt_nudge.execute( + { rig_id: 'rig-1', target_agent_id: 'agent-2', message: 'Urgent!', mode: 'immediate' }, + CTX + ); + expect(client.nudge).toHaveBeenCalledWith({ + rig_id: 'rig-1', + target_agent_id: 'agent-2', + message: 'Urgent!', + mode: 'immediate', + }); + }); + }); }); diff --git a/cloudflare-gastown/container/plugin/mayor-tools.ts b/cloudflare-gastown/container/plugin/mayor-tools.ts index a0d5a2eec3..b7d833e76f 100644 --- a/cloudflare-gastown/container/plugin/mayor-tools.ts +++ b/cloudflare-gastown/container/plugin/mayor-tools.ts @@ -446,5 +446,34 @@ export function createMayorTools(client: MayorGastownClient) { return `UI action "${action.type}" broadcast to dashboard.`; }, }), + + gt_nudge: tool({ + description: + 'Send a real-time nudge to a polecat agent in any rig. Unlike gt_mail_send (which queues ' + + 'a formal persistent message), gt_nudge delivers immediately at the agent\'s next idle moment. ' + + 'Use this for time-sensitive coordination: wake up an agent, request a status check, ' + + 'or notify of a blocking issue.', + args: { + rig_id: tool.schema.string().describe('The UUID of the rig the target agent belongs to'), + target_agent_id: tool.schema.string().describe('UUID of the agent to nudge'), + message: tool.schema.string().describe('The message to deliver'), + mode: tool.schema + .enum(['wait-idle', 'immediate', 'queue']) + .describe( + 'Delivery mode: wait-idle (default) delivers at next idle moment; ' + + 'immediate injects mid-task; queue delivers with TTL' + ) + .optional(), + }, + async execute(args) { + const result = await client.nudge({ + rig_id: args.rig_id, + target_agent_id: args.target_agent_id, + message: args.message, + mode: args.mode ?? 'wait-idle', + }); + return `Nudge queued: ${result.nudge_id} (mode: ${args.mode ?? 'wait-idle'})`; + }, + }), }; } diff --git a/cloudflare-gastown/container/plugin/tools.ts b/cloudflare-gastown/container/plugin/tools.ts index c3769f5056..0df5d6ce67 100644 --- a/cloudflare-gastown/container/plugin/tools.ts +++ b/cloudflare-gastown/container/plugin/tools.ts @@ -217,7 +217,7 @@ export function createTools(client: GastownClient) { 'Emit a plain-language status update visible on the dashboard. ' + 'Call this when starting a new phase of work (e.g. "Installing dependencies", ' + '"Writing tests", "Fixing lint errors"). Write it as a brief sentence for a teammate, ' + - 'not a log line. Do NOT call this on every tool use — only at meaningful phase transitions.', + 'not a log line. Do NOT call this on every tool use â only at meaningful phase transitions.', args: { message: tool.schema .string() @@ -228,5 +228,32 @@ export function createTools(client: GastownClient) { return 'Status updated.'; }, }), + + gt_nudge: tool({ + description: + 'Send a real-time nudge to another agent. Unlike gt_mail_send (which queues a formal ' + + "persistent message), gt_nudge delivers immediately at the agent's next idle moment. " + + 'Use this for time-sensitive coordination: wake up an agent, request a status check, ' + + 'or notify of a blocking issue.', + args: { + target_agent_id: tool.schema.string().describe('UUID of the agent to nudge'), + message: tool.schema.string().describe('The message to deliver'), + mode: tool.schema + .enum(['wait-idle', 'immediate', 'queue']) + .describe( + 'Delivery mode: wait-idle (default) delivers at next idle moment; ' + + 'immediate injects mid-task; queue delivers with TTL' + ) + .optional(), + }, + async execute(args) { + const result = await client.nudge({ + target_agent_id: args.target_agent_id, + message: args.message, + mode: args.mode ?? 'wait-idle', + }); + return `Nudge queued: ${result.nudge_id} (mode: ${args.mode ?? 'wait-idle'})`; + }, + }), }; } diff --git a/cloudflare-gastown/container/src/control-server.ts b/cloudflare-gastown/container/src/control-server.ts index b2ff615ad4..af825d6987 100644 --- a/cloudflare-gastown/container/src/control-server.ts +++ b/cloudflare-gastown/container/src/control-server.ts @@ -392,6 +392,80 @@ app.post('/git/merge', async c => { return c.json(result, 202); }); +// GET /agents/:agentId/pending-nudges +// Proxies to the gastown worker to fetch undelivered nudges for an agent. +// Called by the process-manager when the agent goes idle. +app.get('/agents/:agentId/pending-nudges', async c => { + const { agentId } = c.req.param(); + const apiUrl = process.env.GASTOWN_API_URL; + const token = process.env.GASTOWN_CONTAINER_TOKEN ?? process.env.GASTOWN_SESSION_TOKEN; + const townId = process.env.GASTOWN_TOWN_ID; + const rigId = process.env.GASTOWN_RIG_ID; + + if (!apiUrl || !token || !townId || !rigId) { + return c.json({ error: 'Missing gastown configuration' }, 503); + } + + try { + const resp = await fetch( + `${apiUrl}/api/towns/${townId}/rigs/${rigId}/agents/${agentId}/pending-nudges`, + { + headers: { + Authorization: `Bearer ${token}`, + 'X-Gastown-Agent-Id': agentId, + 'X-Gastown-Rig-Id': rigId, + }, + } + ); + const body: unknown = await resp.json(); + return c.json(body, resp.status as 200); + } catch (err) { + const message = err instanceof Error ? err.message : String(err); + return c.json({ error: message }, 500); + } +}); + +// POST /agents/:agentId/nudge-delivered +// Marks a nudge as delivered via the gastown worker. +// Body: { nudge_id: string } +app.post('/agents/:agentId/nudge-delivered', async c => { + const { agentId } = c.req.param(); + const apiUrl = process.env.GASTOWN_API_URL; + const token = process.env.GASTOWN_CONTAINER_TOKEN ?? process.env.GASTOWN_SESSION_TOKEN; + const townId = process.env.GASTOWN_TOWN_ID; + const rigId = process.env.GASTOWN_RIG_ID; + + if (!apiUrl || !token || !townId || !rigId) { + return c.json({ error: 'Missing gastown configuration' }, 503); + } + + const body: unknown = await c.req.json().catch(() => null); + if (!body || typeof body !== 'object' || !('nudge_id' in body) || typeof body.nudge_id !== 'string') { + return c.json({ error: 'Missing or invalid nudge_id field' }, 400); + } + + try { + const resp = await fetch( + `${apiUrl}/api/towns/${townId}/rigs/${rigId}/agents/${agentId}/nudge-delivered`, + { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + Authorization: `Bearer ${token}`, + 'X-Gastown-Agent-Id': agentId, + 'X-Gastown-Rig-Id': rigId, + }, + body: JSON.stringify({ nudge_id: body.nudge_id }), + } + ); + const respBody: unknown = await resp.json(); + return c.json(respBody, resp.status as 200); + } catch (err) { + const message = err instanceof Error ? err.message : String(err); + return c.json({ error: message }, 500); + } +}); + // ── PTY proxy routes ────────────────────────────────────────────────── // Proxy PTY operations to the agent's internal SDK server. // The SDK server (kilo serve) exposes /pty/* routes on 127.0.0.1:. diff --git a/cloudflare-gastown/container/src/process-manager.ts b/cloudflare-gastown/container/src/process-manager.ts index cb42998aa8..ed848c7509 100644 --- a/cloudflare-gastown/container/src/process-manager.ts +++ b/cloudflare-gastown/container/src/process-manager.ts @@ -31,6 +31,8 @@ const sdkInstances = new Map(); const eventAbortControllers = new Map(); // Event sinks for WebSocket forwarding const eventSinks = new Set<(agentId: string, event: string, data: unknown) => void>(); +// Per-agent idle timers — fires exit when no nudges arrive +const idleTimers = new Map>(); let nextPort = 4096; const startTime = Date.now(); @@ -215,6 +217,174 @@ async function ensureSDKServer( } } +/** + * Zod schema for a single pending nudge returned by the gastown worker. + */ +const PendingNudge = z.object({ + nudge_id: z.string(), + message: z.string(), + mode: z.string(), + priority: z.string(), + source: z.string(), +}); + +const PendingNudgesResponse = z.object({ + success: z.boolean(), + data: z.array(PendingNudge), +}); + +/** + * Fetch pending nudges for an agent from the gastown worker. + * Returns the array (may be empty), or null on error. + */ +async function fetchPendingNudges( + agent: ManagedAgent +): Promise[] | null> { + const authToken = + process.env.GASTOWN_CONTAINER_TOKEN ?? + agent.gastownContainerToken ?? + agent.gastownSessionToken; + if (!agent.gastownApiUrl || !authToken || !agent.townId || !agent.rigId) return null; + + try { + const headers: Record = { + Authorization: `Bearer ${authToken}`, + 'X-Gastown-Agent-Id': agent.agentId, + 'X-Gastown-Rig-Id': agent.rigId, + }; + const resp = await fetch( + `${agent.gastownApiUrl}/api/towns/${agent.townId}/rigs/${agent.rigId}/agents/${agent.agentId}/pending-nudges`, + { headers } + ); + if (!resp.ok) { + console.warn( + `${MANAGER_LOG} fetchPendingNudges: non-ok status ${resp.status} for agent ${agent.agentId}` + ); + return null; + } + const raw: unknown = await resp.json(); + const parsed = PendingNudgesResponse.safeParse(raw); + if (!parsed.success) { + console.warn(`${MANAGER_LOG} fetchPendingNudges: unexpected response shape`, parsed.error.issues); + return null; + } + return parsed.data.data; + } catch (err) { + console.warn(`${MANAGER_LOG} fetchPendingNudges: error for agent ${agent.agentId}:`, err); + return null; + } +} + +/** + * Mark a nudge as delivered via the gastown worker. + */ +async function markNudgeDelivered(agent: ManagedAgent, nudgeId: string): Promise { + const authToken = + process.env.GASTOWN_CONTAINER_TOKEN ?? + agent.gastownContainerToken ?? + agent.gastownSessionToken; + if (!agent.gastownApiUrl || !authToken || !agent.townId || !agent.rigId) return; + + try { + const headers: Record = { + 'Content-Type': 'application/json', + Authorization: `Bearer ${authToken}`, + 'X-Gastown-Agent-Id': agent.agentId, + 'X-Gastown-Rig-Id': agent.rigId, + }; + await fetch( + `${agent.gastownApiUrl}/api/towns/${agent.townId}/rigs/${agent.rigId}/agents/${agent.agentId}/nudge-delivered`, + { + method: 'POST', + headers, + body: JSON.stringify({ nudge_id: nudgeId }), + } + ); + } catch (err) { + console.warn(`${MANAGER_LOG} markNudgeDelivered: error for nudge ${nudgeId}:`, err); + } +} + +/** + * Clear the idle timer for an agent (if any). + */ +function clearIdleTimer(agentId: string): void { + const timer = idleTimers.get(agentId); + if (timer !== undefined) { + clearTimeout(timer); + idleTimers.delete(agentId); + } +} + +/** + * Handle a session.idle event for a non-mayor agent. + * + * - Checks for pending nudges and injects the highest-priority one if found. + * - If no nudges are pending, starts (or restarts) an idle timeout that will + * exit the agent after AGENT_IDLE_TIMEOUT_MS (default 2 min). + * + * Returns true if the agent should continue (nudge injected or timer started), + * false if the agent should exit immediately (injection failed unrecoverably). + */ +async function handleIdleEvent( + agent: ManagedAgent, + onExit: () => void +): Promise { + const agentId = agent.agentId; + console.log(`${MANAGER_LOG} handleIdleEvent: checking nudges for agent ${agentId}`); + + const nudges = await fetchPendingNudges(agent); + + if (nudges === null) { + // Error fetching — treat as no nudges, start idle timer + console.warn(`${MANAGER_LOG} handleIdleEvent: could not fetch nudges for ${agentId}, starting idle timer`); + } else if (nudges.length > 0 && agent.status === 'running') { + // There is at least one pending nudge — inject the first (highest priority) + const nudge = nudges[0]; + console.log( + `${MANAGER_LOG} handleIdleEvent: injecting nudge ${nudge.nudge_id} (priority=${nudge.priority}) for agent ${agentId}` + ); + // Cancel any existing idle timer since the agent will keep working + clearIdleTimer(agentId); + try { + await sendMessage(agentId, nudge.message); + // Mark delivered (fire-and-forget is fine — best effort) + void markNudgeDelivered(agent, nudge.nudge_id); + } catch (err) { + console.warn( + `${MANAGER_LOG} handleIdleEvent: sendMessage failed for agent ${agentId} (status=${agent.status}), exiting:`, + err + ); + onExit(); + } + return; + } + + // No nudges (or fetch error) — (re)start the idle timeout + clearIdleTimer(agentId); + const timeoutMs = + process.env.AGENT_IDLE_TIMEOUT_MS !== undefined + ? Number(process.env.AGENT_IDLE_TIMEOUT_MS) + : 120_000; + + console.log( + `${MANAGER_LOG} handleIdleEvent: no nudges for ${agentId}, idle timeout in ${timeoutMs}ms` + ); + + idleTimers.set( + agentId, + setTimeout(() => { + idleTimers.delete(agentId); + if (agent.status === 'running') { + console.log( + `${MANAGER_LOG} handleIdleEvent: idle timeout fired for agent ${agentId}, exiting` + ); + onExit(); + } + }, timeoutMs) + ); +} + /** * Subscribe to SDK events for an agent's session and forward them. */ @@ -226,6 +396,32 @@ async function subscribeToEvents( const controller = new AbortController(); eventAbortControllers.set(agent.agentId, controller); + // Called when the agent should exit cleanly after idle timeout or nudge failure. + const exitAgent = () => { + if (agent.status !== 'running') return; + log.info('agent.exit', { + agentId: agent.agentId, + name: agent.name, + reason: 'completed', + exitReason: 'completed', + }); + agent.status = 'exited'; + agent.exitReason = 'completed'; + broadcastEvent(agent.agentId, 'agent.exited', { reason: 'completed' }); + void reportAgentCompleted(agent, 'completed'); + + // Release SDK session so the server can shut down when idle + const inst = sdkInstances.get(agent.workdir); + if (inst) { + inst.sessionCount--; + if (inst.sessionCount <= 0) { + inst.server.close(); + sdkInstances.delete(agent.workdir); + } + } + controller.abort(); + }; + try { console.log(`${MANAGER_LOG} Subscribing to events for agent ${agent.agentId}...`); const result = await client.event.subscribe(); @@ -267,34 +463,21 @@ async function subscribeToEvents( // Broadcast to WebSocket sinks broadcastEvent(agent.agentId, event.type ?? 'unknown', event.properties ?? {}); - // Detect completion. session.idle means "done processing this turn." - // Mayor agents are persistent — session.idle for them means "turn done," - // not "task finished." Only non-mayor agents exit on idle. - const isTerminal = event.type === 'session.idle' && request.role !== 'mayor'; - - if (isTerminal) { - log.info('agent.exit', { - agentId: agent.agentId, - name: agent.name, - reason: 'completed', - exitReason: 'completed', - }); - agent.status = 'exited'; - agent.exitReason = 'completed'; - broadcastEvent(agent.agentId, 'agent.exited', { reason: 'completed' }); - void reportAgentCompleted(agent, 'completed'); - - // Release SDK session so the server can shut down when idle - const inst = sdkInstances.get(agent.workdir); - if (inst) { - inst.sessionCount--; - if (inst.sessionCount <= 0) { - inst.server.close(); - sdkInstances.delete(agent.workdir); - } + if (event.type === 'session.idle') { + if (request.role === 'mayor') { + // Mayor agents are persistent — session.idle means "turn done", not exit. + continue; } - break; + // Non-mayor: check for pending nudges before deciding to exit. + // handleIdleEvent is async; we run it in the background so the event + // loop continues. The exitAgent callback will abort the stream if needed. + void handleIdleEvent(agent, exitAgent); + } else { + // Non-idle event means the agent resumed work — cancel any pending idle timer. + clearIdleTimer(agent.agentId); } + + if (controller.signal.aborted) break; } } catch (err) { if (!controller.signal.aborted) { @@ -303,6 +486,7 @@ async function subscribeToEvents( error: err instanceof Error ? err.message : String(err), }); if (agent.status === 'running') { + clearIdleTimer(agent.agentId); agent.status = 'failed'; agent.exitReason = 'Event stream error'; broadcastEvent(agent.agentId, 'agent.exited', { reason: 'stream error' }); @@ -320,6 +504,7 @@ async function subscribeToEvents( } } } finally { + clearIdleTimer(agent.agentId); eventAbortControllers.delete(agent.agentId); } } @@ -447,6 +632,9 @@ export async function stopAgent(agentId: string): Promise { agent.status = 'stopping'; + // Cancel any pending idle timer + clearIdleTimer(agentId); + // Abort event subscription const controller = eventAbortControllers.get(agentId); if (controller) controller.abort(); @@ -537,6 +725,12 @@ export function activeServerCount(): number { } export async function stopAll(): Promise { + // Cancel all idle timers + for (const [, timer] of idleTimers) { + clearTimeout(timer); + } + idleTimers.clear(); + // Abort all event subscriptions for (const [, controller] of eventAbortControllers) { controller.abort(); diff --git a/cloudflare-gastown/src/db/tables/agent-nudges.table.ts b/cloudflare-gastown/src/db/tables/agent-nudges.table.ts new file mode 100644 index 0000000000..ac246f2a3c --- /dev/null +++ b/cloudflare-gastown/src/db/tables/agent-nudges.table.ts @@ -0,0 +1,44 @@ +import { z } from 'zod'; +import { getTableFromZodSchema, getCreateTableQueryFromTable } from '../../util/table'; + +export const NudgeMode = z.enum(['wait-idle', 'immediate', 'queue']); +export type NudgeMode = z.output; + +export const NudgePriority = z.enum(['normal', 'urgent']); +export type NudgePriority = z.output; + +export const AgentNudgeRecord = z.object({ + nudge_id: z.string(), + agent_bead_id: z.string(), + message: z.string(), + mode: NudgeMode, + priority: NudgePriority, + source: z.string(), + created_at: z.string(), + delivered_at: z.string().nullable(), + expires_at: z.string().nullable(), +}); + +export type AgentNudgeRecord = z.output; + +export const agent_nudges = getTableFromZodSchema('agent_nudges', AgentNudgeRecord); + +export function createTableAgentNudges(): string { + return getCreateTableQueryFromTable(agent_nudges, { + nudge_id: `text primary key`, + agent_bead_id: `text not null`, + message: `text not null`, + mode: `text not null default 'wait-idle' check(mode in ('wait-idle', 'immediate', 'queue'))`, + priority: `text not null default 'normal' check(priority in ('normal', 'urgent'))`, + source: `text not null default 'system'`, + created_at: `text not null default (datetime('now'))`, + delivered_at: `text`, + expires_at: `text`, + }); +} + +export function getIndexesAgentNudges(): string[] { + return [ + `CREATE INDEX IF NOT EXISTS idx_agent_nudges_pending ON ${agent_nudges}(${agent_nudges.columns.agent_bead_id}) WHERE ${agent_nudges.columns.delivered_at} IS NULL`, + ]; +} diff --git a/cloudflare-gastown/src/dos/Town.do.ts b/cloudflare-gastown/src/dos/Town.do.ts index c37b34a968..4fafc408d9 100644 --- a/cloudflare-gastown/src/dos/Town.do.ts +++ b/cloudflare-gastown/src/dos/Town.do.ts @@ -41,6 +41,12 @@ import { review_metadata } from '../db/tables/review-metadata.table'; import { escalation_metadata } from '../db/tables/escalation-metadata.table'; import { convoy_metadata } from '../db/tables/convoy-metadata.table'; import { bead_dependencies } from '../db/tables/bead-dependencies.table'; +import { + agent_nudges, + AgentNudgeRecord, + createTableAgentNudges, + getIndexesAgentNudges, +} from '../db/tables/agent-nudges.table'; import { query } from '../util/query.util'; import { getAgentDOStub } from './Agent.do'; import { getTownContainerStub } from './TownContainer.do'; @@ -431,6 +437,12 @@ export class TownDO extends DurableObject { // Rig registry rigs.initRigTables(this.sql); + // Nudges + query(this.sql, createTableAgentNudges(), []); + for (const idx of getIndexesAgentNudges()) { + query(this.sql, idx, []); + } + // Ensure the alarm loop is running. After a deploy/restart, the // Cloudflare runtime normally delivers missed alarms, but if the alarm // was never set or was deleted by destroy(), the loop is dead. Re-arm @@ -1043,6 +1055,164 @@ export class TownDO extends DurableObject { return mail.checkMail(this.sql, agentId); } + // ══════════════════════════════════════════════════════════════════ + // Nudges + // ══════════════════════════════════════════════════════════════════ + + /** + * Queue a nudge for an agent. If mode is 'immediate', attempts to push + * the message directly via the container and marks it delivered on success. + * Returns the nudge_id. + */ + async queueNudge( + agentId: string, + message: string, + options?: { + mode?: 'wait-idle' | 'immediate' | 'queue'; + priority?: 'normal' | 'urgent'; + source?: string; + ttlSeconds?: number; + } + ): Promise { + await this.ensureInitialized(); + + const nudgeId = crypto.randomUUID(); + const mode = options?.mode ?? 'wait-idle'; + const priority = options?.priority ?? 'normal'; + const source = options?.source ?? 'system'; + + let expiresAt: string | null = null; + if (mode === 'queue' && options?.ttlSeconds != null) { + expiresAt = new Date(Date.now() + options.ttlSeconds * 1000).toISOString(); + } + + query( + this.sql, + /* sql */ ` + INSERT INTO ${agent_nudges} ( + ${agent_nudges.columns.nudge_id}, + ${agent_nudges.columns.agent_bead_id}, + ${agent_nudges.columns.message}, + ${agent_nudges.columns.mode}, + ${agent_nudges.columns.priority}, + ${agent_nudges.columns.source}, + ${agent_nudges.columns.expires_at} + ) VALUES (?, ?, ?, ?, ?, ?, ?) + `, + [nudgeId, agentId, message, mode, priority, source, expiresAt] + ); + + console.log( + `${TOWN_LOG} queueNudge: nudge_id=${nudgeId} agent=${agentId} mode=${mode} priority=${priority} source=${source}` + ); + + if (mode === 'immediate') { + const sent = await dispatch.sendMessageToAgent(this.env, this.townId, agentId, message); + if (sent) { + query( + this.sql, + /* sql */ ` + UPDATE ${agent_nudges} + SET ${agent_nudges.columns.delivered_at} = datetime('now') + WHERE ${agent_nudges.nudge_id} = ? + `, + [nudgeId] + ); + console.log(`${TOWN_LOG} queueNudge: immediate nudge delivered to agent=${agentId}`); + } else { + console.warn( + `${TOWN_LOG} queueNudge: immediate delivery failed for agent=${agentId}, nudge queued for retry` + ); + } + } + + return nudgeId; + } + + /** + * Return undelivered, non-expired nudges for an agent. + * Urgent nudges are returned first, then FIFO within same priority. + */ + async getPendingNudges( + agentId: string + ): Promise< + { nudge_id: string; message: string; mode: string; priority: string; source: string }[] + > { + await this.ensureInitialized(); + + const rows = [ + ...query( + this.sql, + /* sql */ ` + SELECT + ${agent_nudges.nudge_id}, + ${agent_nudges.message}, + ${agent_nudges.mode}, + ${agent_nudges.priority}, + ${agent_nudges.source} + FROM ${agent_nudges} + WHERE ${agent_nudges.agent_bead_id} = ? + AND ${agent_nudges.delivered_at} IS NULL + AND (${agent_nudges.expires_at} IS NULL OR ${agent_nudges.expires_at} > datetime('now')) + ORDER BY + CASE ${agent_nudges.priority} WHEN 'urgent' THEN 0 ELSE 1 END ASC, + ${agent_nudges.created_at} ASC + `, + [agentId] + ), + ]; + + return AgentNudgeRecord.pick({ + nudge_id: true, + message: true, + mode: true, + priority: true, + source: true, + }) + .array() + .parse(rows); + } + + /** Mark a nudge as delivered. */ + async markNudgeDelivered(nudgeId: string): Promise { + await this.ensureInitialized(); + + query( + this.sql, + /* sql */ ` + UPDATE ${agent_nudges} + SET ${agent_nudges.columns.delivered_at} = datetime('now') + WHERE ${agent_nudges.nudge_id} = ? + `, + [nudgeId] + ); + } + + /** + * Expire nudges whose expires_at has passed. + * Called from the alarm loop. Returns the count of nudges expired. + */ + async expireStaleNudges(): Promise { + await this.ensureInitialized(); + + const result = [ + ...query( + this.sql, + /* sql */ ` + UPDATE ${agent_nudges} + SET ${agent_nudges.columns.delivered_at} = datetime('now') + WHERE ${agent_nudges.expires_at} IS NOT NULL + AND ${agent_nudges.expires_at} < datetime('now') + AND ${agent_nudges.delivered_at} IS NULL + RETURNING ${agent_nudges.nudge_id} + `, + [] + ), + ]; + + return result.length; + } + // ══════════════════════════════════════════════════════════════════ // Review Queue & Molecules // ══════════════════════════════════════════════════════════════════ @@ -1289,16 +1459,16 @@ export class TownDO extends DurableObject { break; } case 'NUDGE': { - // Send a nudge message to the stuck agent - if (targetAgent) { - mail.sendMail(this.sql, { - from_agent_id: 'patrol', - to_agent_id: targetAgentId, - subject: 'TRIAGE_NUDGE', - body: - input.resolution_notes || + // Nudge the stuck agent — time-sensitive, deliver immediately + if (targetAgent && targetAgentId) { + this.queueNudge( + targetAgentId, + input.resolution_notes || 'The triage system has flagged you as potentially stuck. Please report your status.', - }); + { mode: 'immediate', source: 'triage', priority: 'urgent' } + ).catch(err => + console.warn(`${TOWN_LOG} resolveTriage: nudge failed for agent=${targetAgentId}:`, err) + ); this.emitEvent({ event: 'nudge.queued', townId: this.townId, @@ -2672,6 +2842,11 @@ export class TownDO extends DurableObject { } catch (err) { console.warn(`${TOWN_LOG} alarm: deliverPendingMail failed`, err); } + try { + await this.expireStaleNudges(); + } catch (err) { + console.warn(`${TOWN_LOG} alarm: expireStaleNudges failed`, err); + } try { await this.reEscalateStaleEscalations(); } catch (err) { @@ -3088,7 +3263,11 @@ export class TownDO extends DurableObject { ), ]); - const forceStopIds = patrol.detectGUPPViolations(this.sql, currentWorking); + const forceStopIds = patrol.detectGUPPViolations( + this.sql, + currentWorking, + (agentId, message, opts) => this.queueNudge(agentId, message, opts) + ); // Force-stop agents in the container (best-effort) for (const agentId of forceStopIds) { diff --git a/cloudflare-gastown/src/dos/town/patrol.ts b/cloudflare-gastown/src/dos/town/patrol.ts index f434a9a815..0d587fe4d5 100644 --- a/cloudflare-gastown/src/dos/town/patrol.ts +++ b/cloudflare-gastown/src/dos/town/patrol.ts @@ -11,10 +11,10 @@ import { z } from 'zod'; import { beads, BeadRecord as BeadRecordSchema } from '../../db/tables/beads.table'; import { agent_metadata, AgentMetadataRecord } from '../../db/tables/agent-metadata.table'; +import { agent_nudges } from '../../db/tables/agent-nudges.table'; import { bead_dependencies } from '../../db/tables/bead-dependencies.table'; import { convoy_metadata } from '../../db/tables/convoy-metadata.table'; import { query } from '../../util/query.util'; -import { sendMail } from './mail'; import { deleteAgent, getOrCreateAgent, hookBead, unhookBead } from './agents'; import { createBead, updateBeadStatus } from './beads'; @@ -163,12 +163,15 @@ export function createTriageRequest( /** * Tiered GUPP violation handling: - * - 30 min: send GUPP_CHECK mail (existing behavior) - * - 1h: escalate to mayor + * - 30 min: nudge agent with GUPP_CHECK warning + * - 1h: nudge agent with GUPP_ESCALATION, create triage request * - 2h: force-stop agent, create triage request for dirty polecat * * Returns agent IDs that were force-stopped (caller should stop them * in the container). + * + * The `queueNudge` callback sends a time-sensitive message to the agent. + * It is fire-and-forget (returns a Promise that the caller ignores). */ export function detectGUPPViolations( sql: SqlStorage, @@ -176,7 +179,12 @@ export function detectGUPPViolations( bead_id: string; current_hook_bead_id: string | null; last_activity_at: string | null; - }> + }>, + queueNudge: ( + agentId: string, + message: string, + opts: { mode: 'immediate'; source: string; priority: 'urgent' } + ) => Promise ): string[] { const nowMs = Date.now(); const forceStopIds: string[] = []; @@ -219,29 +227,28 @@ export function detectGUPPViolations( } else if (staleMs >= GUPP_ESCALATE_MS) { // Tier 2: create a triage request for the stuck agent. The triage // agent (or mayor, if escalated) will decide whether to restart, - // nudge, or force-stop. Also warn the stuck agent directly. + // nudge, or force-stop. Also nudge the stuck agent directly. const existingEsc = [ ...query( sql, /* sql */ ` - SELECT ${beads.bead_id} FROM ${beads} - WHERE ${beads.type} = 'message' - AND ${beads.assignee_agent_bead_id} = ? - AND ${beads.title} = 'GUPP_ESCALATION' - AND ${beads.status} = 'open' + SELECT ${agent_nudges.nudge_id} FROM ${agent_nudges} + WHERE ${agent_nudges.agent_bead_id} = ? + AND ${agent_nudges.source} = 'witness' + AND ${agent_nudges.message} LIKE '%GUPP_ESCALATION%' + AND ${agent_nudges.delivered_at} IS NULL LIMIT 1 `, [agent.bead_id] ), ]; if (existingEsc.length === 0) { - // Notify the stuck agent - sendMail(sql, { - from_agent_id: 'patrol', - to_agent_id: agent.bead_id, - subject: 'GUPP_ESCALATION', - body: `You have been inactive for ${Math.round(staleMs / 60_000)} minutes. This has been escalated. You will be force-stopped if inactivity continues.`, - }); + // Nudge the stuck agent — time-sensitive, deliver immediately + queueNudge( + agent.bead_id, + `GUPP_ESCALATION: You have been inactive for ${Math.round(staleMs / 60_000)} minutes. This has been escalated. You will be force-stopped if inactivity continues.`, + { mode: 'immediate', source: 'witness', priority: 'urgent' } + ).catch(() => {}); // Create a triage request so the triage agent (or mayor) is aware createTriageRequest(sql, { @@ -260,28 +267,28 @@ export function detectGUPPViolations( console.log(`${LOG} GUPP escalation: agent=${agent.bead_id}`); } } else if (staleMs >= GUPP_WARN_MS) { - // Tier 1: send warning mail (existing behavior, idempotent) + // Tier 1: nudge agent with GUPP_CHECK warning (idempotent) const existingGupp = [ ...query( sql, /* sql */ ` - SELECT ${beads.bead_id} FROM ${beads} - WHERE ${beads.type} = 'message' - AND ${beads.assignee_agent_bead_id} = ? - AND ${beads.title} = 'GUPP_CHECK' - AND ${beads.status} = 'open' + SELECT ${agent_nudges.nudge_id} FROM ${agent_nudges} + WHERE ${agent_nudges.agent_bead_id} = ? + AND ${agent_nudges.source} = 'witness' + AND ${agent_nudges.message} LIKE '%GUPP_CHECK%' + AND ${agent_nudges.delivered_at} IS NULL LIMIT 1 `, [agent.bead_id] ), ]; if (existingGupp.length === 0) { - sendMail(sql, { - from_agent_id: 'patrol', - to_agent_id: agent.bead_id, - subject: 'GUPP_CHECK', - body: 'You have had work hooked for 30+ minutes with no activity. Are you stuck? If so, call gt_escalate.', - }); + queueNudge( + agent.bead_id, + 'GUPP_CHECK: You have had work hooked for 30+ minutes with no activity. Are you stuck? If so, call gt_escalate.', + { mode: 'immediate', source: 'witness', priority: 'urgent' } + ).catch(() => {}); + console.log(`${LOG} GUPP warn: agent=${agent.bead_id} stale=${Math.round(staleMs / 60_000)}min`); } } } diff --git a/cloudflare-gastown/src/gastown.worker.ts b/cloudflare-gastown/src/gastown.worker.ts index 38f6b93cd4..8282446cdf 100644 --- a/cloudflare-gastown/src/gastown.worker.ts +++ b/cloudflare-gastown/src/gastown.worker.ts @@ -42,6 +42,9 @@ import { handleGetOrCreateAgent, handleDeleteAgent, handleUpdateAgentStatusMessage, + handleGetPendingNudges, + handleNudgeDelivered, + handleNudge, } from './handlers/rig-agents.handler'; import { handleSendMail } from './handlers/rig-mail.handler'; import { handleAppendAgentEvent, handleGetAgentEvents } from './handlers/rig-agent-events.handler'; @@ -110,6 +113,7 @@ import { handleMayorEscalationAcknowledge, handleMayorConvoyStart, handleMayorUiAction, + handleMayorGetPendingNudges, } from './handlers/mayor-tools.handler'; import { mayorAuthMiddleware } from './middleware/mayor-auth.middleware'; import { townAuthMiddleware } from './middleware/town-auth.middleware'; @@ -325,6 +329,15 @@ app.post('/api/towns/:townId/rigs/:rigId/agents/:agentId/status', c => handleUpdateAgentStatusMessage(c, c.req.param()) ) ); +app.get('/api/towns/:townId/rigs/:rigId/agents/:agentId/pending-nudges', c => + handleGetPendingNudges(c, c.req.param()) +); +app.post('/api/towns/:townId/rigs/:rigId/agents/:agentId/nudge-delivered', c => + handleNudgeDelivered(c, c.req.param()) +); + +// Agent-to-agent nudge: any authenticated agent can nudge another agent in the rig +app.post('/api/towns/:townId/rigs/:rigId/nudge', c => handleNudge(c, c.req.param())); // ── Agent Events ───────────────────────────────────────────────────────── @@ -658,6 +671,9 @@ app.post('/api/mayor/:townId/tools/ui-action', c => handleMayorUiAction(c, c.req.param()) ) ); +app.get('/api/mayor/:townId/tools/rigs/:rigId/agents/:agentId/pending-nudges', c => + handleMayorGetPendingNudges(c, c.req.param()) +); app.post('/api/mayor/:townId/tools/sling', c => instrumented(c, 'POST /api/mayor/:townId/tools/sling', () => handleMayorSling(c, c.req.param())) diff --git a/cloudflare-gastown/src/handlers/mayor-tools.handler.ts b/cloudflare-gastown/src/handlers/mayor-tools.handler.ts index 00ca78649f..6f778d45d0 100644 --- a/cloudflare-gastown/src/handlers/mayor-tools.handler.ts +++ b/cloudflare-gastown/src/handlers/mayor-tools.handler.ts @@ -336,6 +336,30 @@ export async function handleMayorListConvoys(c: Context, params: { t return c.json(resSuccess(convoys)); } +/** + * GET /api/mayor/:townId/tools/rigs/:rigId/agents/:agentId/pending-nudges + * Returns undelivered, non-expired nudges for the given agent. + * Allows the mayor to inspect an agent's nudge queue and decide whether to intervene. + */ +export async function handleMayorGetPendingNudges( + c: Context, + params: { townId: string; rigId: string; agentId: string } +) { + const rigOwned = await verifyRigBelongsToTown(c, params.townId, params.rigId); + if (!rigOwned) { + return c.json(resError('Rig not found in this town'), 403); + } + + console.log( + `${HANDLER_LOG} handleMayorGetPendingNudges: townId=${params.townId} rigId=${params.rigId} agentId=${params.agentId}` + ); + + const town = getTownDOStub(c.env, params.townId); + const nudges = await town.getPendingNudges(params.agentId); + + return c.json(resSuccess(nudges)); +} + /** * GET /api/mayor/:townId/tools/convoys/:convoyId * Detailed convoy status with per-bead breakdown. diff --git a/cloudflare-gastown/src/handlers/rig-agents.handler.ts b/cloudflare-gastown/src/handlers/rig-agents.handler.ts index 02813a7b56..c93235ba48 100644 --- a/cloudflare-gastown/src/handlers/rig-agents.handler.ts +++ b/cloudflare-gastown/src/handlers/rig-agents.handler.ts @@ -5,6 +5,7 @@ import { resSuccess, resError } from '../util/res.util'; import { parseJsonBody } from '../util/parse-json-body.util'; import { AgentRole, AgentStatus } from '../types'; import type { GastownEnv } from '../gastown.worker'; +import { getEnforcedAgentId } from '../middleware/auth.middleware'; const AGENT_LOG = '[rig-agents.handler]'; @@ -257,3 +258,72 @@ export async function handleDeleteAgent( await town.deleteAgent(params.agentId); return c.json(resSuccess({ deleted: true })); } + +/** + * Returns undelivered, non-expired nudges for the agent. + * Called by the container's process-manager when the agent goes idle. + */ +export async function handleGetPendingNudges( + c: Context, + params: { rigId: string; agentId: string } +) { + const townId = c.get('townId'); + const town = getTownDOStub(c.env, townId); + const nudges = await town.getPendingNudges(params.agentId); + return c.json(resSuccess(nudges)); +} + +const QueueNudgeBody = z.object({ + target_agent_id: z.string().min(1), + message: z.string().min(1), + mode: z.enum(['wait-idle', 'immediate', 'queue']).optional(), +}); + +const NudgeDeliveredBody = z.object({ + nudge_id: z.string().min(1), +}); + +/** + * Agent-facing endpoint: queues a nudge from one agent to another. + * The requesting agent's identity is taken from the auth token. + */ +export async function handleNudge(c: Context, params: { rigId: string }) { + const parsed = QueueNudgeBody.safeParse(await parseJsonBody(c)); + if (!parsed.success) { + return c.json( + { success: false, error: 'Invalid request body', issues: parsed.error.issues }, + 400 + ); + } + const sourceAgentId = getEnforcedAgentId(c) ?? 'unknown'; + console.log( + `${AGENT_LOG} handleNudge: rigId=${params.rigId} from=${sourceAgentId} target=${parsed.data.target_agent_id}` + ); + const townId = c.get('townId'); + const town = getTownDOStub(c.env, townId); + const nudgeId = await town.queueNudge(parsed.data.target_agent_id, parsed.data.message, { + mode: parsed.data.mode, + source: 'agent', + }); + return c.json(resSuccess({ nudge_id: nudgeId })); +} + +/** + * Marks a nudge as delivered after the container has injected it into the agent. + */ +export async function handleNudgeDelivered( + c: Context, + params: { rigId: string; agentId: string } +) { + const parsed = NudgeDeliveredBody.safeParse(await parseJsonBody(c)); + if (!parsed.success) { + return c.json( + { success: false, error: 'Invalid request body', issues: parsed.error.issues }, + 400 + ); + } + const townId = c.get('townId'); + const town = getTownDOStub(c.env, townId); + await town.markNudgeDelivered(parsed.data.nudge_id); + return c.json(resSuccess({ marked: true })); +} diff --git a/cloudflare-gastown/src/ui/dashboard.ui.ts b/cloudflare-gastown/src/ui/dashboard.ui.ts index adc783ce90..90c0c1d722 100644 --- a/cloudflare-gastown/src/ui/dashboard.ui.ts +++ b/cloudflare-gastown/src/ui/dashboard.ui.ts @@ -71,6 +71,12 @@ export function dashboardHtml(): string { .chip.active { background: #1f6feb33; border-color: #58a6ff; color: #58a6ff; } .chip .remove { margin-left: 4px; color: #484f58; font-size: 10px; } .chip .remove:hover { color: #f85149; } + .badge.nudge-count { background: #d29922aa; color: #e3b341; cursor: pointer; } + .badge.nudge-count:hover { background: #e3b34133; } + .nudge-list { margin-top: 6px; font-size: 11px; } + .nudge-item { border: 1px solid #21262d; border-radius: 4px; padding: 6px 8px; margin-bottom: 4px; } + .nudge-item .nudge-meta { color: #484f58; font-size: 10px; margin-bottom: 2px; } + .nudge-item .nudge-msg { color: #c9d1d9; word-break: break-word; } @@ -127,6 +133,30 @@ export function dashboardHtml(): string {
+
+

Nudges

+
+ + + + +
+
+ + + +
+
+ + +
+
+
+

Agent Actions

@@ -719,7 +749,7 @@ function renderAgents() { } function populateAgentSelects() { - const ids = ['mailFrom','mailTo','mailCheckAgent','actionAgent','rqAgent']; + const ids = ['mailFrom','mailTo','mailCheckAgent','actionAgent','rqAgent','nudgeFrom','nudgeTo','nudgeCheckAgent']; for (const id of ids) { const sel = el(id); const prev = sel.value; @@ -935,6 +965,63 @@ async function createEscalation() { if (r.ok) { el('escTitle').value = ''; el('escBody').value = ''; await loadBeads(); toast('Escalation created'); } } +// ── Nudges ────────────────────────────────────────────────────────── + +async function sendNudge() { + if (!rigId()) { toast('Set a Rig ID first', true); return; } + const from = el('nudgeFrom').value; + const to = el('nudgeTo').value; + const message = el('nudgeMessage').value.trim(); + const mode = el('nudgeMode').value; + if (!to || !message) { toast('Select target agent and enter message', true); return; } + const r = await api('POST', '/api/towns/' + townId() + '/rigs/' + rigId() + '/nudge', { + source_agent_id: from, + target_agent_id: to, + message, + mode, + }); + if (r.ok) { + el('nudgeMessage').value = ''; + toast('Nudge sent: ' + (r.data.data?.nudge_id ?? '')); + } +} + +async function checkPendingNudges() { + if (!rigId()) { toast('Set a Rig ID first', true); return; } + const agentId = el('nudgeCheckAgent').value; + if (!agentId) { toast('Select an agent', true); return; } + const r = await api('GET', '/api/towns/' + townId() + '/rigs/' + rigId() + '/agents/' + agentId + '/pending-nudges'); + if (!r.ok) return; + const nudges = r.data.data || []; + if (!nudges.length) { + el('nudgeResult').innerHTML = '

No pending nudges

'; + return; + } + let h = '
'; + for (const n of nudges) { + const preview = n.message.length > 80 ? n.message.slice(0, 80) + '…' : n.message; + h += '
' + + '
source: ' + esc(n.source) + ' | mode: ' + esc(n.mode) + ' | priority: ' + esc(n.priority) + ' | ' + esc(n.created_at ?? '') + '
' + + '
' + esc(preview) + '
' + + '
' + + '' + + '
' + + '
'; + } + h += '
'; + el('nudgeResult').innerHTML = h; +} + +async function deliverNudgeNow(agentId, message) { + if (!rigId()) { toast('Set a Rig ID first', true); return; } + const r = await api('POST', '/api/towns/' + townId() + '/rigs/' + rigId() + '/nudge', { + target_agent_id: agentId, + message: message, + mode: 'immediate', + }); + if (r.ok) toast('Nudge delivered: ' + (r.data.data?.nudge_id ?? '')); +} + // ── Town Container ────────────────────────────────────────────────── const CONTAINER_DIRECT_BASE = 'http://localhost:8080'; From 2193c27aff4ca3a84339ec4b5688773b68b88676 Mon Sep 17 00:00:00 2001 From: John Fawcett Date: Wed, 18 Mar 2026 15:58:45 +0000 Subject: [PATCH 2/2] fix(gastown): fix nudge expires_at format and GUPP dedupe window --- .../container/plugin/mayor-tools.ts | 2 +- .../container/src/control-server.ts | 7 +++++- .../container/src/process-manager.ts | 22 +++++++++---------- cloudflare-gastown/src/dos/Town.do.ts | 12 ++++++++-- cloudflare-gastown/src/dos/town/patrol.ts | 8 ++++--- .../src/handlers/rig-agents.handler.ts | 2 +- 6 files changed, 33 insertions(+), 20 deletions(-) diff --git a/cloudflare-gastown/container/plugin/mayor-tools.ts b/cloudflare-gastown/container/plugin/mayor-tools.ts index b7d833e76f..1cec0a37cf 100644 --- a/cloudflare-gastown/container/plugin/mayor-tools.ts +++ b/cloudflare-gastown/container/plugin/mayor-tools.ts @@ -450,7 +450,7 @@ export function createMayorTools(client: MayorGastownClient) { gt_nudge: tool({ description: 'Send a real-time nudge to a polecat agent in any rig. Unlike gt_mail_send (which queues ' + - 'a formal persistent message), gt_nudge delivers immediately at the agent\'s next idle moment. ' + + "a formal persistent message), gt_nudge delivers immediately at the agent's next idle moment. " + 'Use this for time-sensitive coordination: wake up an agent, request a status check, ' + 'or notify of a blocking issue.', args: { diff --git a/cloudflare-gastown/container/src/control-server.ts b/cloudflare-gastown/container/src/control-server.ts index af825d6987..8d77b35be0 100644 --- a/cloudflare-gastown/container/src/control-server.ts +++ b/cloudflare-gastown/container/src/control-server.ts @@ -440,7 +440,12 @@ app.post('/agents/:agentId/nudge-delivered', async c => { } const body: unknown = await c.req.json().catch(() => null); - if (!body || typeof body !== 'object' || !('nudge_id' in body) || typeof body.nudge_id !== 'string') { + if ( + !body || + typeof body !== 'object' || + !('nudge_id' in body) || + typeof body.nudge_id !== 'string' + ) { return c.json({ error: 'Missing or invalid nudge_id field' }, 400); } diff --git a/cloudflare-gastown/container/src/process-manager.ts b/cloudflare-gastown/container/src/process-manager.ts index ed848c7509..1b21f70ff2 100644 --- a/cloudflare-gastown/container/src/process-manager.ts +++ b/cloudflare-gastown/container/src/process-manager.ts @@ -241,9 +241,7 @@ async function fetchPendingNudges( agent: ManagedAgent ): Promise[] | null> { const authToken = - process.env.GASTOWN_CONTAINER_TOKEN ?? - agent.gastownContainerToken ?? - agent.gastownSessionToken; + process.env.GASTOWN_CONTAINER_TOKEN ?? agent.gastownContainerToken ?? agent.gastownSessionToken; if (!agent.gastownApiUrl || !authToken || !agent.townId || !agent.rigId) return null; try { @@ -265,7 +263,10 @@ async function fetchPendingNudges( const raw: unknown = await resp.json(); const parsed = PendingNudgesResponse.safeParse(raw); if (!parsed.success) { - console.warn(`${MANAGER_LOG} fetchPendingNudges: unexpected response shape`, parsed.error.issues); + console.warn( + `${MANAGER_LOG} fetchPendingNudges: unexpected response shape`, + parsed.error.issues + ); return null; } return parsed.data.data; @@ -280,9 +281,7 @@ async function fetchPendingNudges( */ async function markNudgeDelivered(agent: ManagedAgent, nudgeId: string): Promise { const authToken = - process.env.GASTOWN_CONTAINER_TOKEN ?? - agent.gastownContainerToken ?? - agent.gastownSessionToken; + process.env.GASTOWN_CONTAINER_TOKEN ?? agent.gastownContainerToken ?? agent.gastownSessionToken; if (!agent.gastownApiUrl || !authToken || !agent.townId || !agent.rigId) return; try { @@ -326,10 +325,7 @@ function clearIdleTimer(agentId: string): void { * Returns true if the agent should continue (nudge injected or timer started), * false if the agent should exit immediately (injection failed unrecoverably). */ -async function handleIdleEvent( - agent: ManagedAgent, - onExit: () => void -): Promise { +async function handleIdleEvent(agent: ManagedAgent, onExit: () => void): Promise { const agentId = agent.agentId; console.log(`${MANAGER_LOG} handleIdleEvent: checking nudges for agent ${agentId}`); @@ -337,7 +333,9 @@ async function handleIdleEvent( if (nudges === null) { // Error fetching — treat as no nudges, start idle timer - console.warn(`${MANAGER_LOG} handleIdleEvent: could not fetch nudges for ${agentId}, starting idle timer`); + console.warn( + `${MANAGER_LOG} handleIdleEvent: could not fetch nudges for ${agentId}, starting idle timer` + ); } else if (nudges.length > 0 && agent.status === 'running') { // There is at least one pending nudge — inject the first (highest priority) const nudge = nudges[0]; diff --git a/cloudflare-gastown/src/dos/Town.do.ts b/cloudflare-gastown/src/dos/Town.do.ts index 4fafc408d9..d73cfdded9 100644 --- a/cloudflare-gastown/src/dos/Town.do.ts +++ b/cloudflare-gastown/src/dos/Town.do.ts @@ -1083,7 +1083,12 @@ export class TownDO extends DurableObject { let expiresAt: string | null = null; if (mode === 'queue' && options?.ttlSeconds != null) { - expiresAt = new Date(Date.now() + options.ttlSeconds * 1000).toISOString(); + // Use SQLite-compatible datetime format (space separator, no Z suffix) so + // comparisons against datetime('now') work correctly. + expiresAt = new Date(Date.now() + options.ttlSeconds * 1000) + .toISOString() + .replace('T', ' ') + .replace('Z', ''); } query( @@ -1467,7 +1472,10 @@ export class TownDO extends DurableObject { 'The triage system has flagged you as potentially stuck. Please report your status.', { mode: 'immediate', source: 'triage', priority: 'urgent' } ).catch(err => - console.warn(`${TOWN_LOG} resolveTriage: nudge failed for agent=${targetAgentId}:`, err) + console.warn( + `${TOWN_LOG} resolveTriage: nudge failed for agent=${targetAgentId}:`, + err + ) ); this.emitEvent({ event: 'nudge.queued', diff --git a/cloudflare-gastown/src/dos/town/patrol.ts b/cloudflare-gastown/src/dos/town/patrol.ts index 0d587fe4d5..5fb78db442 100644 --- a/cloudflare-gastown/src/dos/town/patrol.ts +++ b/cloudflare-gastown/src/dos/town/patrol.ts @@ -236,7 +236,7 @@ export function detectGUPPViolations( WHERE ${agent_nudges.agent_bead_id} = ? AND ${agent_nudges.source} = 'witness' AND ${agent_nudges.message} LIKE '%GUPP_ESCALATION%' - AND ${agent_nudges.delivered_at} IS NULL + AND (${agent_nudges.delivered_at} IS NULL OR ${agent_nudges.delivered_at} > datetime('now', '-60 minutes')) LIMIT 1 `, [agent.bead_id] @@ -276,7 +276,7 @@ export function detectGUPPViolations( WHERE ${agent_nudges.agent_bead_id} = ? AND ${agent_nudges.source} = 'witness' AND ${agent_nudges.message} LIKE '%GUPP_CHECK%' - AND ${agent_nudges.delivered_at} IS NULL + AND (${agent_nudges.delivered_at} IS NULL OR ${agent_nudges.delivered_at} > datetime('now', '-60 minutes')) LIMIT 1 `, [agent.bead_id] @@ -288,7 +288,9 @@ export function detectGUPPViolations( 'GUPP_CHECK: You have had work hooked for 30+ minutes with no activity. Are you stuck? If so, call gt_escalate.', { mode: 'immediate', source: 'witness', priority: 'urgent' } ).catch(() => {}); - console.log(`${LOG} GUPP warn: agent=${agent.bead_id} stale=${Math.round(staleMs / 60_000)}min`); + console.log( + `${LOG} GUPP warn: agent=${agent.bead_id} stale=${Math.round(staleMs / 60_000)}min` + ); } } } diff --git a/cloudflare-gastown/src/handlers/rig-agents.handler.ts b/cloudflare-gastown/src/handlers/rig-agents.handler.ts index c93235ba48..d7f4408ad7 100644 --- a/cloudflare-gastown/src/handlers/rig-agents.handler.ts +++ b/cloudflare-gastown/src/handlers/rig-agents.handler.ts @@ -313,7 +313,7 @@ export async function handleNudge(c: Context, params: { rigId: strin */ export async function handleNudgeDelivered( c: Context, - params: { rigId: string; agentId: string } + _params: { rigId: string; agentId: string } ) { const parsed = NudgeDeliveredBody.safeParse(await parseJsonBody(c)); if (!parsed.success) {