diff --git a/cdk/src/constructs/fanout-consumer.ts b/cdk/src/constructs/fanout-consumer.ts index a81ede02..18c78552 100644 --- a/cdk/src/constructs/fanout-consumer.ts +++ b/cdk/src/constructs/fanout-consumer.ts @@ -20,6 +20,7 @@ import * as path from 'path'; import { Duration } from 'aws-cdk-lib'; import * as dynamodb from 'aws-cdk-lib/aws-dynamodb'; +import * as iam from 'aws-cdk-lib/aws-iam'; import { StartingPosition, Architecture, Runtime } from 'aws-cdk-lib/aws-lambda'; import { DynamoEventSource, SqsDlq } from 'aws-cdk-lib/aws-lambda-event-sources'; import * as lambda from 'aws-cdk-lib/aws-lambda-nodejs'; @@ -62,6 +63,18 @@ export interface FanOutConsumerProps { */ readonly githubTokenSecret?: sm.ISecret; + /** + * Secrets Manager ARN-prefix pattern for per-workspace Slack bot + * tokens. Required ONLY when the platform deploys SlackIntegration — + * the Slack dispatcher reads bot tokens at this scope. Matches the + * other "guarded by prop" grants (taskTable, repoTable, + * githubTokenSecret): a deployment without Slack onboarding gets no + * dangling IAM permission to ``bgagent/slack/*``. Typically passed + * as ``Stack.of(this).formatArn({ ..., resourceName: + * 'bgagent/slack/*' })``. Found in PR #79 review (#2 CRITICAL). + */ + readonly slackSecretArnPattern?: string; + /** * Maximum batch size delivered to the Lambda per invocation. * @@ -134,6 +147,20 @@ export class FanOutConsumer extends Construct { this.fn.addEnvironment('GITHUB_TOKEN_SECRET_ARN', props.githubTokenSecret.secretArn); } + // Slack dispatcher reads per-workspace bot tokens from Secrets + // Manager (``bgagent/slack/``). Scope the grant to the + // caller-provided prefix so the fan-out Lambda cannot read + // unrelated platform secrets — matches the policy the old + // standalone ``SlackNotifyFn`` held before issue #64. Guarded on + // ``slackSecretArnPattern`` so deployments without Slack + // onboarding don't get a dangling IAM grant (PR #79 review #2). + if (props.slackSecretArnPattern) { + this.fn.addToRolePolicy(new iam.PolicyStatement({ + actions: ['secretsmanager:GetSecretValue'], + resources: [props.slackSecretArnPattern], + })); + } + this.fn.addEventSource(new DynamoEventSource(props.taskEventsTable, { startingPosition: StartingPosition.LATEST, batchSize: props.batchSize ?? 100, diff --git a/cdk/src/constructs/slack-integration.ts b/cdk/src/constructs/slack-integration.ts index b2d287e0..4c452394 100644 --- a/cdk/src/constructs/slack-integration.ts +++ b/cdk/src/constructs/slack-integration.ts @@ -23,8 +23,7 @@ import * as apigw from 'aws-cdk-lib/aws-apigateway'; import * as cognito from 'aws-cdk-lib/aws-cognito'; import * as dynamodb from 'aws-cdk-lib/aws-dynamodb'; import * as iam from 'aws-cdk-lib/aws-iam'; -import { Runtime, Architecture, StartingPosition, FilterCriteria, FilterRule } from 'aws-cdk-lib/aws-lambda'; -import * as lambdaEventSources from 'aws-cdk-lib/aws-lambda-event-sources'; +import { Runtime, Architecture } from 'aws-cdk-lib/aws-lambda'; import * as lambda from 'aws-cdk-lib/aws-lambda-nodejs'; import * as secretsmanager from 'aws-cdk-lib/aws-secretsmanager'; import { NagSuppressions } from 'cdk-nag'; @@ -73,9 +72,14 @@ export interface SlackIntegrationProps { * Creates: * - SlackInstallationTable (per-workspace installation records) * - SlackUserMappingTable (Slack user → platform user mappings) - * - Lambda handlers for OAuth, slash commands, events, notifications, and account linking + * - Lambda handlers for OAuth, slash commands, events, and account linking * - API Gateway routes under /slack/* - * - DynamoDB Streams event source for outbound notifications + * + * Outbound Slack delivery (task lifecycle notifications) runs through + * ``FanOutConsumer`` as a per-channel dispatcher. Before issue #64 this + * construct also owned a ``SlackNotifyFn`` DynamoDB Streams consumer on + * ``TaskEventsTable``; that consumer was removed to keep the stream at + * the DynamoDB-documented one-reader-per-shard limit. */ export class SlackIntegration extends Construct { /** The Slack installation table. */ @@ -330,34 +334,8 @@ export class SlackIntegration extends Construct { }); this.userMappingTable.grantReadWriteData(slackLinkFn); - // --- Outbound Notification Handler (DynamoDB Streams trigger) --- - const slackNotifyFn = new lambda.NodejsFunction(this, 'SlackNotifyFn', { - entry: path.join(handlersDir, 'slack-notify.ts'), - handler: 'handler', - runtime: Runtime.NODEJS_24_X, - architecture: Architecture.ARM_64, - timeout: Duration.seconds(30), - environment: { - TASK_TABLE_NAME: props.taskTable.tableName, - }, - bundling: commonBundling, - }); - props.taskTable.grantReadWriteData(slackNotifyFn); - slackNotifyFn.addToRolePolicy(readSlackSecretsPolicy); - - // DynamoDB Streams event source with filtering - slackNotifyFn.addEventSource(new lambdaEventSources.DynamoEventSource(props.taskEventsTable, { - startingPosition: StartingPosition.LATEST, - batchSize: 10, - maxBatchingWindow: Duration.seconds(0), - retryAttempts: 3, - bisectBatchOnError: true, - filters: [ - FilterCriteria.filter({ - eventName: FilterRule.isEqual('INSERT'), - }), - ], - })); + // Outbound Slack delivery runs through FanOutConsumer — see the + // construct doc above for the reader-count rationale (issue #64). // ═══════════════════════════════════════════════════════════════════════════ // API Gateway Routes @@ -436,7 +414,7 @@ export class SlackIntegration extends Construct { } // Standard Lambda suppressions - const allFunctions = [oauthCallbackFn, slackEventsFn, slackCommandsFn, commandProcessorFn, slackLinkFn, slackNotifyFn, slackInteractionsFn]; + const allFunctions = [oauthCallbackFn, slackEventsFn, slackCommandsFn, commandProcessorFn, slackLinkFn, slackInteractionsFn]; for (const fn of allFunctions) { NagSuppressions.addResourceSuppressions(fn, [ { diff --git a/cdk/src/handlers/fanout-task-events.ts b/cdk/src/handlers/fanout-task-events.ts index 998da391..07a046c6 100644 --- a/cdk/src/handlers/fanout-task-events.ts +++ b/cdk/src/handlers/fanout-task-events.ts @@ -29,19 +29,12 @@ * status responses) that would be noise on Email, while GitHub only * cares about PR activity + terminal outcomes. * - * This handler is a **skeleton**: per-channel dispatcher stubs log - * each would-be delivery to CloudWatch but don't call Slack / GitHub / - * SES yet. The design explicitly allows this: - * - * "the fan-out Lambda itself can ship later without any change to - * the agent or CLI" — §8.9 - * - * Enabling a real dispatcher is a per-channel PR: add the SDK client - * (e.g. `@slack/web-api`), replace the `log-only` block, add an IAM - * policy (or Secrets Manager grant) on the Lambda's execution role, - * and add the channel's configuration (OAuth token ARN + channel ID, - * GitHub App credentials, SES verified identity) to the construct's - * props. Chunk J ships the first real dispatcher (GitHub edit-in-place). + * Dispatcher state: GitHub edits a single issue comment in place + * (Chunk J). Slack posts threaded Block Kit messages with emoji + * transitions and session-message cleanup via the ``slack-notify`` + * helper (issue #64 migrated the standalone SlackNotifyFn consumer onto + * this router, dropping ``TaskEventsTable`` from two stream readers + * back to one). Email remains a log-only stub until SES wiring lands. */ import { DynamoDBClient } from '@aws-sdk/client-dynamodb'; @@ -58,6 +51,7 @@ import { logger } from './shared/logger'; import { coerceNumericOrNull } from './shared/numeric'; import { loadRepoConfig } from './shared/repo-config'; import type { ChannelConfig, TaskNotificationsConfig, TaskRecord } from './shared/types'; +import { dispatchSlackEvent, SlackApiError } from './slack-notify'; // Re-export the shared types so existing test imports (and any future // caller that only imports from the handler module) continue to work. @@ -91,11 +85,27 @@ export type NotificationChannel = 'slack' | 'email' | 'github'; export const CHANNEL_DEFAULTS: Record> = { // Slack is the "on-call" channel per §6.2 — all terminal outcomes - // (including cancellations and strands) plus agent_error and the - // Phase 2/3 interactive signals. + // (including cancellations, strands, and timeouts) plus agent_error + // and the Phase 2/3 interactive signals. ``task_created`` and + // ``session_started`` are additionally delivered for Slack-origin + // tasks so the rocket/hourglass-flowing-sand message sequence lines up + // with the @mention thread — the Slack dispatcher itself enforces + // ``channel_source === 'slack'`` so the noisier early lifecycle + // events do not reach non-Slack tasks. + // + // ``pr_created`` is intentionally NOT in the Slack default — even + // though the original §6.2 design listed it. The + // ``task_completed`` message renders a "View PR" button carrying + // the same URL, and posting both produced visual duplication + // (observed during issue #64 dev-stack verification: two messages + // back-to-back with identical View PR buttons). GitHub's default + // keeps ``pr_created`` because the edit-in-place comment surface + // genuinely benefits from the early checkpoint. slack: new Set([ ...TERMINAL_EVENT_TYPES, - 'pr_created', + 'task_timed_out', + 'task_created', + 'session_started', 'agent_error', 'approval_required', // Phase 3 (not yet emitted) 'status_response', // Phase 2 (not yet emitted) @@ -294,29 +304,74 @@ export function shouldFanOut(event: FanOutEvent, overrides?: TaskNotificationsCo } /** - * Per-channel dispatcher stubs. Each currently just logs what it - * WOULD have sent. Replace the body when a real integration lands — - * the interface stays the same. + * Per-channel dispatcher implementations. Slack and GitHub both talk to + * real external APIs today; Email is still a log-only stub until SES + * wiring lands. * - * Dispatchers do NOT catch their own errors. Error isolation lives in - * ``routeEvent`` where ``Promise.allSettled`` records per-channel - * outcomes and a single ``fanout.dispatcher.rejected`` warn fires on - * rejection. Keeping one error sink ensures batch telemetry + * Dispatchers do NOT catch infra errors themselves. Error isolation + * lives in ``routeEvent`` where ``Promise.allSettled`` records + * per-channel outcomes and a single ``fanout.dispatcher.rejected`` warn + * fires on rejection — keeping one error sink ensures batch telemetry * (`dispatched` count) reflects reality: a channel whose dispatcher - * threw is NOT counted as dispatched. + * threw is NOT counted as dispatched. Slack swallows ``SlackApiError`` + * internally (the Slack API rejecting a message — e.g. + * ``channel_not_found`` — is not recoverable by a Lambda retry). + */ +const ddb = DynamoDBDocumentClient.from(new DynamoDBClient({})); + +/** + * Slack dispatcher — hands the event to the in-module + * ``handlers/slack-notify.ts`` helper (issue #64). The helper gates on + * ``channel_source === 'slack'`` (so non-Slack tasks short-circuit after + * a single DDB Get without any Slack API call) and preserves every + * behaviour the old standalone ``SlackNotifyFn`` stream consumer had: + * terminal-event dedup, threaded replies, emoji transitions, session + * message cleanup. Slack-specific API failures are tagged with + * ``SlackApiError`` so the router records a dispatcher-rejected warn + * without escalating to the partial-batch retry path (a retry can't + * fix ``channel_not_found``). Infra errors (DDB, Secrets Manager) are + * rethrown unchanged so ``routeEvent``'s ``Promise.allSettled`` surfaces + * them alongside any other dispatcher's rejection. */ async function dispatchToSlack(event: FanOutEvent): Promise { - logger.info('[fanout/slack] would dispatch', { - event: 'fanout.slack.dispatch_stub', - task_id: event.task_id, - event_id: event.event_id, - event_type: event.event_type, - effective_event_type: effectiveEventType(event), - }); + // Pass the effective event type to the Slack dispatcher so + // ``agent_milestone`` carriers (e.g. ``pr_created``) reach the + // matching renderer. Without this rewrite, the dispatcher's + // NOTIFIABLE_EVENTS gate would silently drop every milestone-wrapped + // event the router subscribed Slack to, lying in + // ``fanout.batch.complete`` telemetry (issue #64 review Cat 7). + const effectiveType = effectiveEventType(event); + const effectiveEvent = effectiveType === event.event_type + ? event + : { ...event, event_type: effectiveType }; + try { + await dispatchSlackEvent(effectiveEvent, ddb); + } catch (err) { + // Match SlackApiError by class OR by ``name`` so a bundler that + // duplicates the slack-notify module (rare with NodejsFunction + // tree-shaking but possible if the module ever gets dual-bundled) + // can't make ``instanceof`` silently fail and turn a + // channel-terminal swallow into an infinite Lambda retry loop. + // Mirrors how ``GitHubCommentError`` is duck-typed by name in + // dispatchToGitHubComment (PR #79 review #7). + const isSlackApiErr = + err instanceof SlackApiError + || (err instanceof Error && err.name === 'SlackApiError'); + if (isSlackApiErr) { + logger.warn('[fanout/slack] Slack API error — swallowing per channel policy', { + event: 'fanout.slack.api_error', + task_id: event.task_id, + event_id: event.event_id, + event_type: event.event_type, + effective_event_type: effectiveType, + error: (err as Error).message, + }); + return; + } + throw err; + } } -const ddb = DynamoDBDocumentClient.from(new DynamoDBClient({})); - /** * Load the TaskRecord fields the GitHub dispatcher needs. Returns * ``null`` if the task vanished (race with TTL cleanup) or if the @@ -560,6 +615,38 @@ async function dispatchToGitHubComment(event: FanOutEvent): Promise { clearTokenCache(); const freshToken = await resolveGitHubToken(tokenArn); result = await upsertTaskComment({ ...upsertParams, token: freshToken }); + } else if ( + isGhErr + && typeof httpStatus === 'number' + && httpStatus >= 400 + && httpStatus < 500 + // 403 (most often "API rate limit exceeded" on GitHub) and 429 + // ("Too Many Requests") are 4xx but **transient** — retrying + // after the rate-limit window opens fixes them. Carving them + // out here keeps a reconciliation wave from permanently + // dropping every GitHub comment under the swallow path. The + // batch retry pumps the backoff naturally; if it never clears, + // the record DLQs after retryAttempts. Found in PR #79 review. + && httpStatus !== 403 + && httpStatus !== 429 + ) { + // Channel-terminal: a 4xx from GitHub (excluding the handled 401 + // rotation path, the 404 re-POST handled inside + // ``upsertTaskComment``, and the 403/429 rate-limit carve-out + // above) means the request itself is malformed or the resource + // is gone — retrying will not change the outcome. Swallow the + // rejection so the post-issue-#64 router does not push the + // record into ``batchItemFailures`` and burn Lambda retries. + // Log a dedicated warn so operators can alarm distinctly from + // the retryable infra path. + logger.warn('[fanout/github] terminal 4xx from GitHub — swallowing per channel policy', { + event: 'fanout.github.api_error', + task_id: event.task_id, + event_id: event.event_id, + http_status: httpStatus, + error: err instanceof Error ? err.message : String(err), + }); + return; } else { throw err; } @@ -638,21 +725,44 @@ const DISPATCHERS: Record Promise.api_error`` warn lines. * - * Returns the list of channels that **successfully** dispatched — a - * channel whose dispatcher rejected is omitted so batch telemetry - * (`dispatched` count in the handler) reflects reality. A rejected - * dispatcher is still logged with a ``fanout.dispatcher.rejected`` - * warn line that operators can alert on. + * The handler reads ``infraRejections.length > 0`` to decide whether to + * push the record into ``batchItemFailures`` so Lambda retries the + * record with the partial-batch contract. This restores the retry + * semantic that the standalone ``SlackNotifyFn`` had pre-issue-#64 + * (its handler rethrew non-``SlackApiError`` so Lambda retried the + * batch). Without this distinction, a transient DDB throttle inside the + * Slack dispatcher would be a permanent drop instead of a retry. + */ +export interface RouteOutcome { + readonly dispatched: ReadonlyArray; + readonly infraRejections: ReadonlyArray; +} + +/** + * Route an event to every subscribed channel. A dispatcher rejection + * must NOT block sibling channels — we use ``Promise.allSettled`` so + * one Slack outage can't drop a GitHub comment or vice-versa. + * + * Returns ``{ dispatched, infraRejections }``. A successful dispatch + * lands in ``dispatched``; a rejection lands in ``infraRejections`` + * because the dispatcher itself is responsible for swallowing channel- + * terminal errors (Slack ``channel_not_found``, etc.) before throwing. + * Anything that reaches the router as a rejection is, by contract, a + * retryable failure — and the handler will flag the record for + * partial-batch retry. */ export async function routeEvent( ev: FanOutEvent, overrides?: TaskNotificationsConfig, -): Promise { +): Promise { const attempted: NotificationChannel[] = []; const tasks: Promise[] = []; // Match against the effective type so ``agent_milestone`` carriers @@ -671,18 +781,20 @@ export async function routeEvent( const results = await Promise.allSettled(tasks); const dispatched: NotificationChannel[] = []; + const infraRejections: NotificationChannel[] = []; results.forEach((r, i) => { const ch = attempted[i]; if (r.status === 'fulfilled') { dispatched.push(ch); return; } - // Belt-and-braces — the dispatcher stubs catch inside their own - // try/catch so this branch only fires if a future refactor drops - // the inner catch or if the dispatcher throws synchronously before - // entering its try. Record at warn so the signal isn't lost. + // The dispatcher rejected. By contract this is an *infra* error — + // channel-terminal errors are swallowed inside the dispatcher + // before reaching us. Mark for partial-batch retry and emit the + // warn so operators can alert on the rate of retryable failures. + infraRejections.push(ch); const reason = r.reason instanceof Error ? r.reason.message : String(r.reason); - logger.warn('[fanout] dispatcher rejected — continuing batch', { + logger.warn('[fanout] dispatcher rejected — flagging record for retry', { event: 'fanout.dispatcher.rejected', channel: ch, task_id: ev.task_id, @@ -690,9 +802,10 @@ export async function routeEvent( event_type: ev.event_type, effective_event_type: effectiveEventType(ev), error: reason, + retryable: true, }); }); - return dispatched; + return { dispatched, infraRejections }; } /** @@ -781,8 +894,18 @@ export const handler = async ( } perTaskCounts.set(ev.task_id, seen + 1); - const channels = await routeEvent(ev, overrides); - if (channels.length > 0) dispatched++; + const outcome = await routeEvent(ev, overrides); + if (outcome.dispatched.length > 0) dispatched++; + // Per-channel infra rejections (DDB throttle, Secrets Manager + // 5xx, transient Slack 5xx) escalate to the partial-batch retry + // path. ``routeEvent`` already logged a warn per rejection; we + // just need to make sure Lambda retries the record so the next + // attempt has a chance to succeed. Without this push, a transient + // failure would be silently dropped — the regression that + // motivated this fix. + if (outcome.infraRejections.length > 0 && record.eventID !== undefined) { + batchItemFailures.push({ itemIdentifier: record.eventID }); + } } catch (err) { // Poison-pill isolation: one record's unhandled throw must not // crash the batch. See the handler doc block for the full list of diff --git a/cdk/src/handlers/shared/slack-blocks.ts b/cdk/src/handlers/shared/slack-blocks.ts index 68a5e336..b717d59b 100644 --- a/cdk/src/handlers/shared/slack-blocks.ts +++ b/cdk/src/handlers/shared/slack-blocks.ts @@ -110,6 +110,14 @@ export function renderSlackBlocks( return simpleStatusMessage(task, ':no_entry_sign: Task cancelled'); case 'task_timed_out': return taskTimedOutMessage(task); + case 'task_stranded': + // Emitted by reconcile-stranded-tasks when a task's heartbeat + // stops. Operators see this on stranded Slack-origin tasks; the + // generic "Event: ..." fallback would be a UX regression + // (issue #64 review Cat 7). + return taskStrandedMessage(task, eventMetadata); + case 'agent_error': + return agentErrorMessage(task, eventMetadata); default: return simpleStatusMessage(task, `Event: ${eventType}`); } @@ -193,6 +201,50 @@ function sessionStartedMessage( }; } +function taskStrandedMessage( + task: Pick, + eventMetadata?: Record, +): SlackMessage { + // The reconciler stamps ``code: STRANDED_NO_HEARTBEAT`` and + // ``prior_status`` on the event metadata (see + // handlers/reconcile-stranded-tasks.ts). Surface the prior status so + // operators can tell at a glance whether the task hung in HYDRATING + // vs RUNNING. + const priorStatus = typeof eventMetadata?.prior_status === 'string' + ? eventMetadata.prior_status + : undefined; + const detail = priorStatus ? ` (last status: ${priorStatus})` : ''; + const text = `:warning: *Task stranded* for \`${task.repo}\`${detail}`; + return { + text: `Task stranded for ${task.repo}`, + blocks: [section(text)], + }; +} + +function agentErrorMessage( + task: Pick, + eventMetadata?: Record, +): SlackMessage { + // ``agent/src/progress_writer.py::write_agent_error`` carries + // ``error_type`` and ``message_preview``. Render whichever is + // present without leaking the full preview if it's noisy. + const errorType = typeof eventMetadata?.error_type === 'string' + ? eventMetadata.error_type + : undefined; + const preview = typeof eventMetadata?.message_preview === 'string' + ? eventMetadata.message_preview + : undefined; + const detail = errorType + ? `\n_Type:_ \`${errorType}\`` + : ''; + const previewLine = preview ? `\n${truncate(preview, 200)}` : ''; + const text = `:rotating_light: *Agent error* during \`${task.repo}\`${detail}${previewLine}`; + return { + text: `Agent error during ${task.repo}`, + blocks: [section(text)], + }; +} + function simpleStatusMessage( task: Pick, label: string, diff --git a/cdk/src/handlers/slack-notify.ts b/cdk/src/handlers/slack-notify.ts index 553ecb9e..4b391ac7 100644 --- a/cdk/src/handlers/slack-notify.ts +++ b/cdk/src/handlers/slack-notify.ts @@ -17,114 +17,249 @@ * SOFTWARE. */ -import { DynamoDBClient } from '@aws-sdk/client-dynamodb'; -import { DynamoDBDocumentClient, GetCommand, UpdateCommand } from '@aws-sdk/lib-dynamodb'; -import type { DynamoDBStreamEvent, DynamoDBRecord } from 'aws-lambda'; +/** + * Slack dispatcher for the FanOutConsumer (§8.9 fan-out plane). + * + * Before issue #64 this module was a standalone DynamoDB Streams consumer + * that read ``TaskEventsTable`` directly. That put the platform at **two** + * concurrent readers per shard on that stream (``SlackNotifyFn`` plus + * ``FanOutConsumer``), which is the hard DynamoDB Streams limit — any + * additional channel reader would start being throttled. The Slack + * delivery logic now lives behind the fan-out router as a per-channel + * dispatcher, leaving ``TaskEventsTable`` with a single stream consumer. + * + * Behaviour preserved from the old handler bit-for-bit (the fan-out + * router does not change any semantics — it just removes the second + * event-source mapping): + * - ``channel_source === 'slack'`` gate. + * - Terminal-event dedup via a conditional ``UpdateItem`` on + * ``channel_metadata.slack_notified_terminal``. + * - Threaded replies under the original ``@mention`` / ``task_created`` + * message via ``slack_thread_ts``. + * - DM channel-id → user-id rewrite for ``D``-prefixed channels. + * - Emoji reaction swaps on the root message per event type. + * - Intermediate message cleanup (``slack_session_msg_ts`` + + * ``slack_created_msg_ts``) on terminal events. + * - Slack API errors are logged and swallowed at the router boundary; + * infra errors (DDB, Secrets Manager) propagate so the record lands + * in the FanOutConsumer's partial-batch retry path. + */ + +import { type DynamoDBDocumentClient, GetCommand, UpdateCommand } from '@aws-sdk/lib-dynamodb'; +// Type-only import of ``FanOutEvent`` — values flow only one way at +// runtime (fanout-task-events imports + calls ``dispatchSlackEvent``), +// so importing the type back creates no runtime cycle. ``import type`` +// is erased after compile, so the bundler sees a one-way dep. +import type { FanOutEvent } from './fanout-task-events'; import { logger } from './shared/logger'; import { renderSlackBlocks } from './shared/slack-blocks'; import { getSlackSecret, SLACK_SECRET_PREFIX } from './shared/slack-verify'; import type { TaskRecord } from './shared/types'; -const ddb = DynamoDBDocumentClient.from(new DynamoDBClient({})); - -const TASK_TABLE = process.env.TASK_TABLE_NAME!; +/** Terminal event types the Slack dispatcher dedups on. Stored in + * ``channel_metadata.slack_notified_terminal`` via a conditional write + * so a retry can never double-post the final outcome. */ +const TERMINAL_EVENTS = new Set([ + 'task_completed', + 'task_failed', + 'task_cancelled', + 'task_timed_out', + 'task_stranded', +]); -const TERMINAL_EVENTS = new Set(['task_completed', 'task_failed', 'task_cancelled', 'task_timed_out']); +/** + * Map an event type to the ``channel_metadata`` attribute that should + * guard against double-posting on a partial-batch retry. PR #79 review + * #4 surfaced the gap: when GitHub or Email rate-limits and the record + * is replayed, every Slack-subscribed event for that record runs again. + * Terminals were already dedup-protected by ``slack_notified_terminal`` + * but ``agent_error`` was not — operators would page twice on a single + * agent failure if a sibling channel happened to fail. + * + * Each entry is an attribute name; ``null`` means the event type is + * intentionally NOT deduped (lifecycle events ``task_created`` / + * ``session_started`` use the per-event ``slack_*_msg_ts`` conditional + * persists instead, which is the right shape since they need to store + * a value, not just a presence marker). + */ +const SLACK_DEDUP_ATTRIBUTE: Record = { + task_completed: 'slack_notified_terminal', + task_failed: 'slack_notified_terminal', + task_cancelled: 'slack_notified_terminal', + task_timed_out: 'slack_notified_terminal', + task_stranded: 'slack_notified_terminal', + agent_error: 'slack_dispatched_agent_error', + task_created: null, + session_started: null, +}; -/** Event types that trigger Slack notifications. */ -const NOTIFIABLE_EVENTS = new Set([ +/** Event types this dispatcher renders. Must stay in sync with the + * Slack entries in ``CHANNEL_DEFAULTS`` (see fanout-task-events.ts) — + * drift means the router subscribes Slack to events that the + * dispatcher silently ignores, which lies in batch telemetry + * (issue #64 review Cat 7). Forward-compat ``approval_required`` and + * ``status_response`` are deliberately absent until their emitters + * ship; until then they fall through and are dropped at this gate. + * ``pr_created`` is intentionally omitted from Slack — the + * ``task_completed`` block already carries the View PR button, so a + * separate "PR opened" message just produces visible duplication + * (verified during issue #64 dev-stack tests). Exported for the + * cross-file consistency test. */ +export const NOTIFIABLE_EVENTS = new Set([ 'task_created', 'session_started', 'task_completed', 'task_failed', 'task_cancelled', 'task_timed_out', + 'task_stranded', + 'agent_error', ]); /** - * Slack notification handler triggered by DynamoDB Streams on TaskEventsTable. - * - * For each task event: - * 1. Load the task record to check channel_source and channel_metadata. - * 2. If channel_source is 'slack', render a Block Kit message and post to Slack. - * 3. Thread replies under the initial message using stored slack_thread_ts. - * - * Infrastructure errors (DynamoDB, Secrets Manager) are rethrown so Lambda's - * configured retry/bisect behavior can do its job. Slack API errors are - * treated as delivery failures and logged but never fail the batch. + * Minimal event shape the dispatcher needs. Defined as a type alias of + * ``FanOutEvent`` so the contract between the router and the dispatcher + * cannot silently drift if either side adds a field — TypeScript will + * propagate the change automatically (PR #79 review #10). Originally a + * standalone interface, but the structural duplication was a footgun. */ -export async function handler(event: DynamoDBStreamEvent): Promise { - for (const record of event.Records) { - try { - await processRecord(record); - } catch (err) { - // Slack delivery errors are terminal — swallow after logging so the batch - // isn't retried for something a retry can't fix. - if (err instanceof SlackApiError) { - logger.warn('Slack delivery failed', { - error: err.message, - event_id: record.eventID, - }); - continue; - } - // Infrastructure errors (DynamoDB throttling, Secrets Manager outage, etc.) - // rethrow so Lambda retries the batch per the configured retryAttempts + - // bisectBatchOnError behavior. - logger.error('Infrastructure error processing Slack notification', { - error: err instanceof Error ? err.message : String(err), - event_id: record.eventID, - }); - throw err; - } - } -} +export type SlackDispatchEvent = FanOutEvent; /** - * Thrown when the Slack API returns an error after a successful HTTP call. - * Tagged so the batch handler can swallow it without retrying — Slack errors - * are not recoverable by retrying the stream record. + * Thrown when the Slack API returns a **terminal** error — one that + * cannot be fixed by a Lambda retry (e.g. ``channel_not_found``, + * ``not_authed``, ``invalid_blocks``). The router catches this class + * specifically and swallows it; the record advances past the cursor + * without tripping the partial-batch retry path. + * + * Retryable Slack errors (``ratelimited``, ``service_unavailable``, + * ``internal_error``) are NOT wrapped in ``SlackApiError`` — they + * propagate as plain ``Error`` so the router classifies them as infra + * rejections and Lambda replays the record. */ -class SlackApiError extends Error { +export class SlackApiError extends Error { constructor(message: string) { super(message); this.name = 'SlackApiError'; } } -async function processRecord(record: DynamoDBRecord): Promise { - if (record.eventName !== 'INSERT' || !record.dynamodb?.NewImage) return; +/** + * Slack API error codes that are terminal — retrying the same request + * yields the same failure. Sourced from the Slack ``chat.postMessage`` + * + ``reactions.add`` documented errors. Codes outside this set are + * treated as retryable so a transient ``ratelimited`` / + * ``service_unavailable`` doesn't get permanently dropped. + */ +const TERMINAL_SLACK_API_ERRORS: ReadonlySet = new Set([ + // Channel-shape failures. + 'channel_not_found', + 'not_in_channel', + 'is_archived', + 'message_not_found', + // Auth failures. + 'not_authed', + 'invalid_auth', + 'token_revoked', + 'token_expired', + 'account_inactive', + // Permission / scope failures (PR #79 review #8): each of these + // means a configuration fix is required before any retry can + // succeed, so swallow them as terminal and let operators alert on + // the dedicated ``fanout.slack.api_error`` warn rate. + 'no_permission', + 'missing_scope', + 'restricted_action', + 'ekm_access_denied', + 'team_access_not_granted', + 'posting_to_general_channel_denied', + 'as_user_not_supported', + // Payload-shape failures. + 'invalid_blocks', + 'invalid_blocks_format', + 'invalid_arguments', + 'msg_too_long', + 'too_many_attachments', +]); - const newImage = record.dynamodb.NewImage; - const eventType = newImage.event_type?.S; - const taskId = newImage.task_id?.S; +/** Tag a Slack ``!result.ok`` error as terminal vs retryable so the + * router can route it to the right outcome. */ +function classifySlackError(slackErrorCode: string): 'terminal' | 'retryable' { + return TERMINAL_SLACK_API_ERRORS.has(slackErrorCode) ? 'terminal' : 'retryable'; +} - if (!eventType || !taskId || !NOTIFIABLE_EVENTS.has(eventType)) return; +/** + * Dispatch a single task event to Slack. + * + * The caller is the fan-out router (``handlers/fanout-task-events.ts``). + * The router already filters by ``CHANNEL_DEFAULTS.slack`` and isolates + * rejections via ``Promise.allSettled``, so this function can throw + * freely on infra problems — the router will log the rejection through + * its standard ``fanout.dispatcher.rejected`` channel without failing + * the batch. + */ +export async function dispatchSlackEvent( + event: SlackDispatchEvent, + ddb: DynamoDBDocumentClient, +): Promise { + const { task_id: taskId, event_type: eventType } = event; + if (!NOTIFIABLE_EVENTS.has(eventType)) return; + + const tableName = process.env.TASK_TABLE_NAME; + if (!tableName) { + // Throw rather than return — a missing env var on a Slack- + // subscribed event is a deployment misconfiguration, not a per- + // record problem. Returning silently used to count as "successful + // dispatch" in batch telemetry, so a broken stack would drop + // every Slack notification indefinitely with only a warn line. + // Throwing routes the rejection through the router's + // ``infraRejections`` path so Lambda retries (until DLQ) and the + // ``fanout.dispatcher.rejected`` metric alarms operators + // (PR #79 review #3). + logger.error('[fanout/slack] TASK_TABLE_NAME not set — cannot dispatch', { + event: 'fanout.slack.missing_env', + error_id: 'FANOUT_SLACK_MISSING_TASK_TABLE', + task_id: taskId, + }); + throw new Error( + `[fanout/slack] TASK_TABLE_NAME env var not set; Slack dispatcher cannot run (task_id=${taskId})`, + ); + } - // Load the task record first so we can skip non-Slack tasks before touching - // their DynamoDB row with dedup writes. const taskResult = await ddb.send(new GetCommand({ - TableName: TASK_TABLE, + TableName: tableName, Key: { task_id: taskId }, })); - const task = taskResult.Item as TaskRecord | undefined; if (!task || task.channel_source !== 'slack') return; - // Deduplicate terminal notifications — the orchestrator may write multiple - // failure/completion events (retries). Use a conditional update to claim - // the right to send the terminal notification. - if (TERMINAL_EVENTS.has(eventType)) { + // Dedup any event that should only ever post once per task even + // under partial-batch retry (terminals, agent_error). The orchestrator + // can also write multiple events of the same kind (retries, + // reconciler), so the ``ADD`` on the ``channel_metadata.`` + // marker claims the right to post for the whole event class. + // ``slack_notified_terminal`` covers all 5 terminals collectively; + // ``slack_dispatched_agent_error`` covers agent_error separately so + // the operator gets the first agent_error but not duplicates from + // sibling-channel-failure retries (PR #79 review #4). + const dedupAttr = SLACK_DEDUP_ATTRIBUTE[eventType]; + if (dedupAttr) { try { await ddb.send(new UpdateCommand({ - TableName: TASK_TABLE, + TableName: tableName, Key: { task_id: taskId }, - UpdateExpression: 'SET channel_metadata.slack_notified_terminal = :t', - ConditionExpression: 'attribute_not_exists(channel_metadata.slack_notified_terminal)', + UpdateExpression: `SET channel_metadata.${dedupAttr} = :t`, + ConditionExpression: `attribute_not_exists(channel_metadata.${dedupAttr})`, ExpressionAttributeValues: { ':t': true }, })); } catch (err) { if ((err as Error)?.name === 'ConditionalCheckFailedException') { - logger.info('Terminal notification already sent, skipping duplicate', { task_id: taskId, event_type: eventType }); + logger.info('[fanout/slack] notification already sent, skipping duplicate', { + event: 'fanout.slack.dedup_hit', + task_id: taskId, + event_type: eventType, + dedup_attr: dedupAttr, + }); return; } throw err; @@ -133,35 +268,35 @@ async function processRecord(record: DynamoDBRecord): Promise { const channelMeta = task.channel_metadata; if (!channelMeta?.slack_team_id || !channelMeta?.slack_channel_id) { - logger.warn('Slack task missing channel metadata', { task_id: taskId }); + logger.warn('[fanout/slack] Slack task missing channel metadata', { + event: 'fanout.slack.missing_metadata', + task_id: taskId, + }); return; } - // Fetch the bot token for this workspace. const botToken = await getSlackSecret(`${SLACK_SECRET_PREFIX}${channelMeta.slack_team_id}`); if (!botToken) { - logger.warn('Bot token not found for Slack workspace', { + logger.warn('[fanout/slack] bot token not found for Slack workspace', { + event: 'fanout.slack.no_bot_token', team_id: channelMeta.slack_team_id, task_id: taskId, }); return; } - // Parse event metadata if present. Failures are logged and treated as "no metadata" — - // the surfaced fallback reason is "Unknown error" which is user-hostile without a log. - const eventMetadata = newImage.metadata?.S - ? safeJsonParse(newImage.metadata.S, { task_id: taskId, event_type: eventType }) - : undefined; + // The fan-out router already parsed ``metadata`` into a JS map, so no + // JSON re-parse is required here — the old handler had to parse the + // ``metadata: { S: ... }`` shape itself from the raw stream record. + const eventMetadata = event.metadata; - // Render the Slack message. - const message = renderSlackBlocks(eventType, task, eventMetadata ?? undefined); + const message = renderSlackBlocks(eventType, task, eventMetadata); - // For task_created, post a new message. For subsequent events, reply in thread. const threadTs = channelMeta.slack_thread_ts; - // For DM channels (prefix 'D'), post to the user ID instead — chat.postMessage - // opens a DM automatically when given a user ID, which avoids the channel_not_found - // error that occurs with ephemeral DM channel IDs from slash commands. + // DM channels use the user id so ``chat.postMessage`` opens the DM + // automatically — the ephemeral channel id Slack hands out in slash + // command payloads can 404 otherwise. const channel = channelMeta.slack_channel_id.startsWith('D') && channelMeta.slack_user_id ? channelMeta.slack_user_id : channelMeta.slack_channel_id; @@ -170,19 +305,12 @@ async function processRecord(record: DynamoDBRecord): Promise { channel, text: message.text, blocks: message.blocks, + unfurl_links: false, }; - - // Thread all messages under the original. For @mentions, threadTs is set to the - // user's mention message by the command processor. For slash commands, threadTs - // is set to the task_created message after it's posted (see below). if (threadTs) { slackPayload.thread_ts = threadTs; } - // Suppress link unfurls — the View PR button is the clean way to access it. - slackPayload.unfurl_links = false; - - // Post to Slack. const response = await fetch('https://slack.com/api/chat.postMessage', { method: 'POST', headers: { @@ -191,42 +319,98 @@ async function processRecord(record: DynamoDBRecord): Promise { }, body: JSON.stringify(slackPayload), }); - const result = await response.json() as { ok: boolean; ts?: string; error?: string }; - if (!result.ok) { - // Slack API errors are not retryable via the Lambda batch (re-processing the - // stream record won't make Slack start accepting the message), so throw a - // tagged error and let the batch handler swallow it after logging. - throw new SlackApiError(`slack chat.postMessage failed: ${result.error ?? 'unknown'} (task_id=${taskId} event_type=${eventType})`); + const errorCode = result.error ?? 'unknown'; + const failureMessage = `slack chat.postMessage failed: ${errorCode} (task_id=${taskId} event_type=${eventType})`; + // Retryable codes (``ratelimited``, ``service_unavailable``, + // ``internal_error``) propagate as a plain Error so the router + // classifies them as infra rejections and Lambda retries the + // record. Terminal codes (``channel_not_found``, ``not_authed``, + // ``invalid_blocks``) are wrapped in SlackApiError so the router + // swallows them — retrying ``channel_not_found`` won't help. + if (classifySlackError(errorCode) === 'retryable') { + // Surface ``Retry-After`` (Slack's rate-limit header, in seconds) + // so operators reading CloudWatch can see when the next retry + // should succeed rather than guessing from sustained warn rate + // (PR #79 review #4 mitigation). Header is a string per fetch + // Headers spec; coerce defensively for the log. + const retryAfter = response.headers.get('retry-after'); + logger.warn('[fanout/slack] retryable Slack API error', { + event: 'fanout.slack.retryable_api_error', + task_id: taskId, + event_type: eventType, + slack_error_code: errorCode, + retry_after_seconds: retryAfter ?? undefined, + }); + throw new Error(failureMessage); + } + throw new SlackApiError(failureMessage); } - // Emoji reaction on the root message — the user's @mention or the task_created message. - // Reactions always use the real channel ID (not user ID), even for DMs. + // Reactions always use the real channel id even for DMs. const reactionChannel = channelMeta.slack_channel_id; const reactionTarget = threadTs ?? result.ts; if (reactionTarget) { await updateReaction(botToken, reactionChannel, reactionTarget, eventType); } - // Store message timestamps for later updates. if (result.ts) { if (eventType === 'task_created') { + // Conditional persist guards against the post-issue-#64 retry + // hazard: under the new ``infraRejections`` escalation path, a + // batch can be replayed after the Slack POST succeeded but the + // UpdateItem failed transiently. Without ``attribute_not_exists`` + // the retry would post a second root, overwrite ``slack_thread_ts``, + // and orphan every threaded reply that had threaded under the + // first ts. The conditional refuses the second write so the + // dedup-by-task is per-attribute, not just per-channel. const updates: string[] = ['channel_metadata.slack_created_msg_ts = :created_ts']; const values: Record = { ':created_ts': result.ts }; + const conditions: string[] = ['attribute_not_exists(channel_metadata.slack_created_msg_ts)']; if (!threadTs) { - // Slash commands: also store thread_ts (mentions already have it). updates.push('channel_metadata.slack_thread_ts = :created_ts'); + conditions.push('attribute_not_exists(channel_metadata.slack_thread_ts)'); } try { await ddb.send(new UpdateCommand({ - TableName: TASK_TABLE, + TableName: tableName, Key: { task_id: taskId }, UpdateExpression: `SET ${updates.join(', ')}`, ExpressionAttributeValues: values, + ConditionExpression: conditions.join(' AND '), })); } catch (err) { - logger.warn('Failed to store task_created message ts', { + if ((err as Error)?.name === 'ConditionalCheckFailedException') { + // Sibling retry won the race or the previous attempt's + // UpdateItem succeeded after we returned. Either way the + // stored ts is authoritative; this attempt's ts is a + // duplicate Slack message that should be deleted to avoid a + // hanging extra root in the channel. Best-effort delete; if + // it fails we log and accept the duplicate. + logger.info('[fanout/slack] task_created persist condition failed (sibling retry) — deleting duplicate', { + event: 'fanout.slack.task_created_dup_delete', + task_id: taskId, + duplicate_ts: result.ts, + }); + const deleted = await deleteMessage(botToken, channel, result.ts); + if (!deleted) { + // The duplicate Slack root is now permanently in the + // thread. Dedicated event key + error_id so operators can + // alarm on the rate of ghost task_created messages + // (PR #79 review #6). + logger.error('[fanout/slack] dup-delete failed — ghost task_created message stays in thread', { + event: 'fanout.slack.dup_delete_failed', + error_id: 'FANOUT_SLACK_DUP_DELETE_FAILED', + task_id: taskId, + event_type: eventType, + duplicate_ts: result.ts, + }); + } + return; + } + logger.warn('[fanout/slack] failed to store task_created message ts', { + event: 'fanout.slack.persist_created_ts_failed', task_id: taskId, error: err instanceof Error ? err.message : String(err), }); @@ -234,13 +418,36 @@ async function processRecord(record: DynamoDBRecord): Promise { } else if (eventType === 'session_started') { try { await ddb.send(new UpdateCommand({ - TableName: TASK_TABLE, + TableName: tableName, Key: { task_id: taskId }, UpdateExpression: 'SET channel_metadata.slack_session_msg_ts = :ts', ExpressionAttributeValues: { ':ts': result.ts }, + // Same retry-hazard guard as task_created above: refuse to + // overwrite a previously-persisted ts so a duplicate + // session message can't leak past terminal cleanup. + ConditionExpression: 'attribute_not_exists(channel_metadata.slack_session_msg_ts)', })); } catch (err) { - logger.warn('Failed to store session message ts', { + if ((err as Error)?.name === 'ConditionalCheckFailedException') { + logger.info('[fanout/slack] session_started persist condition failed (sibling retry) — deleting duplicate', { + event: 'fanout.slack.session_dup_delete', + task_id: taskId, + duplicate_ts: result.ts, + }); + const deleted = await deleteMessage(botToken, channel, result.ts); + if (!deleted) { + logger.error('[fanout/slack] dup-delete failed — ghost session_started message stays in thread', { + event: 'fanout.slack.dup_delete_failed', + error_id: 'FANOUT_SLACK_DUP_DELETE_FAILED', + task_id: taskId, + event_type: eventType, + duplicate_ts: result.ts, + }); + } + return; + } + logger.warn('[fanout/slack] failed to store session message ts', { + event: 'fanout.slack.persist_session_ts_failed', task_id: taskId, error: err instanceof Error ? err.message : String(err), }); @@ -248,18 +455,50 @@ async function processRecord(record: DynamoDBRecord): Promise { } } - // On terminal events, clean up intermediate messages — only the final - // result message stays in the thread. if (TERMINAL_EVENTS.has(eventType)) { - if (channelMeta.slack_session_msg_ts) { - await deleteMessage(botToken, channel, channelMeta.slack_session_msg_ts); + // Re-read the task record before terminal cleanup. The + // ``channelMeta`` snapshot above was captured at dispatch entry — + // by the time we reach a terminal event, the orchestrator-emitted + // ``task_created`` and ``session_started`` events have run on + // earlier stream batches and persisted their ``slack_*_msg_ts`` + // attributes through conditional UpdateItems. On fast tasks + // (~30s) the terminal event can land **before** those persists + // have propagated to a new GetItem, so the initial read sees a + // stale channel_metadata with no msg_ts attributes — and the + // cleanup below silently does nothing, leaving the 🚀 task_created + // message orphaned in the thread (observed in PR #79 dev-stack + // verification). The fresh read closes that window: by the time + // we get here, the dedup write above (which lands in the same + // table) has linearized our view, so any prior persists are now + // visible. + let latestChannelMeta: TaskRecord['channel_metadata'] = channelMeta; + try { + const refreshed = await ddb.send(new GetCommand({ + TableName: tableName, + Key: { task_id: taskId }, + })); + const refreshedTask = refreshed.Item as TaskRecord | undefined; + latestChannelMeta = refreshedTask?.channel_metadata ?? channelMeta; + } catch (err) { + // Best-effort: a GetItem failure here means we fall back to + // the original snapshot. Log so operators can see the + // refresh-rate vs cleanup-skip-rate gap. + logger.warn('[fanout/slack] terminal cleanup re-read failed — falling back to dispatch-entry snapshot', { + event: 'fanout.slack.cleanup_reread_failed', + task_id: taskId, + error: err instanceof Error ? err.message : String(err), + }); + } + if (latestChannelMeta?.slack_session_msg_ts) { + await deleteMessage(botToken, channel, latestChannelMeta.slack_session_msg_ts); } - if (channelMeta.slack_created_msg_ts) { - await deleteMessage(botToken, channel, channelMeta.slack_created_msg_ts); + if (latestChannelMeta?.slack_created_msg_ts) { + await deleteMessage(botToken, channel, latestChannelMeta.slack_created_msg_ts); } } - logger.info('Slack notification sent', { + logger.info('[fanout/slack] notification sent', { + event: 'fanout.slack.dispatched', task_id: taskId, event_type: eventType, team_id: channelMeta.slack_team_id, @@ -267,7 +506,13 @@ async function processRecord(record: DynamoDBRecord): Promise { }); } -/** Map event types to the emoji reaction that should be on the original message. */ +/** Map event types to the emoji reaction that should be on the original + * message. ``task_stranded`` reuses ``x`` — operators see a stranded + * task as a failure mode with the same visual weight. ``agent_error`` + * is a non-terminal alert: keep the watching ``eyes`` reaction so the + * user sees the warning but knows the agent is still working. + * ``pr_created`` is a non-terminal milestone: leave reactions alone + * (no entry → updateReaction returns immediately). */ const EVENT_REACTIONS: Record = { task_created: 'eyes', session_started: 'hourglass_flowing_sand', @@ -275,6 +520,7 @@ const EVENT_REACTIONS: Record = { task_failed: 'x', task_cancelled: 'no_entry_sign', task_timed_out: 'hourglass', + task_stranded: 'x', }; /** Reactions to remove when transitioning to a new state. */ @@ -292,10 +538,27 @@ async function addReaction(botToken: string, channel: string, timestamp: string, }); const result = await response.json() as { ok: boolean; error?: string }; if (!result.ok && result.error !== 'already_reacted') { - logger.warn('Failed to add Slack reaction', { emoji, error: result.error }); + // API-level rejection: per-message UX problem (channel locked, + // emoji unknown). Stays at warn — operators don't page. + logger.warn('[fanout/slack] failed to add reaction', { + event: 'fanout.slack.reaction_add_api_error', + emoji, + error: result.error, + }); } } catch (err) { - logger.warn('Error adding Slack reaction', { emoji, error: err instanceof Error ? err.message : String(err) }); + // Network / DNS / TLS / timeout / SyntaxError — infra class. + // Promote to error with a dedicated event key so the rate of + // network failures has its own alarmable signal, distinct from + // API-level rejections (PR #79 review #5). User-visible symptom + // when this fires unnoticed: stale ⏳ emoji never swaps to ✅. + logger.error('[fanout/slack] network error adding reaction', { + event: 'fanout.slack.reaction_add_network_error', + error_id: 'FANOUT_SLACK_REACTION_NETWORK', + emoji, + error_name: err instanceof Error ? err.name : undefined, + error: err instanceof Error ? err.message : String(err), + }); } } @@ -311,18 +574,29 @@ async function removeReaction(botToken: string, channel: string, timestamp: stri }); const result = await response.json() as { ok: boolean; error?: string }; if (!result.ok && result.error !== 'no_reaction') { - logger.warn('Failed to remove Slack reaction', { emoji, error: result.error }); + logger.warn('[fanout/slack] failed to remove reaction', { + event: 'fanout.slack.reaction_remove_api_error', + emoji, + error: result.error, + }); } } catch (err) { - logger.warn('Error removing Slack reaction', { emoji, error: err instanceof Error ? err.message : String(err) }); + // See addReaction — network failures get their own ``error_id`` + // so operators can alarm on stale-emoji rate distinctly from + // Slack API rejections. + logger.error('[fanout/slack] network error removing reaction', { + event: 'fanout.slack.reaction_remove_network_error', + error_id: 'FANOUT_SLACK_REACTION_NETWORK', + emoji, + error_name: err instanceof Error ? err.name : undefined, + error: err instanceof Error ? err.message : String(err), + }); } } async function updateReaction(botToken: string, channel: string, threadTs: string, eventType: string): Promise { const newEmoji = EVENT_REACTIONS[eventType]; if (!newEmoji) return; - - // Remove stale reactions first, then add the new one. for (const stale of STALE_REACTIONS) { if (stale !== newEmoji) { await removeReaction(botToken, channel, threadTs, stale); @@ -331,7 +605,12 @@ async function updateReaction(botToken: string, channel: string, threadTs: strin await addReaction(botToken, channel, threadTs, newEmoji); } -async function deleteMessage(botToken: string, channel: string, messageTs: string): Promise { +/** Returns ``true`` iff the message was successfully deleted (or was + * already gone — ``message_not_found`` is benign). Callers that care + * about the outcome (the conditional-persist dup-delete path) can + * emit a ``fanout.slack.dup_delete_failed`` event so operators can + * alarm on accumulating ghost messages (PR #79 review #6). */ +async function deleteMessage(botToken: string, channel: string, messageTs: string): Promise { try { const response = await fetch('https://slack.com/api/chat.delete', { method: 'POST', @@ -343,22 +622,32 @@ async function deleteMessage(botToken: string, channel: string, messageTs: strin }); const result = await response.json() as { ok: boolean; error?: string }; if (!result.ok) { - logger.warn('Failed to delete session message', { error: result.error }); + // ``message_not_found`` is benign (message already gone) and is + // treated as a successful delete by the caller's perspective. + // Anything else (e.g. ``cant_delete_message``) leaves an orphan + // in the thread. + if (result.error === 'message_not_found') { + return true; + } + logger.warn('[fanout/slack] failed to delete intermediate message', { + event: 'fanout.slack.message_delete_api_error', + error: result.error, + message_ts: messageTs, + }); + return false; } + return true; } catch (err) { - logger.warn('Error deleting session message', { error: err instanceof Error ? err.message : String(err) }); - } -} - -function safeJsonParse(text: string, context?: Record): Record | null { - try { - return JSON.parse(text); - } catch (err) { - logger.warn('Failed to parse event metadata JSON', { - ...context, + // Network failure → orphan message stays in the thread silently. + // Promote to error so operators can alarm on the orphan rate + // (PR #79 review #5). + logger.error('[fanout/slack] network error deleting intermediate message', { + event: 'fanout.slack.message_delete_network_error', + error_id: 'FANOUT_SLACK_DELETE_NETWORK', + message_ts: messageTs, + error_name: err instanceof Error ? err.name : undefined, error: err instanceof Error ? err.message : String(err), - preview: text.length > 200 ? `${text.slice(0, 200)}...` : text, }); - return null; + return false; } } diff --git a/cdk/src/stacks/agent.ts b/cdk/src/stacks/agent.ts index 7d5c3d08..92683bf5 100644 --- a/cdk/src/stacks/agent.ts +++ b/cdk/src/stacks/agent.ts @@ -21,7 +21,7 @@ import * as path from 'path'; import * as agentcore from '@aws-cdk/aws-bedrock-agentcore-alpha'; import * as bedrock from '@aws-cdk/aws-bedrock-alpha'; import * as agentcoremixins from '@aws-cdk/mixins-preview/aws-bedrockagentcore'; -import { Stack, StackProps, RemovalPolicy, CfnOutput, CfnResource, Duration, Fn, Lazy } from 'aws-cdk-lib'; +import { ArnFormat, Stack, StackProps, RemovalPolicy, CfnOutput, CfnResource, Duration, Fn, Lazy } from 'aws-cdk-lib'; import * as ec2 from 'aws-cdk-lib/aws-ec2'; // ecr_assets import is only needed when the ECS block below is uncommented // import * as ecr_assets from 'aws-cdk-lib/aws-ecr-assets'; @@ -494,13 +494,24 @@ export class AgentStack extends Stack { // --- Fan-out plane consumer --- // Consumes TaskEventsTable DynamoDB Streams and dispatches events to // Slack / GitHub / email per per-channel default filters. GitHub - // dispatcher (Chunk J) edits a single issue comment in place with - // If-Match ETag; Slack / Email remain log-only until Phase 2. + // dispatcher edits a single issue comment in place; Slack + // dispatcher (issue #64) reads per-workspace bot tokens from + // ``bgagent/slack/*``. Email remains a log-only stub until Phase 2. new FanOutConsumer(this, 'FanOutConsumer', { taskEventsTable: taskEventsTable.table, taskTable: taskTable.table, repoTable: repoTable.table, githubTokenSecret, + // Slack bot-token grant is guarded on this prop — pass the + // ``bgagent/slack/*`` prefix so the FanOutConsumer can read + // workspace tokens. Same scope SlackIntegration uses for its + // own writers (PR #79 review #2). + slackSecretArnPattern: Stack.of(this).formatArn({ + service: 'secretsmanager', + resource: 'secret', + resourceName: 'bgagent/slack/*', + arnFormat: ArnFormat.COLON_RESOURCE_NAME, + }), }); // --- Operator dashboard --- diff --git a/cdk/test/constructs/fanout-consumer.test.ts b/cdk/test/constructs/fanout-consumer.test.ts new file mode 100644 index 00000000..175f15f8 --- /dev/null +++ b/cdk/test/constructs/fanout-consumer.test.ts @@ -0,0 +1,173 @@ +/** + * MIT No Attribution + * + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of + * the Software without restriction, including without limitation the rights to + * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of + * the Software, and to permit persons to whom the Software is furnished to do so. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +// PR #79 test gap #34. The construct shipped on issue #64 with no +// unit-level coverage of its IAM contract — the only synth-level +// signal lived inside slack-integration.test.ts ("0 EventSourceMapping") +// which proved the migration didn't regress the OTHER construct. +// These tests pin the FanOutConsumer's own surface: the Slack secret +// grant must be guarded by ``slackSecretArnPattern`` (review #2), the +// stream binding must exist, and the DLQ must be wired. + +import { App, Stack } from 'aws-cdk-lib'; +import { Match, Template } from 'aws-cdk-lib/assertions'; +import * as dynamodb from 'aws-cdk-lib/aws-dynamodb'; +import { FanOutConsumer } from '../../src/constructs/fanout-consumer'; + +function makeTaskEventsTable(stack: Stack): dynamodb.Table { + return new dynamodb.Table(stack, 'TaskEventsTable', { + partitionKey: { name: 'task_id', type: dynamodb.AttributeType.STRING }, + sortKey: { name: 'event_id', type: dynamodb.AttributeType.STRING }, + stream: dynamodb.StreamViewType.NEW_IMAGE, + }); +} + +describe('FanOutConsumer', () => { + test('attaches a single DynamoEventSource on the TaskEventsTable stream', () => { + const app = new App(); + const stack = new Stack(app, 'TestStack'); + new FanOutConsumer(stack, 'FanOut', { + taskEventsTable: makeTaskEventsTable(stack), + }); + const template = Template.fromStack(stack); + + // Exactly one event-source mapping — the architectural invariant + // issue #64 was about. Adding a second consumer to TaskEventsTable + // must fail this test loudly. + template.resourceCountIs('AWS::Lambda::EventSourceMapping', 1); + template.hasResourceProperties('AWS::Lambda::EventSourceMapping', { + StartingPosition: 'LATEST', + // Larger batch size than the old SlackNotifyFn (10) because the + // dispatcher fans out across channels — fewer Lambda + // invocations, better throughput. + BatchSize: 100, + MaximumRetryAttempts: 3, + FunctionResponseTypes: ['ReportBatchItemFailures'], + }); + }); + + test('creates a DLQ for the fanout Lambda', () => { + const app = new App(); + const stack = new Stack(app, 'TestStack'); + new FanOutConsumer(stack, 'FanOut', { + taskEventsTable: makeTaskEventsTable(stack), + }); + const template = Template.fromStack(stack); + + template.resourceCountIs('AWS::SQS::Queue', 1); + template.hasResourceProperties('AWS::SQS::Queue', { + MessageRetentionPeriod: 14 * 24 * 60 * 60, // 14 days + }); + }); + + test('omits the bgagent/slack/* grant when slackSecretArnPattern is not provided (PR #79 review #2)', () => { + // Pre-fix: the policy attached unconditionally so dev stacks + // without Slack onboarding accumulated a dangling IAM permission. + // Post-fix: the policy only attaches when the prop is set, so + // construct consumers stay symmetric with taskTable / repoTable / + // githubTokenSecret (also guarded by their respective props). + const app = new App(); + const stack = new Stack(app, 'TestStack'); + new FanOutConsumer(stack, 'FanOut', { + taskEventsTable: makeTaskEventsTable(stack), + // intentionally no slackSecretArnPattern + }); + const template = Template.fromStack(stack); + + // Iterate every IAM::Policy and assert NONE of them grant + // ``secretsmanager:GetSecretValue`` on a ``bgagent/slack/*`` ARN. + const policies = template.findResources('AWS::IAM::Policy'); + for (const policy of Object.values(policies)) { + const stmts = (policy as { Properties?: { PolicyDocument?: { Statement?: unknown[] } } }) + .Properties?.PolicyDocument?.Statement ?? []; + for (const stmt of stmts) { + const action = (stmt as { Action?: unknown }).Action; + const resource = JSON.stringify((stmt as { Resource?: unknown }).Resource ?? ''); + const isSlackSecretGrant = + (action === 'secretsmanager:GetSecretValue' + || (Array.isArray(action) && action.includes('secretsmanager:GetSecretValue'))) + && resource.includes('bgagent/slack'); + expect(isSlackSecretGrant).toBe(false); + } + } + }); + + test('attaches the bgagent/slack/* grant only when slackSecretArnPattern is provided', () => { + const app = new App(); + const stack = new Stack(app, 'TestStack'); + new FanOutConsumer(stack, 'FanOut', { + taskEventsTable: makeTaskEventsTable(stack), + slackSecretArnPattern: + 'arn:aws:secretsmanager:us-east-1:111122223333:secret:bgagent/slack/*', + }); + const template = Template.fromStack(stack); + + template.hasResourceProperties('AWS::IAM::Policy', { + PolicyDocument: { + Statement: Match.arrayWith([ + Match.objectLike({ + Action: 'secretsmanager:GetSecretValue', + Effect: 'Allow', + Resource: Match.stringLikeRegexp('bgagent/slack/\\*'), + }), + ]), + }, + }); + }); + + test('passes TASK_TABLE_NAME env var when taskTable is provided', () => { + // The Slack dispatcher requires this env var (review #3); the + // construct must wire it from the prop. Its absence triggers the + // FANOUT_SLACK_MISSING_TASK_TABLE error on dispatch. + const app = new App(); + const stack = new Stack(app, 'TestStack'); + const taskTable = new dynamodb.Table(stack, 'TaskTable', { + partitionKey: { name: 'task_id', type: dynamodb.AttributeType.STRING }, + }); + new FanOutConsumer(stack, 'FanOut', { + taskEventsTable: makeTaskEventsTable(stack), + taskTable, + }); + const template = Template.fromStack(stack); + + template.hasResourceProperties('AWS::Lambda::Function', { + Environment: { + Variables: Match.objectLike({ + TASK_TABLE_NAME: Match.anyValue(), + }), + }, + }); + }); + + test('omits TASK_TABLE_NAME env var when taskTable is not provided (graceful degrade)', () => { + const app = new App(); + const stack = new Stack(app, 'TestStack'); + new FanOutConsumer(stack, 'FanOut', { + taskEventsTable: makeTaskEventsTable(stack), + }); + const template = Template.fromStack(stack); + + const fns = template.findResources('AWS::Lambda::Function'); + for (const fn of Object.values(fns)) { + const vars = ((fn as { Properties?: { Environment?: { Variables?: Record } } }) + .Properties?.Environment?.Variables) ?? {}; + expect(vars.TASK_TABLE_NAME).toBeUndefined(); + } + }); +}); diff --git a/cdk/test/constructs/slack-integration.test.ts b/cdk/test/constructs/slack-integration.test.ts index 33f14a33..c78b4214 100644 --- a/cdk/test/constructs/slack-integration.test.ts +++ b/cdk/test/constructs/slack-integration.test.ts @@ -57,9 +57,10 @@ describe('SlackIntegration construct', () => { template.resourceCountIs('AWS::DynamoDB::Table', 4); }); - test('creates 7 Lambda functions', () => { - // oauth-callback, events, commands, command-processor, link, notify, interactions - template.resourceCountIs('AWS::Lambda::Function', 7); + test('creates 6 Lambda functions', () => { + // oauth-callback, events, commands, command-processor, link, interactions + // (issue #64: notify migrated onto FanOutConsumer as a dispatcher) + template.resourceCountIs('AWS::Lambda::Function', 6); }); test('creates API Gateway resources under /slack', () => { @@ -93,15 +94,15 @@ describe('SlackIntegration construct', () => { }); }); - test('notification handler has DynamoDB Streams event source', () => { - template.hasResourceProperties('AWS::Lambda::EventSourceMapping', { - EventSourceArn: Match.anyValue(), - StartingPosition: 'LATEST', - BatchSize: 10, - MaximumBatchingWindowInSeconds: 0, - MaximumRetryAttempts: 3, - BisectBatchOnFunctionError: true, - }); + test('construct no longer owns a TaskEventsTable stream consumer (issue #64)', () => { + // Before issue #64 this construct owned ``SlackNotifyFn`` plus its + // own ``DynamoEventSource`` on ``TaskEventsTable``. Outbound Slack + // delivery now runs through ``FanOutConsumer`` as a per-channel + // dispatcher so ``TaskEventsTable`` stays within the DynamoDB + // one-reader-per-shard practical limit. The assertion pins that the + // migration did not regress: this construct must not create any + // ``AWS::Lambda::EventSourceMapping`` at all. + template.resourceCountIs('AWS::Lambda::EventSourceMapping', 0); }); test('creates 3 Secrets Manager secrets for Slack App credentials', () => { diff --git a/cdk/test/handlers/fanout-task-events.test.ts b/cdk/test/handlers/fanout-task-events.test.ts index 14b15119..c8b97ab1 100644 --- a/cdk/test/handlers/fanout-task-events.test.ts +++ b/cdk/test/handlers/fanout-task-events.test.ts @@ -70,6 +70,29 @@ jest.mock('../../src/handlers/shared/context-hydration', () => ({ clearTokenCache: () => mockClearTokenCache(), })); +// Issue #64: SlackNotifyFn migrated onto FanOutConsumer as a dispatcher. +// The dispatcher calls into ``slack-notify.ts::dispatchSlackEvent``; we +// mock that here so the fanout tests focus on routing invariants and +// leave the per-dispatcher Slack behaviour to ``slack-notify.test.ts``. +// Exposing the mock + the tagged ``SlackApiError`` class lets routing +// tests drive the two observable outcomes the dispatcher produces +// (resolve → ``fanout.slack.dispatched``; reject with SlackApiError → +// router-level ``fanout.slack.api_error`` warn without dispatcher +// rejection). +const mockDispatchSlackEvent: jest.Mock = jest.fn(); +jest.mock('../../src/handlers/slack-notify', () => { + class SlackApiError extends Error { + constructor(message: string) { + super(message); + this.name = 'SlackApiError'; + } + } + return { + dispatchSlackEvent: (ev: unknown, ddb: unknown) => mockDispatchSlackEvent(ev, ddb), + SlackApiError, + }; +}); + process.env.TASK_TABLE_NAME = 'Tasks'; process.env.GITHUB_TOKEN_SECRET_ARN = 'arn:aws:secretsmanager:us-east-1:0:secret:platform'; @@ -138,15 +161,20 @@ describe('fanout-task-events: shouldFanOut filter (union of per-channel defaults timestamp: '2026-04-22T04:00:00Z', }); - // Rev-6 design §6.2: chattier event types (task_created, agent_milestone) - // are intentionally dropped from defaults so users don't mute integrations - // on day one. The ``--verbose`` opt-in (Chunk K follow-up) will re-enable - // milestone delivery. + // Rev-6 design §6.2 + issue #64: the Slack dispatcher is the only + // channel that consumes ``task_created`` / ``session_started`` / + // ``task_timed_out`` — it gates further on ``channel_source === + // 'slack'`` so the extra lifecycle signals never reach API / webhook + // / Linear tasks. That extra gate lives inside the dispatcher, so at + // the filter layer these events now fan out. test.each([ 'task_failed', 'task_completed', 'task_cancelled', 'task_stranded', + 'task_timed_out', // Slack lifecycle (issue #64) + 'task_created', // Slack lifecycle (issue #64) + 'session_started', // Slack lifecycle (issue #64) 'agent_error', 'pr_created', 'approval_required', // Phase 3 forward-compat @@ -156,7 +184,6 @@ describe('fanout-task-events: shouldFanOut filter (union of per-channel defaults }); test.each([ - 'task_created', // intentionally dropped in rev-6 defaults // Bare ``agent_milestone`` (no ``metadata.milestone``) stays // dropped; wrapped milestones on the ``ROUTABLE_MILESTONES`` // allowlist route by name — see the agent_milestone routing @@ -166,7 +193,6 @@ describe('fanout-task-events: shouldFanOut filter (union of per-channel defaults 'agent_tool_call', 'agent_tool_result', 'agent_cost_update', - 'session_started', 'hydration_started', 'hydration_complete', 'admission_rejected', @@ -179,18 +205,48 @@ describe('fanout-task-events: shouldFanOut filter (union of per-channel defaults describe('fanout-task-events: per-channel filter contract (design §6.2)', () => { // Lock in the exact sets from the design doc so a drift in // CHANNEL_DEFAULTS surfaces here instead of in production telemetry. - test('Slack subscribes to terminal + PR + error + approval + status_response', () => { + test('Slack subscribes to terminal + error + approval + status_response + lifecycle (NOT pr_created)', () => { const f = CHANNEL_DEFAULTS.slack; expect([...f].sort()).toEqual([ 'agent_error', 'approval_required', - 'pr_created', + 'session_started', 'status_response', 'task_cancelled', 'task_completed', + 'task_created', 'task_failed', 'task_stranded', + 'task_timed_out', ]); + // ``pr_created`` is deliberately NOT in the Slack default — the + // ``task_completed`` message already renders a "View PR" button + // with the same URL, and posting both would visually duplicate. + // GitHub keeps ``pr_created`` because the edit-in-place comment + // benefits from the early checkpoint. + expect(f.has('pr_created')).toBe(false); + }); + + test('every Slack-default event the dispatcher actually renders today is in NOTIFIABLE_EVENTS (issue #64 review Cat 7 drift guard)', () => { + // The router subscribes Slack to events the dispatcher must + // render. ``approval_required`` and ``status_response`` are + // forward-compat (no emitter today) so they're allowed to be in + // CHANNEL_DEFAULTS.slack but absent from NOTIFIABLE_EVENTS — when + // their emitters land, this test will start failing and force the + // dispatcher update at the same time. Every OTHER Slack default + // must be renderable, otherwise telemetry lies. Use + // ``requireActual`` to bypass the slack-notify mock and read the + // real exported NOTIFIABLE_EVENTS set. + const real = jest.requireActual( + '../../src/handlers/slack-notify', + ); + const forwardCompat = new Set(['approval_required', 'status_response']); + const expectedRenderable = [...CHANNEL_DEFAULTS.slack].filter( + e => !forwardCompat.has(e), + ); + for (const eventType of expectedRenderable) { + expect(real.NOTIFIABLE_EVENTS.has(eventType)).toBe(true); + } }); test('Email subscribes to task_completed + task_failed + approval_required only (minimal per §6.2)', () => { @@ -297,61 +353,66 @@ describe('fanout-task-events: routeEvent (per-channel dispatch)', () => { }); test('task_completed routes to all three channels', async () => { - const channels = await routeEvent(mk('task_completed')); - expect(channels.sort()).toEqual(['email', 'github', 'slack']); + const outcome = await routeEvent(mk('task_completed')); + expect([...outcome.dispatched].sort()).toEqual(['email', 'github', 'slack']); + expect(outcome.infraRejections).toEqual([]); }); test('task_cancelled skips Email per §6.2 (only Slack + GitHub)', async () => { // Regression guard against accidentally folding cancelled+stranded // into Email via a shared TERMINAL spread — design says Email is // minimal (task_completed, task_failed, approval_required only). - const channels = await routeEvent(mk('task_cancelled')); - expect(channels.sort()).toEqual(['github', 'slack']); + const outcome = await routeEvent(mk('task_cancelled')); + expect([...outcome.dispatched].sort()).toEqual(['github', 'slack']); }); test('task_stranded skips Email per §6.2', async () => { - const channels = await routeEvent(mk('task_stranded')); - expect(channels.sort()).toEqual(['github', 'slack']); + const outcome = await routeEvent(mk('task_stranded')); + expect([...outcome.dispatched].sort()).toEqual(['github', 'slack']); }); test('agent_error routes only to Slack', async () => { - const channels = await routeEvent(mk('agent_error')); - expect(channels).toEqual(['slack']); + const outcome = await routeEvent(mk('agent_error')); + expect(outcome.dispatched).toEqual(['slack']); }); - test('pr_created routes to Slack + GitHub but not Email', async () => { - const channels = await routeEvent(mk('pr_created')); - expect(channels.sort()).toEqual(['github', 'slack']); + test('pr_created routes to GitHub only (not Slack — task_completed already carries View PR)', async () => { + const outcome = await routeEvent(mk('pr_created')); + expect(outcome.dispatched).toEqual(['github']); + expect(outcome.dispatched).not.toContain('slack'); }); test('event with no subscribers returns an empty channel list', async () => { // ``agent_milestone`` is not in any channel's default — routing // must produce an empty list so the handler records dispatched=0. - const channels = await routeEvent(mk('agent_milestone')); - expect(channels).toEqual([]); + const outcome = await routeEvent(mk('agent_milestone')); + expect(outcome.dispatched).toEqual([]); + expect(outcome.infraRejections).toEqual([]); }); test('per-task override silences one channel without affecting others', async () => { const overrides: TaskNotificationsConfig = { slack: { enabled: false } }; - const channels = await routeEvent(mk('task_completed'), overrides); - expect(channels.sort()).toEqual(['email', 'github']); - expect(channels).not.toContain('slack'); + const outcome = await routeEvent(mk('task_completed'), overrides); + expect([...outcome.dispatched].sort()).toEqual(['email', 'github']); + expect(outcome.dispatched).not.toContain('slack'); }); }); describe('fanout-task-events: channel isolation', () => { test('one channel rejecting does NOT prevent the others from dispatching', async () => { - // Simulate a Slack-side failure by making the Slack dispatcher's - // inner ``logger.info`` throw, which escapes its own try-block via - // the caught-and-rethrown path in the stub. The router's - // ``Promise.allSettled`` must record Slack as rejected while - // Email + GitHub complete normally. The assertions verify two - // independent signals: - // (1) the other two dispatchers' stub log calls actually ran - // (proving the work was done, not just that the router - // reported success) + // Simulate a Slack infra failure by making the dispatchSlackEvent + // mock reject with a non-SlackApiError throw (SlackApiError would be + // swallowed at the dispatcher boundary — see the SlackApiError + // suppression test below). The router's ``Promise.allSettled`` must + // record Slack as rejected while Email + GitHub complete normally. + // The assertions verify two independent signals: + // (1) the other two dispatchers' work actually ran (proving the + // channels were attempted, not short-circuited) // (2) Slack is omitted from the ``dispatched`` return so batch // telemetry reflects reality + mockDispatchSlackEvent.mockReset().mockRejectedValueOnce( + Object.assign(new Error('slack infra down'), { name: 'ProvisionedThroughputExceededException' }), + ); const loggerModule = await import('../../src/handlers/shared/logger'); const originalInfo = loggerModule.logger.info.bind(loggerModule.logger); const warnSpy = jest.spyOn(loggerModule.logger, 'warn').mockImplementation(() => undefined); @@ -360,41 +421,97 @@ describe('fanout-task-events: channel isolation', () => { (msg: string, meta?: Record) => { const ev = meta?.event as string | undefined; if (ev) observedEvents.push(ev); - if (ev === 'fanout.slack.dispatch_stub') { - throw new Error('slack is down'); - } return originalInfo(msg, meta); }, ); try { - const channels = await routeEvent({ + const outcome = await routeEvent({ task_id: 't-isol', event_id: 'e-isol', event_type: 'task_completed', timestamp: '2026-04-22T04:00:00Z', }); - // (1) Email actually ran its dispatch path (GitHub short-circuits - // on "task not found" because the shared DDB mock returns no - // Item — that's fine; the key invariant is that one channel's - // failure doesn't block the others). + // (1) Email ran its dispatch path (GitHub short-circuits on + // "task not found" because the shared DDB mock returns no Item — + // that's fine; the key invariant is that one channel's failure + // doesn't block the others). Slack's dispatcher was invoked + // exactly once even though it rejected. expect(observedEvents).toContain('fanout.email.dispatch_stub'); - // Slack also ran (it threw), so its log line was emitted before the throw. - expect(observedEvents).toContain('fanout.slack.dispatch_stub'); + expect(mockDispatchSlackEvent).toHaveBeenCalledTimes(1); // (2) Telemetry truthfulness: Slack must NOT be in ``dispatched`` // because its dispatcher rejected. Email + GitHub are. - expect(channels.sort()).toEqual(['email', 'github']); - expect(channels).not.toContain('slack'); + expect([...outcome.dispatched].sort()).toEqual(['email', 'github']); + expect(outcome.dispatched).not.toContain('slack'); + + // (3) Slack landed in ``infraRejections`` so the handler will + // flag this record for partial-batch retry — the BLOCKER that + // motivated the post-issue-#64 review fix. Without this signal, + // a transient Slack-side DDB throttle would be a permanent drop. + expect(outcome.infraRejections).toEqual(['slack']); // The rejection surfaces in a warn log so operators can alert on it. const warnCalls = warnSpy.mock.calls.map(c => c[1] as Record | undefined); const rejectedWarn = warnCalls.find(meta => meta?.event === 'fanout.dispatcher.rejected'); expect(rejectedWarn).toBeDefined(); expect(rejectedWarn?.channel).toBe('slack'); + // The warn flags the rejection as retryable so operators can + // tell the difference between a noisy infra blip (this) and a + // channel-terminal swallow like ``channel_not_found`` (which + // emits ``fanout.slack.api_error`` instead). + expect(rejectedWarn?.retryable).toBe(true); } finally { infoSpy.mockRestore(); warnSpy.mockRestore(); + mockDispatchSlackEvent.mockReset(); + } + }); + + test('SlackApiError from the dispatcher is swallowed (counted as dispatched)', async () => { + // Slack API errors like ``channel_not_found`` are not recoverable + // by a Lambda retry. The fanout Slack dispatcher catches + // SlackApiError internally and logs ``fanout.slack.api_error`` + // without propagating, so the router treats Slack as dispatched. + // This keeps Lambda from burning retries on a bot-token misroute + // while still surfacing the failure in CloudWatch. + const { SlackApiError } = jest.requireMock( + '../../src/handlers/slack-notify', + ); + mockDispatchSlackEvent.mockReset().mockRejectedValueOnce( + new SlackApiError('slack chat.postMessage failed: channel_not_found'), + ); + const loggerModule = await import('../../src/handlers/shared/logger'); + const warnSpy = jest.spyOn(loggerModule.logger, 'warn').mockImplementation(() => undefined); + try { + const outcome = await routeEvent({ + task_id: 't-api-err', + event_id: 'e-api-err', + event_type: 'task_completed', + timestamp: '2026-05-05T00:00:00Z', + }); + + // Slack is listed as dispatched despite the API error — the + // router never saw a rejection because the dispatcher swallowed + // it. The router-level ``fanout.dispatcher.rejected`` warn must + // NOT fire for this case, and ``infraRejections`` must NOT + // include Slack (so the handler does not push the record into + // ``batchItemFailures`` — Lambda must not waste retries on + // ``channel_not_found``). + expect(outcome.dispatched).toContain('slack'); + expect(outcome.infraRejections).not.toContain('slack'); + const rejected = warnSpy.mock.calls.find( + c => (c[1] as Record | undefined)?.event === 'fanout.dispatcher.rejected', + ); + expect(rejected).toBeUndefined(); + // The swallow was observable via ``fanout.slack.api_error``. + const apiErr = warnSpy.mock.calls.find( + c => (c[1] as Record | undefined)?.event === 'fanout.slack.api_error', + ); + expect(apiErr).toBeDefined(); + } finally { + warnSpy.mockRestore(); + mockDispatchSlackEvent.mockReset(); } }); }); @@ -560,12 +677,21 @@ describe('fanout-task-events: GitHub dispatcher (Chunk J)', () => { expect(mockUpsertTaskComment).not.toHaveBeenCalled(); }); - test('upsertTaskComment rejection does NOT break the batch (routeEvent catches)', async () => { + test('upsertTaskComment rejection escalates to partial-batch retry (post-issue-#64-review)', async () => { + // Pre-fix: this test asserted ``batchItemFailures: []`` because + // the router swallowed any dispatcher rejection. That hid + // transient GitHub 5xxs as permanent drops. After the fix, an + // upsertTaskComment rejection lands in ``infraRejections`` and + // the handler escalates the record for partial-batch retry — + // matching the legacy ``SlackNotifyFn`` semantic where infra + // errors triggered Lambda retry. mockDdbSend.mockResolvedValueOnce({ Item: TASK_RECORD_BASE }); mockUpsertTaskComment.mockRejectedValueOnce(new Error('github 500')); - const event: DynamoDBStreamEvent = { Records: [mkEvent('task_completed', 't-gh')] }; - await expect(handler(event)).resolves.toEqual({ batchItemFailures: [] }); + const event = { Records: [mkEvent('task_completed', 't-gh')] } as DynamoDBStreamEvent; + const result = await handler(event); + expect(result.batchItemFailures).toHaveLength(1); + expect(result.batchItemFailures[0].itemIdentifier).toBe(event.Records[0].eventID); // No UpdateCommand fires (no id to persist from a failed upsert). const updateCalls = mockDdbSend.mock.calls.filter( c => (c[0] as { _type?: string })._type === 'Update', @@ -668,19 +794,63 @@ describe('fanout-task-events: GitHub dispatcher (Chunk J)', () => { ); expect(updateCalls).toHaveLength(0); - // The 400 surfaced as a dispatcher-rejected warn, not as a - // silent swallow. + // Post-issue-#64-review Cat 3 fix: GitHub 4xx (excluding 401 + + // 404 which have dedicated handling) is now treated as a + // **channel-terminal** error. The dispatcher swallows it via a + // dedicated ``fanout.github.api_error`` warn, NOT a generic + // ``fanout.dispatcher.rejected``. This keeps Lambda from + // burning retries on a fundamentally bad request — symmetric + // with the SlackApiError swallow on ``channel_not_found``. + const apiErrWarn = warnSpy.mock.calls.find( + c => (c[1] as Record | undefined)?.event === 'fanout.github.api_error', + ); + expect(apiErrWarn).toBeDefined(); + expect((apiErrWarn?.[1] as Record).http_status).toBe(400); + expect(String((apiErrWarn?.[1] as Record).error)).toContain('HTTP 400'); + + // ``fanout.dispatcher.rejected`` must NOT fire — it is reserved + // for retryable infra rejections under the new contract. const rejectedWarn = warnSpy.mock.calls.find( c => (c[1] as Record | undefined)?.event === 'fanout.dispatcher.rejected', ); - expect(rejectedWarn).toBeDefined(); - expect((rejectedWarn?.[1] as Record).channel).toBe('github'); - expect(String((rejectedWarn?.[1] as Record).error)).toContain('HTTP 400'); + expect(rejectedWarn).toBeUndefined(); } finally { warnSpy.mockRestore(); } }); + test.each([403, 429])( + 'HTTP %s from GitHub escalates to partial-batch retry (rate-limit carve-out, PR #79 review #1)', + async (httpStatus) => { + // 403 ("API rate limit exceeded") and 429 ("Too Many Requests") + // are 4xx but transient. The original migration's blanket 4xx + // swallow would permanently drop entire reconciliation waves + // under sustained rate-limiting. The carve-out re-classifies + // them as infra rejections so the record retries until the + // rate-limit window clears (or DLQs after retryAttempts). + mockDdbSend.mockResolvedValueOnce({ + Item: { ...TASK_RECORD_BASE, github_comment_id: 555 }, + }); + const { GitHubCommentError } = jest.requireMock( + '../../src/handlers/shared/github-comment', + ); + mockUpsertTaskComment.mockRejectedValueOnce( + new GitHubCommentError( + `PATCH /repos/owner/repo/issues/comments/555 failed: HTTP ${httpStatus}`, + httpStatus, + ), + ); + + const record = mkEvent('task_completed', 't-gh'); + const result = await handler({ Records: [record] }); + + // Record IS in batchItemFailures — Lambda will replay until + // the rate-limit window opens. Critical: the swallow-as-terminal + // path would have produced an empty array (silent drop). + expect(result.batchItemFailures).toEqual([{ itemIdentifier: record.eventID }]); + }, + ); + test('falls back to issue_number when pr_number is absent', async () => { // Webhook-submitted issue tasks are the common real-world surface. mockDdbSend @@ -918,6 +1088,127 @@ describe('fanout-task-events: GitHub dispatcher (Chunk J)', () => { }); }); +// --------------------------------------------------------------------------- +// Issue #64 — Slack dispatcher integration (SlackNotifyFn migration) +// --------------------------------------------------------------------------- + +describe('fanout-task-events: Slack dispatcher (issue #64 migration)', () => { + // Confirm the Slack dispatcher is wired into the router and receives + // the parsed FanOutEvent (not a raw DynamoDB stream record). Detailed + // per-behaviour coverage (dedup, thread management, reactions, session + // cleanup) lives in ``slack-notify.test.ts``. + + beforeEach(() => { + mockDdbSend.mockReset().mockResolvedValue({ Item: undefined }); + mockDispatchSlackEvent.mockReset().mockResolvedValue(undefined); + }); + + test('task_completed invokes the Slack dispatcher with the parsed event + shared ddb client', async () => { + const event: DynamoDBStreamEvent = { + Records: [mkEvent('task_completed', 't-slack')], + }; + await handler(event); + + expect(mockDispatchSlackEvent).toHaveBeenCalledTimes(1); + const [parsedEvent, ddbClient] = mockDispatchSlackEvent.mock.calls[0]; + // Event is the pre-parsed FanOutEvent shape, not a raw stream record. + expect(parsedEvent).toMatchObject({ + task_id: 't-slack', + event_type: 'task_completed', + }); + // Routing threads the handler's shared DocumentClient through — + // otherwise every dispatched event would pay a fresh client init. + expect(ddbClient).toBeDefined(); + }); + + test('task_created fans out (Slack lifecycle event re-added for issue #64)', async () => { + // Before #64, ``task_created`` was intentionally dropped at the + // filter layer to keep integrations quiet by default. The Slack + // dispatcher now gates further on ``channel_source === 'slack'``, + // so re-admitting it at the filter is safe. + const event: DynamoDBStreamEvent = { + Records: [mkEvent('task_created', 't-slack-created')], + }; + await handler(event); + + expect(mockDispatchSlackEvent).toHaveBeenCalledTimes(1); + expect(mockDispatchSlackEvent.mock.calls[0][0].event_type).toBe('task_created'); + }); + + test('session_started and task_timed_out reach the Slack dispatcher', async () => { + const event: DynamoDBStreamEvent = { + Records: [ + mkEvent('session_started', 't-slack-ss'), + mkEvent('task_timed_out', 't-slack-to'), + ], + }; + await handler(event); + + const types = mockDispatchSlackEvent.mock.calls.map(c => c[0].event_type); + expect(types.sort()).toEqual(['session_started', 'task_timed_out']); + }); + + test('Slack dispatcher infra rejection escalates record to partial-batch retry', async () => { + // Post-issue-#64-review BLOCKER fix: an infra error inside the + // Slack dispatcher (DDB throttling on the task GetItem, Secrets + // Manager 5xx, transient Slack API timeout) must NOT be silently + // dropped. The handler routes the rejection through the new + // ``infraRejections`` channel and pushes the record into + // ``batchItemFailures`` so Lambda retries it. Without this, the + // migration would lose the legacy ``SlackNotifyFn`` retry + // semantic. + mockDispatchSlackEvent.mockReset().mockRejectedValueOnce( + new Error('slack side ddb throttled'), + ); + + const record = mkEvent('task_completed', 't-slack-fail'); + const result = await handler({ Records: [record] }); + expect(result.batchItemFailures).toEqual([{ itemIdentifier: record.eventID }]); + }); + + test('Slack dispatcher SlackApiError swallow does NOT escalate to retry', async () => { + // The other side of the boundary: ``channel_not_found`` and + // similar terminal Slack API errors are wrapped in SlackApiError + // and swallowed inside ``dispatchToSlack``. The router never sees + // the rejection so the record advances cleanly. Pinning this + // distinction prevents a future "let's just retry everything" + // refactor from burning Lambda retries on channel_not_found. + const { SlackApiError } = jest.requireMock( + '../../src/handlers/slack-notify', + ); + mockDispatchSlackEvent.mockReset().mockRejectedValueOnce( + new SlackApiError('slack chat.postMessage failed: channel_not_found'), + ); + + const event: DynamoDBStreamEvent = { + Records: [mkEvent('task_completed', 't-slack-terminal')], + }; + await expect(handler(event)).resolves.toEqual({ batchItemFailures: [] }); + }); + + test('SlackApiError matched by name even when instanceof fails (PR #79 review #7)', async () => { + // Defense-in-depth: if a bundler ever duplicates the slack-notify + // module, two distinct SlackApiError classes coexist and + // ``instanceof`` against one fails for instances of the other. + // The dispatcher must fall back to ``err.name === 'SlackApiError'`` + // so a duplicated-class scenario doesn't flip the channel-terminal + // swallow into an infinite retry loop. Synthesise that exact + // shape: a plain Error with name === 'SlackApiError', NOT an + // instance of the mock's SlackApiError class. + const fakeForeignSlackApiError = new Error( + 'slack chat.postMessage failed: not_authed', + ); + fakeForeignSlackApiError.name = 'SlackApiError'; + mockDispatchSlackEvent.mockReset().mockRejectedValueOnce(fakeForeignSlackApiError); + + const event: DynamoDBStreamEvent = { + Records: [mkEvent('task_completed', 't-slack-foreign-class')], + }; + // Must still be caught — record advances, no batchItemFailures. + await expect(handler(event)).resolves.toEqual({ batchItemFailures: [] }); + }); +}); + // --------------------------------------------------------------------------- // Scenario 7-extended — agent_milestone routing regression // --------------------------------------------------------------------------- @@ -954,7 +1245,8 @@ describe('fanout-task-events: agent_milestone routing (effective event type)', ( }); test('shouldFanOut unwraps agent_milestone to its milestone name', () => { - // ``pr_created`` is in Slack + GitHub defaults → fan out. + // ``pr_created`` is in the GitHub default → fan out (Slack + // explicitly excludes pr_created; see CHANNEL_DEFAULTS comment). expect(shouldFanOut(makeMilestone('pr_created'))).toBe(true); }); @@ -1003,16 +1295,16 @@ describe('fanout-task-events: agent_milestone routing (effective event type)', ( expect(shouldFanOut(colliding)).toBe(false); }); - test('routeEvent dispatches agent_milestone(pr_created) to Slack + GitHub, not Email', async () => { - const channels = await routeEvent(makeMilestone('pr_created')); - expect(channels.sort()).toEqual(['github', 'slack']); + test('routeEvent dispatches agent_milestone(pr_created) to GitHub only (Slack opted out to avoid duplicate View PR)', async () => { + const outcome = await routeEvent(makeMilestone('pr_created')); + expect(outcome.dispatched).toEqual(['github']); }); test('routeEvent drops agent_milestone(agent_turn-like) that no channel subscribes to', async () => { // ``nudge_acknowledged`` is in no channel default today. Must // still route cleanly (empty list) rather than throw. - const channels = await routeEvent(makeMilestone('nudge_acknowledged')); - expect(channels).toEqual([]); + const outcome = await routeEvent(makeMilestone('nudge_acknowledged')); + expect(outcome.dispatched).toEqual([]); }); test('handler dispatches GitHub comment on agent_milestone(pr_created) stream record', async () => { @@ -1112,15 +1404,17 @@ describe('fanout-task-events: partial-batch response (findings #1 + #5)', () => mockClearTokenCache.mockReset(); }); - test('AccessDeniedException from resolveTokenSecretArn stays isolated via allSettled; batch still succeeds (finding #5 today)', async () => { - // Baseline: today's ``routeEvent`` catches the AccessDenied throw - // via ``Promise.allSettled`` so it surfaces as a - // ``fanout.dispatcher.rejected`` warn, NOT as a handler-level - // throw. The structured response is therefore an empty - // ``batchItemFailures`` — the record advances past the cursor. - // This test pins the current containment so a future change that - // accidentally rethrows past ``allSettled`` will flip it from - // "empty failures" to "one failure" and fail loudly here. + test('AccessDeniedException from resolveTokenSecretArn lands in infraRejections and flags the record for retry', async () => { + // Pre-issue-#64-review: this test asserted ``batchItemFailures: []`` + // because ``Promise.allSettled`` swallowed the rejection — that + // pinned a real BLOCKER (transient infra errors silently dropped). + // After the fix, the dispatcher's rejection lands in + // ``infraRejections`` and the handler escalates it to the partial- + // batch retry path. AccessDenied is technically a hard configuration + // failure (not transient), but treating it as retryable is correct + // — operators will see the record stuck in retry and the warn rate + // climbing on ``fanout.dispatcher.rejected`` until they fix the + // IAM policy. Silently dropping was the worse failure mode. const loggerModule = await import('../../src/handlers/shared/logger'); const warnSpy = jest.spyOn(loggerModule.logger, 'warn').mockImplementation(() => undefined); try { @@ -1149,16 +1443,19 @@ describe('fanout-task-events: partial-batch response (findings #1 + #5)', () => const result = await handler(event); - // Containment invariant: ``Promise.allSettled`` caught the - // rejection; the handler sees no throw. - expect(result).toEqual({ batchItemFailures: [] }); - // … but the rejection WAS observed by operators through the - // dispatcher-rejected warn (existing coverage path). + // Record is flagged for partial-batch retry — Lambda will replay + // this single eventID, leaving siblings alone. + expect(result.batchItemFailures).toEqual([{ itemIdentifier: poisonId }]); + + // The rejection is observable through the dispatcher-rejected + // warn so operators can alarm distinctly from the generic + // record-failed warn. const rejectedWarn = warnSpy.mock.calls.find( c => (c[1] as Record | undefined)?.event === 'fanout.dispatcher.rejected', ); expect(rejectedWarn).toBeDefined(); expect((rejectedWarn?.[1] as Record).channel).toBe('github'); + expect((rejectedWarn?.[1] as Record).retryable).toBe(true); } finally { warnSpy.mockRestore(); } diff --git a/cdk/test/handlers/shared/slack-blocks.test.ts b/cdk/test/handlers/shared/slack-blocks.test.ts index 194a18af..6fc8eaad 100644 --- a/cdk/test/handlers/shared/slack-blocks.test.ts +++ b/cdk/test/handlers/shared/slack-blocks.test.ts @@ -126,4 +126,80 @@ describe('renderSlackBlocks', () => { const msg = renderSlackBlocks('task_completed', task); expect(sectionText(msg.blocks[0])).toContain('2m 5s'); }); + + // ------------------------------------------------------------------- + // PR #79 test gap #32 — task_stranded + agent_error renderers + // ------------------------------------------------------------------- + + test('renders task_stranded message with prior_status from event metadata', () => { + // The reconciler stamps ``code: STRANDED_NO_HEARTBEAT`` and + // ``prior_status`` on the event metadata (see + // reconcile-stranded-tasks.ts). The renderer must surface + // prior_status so operators can tell whether the task hung in + // HYDRATING vs RUNNING at a glance — without it, the reviewer's + // "generic Event: ..." UX regression would resurface. + const msg = renderSlackBlocks('task_stranded', baseTask, { + code: 'STRANDED_NO_HEARTBEAT', + prior_status: 'RUNNING', + age_seconds: 1800, + }); + expect(msg.text).toBe('Task stranded for org/repo'); + const text = sectionText(msg.blocks[0]); + expect(text).toContain(':warning:'); + expect(text).toContain('Task stranded'); + expect(text).toContain('org/repo'); + expect(text).toContain('last status: RUNNING'); + }); + + test('renders task_stranded message gracefully without metadata', () => { + // Missing prior_status (e.g. legacy event written before the + // reconciler started stamping it) must not crash; the renderer + // omits the parenthetical and produces a clean message. + const msg = renderSlackBlocks('task_stranded', baseTask); + const text = sectionText(msg.blocks[0]); + expect(text).toContain(':warning:'); + expect(text).toContain('Task stranded'); + expect(text).not.toContain('last status:'); + }); + + test('renders agent_error message with error_type and message_preview', () => { + // ``agent/src/progress_writer.py::write_agent_error`` carries + // ``error_type`` and ``message_preview`` on the event metadata. + // Pre-PR-#79 this fell to the default branch + // (``Event: agent_error for org/repo``) — a UX regression. + const msg = renderSlackBlocks('agent_error', baseTask, { + error_type: 'TimeoutError', + message_preview: 'Tool call timed out after 30s', + }); + const text = sectionText(msg.blocks[0]); + expect(text).toContain(':rotating_light:'); + expect(text).toContain('Agent error'); + expect(text).toContain('org/repo'); + expect(text).toContain('TimeoutError'); + expect(text).toContain('Tool call timed out after 30s'); + }); + + test('renders agent_error message without metadata (legacy event shape)', () => { + // Defense-in-depth: an agent_error event with no metadata at all + // must still produce a sensible Slack message. The error_type and + // preview fields drop out cleanly without leaking ``undefined``. + const msg = renderSlackBlocks('agent_error', baseTask); + const text = sectionText(msg.blocks[0]); + expect(text).toContain(':rotating_light:'); + expect(text).toContain('Agent error'); + expect(text).not.toContain('undefined'); + expect(text).not.toContain('_Type:_'); + }); + + test('agent_error truncates long message_preview to keep Slack message readable', () => { + // The preview cap is 200 chars — protects channel UX from a + // pathological agent that emits a 4 KB error message. + const msg = renderSlackBlocks('agent_error', baseTask, { + error_type: 'BigError', + message_preview: 'X'.repeat(500), + }); + const text = sectionText(msg.blocks[0]); + expect(text.length).toBeLessThan(400); + expect(text).toContain('...'); + }); }); diff --git a/cdk/test/handlers/slack-notify.test.ts b/cdk/test/handlers/slack-notify.test.ts index f5a46b63..b7b10ccf 100644 --- a/cdk/test/handlers/slack-notify.test.ts +++ b/cdk/test/handlers/slack-notify.test.ts @@ -17,12 +17,15 @@ * SOFTWARE. */ -import type { DynamoDBStreamEvent, DynamoDBRecord } from 'aws-lambda'; +// Issue #64: SlackNotifyFn migrated off the direct DynamoDB Streams +// event-source mapping onto FanOutConsumer as a per-channel dispatcher. +// The tests here cover ``dispatchSlackEvent`` directly — the unit of +// behaviour the fan-out router invokes. End-to-end coverage through the +// router lives in ``fanout-task-events.test.ts``. const ddbSend = jest.fn(); -jest.mock('@aws-sdk/client-dynamodb', () => ({ DynamoDBClient: jest.fn(() => ({})) })); +const ddbClient = { send: ddbSend }; jest.mock('@aws-sdk/lib-dynamodb', () => ({ - DynamoDBDocumentClient: { from: jest.fn(() => ({ send: ddbSend })) }, GetCommand: jest.fn((input: unknown) => ({ _type: 'Get', input })), UpdateCommand: jest.fn((input: unknown) => ({ _type: 'Update', input })), })); @@ -38,31 +41,26 @@ const fetchMock = jest.fn(); process.env.TASK_TABLE_NAME = 'Tasks'; -import { handler } from '../../src/handlers/slack-notify'; +import type { DynamoDBDocumentClient } from '@aws-sdk/lib-dynamodb'; +import { dispatchSlackEvent, SlackApiError, type SlackDispatchEvent } from '../../src/handlers/slack-notify'; -function makeInsertRecord( +const ddb = ddbClient as unknown as DynamoDBDocumentClient; + +function mkEvent( taskId: string, eventType: string, metadata?: Record, -): DynamoDBRecord { +): SlackDispatchEvent { return { - eventID: `evt-${Math.random()}`, - eventName: 'INSERT', - dynamodb: { - NewImage: { - task_id: { S: taskId }, - event_type: { S: eventType }, - ...(metadata && { metadata: { S: JSON.stringify(metadata) } }), - }, - }, + task_id: taskId, + event_id: `evt-${taskId}-${eventType}`, + event_type: eventType, + timestamp: '2026-05-05T00:00:00Z', + metadata, }; } -function withRecords(records: DynamoDBRecord[]): DynamoDBStreamEvent { - return { Records: records }; -} - -describe('slack-notify handler', () => { +describe('dispatchSlackEvent', () => { beforeEach(() => { ddbSend.mockReset(); smSend.mockReset(); @@ -74,19 +72,23 @@ describe('slack-notify handler', () => { }); }); - test('skips non-slack tasks without touching DDB beyond the task read', async () => { + test('skips non-slack tasks without touching Slack', async () => { + // A Slack-subscribed event on a non-Slack task must still short- + // circuit cheaply — one DDB Get, no dedup write, no API call. + // This is the ``channel_source === 'slack'`` gate. ddbSend.mockResolvedValueOnce({ Item: { task_id: 't1', channel_source: 'api', channel_metadata: {} }, }); - await handler(withRecords([makeInsertRecord('t1', 'task_completed')])); + await dispatchSlackEvent(mkEvent('t1', 'task_completed'), ddb); - // Only the initial GetCommand ran — no dedup update, no Slack call. expect(ddbSend).toHaveBeenCalledTimes(1); expect(fetchMock).not.toHaveBeenCalled(); }); - test('dedup write runs only after channel_source is confirmed slack', async () => { + test('dedup write only runs after channel_source is confirmed slack', async () => { + // Order matters: we must not write the dedup marker on a task we + // are about to skip. ddbSend .mockResolvedValueOnce({ Item: { @@ -98,9 +100,8 @@ describe('slack-notify handler', () => { }) .mockResolvedValueOnce({}); // UpdateCommand for dedup - await handler(withRecords([makeInsertRecord('t1', 'task_completed')])); + await dispatchSlackEvent(mkEvent('t1', 'task_completed'), ddb); - // GetCommand first, then UpdateCommand (dedup). Order matters (item 17). expect(ddbSend.mock.calls[0][0]._type).toBe('Get'); expect(ddbSend.mock.calls[1][0]._type).toBe('Update'); expect(fetchMock).toHaveBeenCalled(); @@ -117,13 +118,43 @@ describe('slack-notify handler', () => { }) .mockRejectedValueOnce(Object.assign(new Error('exists'), { name: 'ConditionalCheckFailedException' })); - await handler(withRecords([makeInsertRecord('t1', 'task_failed')])); + await dispatchSlackEvent(mkEvent('t1', 'task_failed'), ddb); expect(fetchMock).not.toHaveBeenCalled(); }); - test('swallows Slack API errors without failing the batch', async () => { - ddbSend.mockResolvedValue({ + test.each([ + // Channel + message shape. + 'channel_not_found', + 'not_in_channel', + 'is_archived', + 'message_not_found', + // Auth. + 'not_authed', + 'invalid_auth', + 'token_revoked', + 'token_expired', + 'account_inactive', + // Permission / scope (PR #79 review #8). + 'no_permission', + 'missing_scope', + 'restricted_action', + 'ekm_access_denied', + 'team_access_not_granted', + 'posting_to_general_channel_denied', + 'as_user_not_supported', + // Payload shape. + 'invalid_blocks', + 'invalid_blocks_format', + 'invalid_arguments', + 'msg_too_long', + 'too_many_attachments', + ])('throws a tagged SlackApiError on terminal Slack code %s (router swallows)', async (slackErrorCode) => { + // Pre-PR-#79-review the set was narrower; permission/scope + // codes (ekm_access_denied, missing_scope, etc.) used to be + // classified retryable and would burn 3 retries before DLQ on + // a misconfiguration that no retry can fix. + ddbSend.mockResolvedValueOnce({ Item: { task_id: 't1', channel_source: 'slack', @@ -132,49 +163,131 @@ describe('slack-notify handler', () => { }); fetchMock.mockResolvedValueOnce({ ok: true, - json: () => Promise.resolve({ ok: false, error: 'channel_not_found' }), + json: () => Promise.resolve({ ok: false, error: slackErrorCode }), }); await expect( - handler(withRecords([makeInsertRecord('t1', 'task_created')])), - ).resolves.toBeUndefined(); + dispatchSlackEvent(mkEvent('t1', 'task_created'), ddb), + ).rejects.toBeInstanceOf(SlackApiError); + }); + + test.each([ + 'ratelimited', + 'service_unavailable', + 'internal_error', + 'fatal_error', + 'request_timeout', + 'unknown_method', // anything not in TERMINAL_SLACK_API_ERRORS counts as retryable + ])('throws a plain Error (NOT SlackApiError) on retryable code %s', async (slackErrorCode) => { + // Post-issue-#64-review Cat 3 fix: retryable Slack errors must + // propagate as plain Error so the router classifies them as + // infra rejections and Lambda replays the record. Without this + // split, a transient ratelimited or service_unavailable would + // get permanently dropped under the SlackApiError swallow. + ddbSend.mockResolvedValueOnce({ + Item: { + task_id: 't1', + channel_source: 'slack', + channel_metadata: { slack_team_id: 'T1', slack_channel_id: 'C1' }, + }, + }); + fetchMock.mockResolvedValueOnce({ + ok: true, + headers: { get: () => null }, + json: () => Promise.resolve({ ok: false, error: slackErrorCode }), + }); + + let caught: unknown; + try { + await dispatchSlackEvent(mkEvent('t1', 'task_created'), ddb); + } catch (err) { + caught = err; + } + expect(caught).toBeInstanceOf(Error); + expect(caught).not.toBeInstanceOf(SlackApiError); + expect((caught as Error).message).toContain(slackErrorCode); + }); + + test('logs Retry-After header on rate-limited Slack responses (PR #79 review #4)', async () => { + // Slack returns the Retry-After header (in seconds) on + // ``ratelimited`` so callers know when to retry. Surfacing it in + // the warn log means operators reading CloudWatch can see the + // expected recovery time instead of guessing from sustained warn + // rate. + ddbSend.mockResolvedValueOnce({ + Item: { + task_id: 't1', + channel_source: 'slack', + channel_metadata: { slack_team_id: 'T1', slack_channel_id: 'C1' }, + }, + }); + fetchMock.mockResolvedValueOnce({ + ok: true, + headers: { get: (name: string) => (name.toLowerCase() === 'retry-after' ? '30' : null) }, + json: () => Promise.resolve({ ok: false, error: 'ratelimited' }), + }); + + const loggerModule = await import('../../src/handlers/shared/logger'); + const warnSpy = jest.spyOn(loggerModule.logger, 'warn').mockImplementation(() => undefined); + try { + await expect( + dispatchSlackEvent(mkEvent('t1', 'task_created'), ddb), + ).rejects.toThrow(/ratelimited/); + + const retryWarn = warnSpy.mock.calls.find( + c => (c[1] as Record | undefined)?.event === 'fanout.slack.retryable_api_error', + ); + expect(retryWarn).toBeDefined(); + expect((retryWarn?.[1] as Record).retry_after_seconds).toBe('30'); + expect((retryWarn?.[1] as Record).slack_error_code).toBe('ratelimited'); + } finally { + warnSpy.mockRestore(); + } }); - test('rethrows infra errors so Lambda retries the batch (item 4)', async () => { - ddbSend.mockRejectedValueOnce(Object.assign(new Error('throttle'), { name: 'ProvisionedThroughputExceededException' })); + test('rethrows infra errors so the router records a dispatcher-rejected warn', async () => { + // DDB throttling and Secrets Manager outages must surface to the + // router — Promise.allSettled records them as rejections and batch + // telemetry reflects that the record didn't dispatch. + ddbSend.mockRejectedValueOnce( + Object.assign(new Error('throttle'), { name: 'ProvisionedThroughputExceededException' }), + ); await expect( - handler(withRecords([makeInsertRecord('t1', 'task_completed')])), + dispatchSlackEvent(mkEvent('t1', 'task_completed'), ddb), ).rejects.toThrow('throttle'); }); - test('ignores non-INSERT stream events', async () => { - const modifyRecord: DynamoDBRecord = { - eventID: 'evt-modify', - eventName: 'MODIFY', - dynamodb: { NewImage: { task_id: { S: 't1' }, event_type: { S: 'task_completed' } } }, - }; - await handler(withRecords([modifyRecord])); - expect(ddbSend).not.toHaveBeenCalled(); + test('throws when TASK_TABLE_NAME env var is missing (PR #79 review #3)', async () => { + // Pre-fix: missing env returned silently, so the router counted + // Slack as "dispatched" and a broken stack quietly dropped every + // Slack notification. Post-fix: throw so the rejection lands in + // ``infraRejections`` and Lambda retries / DLQs. + const original = process.env.TASK_TABLE_NAME; + delete process.env.TASK_TABLE_NAME; + try { + await expect( + dispatchSlackEvent(mkEvent('t1', 'task_created'), ddb), + ).rejects.toThrow(/TASK_TABLE_NAME env var not set/); + // No DDB call attempted — the env-var guard fires first. + expect(ddbSend).not.toHaveBeenCalled(); + } finally { + process.env.TASK_TABLE_NAME = original; + } }); - test('ignores non-notifiable event types', async () => { - await handler(withRecords([makeInsertRecord('t1', 'agent_heartbeat')])); + test('ignores event types not in the Slack render set', async () => { + // Defence-in-depth: even if the fanout filter drifts and sends us + // an event the renderer doesn't know how to format, we must + // short-circuit without touching DDB or Slack. + await dispatchSlackEvent(mkEvent('t1', 'agent_heartbeat'), ddb); expect(ddbSend).not.toHaveBeenCalled(); }); - test('logs and continues when event metadata JSON is malformed (item 20)', async () => { - const record: DynamoDBRecord = { - eventID: 'evt-bad-meta', - eventName: 'INSERT', - dynamodb: { - NewImage: { - task_id: { S: 't1' }, - event_type: { S: 'task_failed' }, - metadata: { S: 'not-json{' }, - }, - }, - }; + test('uses pre-parsed metadata without a JSON re-parse', async () => { + // The fan-out router hands us a parsed metadata map — the + // dispatcher must not insist on the old ``metadata: { S: ... }`` + // JSON string shape. ddbSend .mockResolvedValueOnce({ Item: { @@ -185,13 +298,477 @@ describe('slack-notify handler', () => { error_message: 'agent crashed', }, }) - .mockResolvedValueOnce({}); // dedup + .mockResolvedValueOnce({}); - await handler(withRecords([record])); + await dispatchSlackEvent( + mkEvent('t1', 'task_failed', { error: 'oom killed' }), + ddb, + ); - // Still posts to Slack — bad metadata is not fatal. expect(fetchMock).toHaveBeenCalled(); const postBody = JSON.parse((fetchMock.mock.calls[0][1] as { body: string }).body); expect(postBody.text).toContain('org/repo'); }); + + // --------------------------------------------------------------------- + // PR #79 test gap #31 — conditional UpdateItem race + dup-delete + // --------------------------------------------------------------------- + + test('task_created persist ConditionalCheckFailed → posts duplicate then deletes it', async () => { + // Race: a sibling retry already wrote ``slack_created_msg_ts``. + // Our POST landed in Slack first, the conditional UpdateItem + // failed, and we must clean up the duplicate root message via + // ``chat.delete``. Without this, the channel accumulates ghost + // task_created posts on every retry-after-success-write race. + ddbSend + .mockResolvedValueOnce({ + Item: { + task_id: 't1', + channel_source: 'slack', + channel_metadata: { slack_team_id: 'T1', slack_channel_id: 'C1' }, + repo: 'org/repo', + }, + }) + .mockRejectedValueOnce( + Object.assign(new Error('cond fail'), { name: 'ConditionalCheckFailedException' }), + ); + // Default fetchMock returns ok=true with ts; fine for the post + + // best-effort reaction calls. The chat.delete call uses the same + // default which is also ``ok: true`` — perfect for the success path. + + await dispatchSlackEvent(mkEvent('t1', 'task_created'), ddb); + + // Find the chat.delete invocation by URL — the duplicate cleanup + // is the load-bearing assertion. Reaction add/remove calls fall + // through to the default mock and don't carry test signal here. + const deleteCall = fetchMock.mock.calls.find( + ([url]) => url === 'https://slack.com/api/chat.delete', + ); + expect(deleteCall).toBeDefined(); + const deleteBody = JSON.parse((deleteCall![1] as { body: string }).body); + expect(deleteBody.ts).toBe('1234.0001'); + }); + + test('session_started persist ConditionalCheckFailed → posts duplicate then deletes it', async () => { + // Same race as task_created but for session_started; uses + // ``slack_session_msg_ts`` as the conditional attribute. Without + // delete, terminal cleanup would orphan the duplicate session + // message (terminal cleanup deletes a single session_msg_ts, not + // the duplicate that was never persisted). + ddbSend + .mockResolvedValueOnce({ + Item: { + task_id: 't1', + channel_source: 'slack', + channel_metadata: { + slack_team_id: 'T1', + slack_channel_id: 'C1', + slack_thread_ts: '999.000', // session_started threads under task_created + }, + repo: 'org/repo', + }, + }) + .mockRejectedValueOnce( + Object.assign(new Error('cond fail'), { name: 'ConditionalCheckFailedException' }), + ); + + await dispatchSlackEvent(mkEvent('t1', 'session_started'), ddb); + + const deleteCall = fetchMock.mock.calls.find( + ([url]) => url === 'https://slack.com/api/chat.delete', + ); + expect(deleteCall).toBeDefined(); + const deleteBody = JSON.parse((deleteCall![1] as { body: string }).body); + expect(deleteBody.ts).toBe('1234.0001'); + }); + + test('dup-delete failure emits fanout.slack.dup_delete_failed with error_id (PR #79 review #6)', async () => { + // The conditional persist hit a sibling retry; we tried to delete + // the duplicate Slack message but ``chat.delete`` failed. The + // duplicate is now permanent in the thread — operators need a + // dedicated alarmable signal so they can detect ghost-message + // accumulation. We override the default mock for chat.delete + // specifically by URL-routing the fetch implementation rather + // than relying on call-order, so reactions can fall through to + // the default without consuming our scripted responses. + ddbSend + .mockResolvedValueOnce({ + Item: { + task_id: 't1', + channel_source: 'slack', + channel_metadata: { slack_team_id: 'T1', slack_channel_id: 'C1' }, + repo: 'org/repo', + }, + }) + .mockRejectedValueOnce( + Object.assign(new Error('cond fail'), { name: 'ConditionalCheckFailedException' }), + ); + fetchMock.mockImplementation(async (url: string) => { + if (url === 'https://slack.com/api/chat.delete') { + return { + ok: true, + json: () => Promise.resolve({ ok: false, error: 'cant_delete_message' }), + }; + } + // Default: chat.postMessage / reactions.* succeed. + return { + ok: true, + headers: { get: () => null }, + json: () => Promise.resolve({ ok: true, ts: '1234.0001' }), + }; + }); + + const loggerModule = await import('../../src/handlers/shared/logger'); + const errorSpy = jest.spyOn(loggerModule.logger, 'error').mockImplementation(() => undefined); + try { + await dispatchSlackEvent(mkEvent('t1', 'task_created'), ddb); + + const ghostError = errorSpy.mock.calls.find( + c => (c[1] as Record | undefined)?.event === 'fanout.slack.dup_delete_failed', + ); + expect(ghostError).toBeDefined(); + expect((ghostError?.[1] as Record).error_id).toBe('FANOUT_SLACK_DUP_DELETE_FAILED'); + expect((ghostError?.[1] as Record).duplicate_ts).toBe('1234.0001'); + expect((ghostError?.[1] as Record).event_type).toBe('task_created'); + } finally { + errorSpy.mockRestore(); + } + }); + + test('chat.delete returning message_not_found is treated as success (no dup_delete_failed)', async () => { + // ``message_not_found`` means the duplicate already got cleaned + // up by something else (e.g. a previous retry's delete). The + // dup_delete_failed alarm must NOT fire on this benign case, or + // operators will see false positives whenever the race resolves + // cleanly. + ddbSend + .mockResolvedValueOnce({ + Item: { + task_id: 't1', + channel_source: 'slack', + channel_metadata: { slack_team_id: 'T1', slack_channel_id: 'C1' }, + repo: 'org/repo', + }, + }) + .mockRejectedValueOnce( + Object.assign(new Error('cond fail'), { name: 'ConditionalCheckFailedException' }), + ); + fetchMock.mockImplementation(async (url: string) => { + if (url === 'https://slack.com/api/chat.delete') { + return { + ok: true, + json: () => Promise.resolve({ ok: false, error: 'message_not_found' }), + }; + } + return { + ok: true, + headers: { get: () => null }, + json: () => Promise.resolve({ ok: true, ts: '1234.0001' }), + }; + }); + + const loggerModule = await import('../../src/handlers/shared/logger'); + const errorSpy = jest.spyOn(loggerModule.logger, 'error').mockImplementation(() => undefined); + try { + await dispatchSlackEvent(mkEvent('t1', 'task_created'), ddb); + + const ghostError = errorSpy.mock.calls.find( + c => (c[1] as Record | undefined)?.event === 'fanout.slack.dup_delete_failed', + ); + expect(ghostError).toBeUndefined(); + } finally { + errorSpy.mockRestore(); + } + }); + + // ------------------------------------------------------------------- + // PR #79 test gap #33 — agent_error dedup (review finding #4) + // ------------------------------------------------------------------- + + test('agent_error claims its own dedup attribute (slack_dispatched_agent_error)', async () => { + // Pre-PR-#79 review #4 fix: agent_error had no dedup, so a + // sibling-channel-failure retry could double-page operators. + // Post-fix: agent_error writes ``channel_metadata.slack_dispatched_agent_error`` + // via a conditional UpdateItem before posting. This test pins + // the *attribute name* in the UpdateExpression so a future + // refactor that renames the attribute breaks loudly here. + ddbSend + .mockResolvedValueOnce({ + Item: { + task_id: 't1', + channel_source: 'slack', + channel_metadata: { slack_team_id: 'T1', slack_channel_id: 'C1' }, + repo: 'org/repo', + }, + }) + .mockResolvedValueOnce({}); // dedup UpdateItem succeeds + + await dispatchSlackEvent(mkEvent('t1', 'agent_error'), ddb); + + // Second DDB call is the dedup UpdateItem — pin its shape. + const updateInput = ddbSend.mock.calls[1][0]._type === 'Update' + ? ddbSend.mock.calls[1][0].input + : null; + expect(updateInput).toBeTruthy(); + expect(updateInput.UpdateExpression).toContain('slack_dispatched_agent_error'); + expect(updateInput.ConditionExpression).toContain( + 'attribute_not_exists(channel_metadata.slack_dispatched_agent_error)', + ); + }); + + test('agent_error retry hits the dedup guard and skips the post (sibling-channel-failure scenario)', async () => { + // The actual scenario PR #79 review #4 is about: GitHub fails on + // the first run, the record retries, and the Slack agent_error + // dispatcher fires AGAIN. Without the per-event-type dedup, we + // would post a duplicate :rotating_light: line. With it, the + // ConditionalCheckFailedException short-circuits before + // chat.postMessage is ever called. + ddbSend + .mockResolvedValueOnce({ + Item: { + task_id: 't1', + channel_source: 'slack', + channel_metadata: { + slack_team_id: 'T1', + slack_channel_id: 'C1', + slack_dispatched_agent_error: true, // already posted on the first try + }, + repo: 'org/repo', + }, + }) + .mockRejectedValueOnce( + Object.assign(new Error('cond fail'), { name: 'ConditionalCheckFailedException' }), + ); + + await dispatchSlackEvent(mkEvent('t1', 'agent_error'), ddb); + + // No fetch — the dedup guard short-circuited. + expect(fetchMock).not.toHaveBeenCalled(); + }); + + test('terminal dedup attribute is per-class (any first terminal claims; subsequent terminals dedup)', async () => { + // Defense-in-depth for the SLACK_DEDUP_ATTRIBUTE map: a + // ``task_completed`` followed by a sibling-failure-retry's + // ``task_failed`` (e.g. orchestrator wrote both because of a + // late-arriving failure) must dedup against the same + // ``slack_notified_terminal`` slot. Otherwise a flaky retry + // could post both a ✅ and an ❌ for the same task. + ddbSend + .mockResolvedValueOnce({ + Item: { + task_id: 't1', + channel_source: 'slack', + channel_metadata: { + slack_team_id: 'T1', + slack_channel_id: 'C1', + slack_notified_terminal: true, // task_completed already posted + }, + repo: 'org/repo', + }, + }) + .mockRejectedValueOnce( + Object.assign(new Error('cond fail'), { name: 'ConditionalCheckFailedException' }), + ); + + // Simulate an orchestrator-emitted task_failed arriving after + // task_completed already claimed the terminal slot. + await dispatchSlackEvent(mkEvent('t1', 'task_failed'), ddb); + + expect(fetchMock).not.toHaveBeenCalled(); + }); + + test('agent_error and terminals use distinct dedup slots (do not collide)', async () => { + // An agent_error followed by a task_completed must both be + // delivered — they live in different slots + // (slack_dispatched_agent_error vs slack_notified_terminal). + // This test pins the separation so a future refactor can't + // accidentally collapse them and silently drop terminals after + // an agent_error. + ddbSend + .mockResolvedValueOnce({ + Item: { + task_id: 't1', + channel_source: 'slack', + channel_metadata: { + slack_team_id: 'T1', + slack_channel_id: 'C1', + slack_dispatched_agent_error: true, // agent_error already posted + // slack_notified_terminal is NOT set + }, + repo: 'org/repo', + }, + }) + .mockResolvedValueOnce({}); // terminal dedup UpdateItem succeeds + + await dispatchSlackEvent(mkEvent('t1', 'task_completed'), ddb); + + // Reaches chat.postMessage — the agent_error slot did not + // shadow the terminal slot. + expect(fetchMock).toHaveBeenCalled(); + const postCall = fetchMock.mock.calls.find( + ([url]) => url === 'https://slack.com/api/chat.postMessage', + ); + expect(postCall).toBeDefined(); + }); + + // ------------------------------------------------------------------- + // PR #79 test gap #35 — task_stranded through terminal dedup + // ------------------------------------------------------------------- + + test('task_stranded posts and writes the terminal dedup marker on first arrival', async () => { + // task_stranded is one of the 5 terminals that share + // ``slack_notified_terminal``. The reconciler emits + // task_stranded + task_failed back-to-back when a heartbeat + // expires (handlers/reconcile-stranded-tasks.ts:170+); whichever + // arrives first claims the slot and posts a Slack message. + ddbSend + .mockResolvedValueOnce({ + Item: { + task_id: 't1', + channel_source: 'slack', + channel_metadata: { slack_team_id: 'T1', slack_channel_id: 'C1' }, + repo: 'org/repo', + }, + }) + .mockResolvedValueOnce({}); // dedup UpdateItem succeeds + + await dispatchSlackEvent( + mkEvent('t1', 'task_stranded', { code: 'STRANDED_NO_HEARTBEAT', prior_status: 'RUNNING' }), + ddb, + ); + + // Dedup wrote against the shared terminal slot. + const updateInput = ddbSend.mock.calls[1][0]._type === 'Update' + ? ddbSend.mock.calls[1][0].input + : null; + expect(updateInput).toBeTruthy(); + expect(updateInput.UpdateExpression).toContain('slack_notified_terminal'); + + // chat.postMessage fired with the stranded warning. + const postCall = fetchMock.mock.calls.find( + ([url]) => url === 'https://slack.com/api/chat.postMessage', + ); + expect(postCall).toBeDefined(); + const postBody = JSON.parse((postCall![1] as { body: string }).body); + expect(postBody.text).toContain('Task stranded'); + }); + + test('terminal cleanup re-reads TaskRecord so it sees msg_ts attrs persisted on earlier stream batches (orphan-message fix)', async () => { + // Fast-task orphan scenario observed live during PR #79 review: + // 1. task_created stream batch posts the rocket message and + // writes ``slack_created_msg_ts``. + // 2. task_completed stream batch runs ~30s later. Its initial + // GetItem races the prior UpdateItem and sees stale + // channel_metadata WITHOUT slack_created_msg_ts. + // 3. Without the re-read, the terminal cleanup branch sees + // ``channelMeta.slack_created_msg_ts === undefined`` and + // silently skips. The rocket message stays in the thread. + // Fix: a fresh GetItem inside the terminal-cleanup branch + // (after the dedup UpdateItem has linearized our view) sees the + // newly-written attribute and triggers the chat.delete. + ddbSend + .mockResolvedValueOnce({ // dispatch-entry GetItem — STALE (no msg_ts) + Item: { + task_id: 't1', + channel_source: 'slack', + channel_metadata: { slack_team_id: 'T1', slack_channel_id: 'C1' }, + repo: 'org/repo', + }, + }) + .mockResolvedValueOnce({}) // dedup UpdateItem succeeds + .mockResolvedValueOnce({ // terminal cleanup re-read — FRESH + Item: { + task_id: 't1', + channel_source: 'slack', + channel_metadata: { + slack_team_id: 'T1', + slack_channel_id: 'C1', + slack_created_msg_ts: '1234.0001', // landed via the prior batch + }, + repo: 'org/repo', + }, + }); + + await dispatchSlackEvent(mkEvent('t1', 'task_completed'), ddb); + + // The chat.delete fires against the freshly-read msg_ts. + const deleteCall = fetchMock.mock.calls.find( + ([url]) => url === 'https://slack.com/api/chat.delete', + ); + expect(deleteCall).toBeDefined(); + const deleteBody = JSON.parse((deleteCall![1] as { body: string }).body); + expect(deleteBody.ts).toBe('1234.0001'); + }); + + test('terminal cleanup falls back to dispatch-entry snapshot when re-read fails', async () => { + // Defense-in-depth: a transient DDB failure on the re-read GetItem + // must NOT break terminal delivery. Falls back to the snapshot we + // already had, logs a warn, and continues. + ddbSend + .mockResolvedValueOnce({ // dispatch-entry GetItem — has msg_ts + Item: { + task_id: 't1', + channel_source: 'slack', + channel_metadata: { + slack_team_id: 'T1', + slack_channel_id: 'C1', + slack_created_msg_ts: '9999.0001', + }, + repo: 'org/repo', + }, + }) + .mockResolvedValueOnce({}) // dedup + .mockRejectedValueOnce(new Error('throttled')); // re-read fails + + const loggerModule = await import('../../src/handlers/shared/logger'); + const warnSpy = jest.spyOn(loggerModule.logger, 'warn').mockImplementation(() => undefined); + try { + await dispatchSlackEvent(mkEvent('t1', 'task_completed'), ddb); + + // Cleanup still ran with the original snapshot's msg_ts. + const deleteCall = fetchMock.mock.calls.find( + ([url]) => url === 'https://slack.com/api/chat.delete', + ); + expect(deleteCall).toBeDefined(); + const deleteBody = JSON.parse((deleteCall![1] as { body: string }).body); + expect(deleteBody.ts).toBe('9999.0001'); + + // The fallback was observable. + const fallbackWarn = warnSpy.mock.calls.find( + c => (c[1] as Record | undefined)?.event === 'fanout.slack.cleanup_reread_failed', + ); + expect(fallbackWarn).toBeDefined(); + } finally { + warnSpy.mockRestore(); + } + }); + + test('task_stranded after a sibling task_failed dedups (no double-page on the reconciler twin)', async () => { + // Real-world scenario: the reconciler writes BOTH task_stranded + // and task_failed for a heartbeat-expired task (one for the + // operator signal, one to drive the FAILED status transition). + // If both fan out to Slack, the slot must dedup the second + // arrival so operators see exactly one alert per stranded task, + // not a paired ``Task stranded`` + ``Task failed`` storm. + ddbSend + .mockResolvedValueOnce({ + Item: { + task_id: 't1', + channel_source: 'slack', + channel_metadata: { + slack_team_id: 'T1', + slack_channel_id: 'C1', + slack_notified_terminal: true, // task_failed already claimed + }, + repo: 'org/repo', + }, + }) + .mockRejectedValueOnce( + Object.assign(new Error('cond fail'), { name: 'ConditionalCheckFailedException' }), + ); + + await dispatchSlackEvent(mkEvent('t1', 'task_stranded'), ddb); + + expect(fetchMock).not.toHaveBeenCalled(); + }); });