Skip to content

[Gastown] PR 5: Rig DO Alarm — Work Scheduler #212

@jrf0110

Description

@jrf0110

Parent: #204 | Phase 1: Single Rig, Single Polecat

Revised: This was previously "tRPC Routes — Town & Rig Management." The tRPC routes have been moved to a new issue. This issue is now about the Rig DO alarm — the core scheduler that drives the system.

Goal

The Rig DO becomes the active scheduler. Alarms periodically scan state and signal the container to start/stop agent processes. This is the "DO is the brain, container is the muscle" model.

Alarm Handler

async alarm(): Promise<void> {
  await this.schedulePendingWork();
  await this.witnessPatrol();
  await this.processReviewQueue();

  // Re-arm: 30s while active, 5 min when idle
  const hasActiveWork = this.hasActiveAgentsOrPendingBeads();
  const nextAlarm = hasActiveWork ? 30_000 : 300_000;
  this.ctx.storage.setAlarm(Date.now() + nextAlarm);
}

schedulePendingWork() — Dispatch beads to agents

Find agents that have hooked beads but are idle (not yet started in container), and signal the container to start them:

async schedulePendingWork(): Promise<void> {
  const pendingAgents = this.ctx.storage.sql.exec(
    `SELECT a.*, b.id as bead_id, b.title as bead_title
     FROM agents a
     JOIN beads b ON b.assignee_agent_id = a.id
     WHERE a.status = 'idle'
     AND b.status = 'in_progress'
     AND a.current_hook_bead_id IS NOT NULL`
  ).toArray();

  for (const agent of pendingAgents) {
    await this.startAgentInContainer(agent);
  }
}

witnessPatrol() — Health monitoring

The existing witnessPatrol() method is now called by the alarm. Updated to check container process health instead of cloud-agent-next session health:

async witnessPatrol(): Promise<void> {
  const workingAgents = /* SELECT agents WHERE status IN ('working', 'blocked') */;

  for (const agent of workingAgents) {
    // Check if agent process is alive in container
    const container = this.env.TOWN_CONTAINER.get(
      this.env.TOWN_CONTAINER.idFromName(this.townId)
    );
    const statusRes = await container.fetch(`http://container/agents/${agent.id}/status`);
    const { status } = await statusRes.json();

    if (status === 'not_found' || status === 'exited') {
      if (agent.current_hook_bead_id) {
        await this.restartAgent(agent); // Re-dispatch with checkpoint
      } else {
        this.updateAgentStatus(agent.id, 'idle');
      }
      continue;
    }

    // GUPP violation check (30 min no progress)
    if (agent.last_activity_at) {
      const staleMs = Date.now() - new Date(agent.last_activity_at).getTime();
      if (staleMs > 30 * 60 * 1000) {
        await this.sendMail({
          from_agent_id: 'witness',
          to_agent_id: agent.id,
          subject: 'GUPP_CHECK',
          body: 'You have had work hooked for 30+ minutes with no activity. Are you stuck? If so, call gt_escalate.',
        });
      }
    }
  }
}

processReviewQueue() — Trigger merge/refinery

async processReviewQueue(): Promise<void> {
  const pendingEntry = this.popReviewQueue();
  if (!pendingEntry) return;
  // Phase 1: deterministic git merge via container
  // Phase 2: start refinery agent in container
  await this.startMergeInContainer(pendingEntry);
}

Alarm Activation

The alarm is armed when:

  • A new bead is assigned to an agent (hookBead)
  • An agent calls agentDone (to process review queue)
  • Container reports an agent process has exited
  • Health check endpoint is called
private armAlarmIfNeeded() {
  const currentAlarm = this.ctx.storage.getAlarm();
  if (!currentAlarm) {
    this.ctx.storage.setAlarm(Date.now() + 5_000);
  }
}

Note: This issue subsumes the witness alarm functionality from the old #217. The witness patrol is now part of the Rig DO alarm handler rather than a separate PR.

Dependencies

  • PR 1 (Rig DO)
  • PR 4 (Town Container — for fetch() communication)

Acceptance Criteria

  • alarm() handler implemented in Rig DO
  • schedulePendingWork() dispatches idle agents to container
  • witnessPatrol() checks container process health, restarts dead agents, detects GUPP violations
  • processReviewQueue() triggers merge processing
  • Alarm auto-arms on bead assignment, agent done, and health check
  • Adaptive alarm interval (30s active, 5 min idle)
  • Dead agent detection and restart with checkpoint recovery
  • GUPP violation mail sent to stale agents
  • Integration test: bead created → alarm fires → container starts agent

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions