diff --git a/docs/architecture.md b/docs/architecture.md index c9534f8..9627eb1 100644 --- a/docs/architecture.md +++ b/docs/architecture.md @@ -56,6 +56,8 @@ The primary service. Connects to Slack via Socket Mode, routes messages to Agent | `src/evolve/evolve-tools.ts` | Evolve MCP tools (`evolve_list`, `evolve_show`, `evolve_approve`, `evolve_reject`, `evolve_summarize_critical`) for the orchestrator | | `src/slack/preflight.ts` | Boot-time Slack cleanup β€” patches interrupted messages and removes dangling emoji reactions from previous crashes | | `src/slack/image-fetch.ts` | Authenticated download of Slack private image files; returns base64-encoded `ImageAttachment[]` | +| `src/slack/thread-registry.ts` | Thread connection persistence + lifecycle β€” in-memory maps (`agentName↔threadTs`), SQLite rows, 2-hour idle timers, startup recovery | +| `src/slack/thread-tools.ts` | Orchestrator-only MCP tools: `thread_connect` / `thread_disconnect` β€” manages registry, `:link:` reaction, Slack posts, and agent mail | | `src/comms/mail.ts` | Beads-backed inter-agent mail system with push delivery via EventEmitter. Uses `execFileSync` (not shell) to avoid injection. | | `src/comms/mail-tools.ts` | Mail MCP tools (`mail_send`, `mail_check`, `mail_read`, `mail_close`) | | `src/comms/mail-poller.ts` | Polls for orchestrator mail and triggers turns via `sendToAgent` | @@ -193,6 +195,23 @@ When the Agent SDK detects conversation compaction: 4. Handler updates message to "πŸ—œοΈ Conversation was compacted" ``` +### Thread Message Path + +When a Slack thread is connected to a Builder or Helper, messages in that thread bypass the orchestrator entirely: + +``` +1. User posts a reply in a thread that is connected to an agent +2. events.ts detects thread_ts on the message event +3. thread-registry.getByThread(thread_ts) returns the connected agent +4. mailSend({ to: agentName, subject: '[thread] ', ... }) forwards the message +5. touchActivity(agentName) resets the 2-hour idle timer +6. :eyes: reaction added briefly for acknowledgment, removed after 3s +7. Agent receives the mail and wakes up (mail-wakeup IPC) +8. Agent calls slack_reply with channel_id + thread_ts to reply directly in thread +``` + +Thread replies from unconnected threads fall through to normal processing. + ### Slash Commands | Command | Behavior | @@ -299,6 +318,7 @@ the daemon and the dashboard process. Schema lives in | `memories` | Mirror of `memory/entries/*.md` plus DB-only `recall_count` / `last_recalled_at`. Backed by `memories_fts` (FTS5 virtual table) for keyword search. | `.md` files | | `transcript_index` | First/last timestamp + mtime per `~/.claude/projects/**/.jsonl`. Lets the dashboard avoid per-session partial reads. | SDK JSONL files | | `db_meta` | Generic key/value (e.g. `memories.last_reconciled_at`). | DB | +| `thread_connections` | Active Slack thread ↔ agent links. `agent_name` PK + `thread_ts` UNIQUE enforces 0-or-1 at DB level. `last_activity_at` (Unix ms) drives idle-timeout recovery on restart. | DB | **Boundaries:** opaque operational data (usage rows) lives in the DB. User- editable files (`config.json`, `agents.json`, `*.md`) stay as files; the DB @@ -530,7 +550,7 @@ pnpm --filter @friday/cli exec vitest run src/commands/start.test.ts | `@friday/memory` | `store.test.ts`, `search.test.ts` | Memory CRUD, serialization roundtrip, recall tracking, hybrid search scoring, tag filtering, recall frequency boosting, event logging | | `@friday/evolve` | `store.test.ts`, `scan.test.ts`, `rank.test.ts`, `propose.test.ts`, `clusters.test.ts`, `apply.test.ts` | Proposal CRUD + frontmatter roundtrip; deterministic scanners with `scheduled-meta-*` self-exclusion (daemon, feedback, usage spike, transcript retry); scoring + critical thresholds; merge-by-hash and rerank-all; Jaccard cluster merge with union-find; apply pipeline for memory/prompt/config/code (code dispatches via injected `bd` runner β€” asserts epic body, mail labels, error propagation, self-modification guard) | | `@friday/cli` | `help.test.ts`, `services.test.ts`, `state.test.ts`, `migrate.test.ts`, `tmux.ts` (no test β€” exec wrapper), `freshness.test.ts`, command tests for `start/stop/restart/status/attach/logs/reset-orchestrator/inspect/transcript/schedule/setup/usage/config/doctor` | Help text, state-backed PID management, isRunning, parseServiceArg, findMonorepoRoot, state file round-trip + atomicity, legacy pids β†’ state migration with PID validation, prod artifact freshness check, full command surface including `--dev` paths, mode-preserving restart and assertion-flag rejection, four-state status with `--json` contract, log tail spanning rotated `.gz` siblings, attach error paths and crashed-pane notice, recovery hints | -| `@friday/daemon` | `queue.test.ts`, `manager.test.ts`, `helpers.test.ts`, `usage.test.ts`, `config.test.ts`, `registry.test.ts`, `workspace.test.ts`, `workspace-guard.test.ts`, `prime.test.ts`, `client.test.ts`, `agent-tools.test.ts`, `preflight.test.ts`, `image-fetch.test.ts`, `agent-health.test.ts`, `file-tracker.test.ts`, `interrupt.test.ts`, `mail.test.ts`, `mail-poller.test.ts`, `lifecycle.test.ts`, `auto-recall.test.ts`, `events/bus.test.ts`, `events/server.test.ts`, `scheduler/scheduler.test.ts`, `scheduler/trigger.test.ts` | FIFO queue ops, session persistence, Slack helpers (including interrupt signal detection), usage logging, runtime config, agent registry CRUD, workspace/worktree lifecycle, builder workspace path guard (PreToolCall hook), system prompt generation, thinking indicator and status callbacks, MCP agent tools (including `agent_kill`/`agent_refork`), boot preflight cleanup, Slack image fetch and base64 encoding, 3-condition IPC stall detection and crash detection, turn-scoped file tracking sliding window, mail CRUD and push/poll delivery, fork-based agent supervisor (spawn/kill/SIGKILL-fallback/refork/multi-agent isolation/daemon restart restore), memory auto-recall context block assembly, EventBus publish/replay/ring buffer, SSE server endpoints/streaming/reconnect replay, scheduler check loop and cron parsing, scheduled agent triggering and state injection | +| `@friday/daemon` | `queue.test.ts`, `manager.test.ts`, `helpers.test.ts`, `usage.test.ts`, `config.test.ts`, `registry.test.ts`, `workspace.test.ts`, `workspace-guard.test.ts`, `prime.test.ts`, `client.test.ts`, `agent-tools.test.ts`, `preflight.test.ts`, `image-fetch.test.ts`, `agent-health.test.ts`, `file-tracker.test.ts`, `interrupt.test.ts`, `mail.test.ts`, `mail-poller.test.ts`, `lifecycle.test.ts`, `auto-recall.test.ts`, `events/bus.test.ts`, `events/server.test.ts`, `scheduler/scheduler.test.ts`, `scheduler/trigger.test.ts`, `thread-registry.test.ts`, `thread-tools.test.ts` | FIFO queue ops, session persistence, Slack helpers (including interrupt signal detection, addReaction/removeReaction error classification), usage logging, runtime config, agent registry CRUD, workspace/worktree lifecycle, builder workspace path guard (PreToolCall hook), system prompt generation, thinking indicator and status callbacks, MCP agent tools (including `agent_kill`/`agent_refork`), boot preflight cleanup, Slack image fetch and base64 encoding, 3-condition IPC stall detection and crash detection, turn-scoped file tracking sliding window, mail CRUD and push/poll delivery, fork-based agent supervisor (spawn/kill/SIGKILL-fallback/refork/multi-agent isolation/daemon restart restore), memory auto-recall context block assembly, EventBus publish/replay/ring buffer, SSE server endpoints/streaming/reconnect replay, scheduler check loop and cron parsing, scheduled agent triggering and state injection, thread registry connect/disconnect/idle-timer/startup-recovery, thread_connect/thread_disconnect MCP tool handlers | ### Conventions diff --git a/docs/decisions.md b/docs/decisions.md index 454173a..9c69644 100644 --- a/docs/decisions.md +++ b/docs/decisions.md @@ -521,3 +521,37 @@ The MCP server name `"friday-evolve"` is **preserved** (it's a tool-call namespa - **Existing in-flight Beads epics** are not auto-migrated; a one-time script (`scripts/migrate-beads-to-linear.ts`, run separately at cutover, not shipped) ports active beads epics to Linear Backlog. Closed beads epics stay as historical record. - **Linear team config:** `FRIDAY_TEAM_ID` is hardcoded in `@friday/shared/src/linear.ts`. Acceptable because Friday's Linear team is a single shared workspace per install; changing it would be a deliberate operational decision. +## ADR-027: Thread Connections in Existing SQLite DB (Not a Separate File) + +**Status:** Accepted + +**Context:** The thread-linking feature needs to persist active Slack thread ↔ agent connections across daemon restarts and support two lookup patterns efficiently: by `agent_name` (to check what thread an agent is connected to) and by `thread_ts` (to route incoming thread messages to the right agent). A 2-hour idle timeout also requires storing `last_activity_at` to recover remaining timer duration on restart. + +The options were: +1. A new separate SQLite file (`~/.friday/threads.db`) with a direct `better-sqlite3` client +2. A JSON file (`~/.friday/threads.json`) with in-memory maps + periodic flush +3. Add a `thread_connections` table to the existing `~/.friday/friday.db` via Drizzle + +**Decision:** Add `thread_connections` to the existing `friday.db` using a Drizzle schema addition and generated migration. No new packages, no new database file. + +```sql +CREATE TABLE thread_connections ( + agent_name TEXT PRIMARY KEY, -- enforces one thread per agent + channel_id TEXT NOT NULL, + thread_ts TEXT NOT NULL UNIQUE, -- enforces one agent per thread + last_activity_at INTEGER NOT NULL, -- Unix ms, for idle-timer recovery on restart + created_at INTEGER NOT NULL +); +``` + +**Reasons:** +- `better-sqlite3` and Drizzle are already deps of `@friday/shared` β€” no install cost +- The DB's WAL mode, foreign-key enforcement, and `busy_timeout` are already configured +- Drizzle's `drizzle-kit generate` + `runMigrations()` on boot is the established pattern β€” zero ad-hoc SQL +- A JSON file would require O(n) scans for `getByThread`, manual conflict resolution, and a mtime-based reconciler on restart β€” all complexity already solved by SQLite +- `agent_name PRIMARY KEY` + `thread_ts UNIQUE` enforce the 0-or-1 constraint at the DB level, not just in application code + +**Consequences:** +- `thread_connections` is visible in the dashboard's DB alongside usage/memories. This is acceptable β€” it's operational data with the same access pattern. +- The `initThreadRegistry()` call at startup reads all rows, prunes expired ones (> 2h), and rebuilds in-memory maps. This is a sub-millisecond read at typical connection counts. +- On forced disconnect or crash without cleanup, stale rows are pruned silently on next startup β€” no manual intervention needed. diff --git a/packages/shared/drizzle/0001_aromatic_shocker.sql b/packages/shared/drizzle/0001_aromatic_shocker.sql new file mode 100644 index 0000000..d958baf --- /dev/null +++ b/packages/shared/drizzle/0001_aromatic_shocker.sql @@ -0,0 +1,9 @@ +CREATE TABLE `thread_connections` ( + `agent_name` text PRIMARY KEY NOT NULL, + `channel_id` text NOT NULL, + `thread_ts` text NOT NULL, + `last_activity_at` integer NOT NULL, + `created_at` integer NOT NULL +); +--> statement-breakpoint +CREATE UNIQUE INDEX `thread_connections_thread_ts_unique` ON `thread_connections` (`thread_ts`); \ No newline at end of file diff --git a/packages/shared/drizzle/meta/0001_snapshot.json b/packages/shared/drizzle/meta/0001_snapshot.json new file mode 100644 index 0000000..8c6f493 --- /dev/null +++ b/packages/shared/drizzle/meta/0001_snapshot.json @@ -0,0 +1,404 @@ +{ + "version": "6", + "dialect": "sqlite", + "id": "524a911a-b6f0-499a-a14a-4731e5429329", + "prevId": "1dc07bd8-8ed2-4de7-a875-5664b3f914a8", + "tables": { + "db_meta": { + "name": "db_meta", + "columns": { + "key": { + "name": "key", + "type": "text", + "primaryKey": true, + "notNull": true, + "autoincrement": false + }, + "value": { + "name": "value", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + } + }, + "indexes": {}, + "foreignKeys": {}, + "compositePrimaryKeys": {}, + "uniqueConstraints": {}, + "checkConstraints": {} + }, + "memories": { + "name": "memories", + "columns": { + "id": { + "name": "id", + "type": "text", + "primaryKey": true, + "notNull": true, + "autoincrement": false + }, + "title": { + "name": "title", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "content": { + "name": "content", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "tags": { + "name": "tags", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false, + "default": "'[]'" + }, + "created_by": { + "name": "created_by", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "created_at": { + "name": "created_at", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "updated_at": { + "name": "updated_at", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "file_mtime": { + "name": "file_mtime", + "type": "integer", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "recall_count": { + "name": "recall_count", + "type": "integer", + "primaryKey": false, + "notNull": true, + "autoincrement": false, + "default": 0 + }, + "last_recalled_at": { + "name": "last_recalled_at", + "type": "text", + "primaryKey": false, + "notNull": false, + "autoincrement": false + } + }, + "indexes": {}, + "foreignKeys": {}, + "compositePrimaryKeys": {}, + "uniqueConstraints": {}, + "checkConstraints": {} + }, + "thread_connections": { + "name": "thread_connections", + "columns": { + "agent_name": { + "name": "agent_name", + "type": "text", + "primaryKey": true, + "notNull": true, + "autoincrement": false + }, + "channel_id": { + "name": "channel_id", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "thread_ts": { + "name": "thread_ts", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "last_activity_at": { + "name": "last_activity_at", + "type": "integer", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "created_at": { + "name": "created_at", + "type": "integer", + "primaryKey": false, + "notNull": true, + "autoincrement": false + } + }, + "indexes": { + "thread_connections_thread_ts_unique": { + "name": "thread_connections_thread_ts_unique", + "columns": [ + "thread_ts" + ], + "isUnique": true + } + }, + "foreignKeys": {}, + "compositePrimaryKeys": {}, + "uniqueConstraints": {}, + "checkConstraints": {} + }, + "transcript_index": { + "name": "transcript_index", + "columns": { + "session_id": { + "name": "session_id", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "encoded_cwd": { + "name": "encoded_cwd", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "file_path": { + "name": "file_path", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "first_timestamp": { + "name": "first_timestamp", + "type": "text", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "last_timestamp": { + "name": "last_timestamp", + "type": "text", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "turn_count": { + "name": "turn_count", + "type": "integer", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "file_size_bytes": { + "name": "file_size_bytes", + "type": "integer", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "file_mtime": { + "name": "file_mtime", + "type": "integer", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "indexed_at": { + "name": "indexed_at", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + } + }, + "indexes": {}, + "foreignKeys": {}, + "compositePrimaryKeys": { + "transcript_index_session_id_encoded_cwd_pk": { + "columns": [ + "session_id", + "encoded_cwd" + ], + "name": "transcript_index_session_id_encoded_cwd_pk" + } + }, + "uniqueConstraints": {}, + "checkConstraints": {} + }, + "usage": { + "name": "usage", + "columns": { + "id": { + "name": "id", + "type": "integer", + "primaryKey": true, + "notNull": true, + "autoincrement": true + }, + "timestamp": { + "name": "timestamp", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "channel_id": { + "name": "channel_id", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false, + "default": "''" + }, + "session_type": { + "name": "session_type", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "session_id": { + "name": "session_id", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "agent_name": { + "name": "agent_name", + "type": "text", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "model": { + "name": "model", + "type": "text", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "cost_usd": { + "name": "cost_usd", + "type": "real", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "input_tokens": { + "name": "input_tokens", + "type": "integer", + "primaryKey": false, + "notNull": true, + "autoincrement": false, + "default": 0 + }, + "output_tokens": { + "name": "output_tokens", + "type": "integer", + "primaryKey": false, + "notNull": true, + "autoincrement": false, + "default": 0 + }, + "cache_creation_tokens": { + "name": "cache_creation_tokens", + "type": "integer", + "primaryKey": false, + "notNull": true, + "autoincrement": false, + "default": 0 + }, + "cache_read_tokens": { + "name": "cache_read_tokens", + "type": "integer", + "primaryKey": false, + "notNull": true, + "autoincrement": false, + "default": 0 + }, + "turn_number": { + "name": "turn_number", + "type": "integer", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "duration_ms": { + "name": "duration_ms", + "type": "integer", + "primaryKey": false, + "notNull": false, + "autoincrement": false + } + }, + "indexes": { + "usage_session_timestamp": { + "name": "usage_session_timestamp", + "columns": [ + "session_id", + "timestamp" + ], + "isUnique": false + }, + "usage_channel_type": { + "name": "usage_channel_type", + "columns": [ + "channel_id", + "session_type", + "timestamp" + ], + "isUnique": false + }, + "usage_agent_timestamp": { + "name": "usage_agent_timestamp", + "columns": [ + "agent_name", + "timestamp" + ], + "isUnique": false + }, + "usage_timestamp": { + "name": "usage_timestamp", + "columns": [ + "timestamp" + ], + "isUnique": false + } + }, + "foreignKeys": {}, + "compositePrimaryKeys": {}, + "uniqueConstraints": {}, + "checkConstraints": {} + } + }, + "views": {}, + "enums": {}, + "_meta": { + "schemas": {}, + "tables": {}, + "columns": {} + }, + "internal": { + "indexes": {} + } +} \ No newline at end of file diff --git a/packages/shared/drizzle/meta/_journal.json b/packages/shared/drizzle/meta/_journal.json index 67ba057..94cc30d 100644 --- a/packages/shared/drizzle/meta/_journal.json +++ b/packages/shared/drizzle/meta/_journal.json @@ -8,6 +8,13 @@ "when": 1777260602358, "tag": "0000_same_fabian_cortez", "breakpoints": true + }, + { + "idx": 1, + "version": "6", + "when": 1777784192230, + "tag": "0001_aromatic_shocker", + "breakpoints": true } ] } \ No newline at end of file diff --git a/packages/shared/src/db/index.ts b/packages/shared/src/db/index.ts index 29316ea..d7530a7 100644 --- a/packages/shared/src/db/index.ts +++ b/packages/shared/src/db/index.ts @@ -4,3 +4,4 @@ export * from "./meta.js"; export * from "./usage.js"; export * from "./memory.js"; export * from "./transcripts.js"; +export * from "./threads.js"; diff --git a/packages/shared/src/db/schema.ts b/packages/shared/src/db/schema.ts index d9d0f58..7f55714 100644 --- a/packages/shared/src/db/schema.ts +++ b/packages/shared/src/db/schema.ts @@ -81,9 +81,23 @@ export const dbMeta = sqliteTable("db_meta", { value: text("value").notNull(), }); +// ── Thread connections ─────────────────────────────────────────────────────── +// Bidirectional link between a Slack thread and a running agent. +// Enforces 0-or-1: agent_name PK means one thread per agent; thread_ts UNIQUE +// means one agent per thread. last_activity_at drives idle-timeout recovery. +export const threadConnections = sqliteTable("thread_connections", { + agentName: text("agent_name").primaryKey(), + channelId: text("channel_id").notNull(), + threadTs: text("thread_ts").notNull().unique(), + lastActivityAt: integer("last_activity_at").notNull(), // Unix ms + createdAt: integer("created_at").notNull(), // Unix ms +}); + export type UsageRow = typeof usage.$inferSelect; export type UsageInsert = typeof usage.$inferInsert; export type MemoryRow = typeof memories.$inferSelect; export type MemoryInsert = typeof memories.$inferInsert; export type TranscriptIndexRow = typeof transcriptIndex.$inferSelect; export type TranscriptIndexInsert = typeof transcriptIndex.$inferInsert; +export type ThreadConnectionRow = typeof threadConnections.$inferSelect; +export type ThreadConnectionInsert = typeof threadConnections.$inferInsert; diff --git a/packages/shared/src/db/threads.ts b/packages/shared/src/db/threads.ts new file mode 100644 index 0000000..78ad7fc --- /dev/null +++ b/packages/shared/src/db/threads.ts @@ -0,0 +1,40 @@ +import { eq } from "drizzle-orm"; +import { getDb } from "./client.js"; +import { threadConnections } from "./schema.js"; +import type { ThreadConnectionInsert, ThreadConnectionRow } from "./schema.js"; + +export function insertThreadConnection(row: ThreadConnectionInsert): void { + getDb().insert(threadConnections).values(row).run(); +} + +export function deleteThreadConnection(agentName: string): void { + getDb().delete(threadConnections).where(eq(threadConnections.agentName, agentName)).run(); +} + +export function getThreadConnectionByAgent(agentName: string): ThreadConnectionRow | undefined { + return getDb() + .select() + .from(threadConnections) + .where(eq(threadConnections.agentName, agentName)) + .get(); +} + +export function getThreadConnectionByThread(threadTs: string): ThreadConnectionRow | undefined { + return getDb() + .select() + .from(threadConnections) + .where(eq(threadConnections.threadTs, threadTs)) + .get(); +} + +export function updateThreadActivity(agentName: string, lastActivityAt: number): void { + getDb() + .update(threadConnections) + .set({ lastActivityAt }) + .where(eq(threadConnections.agentName, agentName)) + .run(); +} + +export function getAllThreadConnections(): ThreadConnectionRow[] { + return getDb().select().from(threadConnections).all(); +} diff --git a/services/friday/src/agent/lifecycle.ts b/services/friday/src/agent/lifecycle.ts index 665fbe2..d4c948d 100644 --- a/services/friday/src/agent/lifecycle.ts +++ b/services/friday/src/agent/lifecycle.ts @@ -27,6 +27,7 @@ import { recordActivity, clearActivity } from "../monitor/agent-health.js"; import { recordTurnFiles, clearFileTracking } from "../monitor/file-tracker.js"; import { eventBus } from "../events/bus.js"; import type { WorkerCommand, WorkerEvent, WorkerSpawnOptions } from "./worker-protocol.js"; +import { mailSend } from "../comms/mail.js"; // ── Worker path ──────────────────────────────────────────────────────────── @@ -277,6 +278,54 @@ export function getAgentStallState(name: string): StallState | null { return runningAgents.get(name)?.stall ?? null; } +// ── Thread connection notifications ────────────────────────────────────── + +/** + * Notify an agent that a Slack thread has been connected to it. + * The agent will receive the channel_id and thread_ts and should use + * slack_reply with thread_ts to post directly into the thread. + */ +export function notifyThreadConnect( + agentName: string, + channelId: string, + threadTs: string +): void { + mailSend({ + from: "orchestrator", + to: agentName, + subject: "Thread connected", + body: [ + "A Slack thread has been connected to you for direct communication.", + "", + `channel_id: ${channelId}`, + `thread_ts: ${threadTs}`, + "", + "How to use:", + "- User messages from this thread arrive as mail with subject `[thread] `.", + "- To reply into the thread: call `slack_reply` with the channel_id and thread_ts above.", + " This posts directly to the user without routing through the Orchestrator.", + "- The idle timeout is 2 hours, reset by any message in either direction.", + " If you go idle for 2 hours the connection is severed automatically.", + ].join("\n"), + }); +} + +/** + * Notify an agent that its Slack thread connection has been severed. + */ +export function notifyThreadDisconnect(agentName: string, reason: string): void { + mailSend({ + from: "orchestrator", + to: agentName, + subject: "Thread disconnected", + body: [ + `Your Slack thread connection has been severed (reason: ${reason}).`, + "", + "The thread link is gone. Resume communicating only through the Orchestrator via mail.", + ].join("\n"), + }); +} + // ── Restore on daemon restart ───────────────────────────────────────────── export function restoreActiveAgents( diff --git a/services/friday/src/agent/prime.ts b/services/friday/src/agent/prime.ts index 391f4c0..f0ce165 100644 --- a/services/friday/src/agent/prime.ts +++ b/services/friday/src/agent/prime.ts @@ -317,6 +317,19 @@ Use Slack mrkdwn β€” *bold*, \`code\`, bullet lists with β€’. NOT Markdown heade - \`schedule_create\` / \`schedule_list\` / \`schedule_show\` / \`schedule_preview\` / \`schedule_pause\` / \`schedule_resume\` / \`schedule_update\` / \`schedule_revert\` / \`schedule_delete\` / \`schedule_trigger\` β€” manage scheduled agents that run autonomously on cron schedules or one-shot timers. Scheduled agents do their work without your involvement, but can escalate to you via mail if they hit issues. - \`evolve_list\` / \`evolve_show\` / \`evolve_approve\` / \`evolve_reject\` / \`evolve_summarize_critical\` β€” the self-improvement backlog (see "Improvements backlog" below). +## Thread connections + +A thread connection links a Slack thread directly to a running Builder or Helper, letting the user converse with the agent without routing every message through you. + +- \`thread_connect(agent_name, channel_id, thread_ts, anchor_ts)\` β€” connects a Slack thread to an agent. \`anchor_ts\` is the message ts where the \`:link:\` reaction appears (usually the same as \`thread_ts\`). Posts a confirmation in the thread and mails the agent its connection details. Use when the user asks to "connect" or "link" a thread to an agent. +- \`thread_disconnect(agent_name)\` β€” severs the connection, removes the \`:link:\` reaction, and notifies the agent. + +Constraints: +- An agent can only be connected to one thread at a time. Connecting to a second thread auto-disconnects the first (the old thread gets a notice). +- A thread can only be connected to one agent. Attempting to connect a thread already owned by a different agent returns an error β€” disconnect the current owner first. + +Once connected, messages the user posts in that thread are forwarded directly to the agent as mail (subject: \`[thread] \`). The agent replies via \`slack_reply\` with \`thread_ts\`, bypassing you. The connection idles out after 2 hours of inactivity. + ## Improvements backlog A scheduled meta-agent (\`scheduled-meta-daily\`) scans Friday's own logs and writes proposed improvements to a backlog at \`~/.friday/evolve/proposals/\`. Each proposal is one of: \`memory\` (a lesson to remember), \`prompt\`/\`config\` (a tweak to your own brain), or \`code\` (work for a Builder). @@ -437,7 +450,15 @@ You cannot talk to the user. ALL communication goes through mail to the Orchestr - \`gh\` β€” GitHub operations (auth handled). Only use after receiving push approval. - \`bd\` β€” task tracking. All commands: \`cd ${BEADS_DIR} && bd ...\` - \`agent_create\` β€” spawn Helpers (not Builders) for subtasks -- Work exclusively within your workspace worktree. Commit locally and often. Do not push until told to.`; +- Work exclusively within your workspace worktree. Commit locally and often. Do not push until told to. + +## Thread connection + +You may be connected to a Slack thread for direct communication with the user. If so, you will receive a mail with subject **"Thread connected"** containing the \`channel_id\` and \`thread_ts\` for that thread. + +- User messages from the thread arrive as mail with subject **\`[thread] \`**. Handle them promptly β€” the idle timeout is 2 hours (reset by any message in either direction). +- To reply into the thread: call \`slack_reply\` with \`channel_id\` and \`thread_ts\`. This bypasses the Orchestrator and posts directly to the user. +- When you receive mail with subject **"Thread disconnected"**, the thread link is gone β€” resume communicating only through the Orchestrator.`; } // ── Helper ────────────────────────────────────────────────────── @@ -481,6 +502,14 @@ Include a summary of what you did and any issues encountered. - You cannot create other agents - You cannot talk to the user +## Thread connection + +You may be connected to a Slack thread for direct communication with the user. If so, you will receive a mail with subject **"Thread connected"** containing the \`channel_id\` and \`thread_ts\` for that thread. + +- User messages from the thread arrive as mail with subject **\`[thread] \`**. Handle them promptly β€” the idle timeout is 2 hours (reset by any message in either direction). +- To reply into the thread: call \`slack_reply\` with \`channel_id\` and \`thread_ts\`. This bypasses your parent and posts directly to the user. +- When you receive mail with subject **"Thread disconnected"**, the thread link is gone β€” resume communicating only through your parent. + ## Tools - \`gh\` β€” GitHub operations (auth handled) diff --git a/services/friday/src/agent/tools.ts b/services/friday/src/agent/tools.ts index 695ce8d..f8fdcf3 100644 --- a/services/friday/src/agent/tools.ts +++ b/services/friday/src/agent/tools.ts @@ -15,18 +15,27 @@ export function createSlackTools(client: WebClient) { tools: [ tool( "slack_reply", - "Post a message to the current Slack channel. Use this to send status updates, " + + "Post a message to a Slack channel or thread. Use this to send status updates, " + "progress reports, or intermediate results proactively β€” without waiting for " + - "the turn to complete. Each call posts a separate message.", + "the turn to complete. Each call posts a separate message. When connected to a " + + "Slack thread, pass thread_ts to reply directly into that thread.", { text: z.string().describe("The message text to post (supports Slack mrkdwn formatting)"), channel_id: z.string().describe("The Slack channel ID to post to"), + thread_ts: z + .string() + .optional() + .describe( + "Thread timestamp to reply in a thread. When set, the reply posts as a " + + "thread reply rather than a new channel message." + ), }, async (args) => { try { await client.chat.postMessage({ channel: args.channel_id, text: args.text, + ...(args.thread_ts ? { thread_ts: args.thread_ts } : {}), }); return { content: [{ type: "text" as const, text: "Message posted." }], diff --git a/services/friday/src/agent/worker.ts b/services/friday/src/agent/worker.ts index c102b77..8440f3a 100644 --- a/services/friday/src/agent/worker.ts +++ b/services/friday/src/agent/worker.ts @@ -8,7 +8,9 @@ */ import { query } from "@anthropic-ai/claude-agent-sdk"; +import { WebClient } from "@slack/web-api"; import { buildAgentSystemPrompt, buildFirstTurnPrompt } from "./prime.js"; +import { createSlackTools } from "./tools.js"; import { createMailTools } from "../comms/mail-tools.js"; import { mailCheck, mailEvents, buildMailPrompt } from "../comms/mail.js"; import { buildLinearMcpServer } from "../linear/mcp.js"; @@ -111,7 +113,9 @@ async function runAgentLoop( "with `bd ready --json` and continue where you left off."; } - // Reconstruct MCP servers inside the worker process + // Reconstruct MCP servers inside the worker process. + // Workers run in a child_process.fork() β€” live objects cannot be passed over + // IPC. Each server is reconstructed here from env/config available in the child. const mailMcp = createMailTools({ callerName: agentName }); const agentMcp = createAgentTools({ callerName: agentName, @@ -129,6 +133,13 @@ async function runAgentLoop( allMcpServers[LINEAR_MCP_NAME] = linearMcp; } + // Inject slack_reply (with thread_ts support) if bot token is available. + // This lets builders/helpers post directly to Slack and reply into connected threads. + const slackBotToken = process.env.SLACK_BOT_TOKEN; + if (slackBotToken) { + allMcpServers["friday-slack"] = createSlackTools(new WebClient(slackBotToken)); + } + const queryOptions: Record = { allowedTools, cwd, diff --git a/services/friday/src/index.ts b/services/friday/src/index.ts index 2df568b..5b0a444 100644 --- a/services/friday/src/index.ts +++ b/services/friday/src/index.ts @@ -29,6 +29,8 @@ import { createScheduleTools } from "./scheduler/schedule-tools.js"; import { seedScheduledMetaAgents } from "./evolve/seed.js"; import { createEvolveTools } from "./evolve/evolve-tools.js"; import { reconcileLinearTickets } from "./linear/reconcile.js"; +import { initThreadRegistry } from "./slack/thread-registry.js"; +import { createThreadTools } from "./slack/thread-tools.js"; async function main() { const startTime = Date.now(); @@ -131,6 +133,23 @@ async function main() { botUserId, }); + // Initialize thread registry: recovers live connections from SQLite and prunes + // expired ones. Must run after slackPreflight so the Slack client is ready. + initThreadRegistry({ + onIdleDisconnect: async (conn) => { + await app.client.chat + .postMessage({ + channel: conn.channelId, + text: "Disconnected (idle timeout).", + thread_ts: conn.threadTs, + }) + .catch(() => {}); + await app.client.reactions + .remove({ channel: conn.channelId, timestamp: conn.threadTs, name: "link" }) + .catch(() => {}); + }, + }); + // Mail poller: when agents mail the orchestrator, enqueue a turn through // the per-channel turn-queue so it serializes against in-flight Slack turns. // Building the prompt inside run() (not at notify time) keeps the mailbox @@ -185,6 +204,7 @@ async function main() { defaultCwd: config.agent.workingDirectory, }), "friday-evolve": createEvolveTools({ callerName: "orchestrator" }), + "friday-threads": createThreadTools(app.client), }, systemPrompt: buildSystemPrompt( config, diff --git a/services/friday/src/slack/events.ts b/services/friday/src/slack/events.ts index 3b0a6a1..21b52b3 100644 --- a/services/friday/src/slack/events.ts +++ b/services/friday/src/slack/events.ts @@ -52,6 +52,9 @@ import { createEvolveTools } from "../evolve/evolve-tools.js"; import { buildLinearMcpServer } from "../linear/mcp.js"; import { LINEAR_MCP_NAME } from "../linear/constants.js"; import { logFeedback } from "./feedback.js"; +import { getByThread, touchActivity } from "./thread-registry.js"; +import { createThreadTools } from "./thread-tools.js"; +import { mailSend } from "../comms/mail.js"; export function registerEventHandlers(app: App, config: RuntimeConfig): void { const orchestratorChannelId = config.slack.orchestratorChannelId; @@ -427,6 +430,29 @@ export function registerEventHandlers(app: App, config: RuntimeConfig): void { const ts = message.ts; const userId = message.user as string; + // Thread message routing: if this is a reply inside a connected thread, + // forward it directly to the connected agent and skip the orchestrator queue. + const msgThreadTs = rawMsg.thread_ts as string | undefined; + if (msgThreadTs && msgThreadTs !== ts) { + const threadConn = getByThread(msgThreadTs); + if (threadConn) { + mailSend({ + from: "orchestrator", + to: threadConn.agentName, + subject: `[thread] ${text}`, + body: text, + }); + touchActivity(threadConn.agentName); + // Show processing emoji briefly so the user knows the message was received + await client.reactions.add({ channel: channelId, timestamp: ts, name: emojis.processing }).catch(() => {}); + setTimeout(() => { + client.reactions.remove({ channel: channelId, timestamp: ts, name: emojis.processing }).catch(() => {}); + }, 3000); + return; + } + // Thread not connected β€” fall through to normal processing + } + // Fetch image attachments (non-image files and download failures are skipped) const images = hasFiles ? await fetchSlackImages(rawMsg.files, config.slackBotToken) @@ -599,6 +625,7 @@ export function registerEventHandlers(app: App, config: RuntimeConfig): void { defaultCwd: config.agent.workingDirectory, }), "friday-evolve": createEvolveTools({ callerName: "orchestrator" }), + "friday-threads": createThreadTools(client), }; const baseBare = { "friday-memory": createMemoryTools({ callerName: `bare-${channelId}` }), diff --git a/services/friday/src/slack/helpers.test.ts b/services/friday/src/slack/helpers.test.ts index 1d97a2c..ff034f8 100644 --- a/services/friday/src/slack/helpers.test.ts +++ b/services/friday/src/slack/helpers.test.ts @@ -1,4 +1,4 @@ -import { describe, it, expect } from "vitest"; +import { describe, it, expect, vi } from "vitest"; import { buildSystemPrompt, chunkMessage, @@ -8,6 +8,8 @@ import { buildSessionFields, buildBatchDisplayText, buildBatchContent, + addReaction, + removeReaction, } from "./helpers.js"; import type { QueuedMessage } from "../sessions/queue.js"; import type { RuntimeConfig } from "../config.js"; @@ -287,6 +289,84 @@ describe("buildBatchContent", () => { }); }); +// ── Reaction helpers ───────────────────────────────────────────────────── + +function makeClient(overrides: Record = {}) { + return { + reactions: { + add: vi.fn().mockResolvedValue({}), + remove: vi.fn().mockResolvedValue({}), + ...overrides, + }, + } as any; +} + +describe("addReaction", () => { + it("returns {ok: true} on success", async () => { + const client = makeClient(); + const result = await addReaction(client, "C001", "111.222", "link"); + expect(result).toEqual({ ok: true }); + expect(client.reactions.add).toHaveBeenCalledWith({ + channel: "C001", + timestamp: "111.222", + name: "link", + }); + }); + + it("returns permanent error for message_not_found", async () => { + const client = makeClient({ + add: vi.fn().mockRejectedValue({ data: { error: "message_not_found" } }), + }); + const result = await addReaction(client, "C001", "111.222", "link"); + expect(result).toEqual({ ok: false, type: "permanent", code: "message_not_found" }); + }); + + it("returns permanent error for already_reacted", async () => { + const client = makeClient({ + add: vi.fn().mockRejectedValue({ data: { error: "already_reacted" } }), + }); + const result = await addReaction(client, "C001", "111.222", "link"); + expect(result).toEqual({ ok: false, type: "permanent", code: "already_reacted" }); + }); + + it("returns transient error for ratelimited", async () => { + const client = makeClient({ + add: vi.fn().mockRejectedValue({ data: { error: "ratelimited" } }), + }); + const result = await addReaction(client, "C001", "111.222", "link"); + expect(result).toEqual({ ok: false, type: "transient", code: "ratelimited" }); + }); +}); + +describe("removeReaction", () => { + it("returns {ok: true} on success", async () => { + const client = makeClient(); + const result = await removeReaction(client, "C001", "111.222", "link"); + expect(result).toEqual({ ok: true }); + expect(client.reactions.remove).toHaveBeenCalledWith({ + channel: "C001", + timestamp: "111.222", + name: "link", + }); + }); + + it("returns permanent error for no_reaction", async () => { + const client = makeClient({ + remove: vi.fn().mockRejectedValue({ data: { error: "no_reaction" } }), + }); + const result = await removeReaction(client, "C001", "111.222", "link"); + expect(result).toEqual({ ok: false, type: "permanent", code: "no_reaction" }); + }); + + it("returns transient error for request_timeout", async () => { + const client = makeClient({ + remove: vi.fn().mockRejectedValue({ data: { error: "request_timeout" } }), + }); + const result = await removeReaction(client, "C001", "111.222", "link"); + expect(result).toEqual({ ok: false, type: "transient", code: "request_timeout" }); + }); +}); + describe("buildSessionFields", () => { it("builds fields with stats", () => { const stats = { diff --git a/services/friday/src/slack/helpers.ts b/services/friday/src/slack/helpers.ts index 9d74946..6beebba 100644 --- a/services/friday/src/slack/helpers.ts +++ b/services/friday/src/slack/helpers.ts @@ -1,5 +1,6 @@ import type { SessionType } from "@friday/shared"; import type { RuntimeConfig } from "../config.js"; +import type { WebClient } from "@slack/web-api"; import { buildAgentSystemPrompt } from "../agent/prime.js"; import type { QueuedMessage, MultimodalPrompt } from "../sessions/queue.js"; @@ -208,6 +209,55 @@ export function isInterruptSignal(text: string): boolean { return !falsePositive; } +// ── Slack reaction helpers ──────────────────────────────────────────────── + +export type ReactionError = { type: "permanent" | "transient"; code: string }; +export type ReactionResult = { ok: true } | ({ ok: false } & ReactionError); + +const PERMANENT_CODES = new Set([ + "message_not_found", + "channel_not_found", + "not_in_channel", + "already_reacted", + "no_reaction", + "invalid_name", +]); + +function classifyReactionError(err: unknown): ReactionResult { + const code = + (err as any)?.data?.error ?? (err as any)?.code ?? String(err); + const type = PERMANENT_CODES.has(code) ? "permanent" : "transient"; + return { ok: false, type, code }; +} + +export async function addReaction( + client: WebClient, + channel: string, + ts: string, + name: string +): Promise { + try { + await client.reactions.add({ channel, timestamp: ts, name }); + return { ok: true }; + } catch (err) { + return classifyReactionError(err); + } +} + +export async function removeReaction( + client: WebClient, + channel: string, + ts: string, + name: string +): Promise { + try { + await client.reactions.remove({ channel, timestamp: ts, name }); + return { ok: true }; + } catch (err) { + return classifyReactionError(err); + } +} + /** * Build the slash command response for /friday session. */ diff --git a/services/friday/src/slack/thread-registry.test.ts b/services/friday/src/slack/thread-registry.test.ts new file mode 100644 index 0000000..c9c4774 --- /dev/null +++ b/services/friday/src/slack/thread-registry.test.ts @@ -0,0 +1,221 @@ +import { describe, it, expect, beforeEach, afterEach, vi } from "vitest"; +import type { Mock } from "vitest"; + +vi.mock("../log.js", () => ({ log: vi.fn() })); + +// ── Mock DB helpers ─────────────────────────────────────────────────────── +// Use importOriginal so path constants (DAEMON_LOG_PATH etc.) are preserved +// while only the DB helpers are mocked. +vi.mock("@friday/shared", async (importOriginal) => { + const actual = await importOriginal(); + return { + ...actual, + insertThreadConnection: vi.fn(), + deleteThreadConnection: vi.fn(), + getThreadConnectionByAgent: vi.fn(), + getThreadConnectionByThread: vi.fn(), + updateThreadActivity: vi.fn(), + getAllThreadConnections: vi.fn(() => []), + }; +}); + +import { + insertThreadConnection, + deleteThreadConnection, + updateThreadActivity, + getAllThreadConnections, +} from "@friday/shared"; + +// Import after mocking so the module uses the mocked helpers +import { + connect, + disconnect, + getByAgent, + getByThread, + touchActivity, + initThreadRegistry, +} from "./thread-registry.js"; + +// ── Helpers ─────────────────────────────────────────────────────────────── + +/** Clear in-memory state between tests by re-initialising with empty DB */ +function resetRegistry() { + (getAllThreadConnections as Mock).mockReturnValue([]); + initThreadRegistry({ onIdleDisconnect: vi.fn() }); +} + +describe("thread-registry", () => { + beforeEach(() => { + vi.useFakeTimers(); + vi.clearAllMocks(); + resetRegistry(); + }); + + afterEach(() => { + vi.useRealTimers(); + }); + + // ── connect ────────────────────────────────────────────────────────── + + it("connect: inserts row and updates in-memory maps", () => { + const result = connect("builder-foo", "C001", "111.222"); + + expect(result.ok).toBe(true); + expect(insertThreadConnection).toHaveBeenCalledWith( + expect.objectContaining({ + agentName: "builder-foo", + channelId: "C001", + threadTs: "111.222", + }) + ); + + expect(getByAgent("builder-foo")).toMatchObject({ agentName: "builder-foo" }); + expect(getByThread("111.222")).toMatchObject({ agentName: "builder-foo" }); + }); + + it("connect: returns error when thread is already owned by a different agent", () => { + connect("builder-a", "C001", "111.222"); + const result = connect("builder-b", "C001", "111.222"); + + expect(result.ok).toBe(false); + if (!result.ok) { + expect(result.error).toMatch(/builder-a/); + } + }); + + it("connect: stolen β€” disconnects old thread when agent connects to new thread", () => { + connect("builder-a", "C001", "111.222"); + const result = connect("builder-a", "C001", "333.444"); + + expect(result.ok).toBe(true); + if (result.ok) { + expect(result.stolen).toMatchObject({ agentName: "builder-a", threadTs: "111.222" }); + } + // Old thread gone, new thread present + expect(getByThread("111.222")).toBeUndefined(); + expect(getByThread("333.444")).toMatchObject({ agentName: "builder-a" }); + expect(deleteThreadConnection).toHaveBeenCalledWith("builder-a"); + // insertThreadConnection called twice (original + new) + expect(insertThreadConnection).toHaveBeenCalledTimes(2); + }); + + // ── disconnect ─────────────────────────────────────────────────────── + + it("disconnect: removes row and clears maps", () => { + connect("builder-foo", "C001", "111.222"); + const result = disconnect("builder-foo", "manual"); + + expect(result).toMatchObject({ agentName: "builder-foo", threadTs: "111.222" }); + expect(deleteThreadConnection).toHaveBeenCalledWith("builder-foo"); + expect(getByAgent("builder-foo")).toBeUndefined(); + expect(getByThread("111.222")).toBeUndefined(); + }); + + it("disconnect: returns null when agent not connected", () => { + expect(disconnect("builder-none", "manual")).toBeNull(); + }); + + // ── getByAgent / getByThread ───────────────────────────────────────── + + it("getByAgent: returns undefined before connect", () => { + expect(getByAgent("builder-missing")).toBeUndefined(); + }); + + it("getByThread: returns connection after connect, undefined after disconnect", () => { + connect("builder-foo", "C001", "111.222"); + expect(getByThread("111.222")).toBeDefined(); + disconnect("builder-foo", "manual"); + expect(getByThread("111.222")).toBeUndefined(); + }); + + // ── touchActivity ──────────────────────────────────────────────────── + + it("touchActivity: updates last_activity_at in SQLite", () => { + connect("builder-foo", "C001", "111.222"); + vi.advanceTimersByTime(1000); + touchActivity("builder-foo"); + + expect(updateThreadActivity).toHaveBeenCalledWith("builder-foo", expect.any(Number)); + }); + + it("touchActivity: is a no-op when agent not connected", () => { + touchActivity("builder-gone"); + expect(updateThreadActivity).not.toHaveBeenCalled(); + }); + + // ── idle timer ─────────────────────────────────────────────────────── + + it("idle timer: fires after 2 hours and calls onIdleDisconnect", () => { + const onIdleDisconnect = vi.fn(); + initThreadRegistry({ onIdleDisconnect }); + + connect("builder-foo", "C001", "111.222"); + + vi.advanceTimersByTime(7_200_000); + + expect(onIdleDisconnect).toHaveBeenCalledWith( + expect.objectContaining({ agentName: "builder-foo" }) + ); + expect(getByAgent("builder-foo")).toBeUndefined(); + }); + + // ── initThreadRegistry ─────────────────────────────────────────────── + + it("initThreadRegistry: prunes expired rows silently", () => { + const now = Date.now(); + (getAllThreadConnections as Mock).mockReturnValue([ + { + agentName: "builder-old", + channelId: "C001", + threadTs: "111.222", + lastActivityAt: now - 8_000_000, // > 2h ago + createdAt: now - 8_000_000, + }, + ]); + + initThreadRegistry({ onIdleDisconnect: vi.fn() }); + + expect(deleteThreadConnection).toHaveBeenCalledWith("builder-old"); + expect(getByAgent("builder-old")).toBeUndefined(); + }); + + it("initThreadRegistry: restores live connections with maps populated", () => { + const now = Date.now(); + (getAllThreadConnections as Mock).mockReturnValue([ + { + agentName: "builder-live", + channelId: "C002", + threadTs: "222.333", + lastActivityAt: now - 1_000_000, // ~16 min ago β€” still live + createdAt: now - 1_000_000, + }, + ]); + + initThreadRegistry({ onIdleDisconnect: vi.fn() }); + + expect(getByAgent("builder-live")).toMatchObject({ agentName: "builder-live" }); + expect(getByThread("222.333")).toMatchObject({ agentName: "builder-live" }); + expect(deleteThreadConnection).not.toHaveBeenCalled(); + }); + + it("initThreadRegistry: restored connection fires idle timer with remaining time", () => { + const onIdleDisconnect = vi.fn(); + const now = Date.now(); + const elapsed = 3_600_000; // 1 hour elapsed + (getAllThreadConnections as Mock).mockReturnValue([ + { + agentName: "builder-half", + channelId: "C003", + threadTs: "333.444", + lastActivityAt: now - elapsed, + createdAt: now - elapsed, + }, + ]); + + initThreadRegistry({ onIdleDisconnect }); + + // Should fire after the remaining ~1h, not after 2h + vi.advanceTimersByTime(3_600_000); // advance remaining 1h + expect(onIdleDisconnect).toHaveBeenCalled(); + }); +}); diff --git a/services/friday/src/slack/thread-registry.ts b/services/friday/src/slack/thread-registry.ts new file mode 100644 index 0000000..3b136ac --- /dev/null +++ b/services/friday/src/slack/thread-registry.ts @@ -0,0 +1,219 @@ +import { + insertThreadConnection, + deleteThreadConnection, + getThreadConnectionByAgent, + getThreadConnectionByThread, + updateThreadActivity, + getAllThreadConnections, +} from "@friday/shared"; +import { log } from "../log.js"; + +// ── Types ───────────────────────────────────────────────────────────────── + +export interface ThreadConnection { + agentName: string; + channelId: string; + threadTs: string; + lastActivityAt: Date; + idleTimer: ReturnType; +} + +export type DisconnectReason = "idle_timeout" | "manual" | "stolen"; + +export interface DisconnectResult { + agentName: string; + channelId: string; + threadTs: string; +} + +export type ConnectResult = + | { ok: true; stolen?: DisconnectResult } + | { ok: false; error: string }; + +// ── In-memory state ─────────────────────────────────────────────────────── + +const byAgent = new Map(); +const byThread = new Map(); // threadTs β†’ agentName + +const IDLE_TIMEOUT_MS = 7_200_000; // 2 hours + +let _onIdleDisconnect: ((conn: DisconnectResult) => void) | null = null; + +// ── Internal helpers ────────────────────────────────────────────────────── + +function startIdleTimer(agentName: string): ReturnType { + return setTimeout(() => { + const conn = byAgent.get(agentName); + if (!conn) return; + + const result = disconnectInternal(agentName); + if (result && _onIdleDisconnect) { + _onIdleDisconnect(result); + } + log("info", "thread_idle_disconnect", { agentName }); + }, IDLE_TIMEOUT_MS); +} + +function disconnectInternal(agentName: string): DisconnectResult | null { + const conn = byAgent.get(agentName); + if (!conn) return null; + + clearTimeout(conn.idleTimer); + byAgent.delete(agentName); + byThread.delete(conn.threadTs); + deleteThreadConnection(agentName); + + return { + agentName: conn.agentName, + channelId: conn.channelId, + threadTs: conn.threadTs, + }; +} + +// ── Public API ──────────────────────────────────────────────────────────── + +/** + * Connect an agent to a Slack thread. Enforces 0-or-1 constraints: + * - If the agent is already connected to a different thread, that connection + * is severed first (stolen path) and returned in result.stolen. + * - If the thread is already connected to a different agent, returns an error. + */ +export function connect( + agentName: string, + channelId: string, + threadTs: string +): ConnectResult { + // Check if thread is already owned by a different agent + const existingAgentForThread = byThread.get(threadTs); + if (existingAgentForThread && existingAgentForThread !== agentName) { + return { + ok: false, + error: `Thread is already connected to agent "${existingAgentForThread}". Disconnect it first.`, + }; + } + + // If agent already has a connection to a different thread, steal it + let stolen: DisconnectResult | undefined; + const existingConnForAgent = byAgent.get(agentName); + if (existingConnForAgent && existingConnForAgent.threadTs !== threadTs) { + const result = disconnectInternal(agentName); + if (result) stolen = result; + } + + const now = Date.now(); + const idleTimer = startIdleTimer(agentName); + + const conn: ThreadConnection = { + agentName, + channelId, + threadTs, + lastActivityAt: new Date(now), + idleTimer, + }; + + byAgent.set(agentName, conn); + byThread.set(threadTs, agentName); + + insertThreadConnection({ + agentName, + channelId, + threadTs, + lastActivityAt: now, + createdAt: now, + }); + + log("info", "thread_connected", { agentName, channelId, threadTs, stolen: !!stolen }); + + return { ok: true, ...(stolen ? { stolen } : {}) }; +} + +/** + * Disconnect an agent from its thread. Returns connection info so the + * caller can post a Slack message and remove the :link: reaction. + */ +export function disconnect( + agentName: string, + _reason: DisconnectReason +): DisconnectResult | null { + return disconnectInternal(agentName); +} + +export function getByAgent(agentName: string): ThreadConnection | undefined { + return byAgent.get(agentName); +} + +export function getByThread(threadTs: string): ThreadConnection | undefined { + const agentName = byThread.get(threadTs); + if (!agentName) return undefined; + return byAgent.get(agentName); +} + +/** + * Reset the idle timer and update last_activity_at in SQLite. + */ +export function touchActivity(agentName: string): void { + const conn = byAgent.get(agentName); + if (!conn) return; + + clearTimeout(conn.idleTimer); + conn.idleTimer = startIdleTimer(agentName); + conn.lastActivityAt = new Date(); + updateThreadActivity(agentName, conn.lastActivityAt.getTime()); +} + +/** + * Called at daemon startup. Reads all rows from SQLite, prunes expired + * connections (> 2h since last activity) silently, and rebuilds in-memory + * maps with live connections, restarting idle timers. + */ +export function initThreadRegistry(opts: { + onIdleDisconnect: (conn: DisconnectResult) => void; +}): void { + _onIdleDisconnect = opts.onIdleDisconnect; + + // Clear existing in-memory state before rebuilding from DB. + // This ensures a clean slate whether called at startup or in tests. + for (const conn of byAgent.values()) clearTimeout(conn.idleTimer); + byAgent.clear(); + byThread.clear(); + + const rows = getAllThreadConnections(); + const now = Date.now(); + let restored = 0; + let pruned = 0; + + for (const row of rows) { + const age = now - row.lastActivityAt; + + if (age >= IDLE_TIMEOUT_MS) { + // Expired β€” prune silently (session is gone, no reaction removal) + deleteThreadConnection(row.agentName); + pruned++; + continue; + } + + // Live β€” rebuild in-memory state and restart timer with remaining time + const remaining = IDLE_TIMEOUT_MS - age; + const idleTimer = setTimeout(() => { + const result = disconnectInternal(row.agentName); + if (result && _onIdleDisconnect) { + _onIdleDisconnect(result); + } + log("info", "thread_idle_disconnect", { agentName: row.agentName }); + }, remaining); + + const conn: ThreadConnection = { + agentName: row.agentName, + channelId: row.channelId, + threadTs: row.threadTs, + lastActivityAt: new Date(row.lastActivityAt), + idleTimer, + }; + + byAgent.set(row.agentName, conn); + byThread.set(row.threadTs, row.agentName); + restored++; + } + + log("info", "thread_registry_init", { restored, pruned }); +} diff --git a/services/friday/src/slack/thread-tools.test.ts b/services/friday/src/slack/thread-tools.test.ts new file mode 100644 index 0000000..7e888db --- /dev/null +++ b/services/friday/src/slack/thread-tools.test.ts @@ -0,0 +1,213 @@ +import { describe, it, expect, vi, beforeEach } from "vitest"; +import type { Mock } from "vitest"; + +// ── Capture tool handlers via SDK mock ──────────────────────────────────── + +const capturedTools = new Map(); +vi.mock("@anthropic-ai/claude-agent-sdk", () => ({ + createSdkMcpServer: vi.fn(({ tools }: { tools: any[] }) => { + capturedTools.clear(); + for (const t of tools) capturedTools.set(t._name, t._handler); + return { type: "sdk", name: "friday-threads" }; + }), + tool: vi.fn( + (name: string, _desc: string, _schema: any, handler: Function) => ({ + _name: name, + _handler: handler, + }) + ), +})); + +// ── Other mocks ─────────────────────────────────────────────────────────── + +vi.mock("../sessions/registry.js", () => ({ + getAgent: vi.fn(), +})); + +vi.mock("./thread-registry.js", () => ({ + connect: vi.fn(), + disconnect: vi.fn(), + getByAgent: vi.fn(), + getByThread: vi.fn(), +})); + +vi.mock("./helpers.js", () => ({ + addReaction: vi.fn().mockResolvedValue({ ok: true }), + removeReaction: vi.fn().mockResolvedValue({ ok: true }), +})); + +vi.mock("../agent/lifecycle.js", () => ({ + notifyThreadConnect: vi.fn(), + notifyThreadDisconnect: vi.fn(), +})); + +vi.mock("../log.js", () => ({ log: vi.fn() })); + +// ── Import after mocking ────────────────────────────────────────────────── + +import { getAgent } from "../sessions/registry.js"; +import { + connect, + disconnect, + getByAgent, + getByThread, +} from "./thread-registry.js"; +import { addReaction, removeReaction } from "./helpers.js"; +import { notifyThreadConnect, notifyThreadDisconnect } from "../agent/lifecycle.js"; +import { createThreadTools } from "./thread-tools.js"; + +// ── Helpers ─────────────────────────────────────────────────────────────── + +function makeClient() { + return { + chat: { + postMessage: vi.fn().mockResolvedValue({ ts: "999.000" }), + }, + } as any; +} + +async function callTool( + client: any, + toolName: string, + args: Record +): Promise<{ text: string; isError?: boolean }> { + createThreadTools(client); // triggers createSdkMcpServer, populates capturedTools + const handler = capturedTools.get(toolName); + if (!handler) throw new Error(`Tool "${toolName}" not captured`); + const result = await handler(args); + return { text: result.content[0].text, isError: result.isError }; +} + +// ── Tests ───────────────────────────────────────────────────────────────── + +describe("thread_connect", () => { + beforeEach(() => { + vi.clearAllMocks(); + }); + + it("happy path: connects, adds :link: reaction, posts message, notifies agent", async () => { + const client = makeClient(); + (getAgent as Mock).mockReturnValue({ status: "active" }); + (getByThread as Mock).mockReturnValue(undefined); + (connect as Mock).mockReturnValue({ ok: true }); + + const result = await callTool(client, "thread_connect", { + agent_name: "builder-foo", + channel_id: "C001", + thread_ts: "111.222", + anchor_ts: "111.222", + }); + + expect(result.isError).toBeFalsy(); + expect(result.text).toContain("builder-foo"); + expect(addReaction).toHaveBeenCalledWith(expect.anything(), "C001", "111.222", "link"); + expect(client.chat.postMessage).toHaveBeenCalledWith( + expect.objectContaining({ thread_ts: "111.222" }) + ); + expect(notifyThreadConnect).toHaveBeenCalledWith("builder-foo", "C001", "111.222"); + }); + + it("returns error when agent not found", async () => { + const client = makeClient(); + (getAgent as Mock).mockReturnValue(undefined); + + const result = await callTool(client, "thread_connect", { + agent_name: "builder-missing", + channel_id: "C001", + thread_ts: "111.222", + anchor_ts: "111.222", + }); + + expect(result.isError).toBe(true); + expect(result.text).toContain("not found"); + expect(connect).not.toHaveBeenCalled(); + }); + + it("returns error when thread already owned by different agent", async () => { + const client = makeClient(); + (getAgent as Mock).mockReturnValue({ status: "active" }); + (getByThread as Mock).mockReturnValue({ + agentName: "builder-other", + channelId: "C001", + threadTs: "111.222", + }); + + const result = await callTool(client, "thread_connect", { + agent_name: "builder-new", + channel_id: "C001", + thread_ts: "111.222", + anchor_ts: "111.222", + }); + + expect(result.isError).toBe(true); + expect(result.text).toContain("builder-other"); + }); + + it("handles stolen connection: notifies old thread and removes old reaction", async () => { + const client = makeClient(); + (getAgent as Mock).mockReturnValue({ status: "active" }); + (getByThread as Mock).mockReturnValue(undefined); + (connect as Mock).mockReturnValue({ + ok: true, + stolen: { agentName: "builder-foo", channelId: "C001", threadTs: "000.111" }, + }); + + await callTool(client, "thread_connect", { + agent_name: "builder-foo", + channel_id: "C002", + thread_ts: "222.333", + anchor_ts: "222.333", + }); + + // Should post to OLD thread and remove old :link: + expect(client.chat.postMessage).toHaveBeenCalledWith( + expect.objectContaining({ thread_ts: "000.111", channel: "C001" }) + ); + expect(removeReaction).toHaveBeenCalledWith(expect.anything(), "C001", "000.111", "link"); + expect(notifyThreadDisconnect).toHaveBeenCalledWith("builder-foo", "stolen"); + }); +}); + +describe("thread_disconnect", () => { + beforeEach(() => { + vi.clearAllMocks(); + }); + + it("happy path: disconnects, removes :link: reaction, posts message, notifies agent", async () => { + const client = makeClient(); + (getByAgent as Mock).mockReturnValue({ + agentName: "builder-foo", + channelId: "C001", + threadTs: "111.222", + }); + (disconnect as Mock).mockReturnValue({ + agentName: "builder-foo", + channelId: "C001", + threadTs: "111.222", + }); + + const result = await callTool(client, "thread_disconnect", { + agent_name: "builder-foo", + }); + + expect(result.isError).toBeFalsy(); + expect(client.chat.postMessage).toHaveBeenCalledWith( + expect.objectContaining({ thread_ts: "111.222", text: "Disconnected." }) + ); + expect(removeReaction).toHaveBeenCalledWith(expect.anything(), "C001", "111.222", "link"); + expect(notifyThreadDisconnect).toHaveBeenCalledWith("builder-foo", "manual"); + }); + + it("returns error when agent not connected", async () => { + const client = makeClient(); + (getByAgent as Mock).mockReturnValue(undefined); + + const result = await callTool(client, "thread_disconnect", { + agent_name: "builder-unconnected", + }); + + expect(result.isError).toBe(true); + expect(result.text).toContain("not connected"); + expect(disconnect).not.toHaveBeenCalled(); + }); +}); diff --git a/services/friday/src/slack/thread-tools.ts b/services/friday/src/slack/thread-tools.ts new file mode 100644 index 0000000..7f3e83a --- /dev/null +++ b/services/friday/src/slack/thread-tools.ts @@ -0,0 +1,183 @@ +import { createSdkMcpServer, tool } from "@anthropic-ai/claude-agent-sdk"; +import { z } from "zod"; +import type { WebClient } from "@slack/web-api"; +import { getAgent } from "../sessions/registry.js"; +import { + connect, + disconnect, + getByAgent, + getByThread, +} from "./thread-registry.js"; +import { addReaction, removeReaction } from "./helpers.js"; +import { notifyThreadConnect, notifyThreadDisconnect } from "../agent/lifecycle.js"; +import { log } from "../log.js"; + +/** + * Creates the friday-threads MCP server β€” orchestrator-only. + * Provides thread_connect and thread_disconnect tools for bidirectional + * Slack thread ↔ agent linking. + */ +export function createThreadTools(client: WebClient) { + return createSdkMcpServer({ + name: "friday-threads", + tools: [ + tool( + "thread_connect", + "Connect a Slack thread directly to a running Builder or Helper agent for " + + "bidirectional communication. The user's messages in that thread are forwarded " + + "to the agent as mail, and the agent can reply directly via slack_reply. " + + "An agent can only be connected to one thread; a thread to one agent.", + { + agent_name: z.string().describe("Name of the Builder or Helper agent to connect"), + channel_id: z.string().describe("Slack channel ID containing the thread"), + thread_ts: z.string().describe("Timestamp of the root thread message"), + anchor_ts: z + .string() + .describe( + "Timestamp of the message where the :link: reaction will be placed " + + "(usually the same as thread_ts)." + ), + }, + async (args) => { + const { agent_name, channel_id, thread_ts, anchor_ts } = args; + + // Validate agent exists + const entry = getAgent(agent_name); + if (!entry) { + return { + content: [{ type: "text" as const, text: `Error: agent "${agent_name}" not found.` }], + isError: true, + }; + } + if (entry.status === "destroyed") { + return { + content: [{ type: "text" as const, text: `Error: agent "${agent_name}" is destroyed.` }], + isError: true, + }; + } + + // Handle stolen-connection case: if thread is already owned by a different agent, + // refuse β€” caller must explicitly disconnect the current owner first. + const existingOwner = getByThread(thread_ts); + if (existingOwner && existingOwner.agentName !== agent_name) { + return { + content: [ + { + type: "text" as const, + text: + `Error: thread is already connected to agent "${existingOwner.agentName}". ` + + `Disconnect it with thread_disconnect first.`, + }, + ], + isError: true, + }; + } + + // Connect (handles agent-stolen case internally β€” old thread gets disconnect) + const result = connect(agent_name, channel_id, thread_ts); + if (!result.ok) { + return { + content: [{ type: "text" as const, text: `Error: ${result.error}` }], + isError: true, + }; + } + + // If the agent was stolen from a previous thread, notify the old thread + if (result.stolen) { + const old = result.stolen; + const slackLink = `https://slack.com/archives/${old.channelId}/p${old.threadTs.replace(".", "")}`; + await client.chat.postMessage({ + channel: old.channelId, + text: `Disconnected β€” agent connected to new thread: ${slackLink}`, + thread_ts: old.threadTs, + }).catch(() => {}); + await removeReaction(client, old.channelId, old.threadTs, "link"); + notifyThreadDisconnect(agent_name, "stolen"); + } + + // Add :link: reaction to anchor message + await addReaction(client, channel_id, anchor_ts, "link"); + + // Post confirmation in thread + await client.chat.postMessage({ + channel: channel_id, + text: `Connected to \`${agent_name}\`. Messages you send here go directly to the agent.`, + thread_ts, + }).catch(() => {}); + + // Notify agent + notifyThreadConnect(agent_name, channel_id, thread_ts); + + log("info", "thread_connect_tool", { agentName: agent_name, channelId: channel_id, threadTs: thread_ts }); + + return { + content: [ + { + type: "text" as const, + text: `Connected. Thread ${thread_ts} in ${channel_id} is now linked to ${agent_name}.`, + }, + ], + }; + } + ), + + tool( + "thread_disconnect", + "Disconnect a Builder or Helper agent from its connected Slack thread. " + + "Removes the :link: reaction, posts a disconnect notice in the thread, " + + "and notifies the agent.", + { + agent_name: z.string().describe("Name of the agent to disconnect"), + }, + async (args) => { + const { agent_name } = args; + + const conn = getByAgent(agent_name); + if (!conn) { + return { + content: [ + { + type: "text" as const, + text: `Error: agent "${agent_name}" is not connected to any thread.`, + }, + ], + isError: true, + }; + } + + const result = disconnect(agent_name, "manual"); + if (!result) { + return { + content: [{ type: "text" as const, text: `Error: disconnect failed for "${agent_name}".` }], + isError: true, + }; + } + + // Post disconnect notice in thread + await client.chat.postMessage({ + channel: result.channelId, + text: "Disconnected.", + thread_ts: result.threadTs, + }).catch(() => {}); + + // Remove :link: reaction + await removeReaction(client, result.channelId, result.threadTs, "link"); + + // Notify agent + notifyThreadDisconnect(agent_name, "manual"); + + log("info", "thread_disconnect_tool", { agentName: agent_name }); + + return { + content: [ + { + type: "text" as const, + text: `Disconnected agent "${agent_name}" from thread ${result.threadTs}.`, + }, + ], + }; + } + ), + ], + }); +}