diff --git a/packages/opencode/src/flag/flag.ts b/packages/opencode/src/flag/flag.ts index b11058b34058..3f67baff22e8 100644 --- a/packages/opencode/src/flag/flag.ts +++ b/packages/opencode/src/flag/flag.ts @@ -21,10 +21,8 @@ export namespace Flag { export const OPENCODE_DISABLE_CLAUDE_CODE = truthy("OPENCODE_DISABLE_CLAUDE_CODE") export const OPENCODE_DISABLE_CLAUDE_CODE_PROMPT = OPENCODE_DISABLE_CLAUDE_CODE || truthy("OPENCODE_DISABLE_CLAUDE_CODE_PROMPT") - export const OPENCODE_DISABLE_CLAUDE_CODE_SKILLS = - OPENCODE_DISABLE_CLAUDE_CODE || truthy("OPENCODE_DISABLE_CLAUDE_CODE_SKILLS") - export const OPENCODE_DISABLE_EXTERNAL_SKILLS = - OPENCODE_DISABLE_CLAUDE_CODE_SKILLS || truthy("OPENCODE_DISABLE_EXTERNAL_SKILLS") + export declare const OPENCODE_DISABLE_CLAUDE_CODE_SKILLS: boolean + export declare const OPENCODE_DISABLE_EXTERNAL_SKILLS: boolean export declare const OPENCODE_DISABLE_PROJECT_CONFIG: boolean export const OPENCODE_FAKE_VCS = process.env["OPENCODE_FAKE_VCS"] export declare const OPENCODE_CLIENT: string @@ -47,6 +45,7 @@ export namespace Flag { export const OPENCODE_EXPERIMENTAL_LSP_TOOL = OPENCODE_EXPERIMENTAL || truthy("OPENCODE_EXPERIMENTAL_LSP_TOOL") export const OPENCODE_DISABLE_FILETIME_CHECK = truthy("OPENCODE_DISABLE_FILETIME_CHECK") export const OPENCODE_EXPERIMENTAL_PLAN_MODE = OPENCODE_EXPERIMENTAL || truthy("OPENCODE_EXPERIMENTAL_PLAN_MODE") + export declare const OPENCODE_EXPERIMENTAL_AGENT_TEAMS: boolean export const OPENCODE_EXPERIMENTAL_MARKDOWN = truthy("OPENCODE_EXPERIMENTAL_MARKDOWN") export const OPENCODE_MODELS_URL = process.env["OPENCODE_MODELS_URL"] export const OPENCODE_MODELS_PATH = process.env["OPENCODE_MODELS_PATH"] @@ -91,3 +90,35 @@ Object.defineProperty(Flag, "OPENCODE_CLIENT", { enumerable: true, configurable: false, }) + +// Dynamic getter for OPENCODE_DISABLE_CLAUDE_CODE_SKILLS +// Evaluated at access time so tests and external tooling can toggle it +Object.defineProperty(Flag, "OPENCODE_DISABLE_CLAUDE_CODE_SKILLS", { + get() { + return truthy("OPENCODE_DISABLE_CLAUDE_CODE") || truthy("OPENCODE_DISABLE_CLAUDE_CODE_SKILLS") + }, + enumerable: true, + configurable: false, +}) + +// Dynamic getter for OPENCODE_DISABLE_EXTERNAL_SKILLS +// Independent of OPENCODE_DISABLE_CLAUDE_CODE so disabling Claude Code +// doesn't block .agents/skills/ loading +Object.defineProperty(Flag, "OPENCODE_DISABLE_EXTERNAL_SKILLS", { + get() { + return truthy("OPENCODE_DISABLE_EXTERNAL_SKILLS") + }, + enumerable: true, + configurable: false, +}) + +// Dynamic getter for OPENCODE_EXPERIMENTAL_AGENT_TEAMS +// This must be evaluated at access time, not module load time, +// because integration tests and external tooling may set this env var at runtime +Object.defineProperty(Flag, "OPENCODE_EXPERIMENTAL_AGENT_TEAMS", { + get() { + return Flag.OPENCODE_EXPERIMENTAL || truthy("OPENCODE_EXPERIMENTAL_AGENT_TEAMS") + }, + enumerable: true, + configurable: false, +}) diff --git a/packages/opencode/src/project/bootstrap.ts b/packages/opencode/src/project/bootstrap.ts index efdcaba99094..16f704b46b05 100644 --- a/packages/opencode/src/project/bootstrap.ts +++ b/packages/opencode/src/project/bootstrap.ts @@ -13,6 +13,7 @@ import { Log } from "@/util/log" import { ShareNext } from "@/share/share-next" import { Snapshot } from "../snapshot" import { Truncate } from "../tool/truncation" +import { Flag } from "@/flag/flag" export async function InstanceBootstrap() { Log.Default.info("bootstrapping", { directory: Instance.directory }) @@ -32,4 +33,25 @@ export async function InstanceBootstrap() { await Project.setInitialized(Instance.project.id) } }) + + // Team features — order matters: + // 1. onCleanedRestorePermissions() registers synchronously so it's ready + // before recover(), which could trigger cleanup if all members are shutdown. + // 2. recover() marks stale busy executions as cancelled, transitions members to ready, and notifies leads. + // 3. autoCleanup() subscribes AFTER recover finishes (.finally()) to avoid + // spurious MemberStatusChanged events during recovery triggering premature cleanup. + // Fire-and-forget: don't block bootstrap completion. + if (Flag.OPENCODE_EXPERIMENTAL_AGENT_TEAMS) { + // Dynamic import — only load team module when the feature flag is enabled + import("../team").then(({ Team }) => { + Team.onCleanedRestorePermissions() + Team.recover() + .catch((err) => { + Log.Default.warn("team recovery failed", { error: err instanceof Error ? err.message : err }) + }) + .finally(() => { + Team.autoCleanup() + }) + }) + } } diff --git a/packages/opencode/src/team/events.ts b/packages/opencode/src/team/events.ts new file mode 100644 index 000000000000..8b17b93ce1a2 --- /dev/null +++ b/packages/opencode/src/team/events.ts @@ -0,0 +1,163 @@ +import z from "zod" +import { BusEvent } from "../bus/bus-event" + +export const MemberStatus = z.enum(["ready", "busy", "shutdown_requested", "shutdown", "error"]) +export type MemberStatus = z.infer + +export const ExecutionStatus = z.enum([ + "idle", + "starting", + "running", + "cancel_requested", + "cancelling", + "cancelled", + "completing", + "completed", + "failed", + "timed_out", +]) +export type ExecutionStatus = z.infer + +/** Validates safe identifiers for team/member names — prevents path traversal */ +const SafeName = z + .string() + .regex(/^[a-z0-9][a-z0-9-]{0,63}$/, "Must be lowercase alphanumeric with hyphens, 1-64 chars") + +export const TeamMemberSchema = z.object({ + name: SafeName, + sessionID: z.string(), + agent: z.string(), + status: MemberStatus, + execution_status: ExecutionStatus.optional(), + prompt: z.string().optional(), + /** Model this teammate is using, in "providerID/modelID" format. */ + model: z.string().optional(), + planApproval: z.enum(["none", "pending", "approved", "rejected"]).optional(), +}) +export type TeamMember = z.infer + +export const TeamInfoSchema = z.object({ + name: SafeName, + leadSessionID: z.string(), + members: z.array(TeamMemberSchema), + created: z.number(), + delegate: z.boolean().optional(), +}) +export type TeamInfo = z.infer + +export const TeamTaskSchema = z.object({ + id: z.string(), + content: z.string(), + status: z.enum(["pending", "in_progress", "completed", "cancelled", "blocked"]), + priority: z.enum(["high", "medium", "low"]), + assignee: z.string().optional(), + depends_on: z.array(z.string()).optional(), +}) +export type TeamTask = z.infer + +export namespace TeamEvent { + export const Created = BusEvent.define( + "team.created", + z.object({ + team: TeamInfoSchema, + }), + ) + + export const MemberSpawned = BusEvent.define( + "team.member.spawned", + z.object({ + teamName: z.string(), + member: TeamMemberSchema, + }), + ) + + export const MemberStatusChanged = BusEvent.define( + "team.member.status", + z.object({ + teamName: z.string(), + memberName: z.string(), + status: MemberStatus, + }), + ) + + export const MemberExecutionChanged = BusEvent.define( + "team.member.execution", + z.object({ + teamName: z.string(), + memberName: z.string(), + status: ExecutionStatus, + }), + ) + + export const Message = BusEvent.define( + "team.message", + z.object({ + teamName: z.string(), + from: z.string(), + to: z.string(), + text: z.string(), + }), + ) + + export const Broadcast = BusEvent.define( + "team.broadcast", + z.object({ + teamName: z.string(), + from: z.string(), + text: z.string(), + }), + ) + + export const TaskUpdated = BusEvent.define( + "team.task.updated", + z.object({ + teamName: z.string(), + tasks: z.array(TeamTaskSchema), + }), + ) + + export const TaskClaimed = BusEvent.define( + "team.task.claimed", + z.object({ + teamName: z.string(), + taskId: z.string(), + memberName: z.string(), + }), + ) + + export const ShutdownRequest = BusEvent.define( + "team.shutdown.request", + z.object({ + teamName: z.string(), + memberName: z.string(), + }), + ) + + export const PlanApproval = BusEvent.define( + "team.plan.approval", + z.object({ + teamName: z.string(), + memberName: z.string(), + approved: z.boolean(), + feedback: z.string().optional(), + }), + ) + + export const MessageRead = BusEvent.define( + "team.message.read", + z.object({ + teamName: z.string(), + agentName: z.string(), + count: z.number(), + }), + ) + + export const Cleaned = BusEvent.define( + "team.cleaned", + z.object({ + teamName: z.string(), + leadSessionID: z.string(), + delegate: z.boolean(), + }), + ) +} diff --git a/packages/opencode/src/team/inbox.ts b/packages/opencode/src/team/inbox.ts new file mode 100644 index 000000000000..24e6a66e8de1 --- /dev/null +++ b/packages/opencode/src/team/inbox.ts @@ -0,0 +1,117 @@ +import { Log } from "../util/log" +import { Lock } from "../util/lock" +import { Global } from "../global" +import { Instance } from "../project/instance" +import { Bus } from "../bus" +import { TeamEvent } from "./events" +import path from "path" +import fs from "fs/promises" + +const log = Log.create({ service: "team.inbox" }) + +export interface InboxMessage { + id: string + from: string + text: string + timestamp: number + read: boolean +} + +/** Resolve the JSONL file path for an agent's inbox */ +function filepath(teamName: string, agentName: string): string { + return path.join(Global.Path.data, "storage", "team_inbox", Instance.project.id, teamName, agentName + ".jsonl") +} + +/** Parse a JSONL file into an array of InboxMessage */ +function parse(content: string): InboxMessage[] { + if (!content.trim()) return [] + return content + .split("\n") + .filter((line) => line.trim()) + .map((line) => JSON.parse(line) as InboxMessage) +} + +export namespace Inbox { + /** + * Write a message to an agent's inbox. + * Appends a single JSON line — O(1), no read-modify-write. + */ + export async function write(teamName: string, to: string, message: Omit): Promise { + const target = filepath(teamName, to) + using _ = await Lock.write(target) + await fs.mkdir(path.dirname(target), { recursive: true }) + await fs.appendFile(target, JSON.stringify({ ...message, read: false }) + "\n") + log.info("inbox write", { teamName, to, from: message.from, id: message.id }) + } + + /** + * Read all unread messages from an agent's inbox. + */ + export async function unread(teamName: string, agentName: string): Promise { + const target = filepath(teamName, agentName) + using _ = await Lock.read(target) + const content = await Bun.file(target) + .text() + .catch(() => "") + return parse(content).filter((m) => !m.read) + } + + /** + * Read all messages (read and unread) from an agent's inbox. + */ + export async function all(teamName: string, agentName: string): Promise { + const target = filepath(teamName, agentName) + using _ = await Lock.read(target) + const content = await Bun.file(target) + .text() + .catch(() => "") + return parse(content) + } + + /** + * Mark all unread messages as read for an agent. + * Returns the newly-read messages so callers can send delivery receipts. + * Publishes TeamEvent.MessageRead with the count. + * + * This is the only operation that rewrites the entire file. + */ + export async function markRead(teamName: string, agentName: string): Promise { + const target = filepath(teamName, agentName) + using _ = await Lock.write(target) + const content = await Bun.file(target) + .text() + .catch(() => "") + const messages = parse(content) + const read: InboxMessage[] = [] + for (const msg of messages) { + if (msg.read) continue + msg.read = true + read.push({ ...msg }) + } + if (read.length === 0) return [] + // Rewrite entire file with updated read flags + await Bun.write(target, messages.map((m) => JSON.stringify(m)).join("\n") + "\n") + log.info("inbox marked read", { teamName, agentName, count: read.length }) + await Bus.publish(TeamEvent.MessageRead, { teamName, agentName, count: read.length }) + return read + } + + /** + * Remove an agent's inbox entirely. + */ + export async function remove(teamName: string, agentName: string): Promise { + const target = filepath(teamName, agentName) + await fs.unlink(target).catch(() => {}) + } + + /** + * Remove all inboxes for a team. + */ + export async function removeAll(teamName: string, agentNames: string[]): Promise { + for (const name of agentNames) { + await remove(teamName, name) + } + // Also remove the lead inbox + await remove(teamName, "lead") + } +} diff --git a/packages/opencode/src/team/index.ts b/packages/opencode/src/team/index.ts new file mode 100644 index 000000000000..e651dd09d7c4 --- /dev/null +++ b/packages/opencode/src/team/index.ts @@ -0,0 +1,957 @@ +import z from "zod" +import { Log } from "../util/log" +import { Bus } from "../bus" +import { Instance } from "../project/instance" +import { Storage } from "../storage/storage" +import { Lock } from "../util/lock" +import { fn } from "../util/fn" +import { + TeamEvent, + MemberStatus as MemberStatusSchema, + ExecutionStatus, + TeamInfoSchema, + TeamTaskSchema, + type TeamInfo, + type TeamMember, + type TeamTask, + type MemberStatus, + type ExecutionStatus as ExecutionStatusType, +} from "./events" + +export { + TeamEvent, + ExecutionStatus, + TeamInfoSchema, + TeamTaskSchema, + type TeamInfo, + type TeamMember, + type TeamTask, +} from "./events" + +/** Write tools that are denied during plan-approval or delegate mode */ +export const WRITE_TOOLS = ["bash", "write", "edit", "multiedit", "apply_patch"] as const + +const log = Log.create({ service: "team" }) + +/** Storage key for a team's config */ +function configKey(name: string): string[] { + return ["team", Instance.project.id, name] +} + +/** Storage key for a team's task list — separate prefix from "team" so + * Storage.list(["team", projectID]) only returns config keys, not task data */ +function tasksKey(name: string): string[] { + return ["team_tasks", Instance.project.id, name] +} + +const TERMINAL_EXECUTION_STATES = new Set([ + "idle", + "cancelled", + "completed", + "failed", + "timed_out", +]) + +const CREATE_LOCK_KEY = () => `team:create:${Instance.project.id}` + +const MEMBER_TRANSITIONS: Record = { + ready: ["busy", "shutdown_requested", "shutdown", "error"], + busy: ["ready", "shutdown_requested", "error"], + shutdown_requested: ["shutdown", "error"], + shutdown: [], + error: ["ready", "shutdown_requested", "shutdown"], +} + +const EXECUTION_TRANSITIONS: Record = { + idle: ["starting"], + starting: ["running", "cancel_requested", "cancelling", "failed", "timed_out"], + running: ["cancel_requested", "cancelling", "completing", "failed", "timed_out"], + cancel_requested: ["cancelling", "cancelled", "failed", "timed_out"], + cancelling: ["cancelled", "failed", "timed_out"], + cancelled: ["idle"], + completing: ["completed", "failed", "timed_out"], + completed: ["idle"], + failed: ["idle"], + timed_out: ["idle"], +} + +function normalizeMember(member: TeamMember): TeamMember { + const status = MemberStatusSchema.parse(member.status) + const execution_status = ExecutionStatus.safeParse(member.execution_status).success + ? member.execution_status + : status === "busy" + ? "running" + : "idle" + return { + ...member, + status, + execution_status, + } +} + +function normalizeTeam(team: TeamInfo): TeamInfo { + return { + ...team, + members: team.members.map(normalizeMember), + } +} + +function canTransition(current: T, next: T, map: Record) { + if (current === next) return true + return map[current]?.includes(next) === true +} + +export namespace Team { + /** + * Subscribe to member status changes and auto-cleanup teams + * when all members have reached "shutdown" status. + * Called once during InstanceBootstrap. + */ + export function autoCleanup(): () => void { + return Bus.subscribe(TeamEvent.MemberStatusChanged, async (event) => { + if (event.properties.status !== "shutdown") return + + const team = await get(event.properties.teamName) + if (!team) return + if (team.members.length === 0) return + if (team.members.some((m) => m.status !== "shutdown")) return + + log.info("all members shutdown, auto-cleaning team", { teamName: team.name }) + try { + await cleanup(team.name) + } catch (err: unknown) { + log.warn("auto-cleanup failed", { + teamName: team.name, + error: err instanceof Error ? err.message : String(err), + }) + } + }) + } + + /** + * Listen for TeamEvent.Cleaned and restore session permissions. + * This decouples the team module from the session module — + * cleanup only publishes the event, this listener handles session side-effects. + */ + export function onCleanedRestorePermissions(): () => void { + return Bus.subscribe(TeamEvent.Cleaned, async (event) => { + if (!event.properties.delegate) return + + try { + const { Session } = await import("../session") + await Session.update(event.properties.leadSessionID, (draft) => { + draft.permission = (draft.permission ?? []).filter( + (rule) => !((WRITE_TOOLS as readonly string[]).includes(rule.permission) && rule.action === "deny"), + ) + }) + log.info("restored lead session permissions", { + teamName: event.properties.teamName, + sessionID: event.properties.leadSessionID, + }) + } catch (err: unknown) { + log.warn("failed to restore lead session permissions", { + teamName: event.properties.teamName, + error: err instanceof Error ? err.message : String(err), + }) + } + }) + } + + /** + * Create a new team. The lead session is the caller's session. + */ + export const create = fn( + z.object({ + name: z.string(), + leadSessionID: z.string(), + delegate: z.boolean().optional(), + }), + async (input) => { + using _ = await Lock.write(CREATE_LOCK_KEY()) + + const existing = await get(input.name) + if (existing) throw new Error(`Team "${input.name}" already exists`) + + const lead = await findBySession(input.leadSessionID) + if (lead?.role === "lead") + throw new Error(`Session is already leading team "${lead.team.name}". Only one team per session is allowed.`) + if (lead?.role === "member") + throw new Error(`This session is a teammate in "${lead.team.name}". Teammates cannot create new teams.`) + + const team: TeamInfo = { + name: input.name, + leadSessionID: input.leadSessionID, + members: [], + created: Date.now(), + ...(input.delegate ? { delegate: true } : {}), + } + + await Storage.write(configKey(input.name), team) + await Storage.write(tasksKey(input.name), [] as TeamTask[]) + + log.info("team created", { name: input.name, leadSessionID: input.leadSessionID }) + await Bus.publish(TeamEvent.Created, { team }) + return team + }, + ) + + /** + * Get a team by name. Returns undefined if not found. + */ + export const get = fn(z.string(), async (name) => { + try { + return normalizeTeam(await Storage.read(configKey(name))) + } catch { + return undefined + } + }) + + /** + * List all teams in this project. + */ + export async function list(): Promise { + try { + const keys = await Storage.list(["team", Instance.project.id]) + return (await Promise.all(keys.map((key) => Storage.read(key).catch(() => undefined)))) + .filter((t): t is TeamInfo => t !== undefined) + .map(normalizeTeam) + } catch { + return [] + } + } + + /** + * Add a member to a team (atomic via Storage.update). + * Rejects duplicate names (case-insensitive), duplicate sessionIDs, and "lead" as a name. + */ + export async function addMember(teamName: string, member: TeamMember): Promise { + const lower = member.name.toLowerCase() + if (lower === "lead") throw new Error(`Name "lead" is reserved and cannot be used for a teammate.`) + + await Storage.update(configKey(teamName), (draft) => { + if (draft.members.some((m) => m.name.toLowerCase() === lower)) + throw new Error(`Teammate "${member.name}" already exists in team "${teamName}" (case-insensitive)`) + if (draft.members.some((m) => m.sessionID === member.sessionID)) + throw new Error(`Session "${member.sessionID}" is already registered in team "${teamName}"`) + draft.members.push(member) + }) + + log.info("member added", { teamName, member: member.name, agent: member.agent }) + await Bus.publish(TeamEvent.MemberSpawned, { teamName, member }) + } + + export async function transitionMemberStatus( + teamName: string, + memberName: string, + status: MemberStatus, + options?: { guard?: boolean; force?: boolean }, + ): Promise { + let changed = false + try { + await Storage.update(configKey(teamName), (draft) => { + const member = draft.members.find((m) => m.name === memberName) + if (!member) return + if (options?.guard && member.status === "shutdown") return + const from = member.status + if (!options?.force && !canTransition(from, status, MEMBER_TRANSITIONS)) return + if (from === status) return + member.status = status + changed = true + }) + } catch { + return false + } + if (!changed) return false + await Bus.publish(TeamEvent.MemberStatusChanged, { teamName, memberName, status }) + return true + } + + export async function transitionExecutionStatus( + teamName: string, + memberName: string, + status: ExecutionStatusType, + options?: { force?: boolean }, + ): Promise { + let changed = false + try { + await Storage.update(configKey(teamName), (draft) => { + const member = draft.members.find((m) => m.name === memberName) + if (!member) return + const from = normalizeMember(member).execution_status ?? "idle" + if (!options?.force && !canTransition(from, status, EXECUTION_TRANSITIONS)) return + if (from === status) return + member.execution_status = status + changed = true + }) + } catch { + return false + } + if (!changed) return false + await Bus.publish(TeamEvent.MemberExecutionChanged, { teamName, memberName, status }) + return true + } + + /** + * Backward-compatible setter for tests and call sites that need direct status updates. + */ + export async function setMemberStatus( + teamName: string, + memberName: string, + status: MemberStatus, + options?: { guard?: boolean }, + ): Promise { + await transitionMemberStatus(teamName, memberName, status, { guard: options?.guard, force: true }) + } + + /** + * Toggle delegate mode on a team. + */ + export async function setDelegate(teamName: string, delegate: boolean): Promise { + try { + await Storage.update(configKey(teamName), (draft) => { + draft.delegate = delegate + }) + } catch { + // Team not found — ignore + } + } + + /** + * Update a member's plan approval status. + */ + export async function setMemberPlanApproval( + teamName: string, + memberName: string, + planApproval: "none" | "pending" | "approved" | "rejected", + ): Promise { + try { + await Storage.update(configKey(teamName), (draft) => { + const member = draft.members.find((m) => m.name === memberName) + if (!member) return + member.planApproval = planApproval + }) + } catch { + // Team not found — ignore + } + } + + /** + * Remove a member from a team. + */ + export async function removeMember(teamName: string, memberName: string): Promise { + try { + await Storage.update(configKey(teamName), (draft) => { + draft.members = draft.members.filter((m) => m.name !== memberName) + }) + } catch { + // Team not found — ignore + } + log.info("member removed", { teamName, memberName }) + } + + /** + * Find which team a session belongs to (as lead or member). + */ + export async function findBySession( + sessionID: string, + ): Promise<{ team: TeamInfo; role: "lead" | "member"; memberName?: string } | undefined> { + const teams = await list() + for (const team of teams) { + if (team.leadSessionID === sessionID) return { team, role: "lead" } + const member = team.members.find((m) => m.sessionID === sessionID) + if (member) return { team, role: "member", memberName: member.name } + } + return undefined + } + + /** + * Resolve the model for a teammate. + * Priority: explicit model param > agent model > lead's last model > global default. + * Returns `{ error }` if the explicit model is not found. + */ + export async function resolveModel(input: { + model?: string + agent: { model?: { providerID: string; modelID: string } } + messages: Array<{ info: { role: string; model?: { providerID: string; modelID: string } } }> + }): Promise<{ providerID: string; modelID: string } | { error: string }> { + const { Provider } = await import("../provider/provider") + + if (input.model) { + const parsed = Provider.parseModel(input.model) + try { + await Provider.getModel(parsed.providerID, parsed.modelID) + } catch (e: unknown) { + if (Provider.ModelNotFoundError.isInstance(e)) { + const hint = e.data.suggestions?.length ? ` Did you mean: ${e.data.suggestions.join(", ")}?` : "" + return { error: `Model not found: ${input.model}.${hint}` } + } + throw e + } + return parsed + } + if (input.agent.model) return input.agent.model + const lastUser = input.messages.findLast((m) => m.info.role === "user") + if (lastUser?.info.model) return lastUser.info.model + return await Provider.defaultModel() + } + + /** + * Spawn a teammate — creates session, registers member, starts prompt loop. + * On addMember failure, cleans up the orphaned session. + */ + export async function spawnMember(input: { + teamName: string + name: string + parentSessionID: string + agent: { name: string; prompt?: string; skills?: string[] } + model: { providerID: string; modelID: string } + prompt: string + claimTask?: string + planApproval: boolean + }): Promise<{ sessionID: string; label: string }> { + const { Session } = await import("../session") + const { SessionPrompt } = await import("../session/prompt") + const { Identifier } = await import("../id/id") + const { Instance: Inst } = await import("../project/instance") + const { TeamMessaging } = await import("./messaging") + + const label = `${input.model.providerID}/${input.model.modelID}` + + // Build permission rules for the child session + const rules: Array<{ permission: string; pattern: string; action: "deny" | "allow" }> = [ + { permission: "team_create", pattern: "*", action: "deny" }, + { permission: "team_spawn", pattern: "*", action: "deny" }, + { permission: "team_shutdown", pattern: "*", action: "deny" }, + { permission: "team_cleanup", pattern: "*", action: "deny" }, + { permission: "team_approve_plan", pattern: "*", action: "deny" }, + ] + if (input.planApproval) { + // Pattern "*:plan-approval" is intentionally NOT "*" — PermissionNext.disabled() only + // strips tools with pattern "*", so these remain visible to the model but are denied at + // execution time. The ":plan-approval" tag lets approvePlan() remove only these rules. + rules.push( + ...WRITE_TOOLS.map((tool) => ({ permission: tool, pattern: "*:plan-approval", action: "deny" as const })), + ) + } + + const session = await Session.createNext({ + parentID: input.parentSessionID, + directory: Inst.directory, + title: `${input.name} (@${input.agent.name} teammate, ${label})${input.planApproval ? " [plan mode]" : ""}`, + permission: rules, + }) + + // Register member — if this fails, clean up the orphaned session + try { + await addMember(input.teamName, { + name: input.name, + sessionID: session.id, + agent: input.agent.name, + status: "busy", + execution_status: "idle", + prompt: input.prompt, + model: label, + planApproval: input.planApproval ? "pending" : "none", + }) + } catch (err) { + // Orphaned session cleanup + try { + await Session.remove(session.id) + } catch { + log.warn("failed to clean up orphaned session", { sessionID: session.id }) + } + throw err + } + + if (input.claimTask) { + await TeamTasks.claim(input.teamName, input.claimTask, input.name).catch(() => {}) + } + + // Build teammate context message + const planInstructions = input.planApproval + ? [ + "", + "IMPORTANT: You are in PLAN MODE (read-only). You can read files, search, and explore,", + "but you CANNOT write, edit, or run bash commands until the lead approves your plan.", + "", + "Your workflow:", + "1. Research and explore the codebase to understand the problem", + "2. Formulate a detailed implementation plan", + "3. Send your plan to the lead using team_message (to: 'lead')", + "4. Wait for the lead to approve your plan (you'll receive a message when approved)", + "5. Once approved, your write permissions will be unlocked and you can implement", + "", + ] + : [] + + const skillContext = input.agent.skills?.length + ? [ + "", + `Preloaded skills: ${input.agent.skills.join(", ")}`, + "These skills are already loaded into your context — you do not need to invoke the skill tool for them.", + "", + ] + : [] + + const context = [ + `You are "${input.name}", a teammate in team "${input.teamName}".`, + `Your agent type is "${input.agent.name}", using model ${label}.`, + "", + "Team tools available to you:", + "- team_message: send a message to the lead or another teammate", + "- team_broadcast: send a message to all teammates", + "- team_tasks: view/add/complete tasks on the shared task list", + "- team_claim: claim a pending task from the shared task list", + "", + "You do NOT have access to team_create, team_spawn, team_shutdown, or team_cleanup.", + "Only the team lead can manage the team structure.", + ...skillContext, + ...planInstructions, + "When you finish a task, mark it done with team_tasks and send a summary to the lead with team_message.", + "You can message any teammate by name — not just the lead. Coordinate directly with peers when useful.", + "", + "SUBAGENT RELAY: If you use the task tool to spawn subagents, they CANNOT communicate with the team.", + "You are responsible for relaying any relevant subagent findings via team_message or team_broadcast.", + "", + "IMPORTANT: Your plain text output is NOT visible to the team lead or other teammates.", + "You MUST use team_message or team_broadcast to communicate. Just typing a response is not enough.", + "", + "Your instructions:", + input.prompt, + ].join("\n") + + const msgId = Identifier.ascending("message") + await Session.updateMessage({ + id: msgId, + sessionID: session.id, + role: "user", + agent: input.agent.name, + model: input.model, + time: { created: Date.now() }, + }) + await Session.updatePart({ + id: Identifier.ascending("part"), + messageID: msgId, + sessionID: session.id, + type: "text", + text: context, + }) + + await transitionMemberStatus(input.teamName, input.name, "busy") + await transitionExecutionStatus(input.teamName, input.name, "starting") + + // Fire-and-forget the teammate's prompt loop. + // Wrapped in Promise.resolve().then() to guard against synchronous throws. + log.info("spawning teammate", { teamName: input.teamName, name: input.name, sessionID: session.id }) + Promise.resolve() + .then(async () => { + await transitionExecutionStatus(input.teamName, input.name, "running") + return SessionPrompt.loop({ sessionID: session.id }) + }) + .then(async () => { + log.info("teammate loop ended", { teamName: input.teamName, name: input.name }) + await transitionExecutionStatus(input.teamName, input.name, "completing") + await transitionExecutionStatus(input.teamName, input.name, "completed") + await transitionExecutionStatus(input.teamName, input.name, "idle") + const team = await get(input.teamName) + const member = team?.members.find((m) => m.name === input.name) + if (member?.status === "shutdown_requested") { + await transitionMemberStatus(input.teamName, input.name, "shutdown") + } else { + await transitionMemberStatus(input.teamName, input.name, "ready") + } + await notifyLead(input.teamName, input.name, session.id, "completed") + }) + .catch(async (err) => { + log.warn("teammate loop error", { teamName: input.teamName, name: input.name, error: err.message }) + await transitionExecutionStatus(input.teamName, input.name, "failed") + await transitionExecutionStatus(input.teamName, input.name, "idle") + await transitionMemberStatus(input.teamName, input.name, "error") + await notifyLead(input.teamName, input.name, session.id, "errored", err.message) + }) + + return { sessionID: session.id, label } + } + + /** + * Approve or reject a teammate's plan. On approval, removes plan-approval + * deny rules and notifies the teammate. + */ + export async function approvePlan(input: { + teamName: string + memberName: string + approved: boolean + feedback?: string + }): Promise { + const { Session } = await import("../session") + const { TeamMessaging } = await import("./messaging") + + const team = await get(input.teamName) + if (!team) throw new Error(`Team "${input.teamName}" not found`) + + const member = team.members.find((m) => m.name === input.memberName) + if (!member) throw new Error(`Teammate "${input.memberName}" not found`) + + if (input.approved) { + await Session.update(member.sessionID, (draft) => { + if (draft.permission) { + draft.permission = draft.permission.filter((rule) => rule.pattern !== "*:plan-approval") + } + }) + await setMemberPlanApproval(input.teamName, input.memberName, "approved") + await TeamMessaging.send({ + teamName: input.teamName, + from: "lead", + to: input.memberName, + text: input.feedback + ? `Your plan has been APPROVED. You now have full write access. Feedback: ${input.feedback}` + : "Your plan has been APPROVED. You now have full write access. Proceed with implementation.", + }) + } else { + await setMemberPlanApproval(input.teamName, input.memberName, "rejected") + await TeamMessaging.send({ + teamName: input.teamName, + from: "lead", + to: input.memberName, + text: `Your plan has been REJECTED. Please revise and resubmit. Feedback: ${input.feedback ?? "No specific feedback provided."}`, + }) + } + + await Bus.publish(TeamEvent.PlanApproval, { + teamName: input.teamName, + memberName: input.memberName, + approved: input.approved, + feedback: input.feedback, + }) + } + + /** + * Notify the lead that a teammate's loop finished or errored. + * Uses guard option because the lead may have already sent a shutdown request + * (setting status to "shutdown") while the loop was finishing — without guard, + * this would overwrite "shutdown" with "ready", preventing auto-cleanup. + */ + async function notifyLead( + teamName: string, + name: string, + sessionID: string, + status: "completed" | "cancelled" | "errored", + error?: string, + ) { + try { + const { TeamMessaging } = await import("./messaging") + + const team = await get(teamName) + if (!team) return + + const member = team.members.find((m) => m.name === name) + if (member?.status === "shutdown") return + + const text = + status === "cancelled" + ? `I was interrupted by the lead and am now idle. Send me a message to resume work.` + : status === "completed" + ? `I have finished my current work and am now idle. Review my session (${sessionID}) for detailed results. You can use team_shutdown to shut me down if no more work is needed.` + : `I encountered an error and stopped: ${error ?? "unknown error"}. Review my session (${sessionID}). You can use team_shutdown to shut me down, or send me a message to retry.` + + await TeamMessaging.send({ + teamName, + from: name, + to: "lead", + text, + }) + } catch (err: unknown) { + log.warn("failed to notify lead of teammate completion", { + teamName, + name, + error: err instanceof Error ? err.message : String(err), + }) + } + } + + /** + * Clean up a team — removes config and task data. + * Fails if any members are still active. + * Publishes TeamEvent.Cleaned so listeners can handle side-effects + * (e.g. restoring lead session permissions). + */ + export async function cleanup(teamName: string): Promise { + const team = await get(teamName) + if (!team) throw new Error(`Team "${teamName}" not found`) + + const alive = team.members.filter((m) => m.status !== "shutdown") + if (alive.length > 0) { + throw new Error( + `Cannot clean up team "${teamName}": ${alive.length} non-shutdown member(s): ${alive.map((m) => m.name).join(", ")}. Shut them down first.`, + ) + } + + const { Inbox } = await import("./inbox") + await Inbox.removeAll( + teamName, + team.members.map((m) => m.name), + ) + await Storage.remove(configKey(teamName)) + await Storage.remove(tasksKey(teamName)) + + log.info("team cleaned up", { teamName }) + await Bus.publish(TeamEvent.Cleaned, { + teamName, + leadSessionID: team.leadSessionID, + delegate: !!team.delegate, + }) + } + + /** + * Cancel a single teammate's prompt loop by calling SessionPrompt.cancel. + * This mirrors how the Task tool propagates abort to subagents (task.ts:121-125). + * Returns true if the member was found and cancelled. + */ + export async function cancelMember(teamName: string, memberName: string): Promise { + const { SessionPrompt } = await import("../session/prompt") + const { SessionStatus } = await import("../session/status") + + const team = await get(teamName) + if (!team) return false + + const member = team.members.find((m) => m.name === memberName) + if (!member) return false + // Allow cancel for busy members and shutdown_requested members + // (shutdown sets shutdown_requested before calling cancelMember, + // so the member is no longer "busy" by the time we get here) + if (member.status !== "busy" && member.status !== "shutdown_requested") return false + if (TERMINAL_EXECUTION_STATES.has(member.execution_status ?? "idle")) return false + + log.info("cancelling member", { teamName, memberName, sessionID: member.sessionID }) + await transitionExecutionStatus(teamName, memberName, "cancel_requested") + + for (const _ of [0, 1, 2]) { + SessionPrompt.cancel(member.sessionID) + await transitionExecutionStatus(teamName, memberName, "cancelling") + await Bun.sleep(120) + const next = await get(teamName) + const current = next?.members.find((m) => m.name === memberName) + if (!current) break + if (TERMINAL_EXECUTION_STATES.has(current.execution_status ?? "idle")) break + if (current.status !== "busy" && current.status !== "shutdown_requested") break + } + + const next = await get(teamName) + const current = next?.members.find((m) => m.name === memberName) + if (!current) return true + if (TERMINAL_EXECUTION_STATES.has(current.execution_status ?? "idle")) return true + + const runtime = SessionStatus.get(member.sessionID) + if (runtime.type !== "idle") return false + + await transitionExecutionStatus(teamName, memberName, "cancelled", { force: true }) + await transitionExecutionStatus(teamName, memberName, "idle", { force: true }) + if (current.status === "busy") { + await transitionMemberStatus(teamName, memberName, "ready", { force: true }) + } + return true + } + + /** + * Cancel all active teammates' prompt loops. + * Returns the count of members that were cancelled. + */ + export async function cancelAllMembers(teamName: string): Promise { + const { SessionPrompt } = await import("../session/prompt") + + const team = await get(teamName) + if (!team) return 0 + + let count = 0 + for (const member of team.members) { + if (member.status !== "busy") continue + if (TERMINAL_EXECUTION_STATES.has(member.execution_status ?? "idle")) continue + log.info("cancelling member", { teamName, memberName: member.name, sessionID: member.sessionID }) + await transitionExecutionStatus(teamName, member.name, "cancel_requested") + SessionPrompt.cancel(member.sessionID) + await transitionExecutionStatus(teamName, member.name, "cancelling") + count++ + } + return count + } + + /** + * Mark teammates that were busy when the server died as cancelled + * and inject a notification into the lead session. + * Called once during InstanceBootstrap. + */ + export async function recover(): Promise<{ interrupted: number }> { + const teams = await list() + let count = 0 + + for (const team of teams) { + const active = team.members.filter((m) => m.status === "busy") + if (active.length === 0) continue + + log.info("marking interrupted teammates", { teamName: team.name, count: active.length }) + + const names: string[] = [] + for (const member of active) { + await transitionExecutionStatus(team.name, member.name, "cancelled", { force: true }) + await transitionExecutionStatus(team.name, member.name, "idle", { force: true }) + await transitionMemberStatus(team.name, member.name, "ready", { force: true }) + names.push(member.name) + count++ + } + + // Recover undelivered inbox messages for interrupted members and the lead + try { + const { TeamMessaging } = await import("./messaging") + for (const member of active) { + await TeamMessaging.recoverInbox(team.name, member.name, member.sessionID) + } + await TeamMessaging.recoverInbox(team.name, "lead", team.leadSessionID) + } catch (err: unknown) { + log.warn("inbox recovery failed", { + teamName: team.name, + error: err instanceof Error ? err.message : String(err), + }) + } + + try { + const { Session } = await import("../session") + const { Identifier } = await import("../id/id") + const msgs = await Session.messages({ sessionID: team.leadSessionID }) + const lastUser = msgs.findLast((m) => m.info.role === "user") + if (lastUser) { + const info = lastUser.info as { agent: string; model: { providerID: string; modelID: string } } + const msgId = Identifier.ascending("message") + await Session.updateMessage({ + id: msgId, + sessionID: team.leadSessionID, + role: "user", + agent: info.agent, + model: info.model, + time: { created: Date.now() }, + }) + await Session.updatePart({ + id: Identifier.ascending("part"), + messageID: msgId, + sessionID: team.leadSessionID, + type: "text", + text: `[System]: Server was restarted. The following teammates in team "${team.name}" were interrupted and need to be resumed: ${names.join(", ")}. Use team_message or team_broadcast to tell them to continue their work.`, + synthetic: true, + }) + } + } catch (err: unknown) { + log.warn("failed to notify lead of interrupted teammates", { + teamName: team.name, + error: err instanceof Error ? err.message : String(err), + }) + } + } + + if (count > 0) log.info("team recovery complete", { interrupted: count }) + return { interrupted: count } + } +} + +export namespace TeamTasks { + /** + * Read all tasks for a team. + */ + export async function list(teamName: string): Promise { + try { + return await Storage.read(tasksKey(teamName)) + } catch { + return [] + } + } + + /** + * Write the full task list for a team (replaces). + */ + export async function update(teamName: string, tasks: TeamTask[]): Promise { + const resolved = resolveDependencies(tasks) + await Storage.write(tasksKey(teamName), resolved) + await Bus.publish(TeamEvent.TaskUpdated, { teamName, tasks: resolved }) + } + + /** + * Add tasks to the team's task list. + */ + export async function add(teamName: string, newTasks: TeamTask[]): Promise { + const existing = await list(teamName) + const resolved = resolveDependencies([...existing, ...newTasks]) + await Storage.write(tasksKey(teamName), resolved) + await Bus.publish(TeamEvent.TaskUpdated, { teamName, tasks: resolved }) + } + + /** + * Atomically claim a task. Returns true if claimed, false if already taken. + */ + export async function claim(teamName: string, taskId: string, memberName: string): Promise { + let claimed = false + try { + await Storage.update(tasksKey(teamName), (tasks) => { + const task = tasks.find((t) => t.id === taskId) + if (!task) return + if (task.status !== "pending") return + if (task.assignee) return + + if (task.depends_on?.length) { + const unresolved = task.depends_on.some((depId) => { + const dep = tasks.find((t) => t.id === depId) + return !dep || (dep.status !== "completed" && dep.status !== "cancelled") + }) + if (unresolved) return + } + + task.status = "in_progress" + task.assignee = memberName + claimed = true + }) + } catch { + return false + } + + if (claimed) await Bus.publish(TeamEvent.TaskClaimed, { teamName, taskId, memberName }) + return claimed + } + + /** + * Mark a task as completed. + */ + export async function complete(teamName: string, taskId: string): Promise { + let tasks: TeamTask[] = [] + try { + tasks = await Storage.update(tasksKey(teamName), (draft) => { + const task = draft.find((t) => t.id === taskId) + if (task) task.status = "completed" + const resolved = resolveDependencies(draft) + // Mutate in-place — Storage.update serializes the original reference, + // so reassignment (draft = resolved) wouldn't propagate + draft.length = 0 + draft.push(...resolved) + }) + } catch { + return + } + await Bus.publish(TeamEvent.TaskUpdated, { teamName, tasks }) + } + + function resolveDependencies(tasks: TeamTask[]): TeamTask[] { + const validIds = new Set(tasks.map((t) => t.id)) + + return tasks.map((task) => { + if (task.depends_on) { + task = { ...task, depends_on: task.depends_on.filter((id) => validIds.has(id) && id !== task.id) } + } + if (!task.depends_on?.length) return task + + const unresolved = task.depends_on.some((depId) => { + const dep = tasks.find((t) => t.id === depId) + return !dep || (dep.status !== "completed" && dep.status !== "cancelled") + }) + + if (unresolved && task.status === "pending") return { ...task, status: "blocked" } + if (!unresolved && task.status === "blocked") return { ...task, status: "pending" } + return task + }) + } +} diff --git a/packages/opencode/src/team/messaging.ts b/packages/opencode/src/team/messaging.ts new file mode 100644 index 000000000000..c973e9cce559 --- /dev/null +++ b/packages/opencode/src/team/messaging.ts @@ -0,0 +1,319 @@ +import { Log } from "../util/log" +import { Bus } from "../bus" +import { Session } from "../session" +import { SessionPrompt } from "../session/prompt" +import { SessionStatus } from "../session/status" +import { Identifier } from "../id/id" +import { Team, TeamEvent } from "./index" +import { Inbox } from "./inbox" + +const log = Log.create({ service: "team.messaging" }) +const MAX_TEXT = 10 * 1024 + +function validateText(text: string) { + if (text.length <= MAX_TEXT) return + throw new Error(`Team message too large (${text.length} chars). Maximum is ${MAX_TEXT} chars.`) +} + +function messageId(): string { + return `im_${Date.now().toString(36)}_${Math.random().toString(36).slice(2, 8)}` +} + +export namespace TeamMessaging { + /** + * Send a message from one team member to another. + * Writes to the recipient's inbox (source of truth), then injects + * a synthetic user message into their session (delivery mechanism), + * then auto-wakes if idle. + */ + export async function send(input: { teamName: string; from: string; to: string; text: string }): Promise { + validateText(input.text) + const team = await Team.get(input.teamName) + if (!team) throw new Error(`Team "${input.teamName}" not found`) + + // Find recipient session + let targetSessionID: string | undefined + if (input.to === "lead") { + targetSessionID = team.leadSessionID + } else { + const member = team.members.find((m) => m.name === input.to) + if (!member) throw new Error(`Member "${input.to}" not found in team "${input.teamName}"`) + if (member.status === "shutdown") throw new Error(`Member "${input.to}" has shut down`) + targetSessionID = member.sessionID + } + + if (!targetSessionID) throw new Error(`Could not find session for "${input.to}"`) + + // Write to inbox (source of truth) + const inboxId = messageId() + await Inbox.write(input.teamName, input.to, { + id: inboxId, + from: input.from, + text: input.text, + timestamp: Date.now(), + }) + + // Inject into session (delivery mechanism), tagged with inbox ID for dedup + await injectMessage(targetSessionID, input.from, input.text, inboxId) + + log.info("message sent", { teamName: input.teamName, from: input.from, to: input.to }) + await Bus.publish(TeamEvent.Message, { + teamName: input.teamName, + from: input.from, + to: input.to, + text: input.text, + }) + + // Auto-wake: if the recipient session is idle, start its prompt loop + // so the LLM processes the injected message. + autoWake(targetSessionID, input.from) + } + + /** + * Broadcast a message from one member to all other members. + */ + export async function broadcast(input: { teamName: string; from: string; text: string }): Promise { + validateText(input.text) + const team = await Team.get(input.teamName) + if (!team) throw new Error(`Team "${input.teamName}" not found`) + + // Send to all active members except the sender + const memberTargets = team.members + .filter((m) => m.name !== input.from && m.status !== "shutdown") + .map((m) => ({ name: m.name, sessionID: m.sessionID })) + + const targets = + input.from !== "lead" && team.leadSessionID + ? [{ name: "lead", sessionID: team.leadSessionID }, ...memberTargets] + : memberTargets + + const errors: Array<{ target: string; phase: string; error: string }> = [] + for (const target of targets) { + const inboxId = messageId() + + // Write to inbox (source of truth) + const wrote = await Inbox.write(input.teamName, target.name, { + id: inboxId, + from: input.from, + text: input.text, + timestamp: Date.now(), + }).then( + () => true, + (err) => { + const msg = err instanceof Error ? err.message : String(err) + log.warn("broadcast inbox write failed", { target: target.name, error: msg }) + errors.push({ target: target.name, phase: "inbox", error: msg }) + return false + }, + ) + + // Only inject if inbox write succeeded — no point delivering a message + // that won't survive recovery + if (wrote) { + await injectMessage(target.sessionID, input.from, input.text, inboxId).catch((err) => { + const msg = err instanceof Error ? err.message : String(err) + log.warn("broadcast inject failed", { target: target.name, error: msg }) + errors.push({ target: target.name, phase: "inject", error: msg }) + }) + } + } + + const delivered = targets.length - errors.filter((e) => e.phase === "inbox").length + log.info("broadcast sent", { + teamName: input.teamName, + from: input.from, + targets: targets.length, + delivered, + errors: errors.length, + }) + if (errors.length > 0) log.warn("broadcast partial failure", { teamName: input.teamName, errors }) + + await Bus.publish(TeamEvent.Broadcast, { + teamName: input.teamName, + from: input.from, + text: input.text, + }) + + // Auto-wake all idle recipient sessions + for (const target of targets) { + autoWake(target.sessionID, input.from) + } + } + + /** + * Mark all messages as read in an agent's inbox, then send + * delivery receipts back to each sender. Receipts are batched + * per sender and flow through the same inbox + inject + auto-wake + * path as regular team messages. + */ + export async function markRead(teamName: string, agentName: string): Promise { + const read = await Inbox.markRead(teamName, agentName) + if (read.length === 0) return 0 + + // Group by sender for batched receipts + const bySender = new Map() + for (const msg of read) { + bySender.set(msg.from, (bySender.get(msg.from) ?? 0) + 1) + } + + // Send a receipt to each distinct sender + const team = await Team.get(teamName) + if (team) { + for (const [sender, count] of bySender) { + // Find sender's session + let senderSessionID: string | undefined + if (sender === "lead") { + senderSessionID = team.leadSessionID + } else { + const member = team.members.find((m) => m.name === sender) + if (member && member.status !== "shutdown") senderSessionID = member.sessionID + } + if (!senderSessionID) continue + + const text = count === 1 ? `${agentName} has read your message` : `${agentName} has read your ${count} messages` + + const receiptId = messageId() + await Inbox.write(teamName, sender, { + id: receiptId, + from: agentName, + text: `[receipt] ${text}`, + timestamp: Date.now(), + }).catch((err: unknown) => { + log.warn("receipt inbox write failed", { + teamName, + sender, + error: err instanceof Error ? err.message : String(err), + }) + }) + + await injectMessage(senderSessionID, agentName, `[receipt] ${text}`, receiptId).catch((err: unknown) => { + log.warn("receipt inject failed", { + teamName, + sender, + error: err instanceof Error ? err.message : String(err), + }) + }) + + autoWake(senderSessionID, agentName) + } + log.info("delivery receipts sent", { teamName, from: agentName, senders: [...bySender.keys()] }) + } + + return read.length + } + + /** + * Reinject unread inbox messages that were never delivered to the session. + * Deduplicates by inboxMessageId stored in part metadata. + * Returns the number of messages reinjected. + */ + export async function recoverInbox(teamName: string, agentName: string, sessionID: string): Promise { + const pending = await Inbox.unread(teamName, agentName) + if (pending.length === 0) return 0 + + // Find inbox message IDs already present in the session + const msgs = await Session.messages({ sessionID }) + const delivered = new Set() + for (const msg of msgs) { + for (const part of msg.parts) { + const meta = (part as { metadata?: Record }).metadata + if (meta?.inboxMessageId) delivered.add(meta.inboxMessageId as string) + } + } + + let count = 0 + for (const msg of pending) { + if (delivered.has(msg.id)) continue + await injectMessage(sessionID, msg.from, msg.text, msg.id) + count++ + } + + if (count > 0) + log.info("inbox recovery", { teamName, agentName, reinjected: count, skipped: pending.length - count }) + return count + } + + /** + * Auto-wake an idle session after a team message is injected. + * If the session is idle (no active prompt loop), starts a new loop + * so the LLM picks up and processes the injected message. + */ + async function autoWake(sessionID: string, from: string) { + try { + const status = SessionStatus.get(sessionID) + if (status.type !== "idle") return + // Don't wake a teammate that's fully shut down. + // We DO wake for shutdown_requested — the teammate needs to process + // the shutdown message and wrap up. The .then() handler below + // transitions shutdown_requested → shutdown when the loop ends. + const info = await Team.findBySession(sessionID) + if (info && info.role === "member") { + const member = info.team.members.find((m) => m.name === info.memberName) + if (member?.status === "shutdown") return + } + log.info("auto-waking idle session", { sessionID, from }) + SessionPrompt.loop({ sessionID }) + .then(async () => { + // When an auto-woken loop ends, check if shutdown was requested. + // Shutdown is authoritative — the teammate gets one loop to wrap up + // (summarize findings, send final messages) then transitions to shutdown. + // Both this handler and the spawn .then() check for shutdown_requested; + // transitionMemberStatus is idempotent (from === status returns early). + const match = await Team.findBySession(sessionID) + if (!match || match.role !== "member") return + const team = await Team.get(match.team.name) + const member = team?.members.find((m) => m.name === match.memberName) + if (member?.status === "shutdown_requested") { + await Team.transitionMemberStatus(match.team.name, match.memberName!, "shutdown") + log.info("auto-wake loop completed shutdown", { teamName: match.team.name, name: match.memberName }) + } + }) + .catch((err: unknown) => { + log.warn("auto-wake loop failed", { sessionID, error: err instanceof Error ? err.message : String(err) }) + }) + } catch (err) { + log.warn("auto-wake failed", { sessionID, error: err instanceof Error ? (err as Error).message : String(err) }) + } + } + + /** + * Inject a synthetic user message into a session from a teammate. + * This is how teammates "receive" messages — as user messages + * with a TeamMessagePart that the prompt loop will process. + */ + async function injectMessage( + sessionID: string, + fromName: string, + text: string, + inboxMessageId?: string, + ): Promise { + // Get the session to find the current agent and model + // Don't limit — we need to find the last user message which may not be the most recent + const msgs = await Session.messages({ sessionID }) + const lastUser = msgs.findLast((m) => m.info.role === "user") + if (!lastUser) { + throw new Error(`No user message found in session ${sessionID}`) + } + const userInfo = lastUser.info as { agent: string; model: { providerID: string; modelID: string } } + + const msgId = Identifier.ascending("message") + await Session.updateMessage({ + id: msgId, + sessionID, + role: "user", + agent: userInfo.agent, + model: userInfo.model, + time: { created: Date.now() }, + }) + + await Session.updatePart({ + id: Identifier.ascending("part"), + messageID: msgId, + sessionID, + type: "text", + text: `[Team message from ${fromName}]: ${text}`, + synthetic: true, + ...(inboxMessageId ? { metadata: { inboxMessageId } } : {}), + }) + } +} diff --git a/packages/opencode/test/team/team-autowake.test.ts b/packages/opencode/test/team/team-autowake.test.ts new file mode 100644 index 000000000000..319c71b03777 --- /dev/null +++ b/packages/opencode/test/team/team-autowake.test.ts @@ -0,0 +1,639 @@ +/** + * Tests for the autoWake mechanism in TeamMessaging. + * + * autoWake is a private function called inside TeamMessaging.send() and + * TeamMessaging.broadcast(). When a team message is injected into a + * recipient's session, autoWake checks SessionStatus — if the session is + * idle, it fires SessionPrompt.loop() so the LLM picks up the new message. + * + * In the test environment there is no LLM, so SessionPrompt.loop() will + * fail. The important property autoWake guarantees is that the failure is + * caught (logged, not thrown) so message delivery always succeeds. + * + * We verify: + * 1. send() succeeds for idle recipients (auto-wake error is swallowed) + * 2. send() succeeds for busy recipients (no wake attempted) + * 3. broadcast() delivers to all non-shutdown members regardless of status + * 4. Message format is correct: "[Team message from {name}]: {text}" + * 5. Bus events (TeamEvent.Message / TeamEvent.Broadcast) are published + * 6. Shutdown members are skipped during broadcast + */ +import { describe, expect, test } from "bun:test" +import path from "path" +import { Instance } from "../../src/project/instance" +import { Team } from "../../src/team" +import { TeamMessaging } from "../../src/team/messaging" +import { TeamEvent } from "../../src/team/events" +import { Session } from "../../src/session" +import { SessionStatus } from "../../src/session/status" +import { Bus } from "../../src/bus" +import { Identifier } from "../../src/id/id" +import { Log } from "../../src/util/log" +import { tmpdir } from "../fixture/fixture" + +Log.init({ print: false }) + +async function seedUserMessage(sessionID: string, text = "init") { + const mid = Identifier.ascending("message") + await Session.updateMessage({ + id: mid, + sessionID, + role: "user", + agent: "general", + model: { providerID: "anthropic", modelID: "claude-3-5-sonnet-20241022" }, + time: { created: Date.now() }, + }) + await Session.updatePart({ + id: Identifier.ascending("part"), + messageID: mid, + sessionID, + type: "text", + text, + }) + return mid +} + +// --------------------------------------------------------------------------- +// send() – idle recipient +// --------------------------------------------------------------------------- + +describe("autoWake: send to idle recipient", () => { + test("send succeeds and message is injected even though auto-wake loop will fail (no LLM)", async () => { + await using tmp = await tmpdir({ git: true }) + + await Instance.provide({ + directory: tmp.path, + fn: async () => { + const lead = await Session.create({}) + const member = await Session.create({ parentID: lead.id }) + await seedUserMessage(lead.id) + await seedUserMessage(member.id) + + await Team.create({ name: "wake-idle", leadSessionID: lead.id }) + await Team.addMember("wake-idle", { + name: "worker", + sessionID: member.id, + agent: "general", + status: "busy", + }) + + // Confirm member session is idle (default state — no prompt loop running) + const before = SessionStatus.get(member.id) + expect(before.type).toBe("idle") + + // send() should NOT throw even though autoWake fires and loop() fails + await TeamMessaging.send({ + teamName: "wake-idle", + from: "lead", + to: "worker", + text: "Please start task A", + }) + + // Verify the synthetic message was injected + const msgs = await Session.messages({ sessionID: member.id }) + const received = msgs.find((m) => + m.parts.some((p) => p.type === "text" && p.text.includes("[Team message from lead]")), + ) + expect(received).toBeDefined() + const part = received!.parts.find((p) => p.type === "text") as any + expect(part.text).toBe("[Team message from lead]: Please start task A") + + await Team.setMemberStatus("wake-idle", "worker", "shutdown") + await Team.cleanup("wake-idle") + }, + }) + }) + + test("message format is correct: [Team message from {name}]: {text}", async () => { + await using tmp = await tmpdir({ git: true }) + + await Instance.provide({ + directory: tmp.path, + fn: async () => { + const lead = await Session.create({}) + const member = await Session.create({ parentID: lead.id }) + await seedUserMessage(lead.id) + await seedUserMessage(member.id) + + await Team.create({ name: "fmt-team", leadSessionID: lead.id }) + await Team.addMember("fmt-team", { + name: "reviewer", + sessionID: member.id, + agent: "general", + status: "busy", + }) + + await TeamMessaging.send({ + teamName: "fmt-team", + from: "reviewer", + to: "lead", + text: "Found 3 issues in auth module", + }) + + const msgs = await Session.messages({ sessionID: lead.id }) + const injected = msgs.find((m) => + m.parts.some((p) => p.type === "text" && p.text.startsWith("[Team message from reviewer]:")), + ) + expect(injected).toBeDefined() + + const textPart = injected!.parts.find((p) => p.type === "text") as any + expect(textPart.text).toBe("[Team message from reviewer]: Found 3 issues in auth module") + expect(textPart.synthetic).toBe(true) + + await Team.setMemberStatus("fmt-team", "reviewer", "shutdown") + await Team.cleanup("fmt-team") + }, + }) + }) +}) + +// --------------------------------------------------------------------------- +// send() – busy recipient +// --------------------------------------------------------------------------- + +describe("autoWake: send to busy recipient", () => { + test("send succeeds and message is injected when recipient is busy (no wake attempted)", async () => { + await using tmp = await tmpdir({ git: true }) + + await Instance.provide({ + directory: tmp.path, + fn: async () => { + const lead = await Session.create({}) + const member = await Session.create({ parentID: lead.id }) + await seedUserMessage(lead.id) + await seedUserMessage(member.id) + + await Team.create({ name: "wake-busy", leadSessionID: lead.id }) + await Team.addMember("wake-busy", { + name: "worker", + sessionID: member.id, + agent: "general", + status: "busy", + }) + + // Simulate a busy session (prompt loop already running) + SessionStatus.set(member.id, { type: "busy" }) + expect(SessionStatus.get(member.id).type).toBe("busy") + + // send() should succeed — autoWake skips because status !== "idle" + await TeamMessaging.send({ + teamName: "wake-busy", + from: "lead", + to: "worker", + text: "Update on requirements", + }) + + // Message was still injected + const msgs = await Session.messages({ sessionID: member.id }) + const received = msgs.find((m) => + m.parts.some((p) => p.type === "text" && p.text.includes("[Team message from lead]")), + ) + expect(received).toBeDefined() + + // Status should still be busy (autoWake did nothing) + expect(SessionStatus.get(member.id).type).toBe("busy") + + // Reset status for cleanup + SessionStatus.set(member.id, { type: "idle" }) + await Team.setMemberStatus("wake-busy", "worker", "shutdown") + await Team.cleanup("wake-busy") + }, + }) + }) + + test("send succeeds when recipient is in retry state", async () => { + await using tmp = await tmpdir({ git: true }) + + await Instance.provide({ + directory: tmp.path, + fn: async () => { + const lead = await Session.create({}) + const member = await Session.create({ parentID: lead.id }) + await seedUserMessage(lead.id) + await seedUserMessage(member.id) + + await Team.create({ name: "wake-retry", leadSessionID: lead.id }) + await Team.addMember("wake-retry", { + name: "worker", + sessionID: member.id, + agent: "general", + status: "busy", + }) + + // Set retry state — autoWake should skip (type !== "idle") + SessionStatus.set(member.id, { type: "retry", attempt: 1, message: "rate limited", next: Date.now() + 5000 }) + expect(SessionStatus.get(member.id).type).toBe("retry") + + await TeamMessaging.send({ + teamName: "wake-retry", + from: "lead", + to: "worker", + text: "Just checking in", + }) + + // Message still injected + const msgs = await Session.messages({ sessionID: member.id }) + const received = msgs.find((m) => m.parts.some((p) => p.type === "text" && p.text.includes("Just checking in"))) + expect(received).toBeDefined() + + // Status unchanged + expect(SessionStatus.get(member.id).type).toBe("retry") + + SessionStatus.set(member.id, { type: "idle" }) + await Team.setMemberStatus("wake-retry", "worker", "shutdown") + await Team.cleanup("wake-retry") + }, + }) + }) +}) + +// --------------------------------------------------------------------------- +// broadcast() – autoWake across multiple members +// --------------------------------------------------------------------------- + +describe("autoWake: broadcast", () => { + test("broadcast delivers to all active members regardless of idle/busy status", async () => { + await using tmp = await tmpdir({ git: true }) + + await Instance.provide({ + directory: tmp.path, + fn: async () => { + const lead = await Session.create({}) + const idle1 = await Session.create({ parentID: lead.id }) + const idle2 = await Session.create({ parentID: lead.id }) + const busy1 = await Session.create({ parentID: lead.id }) + + await seedUserMessage(lead.id) + await seedUserMessage(idle1.id) + await seedUserMessage(idle2.id) + await seedUserMessage(busy1.id) + + await Team.create({ name: "bcast-wake", leadSessionID: lead.id }) + await Team.addMember("bcast-wake", { name: "idle-a", sessionID: idle1.id, agent: "general", status: "busy" }) + await Team.addMember("bcast-wake", { name: "idle-b", sessionID: idle2.id, agent: "general", status: "busy" }) + await Team.addMember("bcast-wake", { name: "busy-c", sessionID: busy1.id, agent: "general", status: "busy" }) + + // idle-a and idle-b are idle (default), busy-c is busy + SessionStatus.set(busy1.id, { type: "busy" }) + + // Broadcast from lead to all members + await TeamMessaging.broadcast({ + teamName: "bcast-wake", + from: "lead", + text: "New priority: focus on auth module", + }) + + // All three members should have received the message + for (const [name, sid] of [ + ["idle-a", idle1.id], + ["idle-b", idle2.id], + ["busy-c", busy1.id], + ] as const) { + const msgs = await Session.messages({ sessionID: sid }) + const received = msgs.find((m) => m.parts.some((p) => p.type === "text" && p.text.includes("New priority"))) + expect(received).toBeDefined() + } + + // busy-c should still be busy + expect(SessionStatus.get(busy1.id).type).toBe("busy") + + // Cleanup + SessionStatus.set(busy1.id, { type: "idle" }) + for (const name of ["idle-a", "idle-b", "busy-c"]) { + await Team.setMemberStatus("bcast-wake", name, "shutdown") + } + await Team.cleanup("bcast-wake") + }, + }) + }) + + test("broadcast skips shutdown members", async () => { + await using tmp = await tmpdir({ git: true }) + + await Instance.provide({ + directory: tmp.path, + fn: async () => { + const lead = await Session.create({}) + const active = await Session.create({ parentID: lead.id }) + const shutdown = await Session.create({ parentID: lead.id }) + + await seedUserMessage(lead.id) + await seedUserMessage(active.id) + await seedUserMessage(shutdown.id) + + await Team.create({ name: "bcast-skip", leadSessionID: lead.id }) + await Team.addMember("bcast-skip", { name: "alive", sessionID: active.id, agent: "general", status: "busy" }) + await Team.addMember("bcast-skip", { + name: "dead", + sessionID: shutdown.id, + agent: "general", + status: "shutdown", + }) + + await TeamMessaging.broadcast({ + teamName: "bcast-skip", + from: "lead", + text: "Are you there?", + }) + + // Active member gets the message + const activeMsgs = await Session.messages({ sessionID: active.id }) + const received = activeMsgs.find((m) => + m.parts.some((p) => p.type === "text" && p.text.includes("Are you there?")), + ) + expect(received).toBeDefined() + + // Shutdown member does NOT get the message + const shutdownMsgs = await Session.messages({ sessionID: shutdown.id }) + const skipped = shutdownMsgs.find((m) => + m.parts.some((p) => p.type === "text" && p.text.includes("Are you there?")), + ) + expect(skipped).toBeUndefined() + + await Team.setMemberStatus("bcast-skip", "alive", "shutdown") + await Team.cleanup("bcast-skip") + }, + }) + }) + + test("broadcast from member excludes sender but includes lead", async () => { + await using tmp = await tmpdir({ git: true }) + + await Instance.provide({ + directory: tmp.path, + fn: async () => { + const lead = await Session.create({}) + const memberA = await Session.create({ parentID: lead.id }) + const memberB = await Session.create({ parentID: lead.id }) + + await seedUserMessage(lead.id) + await seedUserMessage(memberA.id) + await seedUserMessage(memberB.id) + + await Team.create({ name: "bcast-sender", leadSessionID: lead.id }) + await Team.addMember("bcast-sender", { + name: "alice", + sessionID: memberA.id, + agent: "general", + status: "busy", + }) + await Team.addMember("bcast-sender", { name: "bob", sessionID: memberB.id, agent: "general", status: "busy" }) + + // alice broadcasts + await TeamMessaging.broadcast({ + teamName: "bcast-sender", + from: "alice", + text: "I found something important", + }) + + // Lead should receive it + const leadMsgs = await Session.messages({ sessionID: lead.id }) + const leadReceived = leadMsgs.find((m) => + m.parts.some((p) => p.type === "text" && p.text.includes("I found something important")), + ) + expect(leadReceived).toBeDefined() + + // bob should receive it + const bobMsgs = await Session.messages({ sessionID: memberB.id }) + const bobReceived = bobMsgs.find((m) => + m.parts.some((p) => p.type === "text" && p.text.includes("I found something important")), + ) + expect(bobReceived).toBeDefined() + + // alice (sender) should NOT receive it + const aliceMsgs = await Session.messages({ sessionID: memberA.id }) + const aliceReceived = aliceMsgs.find((m) => + m.parts.some((p) => p.type === "text" && p.text.includes("I found something important")), + ) + expect(aliceReceived).toBeUndefined() + + for (const name of ["alice", "bob"]) { + await Team.setMemberStatus("bcast-sender", name, "shutdown") + } + await Team.cleanup("bcast-sender") + }, + }) + }) +}) + +// --------------------------------------------------------------------------- +// Bus events +// --------------------------------------------------------------------------- + +describe("autoWake: bus events are published", () => { + test("send publishes TeamEvent.Message", async () => { + await using tmp = await tmpdir({ git: true }) + + await Instance.provide({ + directory: tmp.path, + fn: async () => { + const lead = await Session.create({}) + const member = await Session.create({ parentID: lead.id }) + await seedUserMessage(lead.id) + await seedUserMessage(member.id) + + await Team.create({ name: "event-send", leadSessionID: lead.id }) + await Team.addMember("event-send", { name: "worker", sessionID: member.id, agent: "general", status: "busy" }) + + const events: any[] = [] + const unsub = Bus.subscribe(TeamEvent.Message, (event) => { + events.push(event.properties) + }) + + await TeamMessaging.send({ + teamName: "event-send", + from: "lead", + to: "worker", + text: "Do the thing", + }) + + unsub() + + expect(events).toHaveLength(1) + expect(events[0].teamName).toBe("event-send") + expect(events[0].from).toBe("lead") + expect(events[0].to).toBe("worker") + expect(events[0].text).toBe("Do the thing") + + await Team.setMemberStatus("event-send", "worker", "shutdown") + await Team.cleanup("event-send") + }, + }) + }) + + test("broadcast publishes TeamEvent.Broadcast", async () => { + await using tmp = await tmpdir({ git: true }) + + await Instance.provide({ + directory: tmp.path, + fn: async () => { + const lead = await Session.create({}) + const member = await Session.create({ parentID: lead.id }) + await seedUserMessage(lead.id) + await seedUserMessage(member.id) + + await Team.create({ name: "event-bcast", leadSessionID: lead.id }) + await Team.addMember("event-bcast", { + name: "worker", + sessionID: member.id, + agent: "general", + status: "busy", + }) + + const events: any[] = [] + const unsub = Bus.subscribe(TeamEvent.Broadcast, (event) => { + events.push(event.properties) + }) + + await TeamMessaging.broadcast({ + teamName: "event-bcast", + from: "lead", + text: "All hands update", + }) + + unsub() + + expect(events).toHaveLength(1) + expect(events[0].teamName).toBe("event-bcast") + expect(events[0].from).toBe("lead") + expect(events[0].text).toBe("All hands update") + + await Team.setMemberStatus("event-bcast", "worker", "shutdown") + await Team.cleanup("event-bcast") + }, + }) + }) +}) + +// --------------------------------------------------------------------------- +// Error resilience +// --------------------------------------------------------------------------- + +describe("autoWake: error resilience", () => { + test("send to idle recipient does not throw even though loop() fails internally", async () => { + await using tmp = await tmpdir({ git: true }) + + await Instance.provide({ + directory: tmp.path, + fn: async () => { + const lead = await Session.create({}) + const member = await Session.create({ parentID: lead.id }) + await seedUserMessage(lead.id) + await seedUserMessage(member.id) + + await Team.create({ name: "resilient", leadSessionID: lead.id }) + await Team.addMember("resilient", { name: "worker", sessionID: member.id, agent: "general", status: "busy" }) + + // Member session is idle → autoWake will try SessionPrompt.loop() + // which will fail (no LLM/agent config in test). The error must be caught. + expect(SessionStatus.get(member.id).type).toBe("idle") + + // This must NOT throw + await TeamMessaging.send({ + teamName: "resilient", + from: "lead", + to: "worker", + text: "This should not fail", + }) + + // The message was still delivered + const msgs = await Session.messages({ sessionID: member.id }) + const received = msgs.find((m) => + m.parts.some((p) => p.type === "text" && p.text.includes("This should not fail")), + ) + expect(received).toBeDefined() + + await Team.setMemberStatus("resilient", "worker", "shutdown") + await Team.cleanup("resilient") + }, + }) + }) + + test("broadcast with mix of idle and busy members does not throw", async () => { + await using tmp = await tmpdir({ git: true }) + + await Instance.provide({ + directory: tmp.path, + fn: async () => { + const lead = await Session.create({}) + const s1 = await Session.create({ parentID: lead.id }) + const s2 = await Session.create({ parentID: lead.id }) + + await seedUserMessage(lead.id) + await seedUserMessage(s1.id) + await seedUserMessage(s2.id) + + await Team.create({ name: "resilient-bcast", leadSessionID: lead.id }) + await Team.addMember("resilient-bcast", { + name: "idle-one", + sessionID: s1.id, + agent: "general", + status: "busy", + }) + await Team.addMember("resilient-bcast", { + name: "busy-one", + sessionID: s2.id, + agent: "general", + status: "busy", + }) + + SessionStatus.set(s2.id, { type: "busy" }) + + // Must NOT throw despite idle member triggering a failing loop() + await TeamMessaging.broadcast({ + teamName: "resilient-bcast", + from: "lead", + text: "Broadcast that must not fail", + }) + + // Both members got the message + for (const sid of [s1.id, s2.id]) { + const msgs = await Session.messages({ sessionID: sid }) + const received = msgs.find((m) => + m.parts.some((p) => p.type === "text" && p.text.includes("Broadcast that must not fail")), + ) + expect(received).toBeDefined() + } + + SessionStatus.set(s2.id, { type: "idle" }) + for (const name of ["idle-one", "busy-one"]) { + await Team.setMemberStatus("resilient-bcast", name, "shutdown") + } + await Team.cleanup("resilient-bcast") + }, + }) + }) + + test("multiple rapid sends to idle session all succeed", async () => { + await using tmp = await tmpdir({ git: true }) + + await Instance.provide({ + directory: tmp.path, + fn: async () => { + const lead = await Session.create({}) + const member = await Session.create({ parentID: lead.id }) + await seedUserMessage(lead.id) + await seedUserMessage(member.id) + + await Team.create({ name: "rapid-wake", leadSessionID: lead.id }) + await Team.addMember("rapid-wake", { name: "worker", sessionID: member.id, agent: "general", status: "busy" }) + + // Fire multiple sends rapidly — all should succeed + await Promise.all([ + TeamMessaging.send({ teamName: "rapid-wake", from: "lead", to: "worker", text: "msg-1" }), + TeamMessaging.send({ teamName: "rapid-wake", from: "lead", to: "worker", text: "msg-2" }), + TeamMessaging.send({ teamName: "rapid-wake", from: "lead", to: "worker", text: "msg-3" }), + ]) + + const msgs = await Session.messages({ sessionID: member.id }) + const teamMsgs = msgs.filter((m) => + m.parts.some((p) => p.type === "text" && p.text.startsWith("[Team message from lead]:")), + ) + expect(teamMsgs).toHaveLength(3) + + await Team.setMemberStatus("rapid-wake", "worker", "shutdown") + await Team.cleanup("rapid-wake") + }, + }) + }) +}) diff --git a/packages/opencode/test/team/team-cancel.test.ts b/packages/opencode/test/team/team-cancel.test.ts new file mode 100644 index 000000000000..092fb315d7e8 --- /dev/null +++ b/packages/opencode/test/team/team-cancel.test.ts @@ -0,0 +1,427 @@ +/** + * Tests for Team.cancelMember and Team.cancelAllMembers. + * + * These methods propagate abort from the lead session to teammate sessions + * by calling SessionPrompt.cancel on each active member, mirroring the + * Task tool's abort propagation pattern (task.ts:121-125). + */ +import { describe, expect, test } from "bun:test" +import { Instance } from "../../src/project/instance" +import { Team } from "../../src/team" +import { Session } from "../../src/session" +import { SessionStatus } from "../../src/session/status" +import { Log } from "../../src/util/log" +import { tmpdir } from "../fixture/fixture" + +Log.init({ print: false }) + +describe("Team.cancelMember", () => { + test("returns false for non-existent team", async () => { + await using tmp = await tmpdir({ git: true }) + + await Instance.provide({ + directory: tmp.path, + fn: async () => { + const result = await Team.cancelMember("no-such-team", "alice") + expect(result).toBe(false) + }, + }) + }) + + test("returns false for non-existent member", async () => { + await using tmp = await tmpdir({ git: true }) + + await Instance.provide({ + directory: tmp.path, + fn: async () => { + const lead = await Session.create({}) + await Team.create({ name: "cancel-test-1", leadSessionID: lead.id }) + + const result = await Team.cancelMember("cancel-test-1", "ghost") + expect(result).toBe(false) + + await Team.cleanup("cancel-test-1") + }, + }) + }) + + test("returns false for non-active member (idle)", async () => { + await using tmp = await tmpdir({ git: true }) + + await Instance.provide({ + directory: tmp.path, + fn: async () => { + const lead = await Session.create({}) + await Team.create({ name: "cancel-test-2", leadSessionID: lead.id }) + + const member = await Session.create({ parentID: lead.id }) + await Team.addMember("cancel-test-2", { + name: "idle-worker", + sessionID: member.id, + agent: "general", + status: "busy", + }) + + // Set to idle first + await Team.setMemberStatus("cancel-test-2", "idle-worker", "ready") + + const result = await Team.cancelMember("cancel-test-2", "idle-worker") + expect(result).toBe(false) + + await Team.setMemberStatus("cancel-test-2", "idle-worker", "shutdown") + await Team.cleanup("cancel-test-2") + }, + }) + }) + + test("returns false for shutdown member", async () => { + await using tmp = await tmpdir({ git: true }) + + await Instance.provide({ + directory: tmp.path, + fn: async () => { + const lead = await Session.create({}) + await Team.create({ name: "cancel-test-3", leadSessionID: lead.id }) + + const member = await Session.create({ parentID: lead.id }) + await Team.addMember("cancel-test-3", { + name: "done-worker", + sessionID: member.id, + agent: "general", + status: "busy", + }) + + await Team.setMemberStatus("cancel-test-3", "done-worker", "shutdown") + + const result = await Team.cancelMember("cancel-test-3", "done-worker") + expect(result).toBe(false) + + await Team.cleanup("cancel-test-3") + }, + }) + }) + + test("cancels active member and sets session status to idle", async () => { + await using tmp = await tmpdir({ git: true }) + + await Instance.provide({ + directory: tmp.path, + fn: async () => { + const lead = await Session.create({}) + await Team.create({ name: "cancel-test-4", leadSessionID: lead.id }) + + const member = await Session.create({ parentID: lead.id }) + await Team.addMember("cancel-test-4", { + name: "busy-worker", + sessionID: member.id, + agent: "general", + status: "busy", + }) + + // Simulate the member being busy + SessionStatus.set(member.id, { type: "busy" }) + expect(SessionStatus.get(member.id).type).toBe("busy") + + const result = await Team.cancelMember("cancel-test-4", "busy-worker") + expect(result).toBe(true) + + // SessionPrompt.cancel sets status to idle + expect(SessionStatus.get(member.id).type).toBe("idle") + + await Team.setMemberStatus("cancel-test-4", "busy-worker", "shutdown") + await Team.cleanup("cancel-test-4") + }, + }) + }) +}) + +describe("Team.cancelAllMembers", () => { + test("returns 0 for non-existent team", async () => { + await using tmp = await tmpdir({ git: true }) + + await Instance.provide({ + directory: tmp.path, + fn: async () => { + const result = await Team.cancelAllMembers("no-such-team") + expect(result).toBe(0) + }, + }) + }) + + test("returns 0 when no active members", async () => { + await using tmp = await tmpdir({ git: true }) + + await Instance.provide({ + directory: tmp.path, + fn: async () => { + const lead = await Session.create({}) + await Team.create({ name: "cancel-all-1", leadSessionID: lead.id }) + + const member = await Session.create({ parentID: lead.id }) + await Team.addMember("cancel-all-1", { + name: "shutdown-worker", + sessionID: member.id, + agent: "general", + status: "busy", + }) + await Team.setMemberStatus("cancel-all-1", "shutdown-worker", "shutdown") + + const result = await Team.cancelAllMembers("cancel-all-1") + expect(result).toBe(0) + + await Team.cleanup("cancel-all-1") + }, + }) + }) + + test("cancels all active members and returns count", async () => { + await using tmp = await tmpdir({ git: true }) + + await Instance.provide({ + directory: tmp.path, + fn: async () => { + const lead = await Session.create({}) + await Team.create({ name: "cancel-all-2", leadSessionID: lead.id }) + + const m1 = await Session.create({ parentID: lead.id }) + const m2 = await Session.create({ parentID: lead.id }) + const m3 = await Session.create({ parentID: lead.id }) + + await Team.addMember("cancel-all-2", { + name: "worker-a", + sessionID: m1.id, + agent: "general", + status: "busy", + }) + await Team.addMember("cancel-all-2", { + name: "worker-b", + sessionID: m2.id, + agent: "general", + status: "busy", + }) + await Team.addMember("cancel-all-2", { + name: "worker-c", + sessionID: m3.id, + agent: "general", + status: "busy", + }) + + // One member is shutdown — should not be cancelled + await Team.setMemberStatus("cancel-all-2", "worker-c", "shutdown") + + // Simulate busy sessions + SessionStatus.set(m1.id, { type: "busy" }) + SessionStatus.set(m2.id, { type: "busy" }) + + const result = await Team.cancelAllMembers("cancel-all-2") + expect(result).toBe(2) + + // Both active members should now be idle + expect(SessionStatus.get(m1.id).type).toBe("idle") + expect(SessionStatus.get(m2.id).type).toBe("idle") + + // Cleanup + await Team.setMemberStatus("cancel-all-2", "worker-a", "shutdown") + await Team.setMemberStatus("cancel-all-2", "worker-b", "shutdown") + await Team.cleanup("cancel-all-2") + }, + }) + }) + + test("skips interrupted members", async () => { + await using tmp = await tmpdir({ git: true }) + + await Instance.provide({ + directory: tmp.path, + fn: async () => { + const lead = await Session.create({}) + await Team.create({ name: "cancel-all-3", leadSessionID: lead.id }) + + const m1 = await Session.create({ parentID: lead.id }) + const m2 = await Session.create({ parentID: lead.id }) + + await Team.addMember("cancel-all-3", { + name: "active-one", + sessionID: m1.id, + agent: "general", + status: "busy", + }) + await Team.addMember("cancel-all-3", { + name: "interrupted-one", + sessionID: m2.id, + agent: "general", + status: "busy", + }) + await Team.setMemberStatus("cancel-all-3", "interrupted-one", "ready") + + SessionStatus.set(m1.id, { type: "busy" }) + + const result = await Team.cancelAllMembers("cancel-all-3") + expect(result).toBe(1) // Only the active one + + expect(SessionStatus.get(m1.id).type).toBe("idle") + + await Team.setMemberStatus("cancel-all-3", "active-one", "shutdown") + await Team.setMemberStatus("cancel-all-3", "interrupted-one", "shutdown") + await Team.cleanup("cancel-all-3") + }, + }) + }) +}) + +describe("Abort propagation: lead abort cancels teammates", () => { + test("findBySession + cancelAllMembers cancels teammates when lead is aborted", async () => { + await using tmp = await tmpdir({ git: true }) + + await Instance.provide({ + directory: tmp.path, + fn: async () => { + const lead = await Session.create({}) + await Team.create({ name: "abort-prop-1", leadSessionID: lead.id }) + + const m1 = await Session.create({ parentID: lead.id }) + const m2 = await Session.create({ parentID: lead.id }) + + await Team.addMember("abort-prop-1", { + name: "worker-x", + sessionID: m1.id, + agent: "general", + status: "busy", + }) + await Team.addMember("abort-prop-1", { + name: "worker-y", + sessionID: m2.id, + agent: "general", + status: "busy", + }) + + SessionStatus.set(m1.id, { type: "busy" }) + SessionStatus.set(m2.id, { type: "busy" }) + + // Simulate what the session.abort route does: + // 1. Cancel lead session (SessionPrompt.cancel) + // 2. Find team by session → cancel all members + const match = await Team.findBySession(lead.id) + expect(match).toBeDefined() + expect(match!.role).toBe("lead") + + const cancelled = await Team.cancelAllMembers(match!.team.name) + expect(cancelled).toBe(2) + + expect(SessionStatus.get(m1.id).type).toBe("idle") + expect(SessionStatus.get(m2.id).type).toBe("idle") + + await Team.setMemberStatus("abort-prop-1", "worker-x", "shutdown") + await Team.setMemberStatus("abort-prop-1", "worker-y", "shutdown") + await Team.cleanup("abort-prop-1") + }, + }) + }) + + test("findBySession returns undefined for non-team session — no propagation", async () => { + await using tmp = await tmpdir({ git: true }) + + await Instance.provide({ + directory: tmp.path, + fn: async () => { + const standalone = await Session.create({}) + const match = await Team.findBySession(standalone.id) + expect(match).toBeUndefined() + // cancelAllMembers would not be called — no-op + }, + }) + }) + + test("member abort does not cascade to other members", async () => { + await using tmp = await tmpdir({ git: true }) + + await Instance.provide({ + directory: tmp.path, + fn: async () => { + const lead = await Session.create({}) + await Team.create({ name: "abort-prop-2", leadSessionID: lead.id }) + + const m1 = await Session.create({ parentID: lead.id }) + const m2 = await Session.create({ parentID: lead.id }) + + await Team.addMember("abort-prop-2", { + name: "member-a", + sessionID: m1.id, + agent: "general", + status: "busy", + }) + await Team.addMember("abort-prop-2", { + name: "member-b", + sessionID: m2.id, + agent: "general", + status: "busy", + }) + + SessionStatus.set(m1.id, { type: "busy" }) + SessionStatus.set(m2.id, { type: "busy" }) + + // When a member session is aborted, findBySession returns "member" role + const match = await Team.findBySession(m1.id) + expect(match).toBeDefined() + expect(match!.role).toBe("member") + + // The route only propagates for role === "lead", so member-b stays busy + // (cancelAllMembers is NOT called for member aborts) + expect(SessionStatus.get(m2.id).type).toBe("busy") + + await Team.setMemberStatus("abort-prop-2", "member-a", "shutdown") + await Team.setMemberStatus("abort-prop-2", "member-b", "shutdown") + await Team.cleanup("abort-prop-2") + }, + }) + }) +}) + +describe("Cancel vs finish notification", () => { + test("cancelMember marks session as cancelled so notifyLead can distinguish from natural finish", async () => { + await using tmp = await tmpdir({ git: true }) + + await Instance.provide({ + directory: tmp.path, + fn: async () => { + const lead = await Session.create({}) + await Team.create({ name: "cancel-notify-1", leadSessionID: lead.id }) + + const m1 = await Session.create({ parentID: lead.id }) + const m2 = await Session.create({ parentID: lead.id }) + + await Team.addMember("cancel-notify-1", { + name: "will-cancel", + sessionID: m1.id, + agent: "general", + status: "busy", + }) + await Team.addMember("cancel-notify-1", { + name: "not-cancelled", + sessionID: m2.id, + agent: "general", + status: "busy", + }) + + SessionStatus.set(m1.id, { type: "busy" }) + + // Cancel one member + const ok = await Team.cancelMember("cancel-notify-1", "will-cancel") + expect(ok).toBe(true) + + // cancelAllMembers also marks sessions + SessionStatus.set(m2.id, { type: "busy" }) + const count = await Team.cancelAllMembers("cancel-notify-1") + // m1 is no longer active (was cancelled above), only m2 gets cancelled + // But m1 status wasn't updated to non-active in Team storage by cancelMember + // (cancelMember only calls SessionPrompt.cancel, doesn't update member status) + // So cancelAllMembers may try m1 again — but it's still "busy" in storage + expect(count).toBeGreaterThanOrEqual(1) + + await Team.setMemberStatus("cancel-notify-1", "will-cancel", "shutdown") + await Team.setMemberStatus("cancel-notify-1", "not-cancelled", "shutdown") + await Team.cleanup("cancel-notify-1") + }, + }) + }) +}) diff --git a/packages/opencode/test/team/team-persistence.test.ts b/packages/opencode/test/team/team-persistence.test.ts new file mode 100644 index 000000000000..ec2f23c36d2b --- /dev/null +++ b/packages/opencode/test/team/team-persistence.test.ts @@ -0,0 +1,198 @@ +import { describe, expect, test } from "bun:test" +import path from "path" +import fs from "fs/promises" +import { Instance } from "../../src/project/instance" +import { Team, TeamTasks } from "../../src/team" +import { Env } from "../../src/env" +import { Log } from "../../src/util/log" + +Log.init({ print: false }) + +/** + * Tests that team state persists via the Storage namespace and can be read + * back after a simulated server restart (same Instance.provide context since + * Storage is global, keyed by project.id). + * + * Each test must clean up its teams to avoid polluting other tests. + */ +describe("Team persistence across restarts", () => { + test("Team.get reads team created in a previous context", async () => { + const dir = await fs.mkdtemp(path.join(import.meta.dir, ".tmp-persist-")) + + try { + await Instance.provide({ + directory: dir, + init: async () => Env.set("ANTHROPIC_API_KEY", "test-key"), + fn: async () => { + await Team.create({ + name: "persist-test", + leadSessionID: "ses_lead_abc", + }) + await Team.addMember("persist-test", { + name: "worker-1", + sessionID: "ses_worker_1", + agent: "general", + status: "busy", + prompt: "do stuff", + model: "anthropic/claude-sonnet-4-20250514", + planApproval: "none", + }) + + // Verify data persists via API (Storage is global, not file-local) + const team = await Team.get("persist-test") + expect(team).toBeDefined() + expect(team!.name).toBe("persist-test") + expect(team!.leadSessionID).toBe("ses_lead_abc") + expect(team!.members).toHaveLength(1) + expect(team!.members[0].name).toBe("worker-1") + expect(team!.members[0].status).toBe("busy") + expect(team!.members[0].model).toBe("anthropic/claude-sonnet-4-20250514") + + // Cleanup + await Team.setMemberStatus("persist-test", "worker-1", "shutdown") + await Team.cleanup("persist-test") + }, + }) + } finally { + await fs.rm(dir, { recursive: true, force: true }) + } + }) + + test("Team.list finds all teams after restart", async () => { + const dir = await fs.mkdtemp(path.join(import.meta.dir, ".tmp-persist-")) + + try { + await Instance.provide({ + directory: dir, + init: async () => Env.set("ANTHROPIC_API_KEY", "test-key"), + fn: async () => { + await Team.create({ name: "alpha", leadSessionID: "ses_alpha_p" }) + await Team.create({ name: "beta", leadSessionID: "ses_beta_p" }) + + const teams = await Team.list() + const names = teams.map((t) => t.name).sort() + expect(names).toContain("alpha") + expect(names).toContain("beta") + + // Cleanup + await Team.cleanup("alpha") + await Team.cleanup("beta") + }, + }) + } finally { + await fs.rm(dir, { recursive: true, force: true }) + } + }) + + test("Team.findBySession works after restart", async () => { + const dir = await fs.mkdtemp(path.join(import.meta.dir, ".tmp-persist-")) + + try { + await Instance.provide({ + directory: dir, + init: async () => Env.set("ANTHROPIC_API_KEY", "test-key"), + fn: async () => { + await Team.create({ name: "find-test", leadSessionID: "ses_lead_find_p" }) + await Team.addMember("find-test", { + name: "searcher", + sessionID: "ses_member_find_p", + agent: "explore", + status: "busy", + prompt: "search", + planApproval: "none", + }) + + // Find lead + const lead = await Team.findBySession("ses_lead_find_p") + expect(lead).toBeDefined() + expect(lead!.role).toBe("lead") + expect(lead!.team.name).toBe("find-test") + + // Find member + const member = await Team.findBySession("ses_member_find_p") + expect(member).toBeDefined() + expect(member!.role).toBe("member") + expect(member!.memberName).toBe("searcher") + + // Non-existent session + const none = await Team.findBySession("ses_nonexistent_p") + expect(none).toBeUndefined() + + // Cleanup + await Team.setMemberStatus("find-test", "searcher", "shutdown") + await Team.cleanup("find-test") + }, + }) + } finally { + await fs.rm(dir, { recursive: true, force: true }) + } + }) + + test("TeamTasks persist after restart", async () => { + const dir = await fs.mkdtemp(path.join(import.meta.dir, ".tmp-persist-")) + + try { + await Instance.provide({ + directory: dir, + init: async () => Env.set("ANTHROPIC_API_KEY", "test-key"), + fn: async () => { + await Team.create({ name: "tasks-test", leadSessionID: "ses_tasks_p" }) + await TeamTasks.add("tasks-test", [ + { id: "t1", content: "Research", status: "completed", priority: "high" }, + { id: "t2", content: "Implement", status: "pending", priority: "high", depends_on: ["t1"] }, + { id: "t3", content: "Test", status: "pending", priority: "medium", depends_on: ["t2"] }, + ]) + + const tasks = await TeamTasks.list("tasks-test") + expect(tasks).toHaveLength(3) + + const t1 = tasks.find((t) => t.id === "t1") + expect(t1!.status).toBe("completed") + + const t2 = tasks.find((t) => t.id === "t2") + expect(t2!.status).toBe("pending") + + const t3 = tasks.find((t) => t.id === "t3") + expect(t3!.status).toBe("blocked") + + // Cleanup + await Team.cleanup("tasks-test") + }, + }) + } finally { + await fs.rm(dir, { recursive: true, force: true }) + } + }) + + test("Member status updates persist after restart", async () => { + const dir = await fs.mkdtemp(path.join(import.meta.dir, ".tmp-persist-")) + + try { + await Instance.provide({ + directory: dir, + init: async () => Env.set("ANTHROPIC_API_KEY", "test-key"), + fn: async () => { + await Team.create({ name: "status-test", leadSessionID: "ses_st_p" }) + await Team.addMember("status-test", { + name: "agent-a", + sessionID: "ses_a_p", + agent: "general", + status: "busy", + prompt: "work", + planApproval: "none", + }) + await Team.setMemberStatus("status-test", "agent-a", "ready") + + const team = await Team.get("status-test") + expect(team!.members[0].status).toBe("ready") + + // Cleanup + await Team.setMemberStatus("status-test", "agent-a", "shutdown") + await Team.cleanup("status-test") + }, + }) + } finally { + await fs.rm(dir, { recursive: true, force: true }) + } + }) +}) diff --git a/packages/opencode/test/team/team-recovery-e2e.test.ts b/packages/opencode/test/team/team-recovery-e2e.test.ts new file mode 100644 index 000000000000..1050e4969fcd --- /dev/null +++ b/packages/opencode/test/team/team-recovery-e2e.test.ts @@ -0,0 +1,321 @@ +import { afterAll, beforeAll, beforeEach, describe, expect, test } from "bun:test" +import path from "path" +import { Instance } from "../../src/project/instance" +import { Team } from "../../src/team" +import { TeamMessaging } from "../../src/team/messaging" +import { Session } from "../../src/session" +import { SessionPrompt } from "../../src/session/prompt" +import { SessionStatus } from "../../src/session/status" +import { Identifier } from "../../src/id/id" +import { Log } from "../../src/util/log" +import { Bus } from "../../src/bus" +import { TeamEvent } from "../../src/team/events" +import { tmpdir } from "../fixture/fixture" + +Log.init({ print: false }) + +// ---------- Mock Anthropic SSE server ---------- + +const serverState = { + server: null as ReturnType | null, + responses: [] as Array<{ response: Response }>, + requests: [] as Array<{ url: string; body: any }>, +} + +function anthropicSSE(text: string) { + const chunks = [ + { + type: "message_start", + message: { + id: "msg-recovery-test", + model: "claude-3-5-sonnet-20241022", + usage: { + input_tokens: 10, + cache_creation_input_tokens: null, + cache_read_input_tokens: null, + }, + }, + }, + { + type: "content_block_start", + index: 0, + content_block: { type: "text", text: "" }, + }, + { + type: "content_block_delta", + index: 0, + delta: { type: "text_delta", text }, + }, + { type: "content_block_stop", index: 0 }, + { + type: "message_delta", + delta: { stop_reason: "end_turn", stop_sequence: null, container: null }, + usage: { + input_tokens: 10, + output_tokens: 5, + cache_creation_input_tokens: null, + cache_read_input_tokens: null, + }, + }, + { type: "message_stop" }, + ] + + const payload = chunks.map((c) => `event: ${c.type}\ndata: ${JSON.stringify(c)}`).join("\n\n") + "\n\n" + const encoder = new TextEncoder() + return new Response( + new ReadableStream({ + start(controller) { + controller.enqueue(encoder.encode(payload)) + controller.close() + }, + }), + { + status: 200, + headers: { "Content-Type": "text/event-stream" }, + }, + ) +} + +function queueResponse(text: string) { + serverState.responses.push({ response: anthropicSSE(text) }) +} + +beforeAll(() => { + serverState.server = Bun.serve({ + port: 0, + async fetch(req) { + const body = await req.json().catch(() => ({})) + serverState.requests.push({ url: req.url, body }) + const next = serverState.responses.shift() + if (!next) return anthropicSSE("(no queued response)") + return next.response + }, + }) +}) + +beforeEach(() => { + serverState.responses.length = 0 + serverState.requests.length = 0 +}) + +afterAll(() => { + serverState.server?.stop() +}) + +describe("Team recovery e2e: full restart cycle", () => { + test("recovery marks active members interrupted, team_message auto-wakes them", async () => { + const server = serverState.server! + + await using tmp = await tmpdir({ + git: true, + init: async (dir) => { + await Bun.write( + path.join(dir, "opencode.json"), + JSON.stringify({ + $schema: "https://opencode.ai/config.json", + enabled_providers: ["anthropic"], + provider: { + anthropic: { + options: { + apiKey: "test-anthropic-key", + baseURL: `${server.url.origin}/v1`, + }, + }, + }, + }), + ) + }, + }) + + let leadSessionID: string + let memberSessionID: string + + // ===== PHASE 1: First boot — create team with active teammate ===== + await Instance.provide({ + directory: tmp.path, + fn: async () => { + // Create lead session + const leadSession = await Session.create({}) + leadSessionID = leadSession.id + + // Create a user message in lead session (needed for message injection later) + const leadMsgId = Identifier.ascending("message") + await Session.updateMessage({ + id: leadMsgId, + sessionID: leadSession.id, + role: "user", + agent: "build", + model: { providerID: "anthropic", modelID: "claude-3-5-sonnet-20241022" }, + time: { created: Date.now() }, + }) + await Session.updatePart({ + id: Identifier.ascending("part"), + messageID: leadMsgId, + sessionID: leadSession.id, + type: "text", + text: "Create a team and spawn a researcher", + }) + + // Create team + await Team.create({ + name: "recovery-e2e", + leadSessionID: leadSession.id, + }) + + // Create child session for the teammate + const memberSession = await Session.create({ + parentID: leadSession.id, + title: "researcher (teammate)", + }) + memberSessionID = memberSession.id + + // Register as team member with status "busy" + await Team.addMember("recovery-e2e", { + name: "researcher", + sessionID: memberSession.id, + agent: "explore", + status: "busy", + prompt: "Research the session module", + model: "anthropic/claude-3-5-sonnet-20241022", + planApproval: "none", + }) + + // Create a user message in the member session (needed for loop to work) + const memberMsgId = Identifier.ascending("message") + await Session.updateMessage({ + id: memberMsgId, + sessionID: memberSession.id, + role: "user", + agent: "explore", + model: { providerID: "anthropic", modelID: "claude-3-5-sonnet-20241022" }, + time: { created: Date.now() }, + }) + await Session.updatePart({ + id: Identifier.ascending("part"), + messageID: memberMsgId, + sessionID: memberSession.id, + type: "text", + text: "You are researcher, a teammate. Research the session module.", + }) + + // Verify setup + const team = await Team.get("recovery-e2e") + expect(team).toBeDefined() + expect(team!.members).toHaveLength(1) + expect(team!.members[0].status).toBe("busy") + expect(team!.members[0].sessionID).toBe(memberSession.id) + }, + }) + + // ===== PHASE 2: "Server dies" — instance is gone, no loops running ===== + // (Instance.provide already disposed the context above) + + // ===== PHASE 3: Second boot — recovery runs ===== + await Instance.provide({ + directory: tmp.path, + fn: async () => { + // Run recovery + const result = await Team.recover() + expect(result.interrupted).toBe(1) + + // Verify member is now "ready" + const team = await Team.get("recovery-e2e") + expect(team!.members[0].status).toBe("ready") + + // Verify lead session got a notification message + const msgs = await Session.messages({ sessionID: leadSessionID! }) + const lastMsg = msgs[msgs.length - 1] + expect(lastMsg.info.role).toBe("user") + const textParts = lastMsg.parts.filter((p) => p.type === "text") + const hasNotification = textParts.some( + (p) => p.type === "text" && p.text.includes("Server was restarted"), + ) + expect(hasNotification).toBe(true) + + // ===== PHASE 4: User says "continue" — lead LLM sends team_message ===== + // Simulate what the LLM would do: send a team_message to the researcher + // Queue a mock LLM response for the researcher's auto-waked loop + queueResponse("I have resumed my research after the restart.") + + // Verify the member session is idle (no loop running) + const statusBefore = SessionStatus.get(memberSessionID!) + expect(statusBefore.type).toBe("idle") + + // Send team_message — this should trigger auto-wake + await TeamMessaging.send({ + teamName: "recovery-e2e", + from: "lead", + to: "researcher", + text: "Continue your work on the session module research.", + }) + + // Give the auto-waked loop time to process + await Bun.sleep(500) + + // Verify the teammate's loop ran (mock LLM was called) + const anthropicRequests = serverState.requests.filter((r) => + r.url.includes("/v1/messages"), + ) + expect(anthropicRequests.length).toBeGreaterThanOrEqual(1) + + // Wait for loop to fully complete + await Bun.sleep(500) + + // The session status should return to idle after the loop finishes + const statusAfter = SessionStatus.get(memberSessionID!) + expect(statusAfter.type).toBe("idle") + }, + }) + }) + + test("recovery with no active members is a no-op", async () => { + const server = serverState.server! + + await using tmp = await tmpdir({ + git: true, + init: async (dir) => { + await Bun.write( + path.join(dir, "opencode.json"), + JSON.stringify({ + $schema: "https://opencode.ai/config.json", + enabled_providers: ["anthropic"], + provider: { + anthropic: { + options: { + apiKey: "test-anthropic-key", + baseURL: `${server.url.origin}/v1`, + }, + }, + }, + }), + ) + }, + }) + + await Instance.provide({ + directory: tmp.path, + fn: async () => { + const leadSession = await Session.create({}) + await Team.create({ name: "noop-team", leadSessionID: leadSession.id }) + await Team.addMember("noop-team", { + name: "worker", + sessionID: "ses_fake", + agent: "general", + status: "ready", + planApproval: "none", + }) + }, + }) + + await Instance.provide({ + directory: tmp.path, + fn: async () => { + const result = await Team.recover() + expect(result.interrupted).toBe(0) + + const team = await Team.get("noop-team") + expect(team!.members[0].status).toBe("ready") + }, + }) + }) +}) diff --git a/packages/opencode/test/team/team-recovery.test.ts b/packages/opencode/test/team/team-recovery.test.ts new file mode 100644 index 000000000000..acf4351b639b --- /dev/null +++ b/packages/opencode/test/team/team-recovery.test.ts @@ -0,0 +1,324 @@ +import { describe, expect, test } from "bun:test" +import path from "path" +import fs from "fs/promises" +import { Instance } from "../../src/project/instance" +import { Team } from "../../src/team" +import { Session } from "../../src/session" +import { Env } from "../../src/env" +import { Log } from "../../src/util/log" + +Log.init({ print: false }) + +/** + * Tests for Team.recover() — marking active teammates as "ready" + * after a server restart so the user can explicitly resume them. + * + * Note: Since teams are now stored via the global Storage namespace (keyed by + * project.id), team data persists across Instance.provide() calls even with + * different directories — which is exactly what we want for recovery tests. + * Each test must clean up its teams afterward to avoid polluting other tests. + */ +describe("Team recovery after restart", () => { + test("marks active members as interrupted", async () => { + const dir = await fs.mkdtemp(path.join(import.meta.dir, ".tmp-recover-")) + + try { + await Instance.provide({ + directory: dir, + init: async () => Env.set("ANTHROPIC_API_KEY", "test-key"), + fn: async () => { + await Team.create({ + name: "recover-test", + leadSessionID: "ses_lead", + }) + await Team.addMember("recover-test", { + name: "worker-1", + sessionID: "ses_w1", + agent: "general", + status: "busy", + prompt: "work on stuff", + planApproval: "none", + }) + await Team.addMember("recover-test", { + name: "worker-2", + sessionID: "ses_w2", + agent: "explore", + status: "busy", + prompt: "research things", + planApproval: "none", + }) + + const result = await Team.recover() + expect(result.interrupted).toBe(2) + + const team = await Team.get("recover-test") + expect(team).toBeDefined() + expect(team!.members[0].status).toBe("ready") + expect(team!.members[1].status).toBe("ready") + + // Cleanup: mark all as shutdown so cleanup succeeds + await Team.setMemberStatus("recover-test", "worker-1", "shutdown") + await Team.setMemberStatus("recover-test", "worker-2", "shutdown") + await Team.cleanup("recover-test") + }, + }) + } finally { + await fs.rm(dir, { recursive: true, force: true }) + } + }) + + test("skips members with non-active status", async () => { + const dir = await fs.mkdtemp(path.join(import.meta.dir, ".tmp-recover-")) + + try { + await Instance.provide({ + directory: dir, + init: async () => Env.set("ANTHROPIC_API_KEY", "test-key"), + fn: async () => { + await Team.create({ + name: "recover-skip", + leadSessionID: "ses_lead_skip", + }) + await Team.addMember("recover-skip", { + name: "idle-worker", + sessionID: "ses_idle", + agent: "general", + status: "ready", + prompt: "done", + planApproval: "none", + }) + await Team.addMember("recover-skip", { + name: "shutdown-worker", + sessionID: "ses_shutdown", + agent: "general", + status: "shutdown", + prompt: "bye", + planApproval: "none", + }) + + const result = await Team.recover() + expect(result.interrupted).toBe(0) + + const team = await Team.get("recover-skip") + expect(team!.members[0].status).toBe("ready") + expect(team!.members[1].status).toBe("shutdown") + + // Cleanup + await Team.setMemberStatus("recover-skip", "idle-worker", "shutdown") + await Team.cleanup("recover-skip") + }, + }) + } finally { + await fs.rm(dir, { recursive: true, force: true }) + } + }) + + test("marks members as interrupted even when session exists", async () => { + const dir = await fs.mkdtemp(path.join(import.meta.dir, ".tmp-recover-")) + + try { + await Instance.provide({ + directory: dir, + init: async () => Env.set("ANTHROPIC_API_KEY", "test-key"), + fn: async () => { + const leadSession = await Session.create({}) + const memberSession = await Session.create({ parentID: leadSession.id }) + + await Team.create({ + name: "recover-real", + leadSessionID: leadSession.id, + }) + await Team.addMember("recover-real", { + name: "real-worker", + sessionID: memberSession.id, + agent: "general", + status: "busy", + prompt: "do real work", + planApproval: "none", + }) + + const result = await Team.recover() + expect(result.interrupted).toBe(1) + + const team = await Team.get("recover-real") + expect(team).toBeDefined() + expect(team!.members[0].status).toBe("ready") + + // Cleanup + await Team.setMemberStatus("recover-real", "real-worker", "shutdown") + await Team.cleanup("recover-real") + }, + }) + } finally { + await fs.rm(dir, { recursive: true, force: true }) + } + }) + + test("handles mix of active and non-active members", async () => { + const dir = await fs.mkdtemp(path.join(import.meta.dir, ".tmp-recover-")) + + try { + await Instance.provide({ + directory: dir, + init: async () => Env.set("ANTHROPIC_API_KEY", "test-key"), + fn: async () => { + await Team.create({ + name: "recover-mix", + leadSessionID: "ses_lead_mix", + }) + await Team.addMember("recover-mix", { + name: "worker-a", + sessionID: "ses_a", + agent: "general", + status: "busy", + prompt: "task a", + planApproval: "none", + }) + await Team.addMember("recover-mix", { + name: "worker-b", + sessionID: "ses_b", + agent: "explore", + status: "ready", + prompt: "task b", + planApproval: "none", + }) + await Team.addMember("recover-mix", { + name: "worker-c", + sessionID: "ses_c", + agent: "general", + status: "busy", + prompt: "task c", + planApproval: "none", + }) + + const result = await Team.recover() + expect(result.interrupted).toBe(2) + + const team = await Team.get("recover-mix") + expect(team!.members.find((m) => m.name === "worker-a")!.status).toBe("ready") + expect(team!.members.find((m) => m.name === "worker-b")!.status).toBe("ready") + expect(team!.members.find((m) => m.name === "worker-c")!.status).toBe("ready") + + // Cleanup + await Team.setMemberStatus("recover-mix", "worker-a", "shutdown") + await Team.setMemberStatus("recover-mix", "worker-b", "shutdown") + await Team.setMemberStatus("recover-mix", "worker-c", "shutdown") + await Team.cleanup("recover-mix") + }, + }) + } finally { + await fs.rm(dir, { recursive: true, force: true }) + } + }) + + test("returns zero when no teams exist", async () => { + const dir = await fs.mkdtemp(path.join(import.meta.dir, ".tmp-recover-")) + + try { + await Instance.provide({ + directory: dir, + init: async () => Env.set("ANTHROPIC_API_KEY", "test-key"), + fn: async () => { + const result = await Team.recover() + expect(result.interrupted).toBe(0) + }, + }) + } finally { + await fs.rm(dir, { recursive: true, force: true }) + } + }) + + test("handles multiple teams", async () => { + const dir = await fs.mkdtemp(path.join(import.meta.dir, ".tmp-recover-")) + + try { + await Instance.provide({ + directory: dir, + init: async () => Env.set("ANTHROPIC_API_KEY", "test-key"), + fn: async () => { + await Team.create({ name: "team-alpha", leadSessionID: "ses_alpha" }) + await Team.addMember("team-alpha", { + name: "alpha-1", + sessionID: "ses_a1", + agent: "general", + status: "busy", + prompt: "work", + planApproval: "none", + }) + + await Team.create({ name: "team-beta", leadSessionID: "ses_beta" }) + await Team.addMember("team-beta", { + name: "beta-1", + sessionID: "ses_b1", + agent: "explore", + status: "busy", + prompt: "research", + planApproval: "none", + }) + await Team.addMember("team-beta", { + name: "beta-2", + sessionID: "ses_b2", + agent: "general", + status: "busy", + prompt: "implement", + planApproval: "none", + }) + + const result = await Team.recover() + expect(result.interrupted).toBe(3) + + const alpha = await Team.get("team-alpha") + expect(alpha!.members[0].status).toBe("ready") + + const beta = await Team.get("team-beta") + expect(beta!.members[0].status).toBe("ready") + expect(beta!.members[1].status).toBe("ready") + + // Cleanup + await Team.setMemberStatus("team-alpha", "alpha-1", "shutdown") + await Team.cleanup("team-alpha") + await Team.setMemberStatus("team-beta", "beta-1", "shutdown") + await Team.setMemberStatus("team-beta", "beta-2", "shutdown") + await Team.cleanup("team-beta") + }, + }) + } finally { + await fs.rm(dir, { recursive: true, force: true }) + } + }) + + test("recover is idempotent — already interrupted members are skipped", async () => { + const dir = await fs.mkdtemp(path.join(import.meta.dir, ".tmp-recover-")) + + try { + await Instance.provide({ + directory: dir, + init: async () => Env.set("ANTHROPIC_API_KEY", "test-key"), + fn: async () => { + await Team.create({ name: "idem-test", leadSessionID: "ses_idem" }) + await Team.addMember("idem-test", { + name: "worker", + sessionID: "ses_w", + agent: "general", + status: "busy", + prompt: "work", + planApproval: "none", + }) + + const r1 = await Team.recover() + expect(r1.interrupted).toBe(1) + + // Already interrupted, skip + const r2 = await Team.recover() + expect(r2.interrupted).toBe(0) + + // Cleanup + await Team.setMemberStatus("idem-test", "worker", "shutdown") + await Team.cleanup("idem-test") + }, + }) + } finally { + await fs.rm(dir, { recursive: true, force: true }) + } + }) +})