Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
cc81223
feat(fanout): migrate SlackNotifyFn to FanOutConsumer subscriber (#64)
May 12, 2026
7a93a4e
Merge branch 'main' into feat/migrate-slack-to-fanout
isadeks May 12, 2026
8615b56
Merge branch 'main' into feat/migrate-slack-to-fanout
krokoko May 13, 2026
e8437bb
fix(fanout): retry GitHub 403/429 instead of swallowing as terminal (…
May 13, 2026
2191c08
fix(fanout): guard Slack Secrets Manager grant on a prop (#79 review #2)
May 13, 2026
1d7c169
fix(fanout): throw on missing TASK_TABLE_NAME env var (#79 review #3)
May 13, 2026
e6e8a01
fix(fanout): match SlackApiError by name as well as instanceof (#79 r…
May 13, 2026
a29c157
fix(fanout): extend TERMINAL_SLACK_API_ERRORS with permission codes (…
May 13, 2026
fb88d4d
fix(fanout): promote Slack reaction/delete network errors to error lo…
May 13, 2026
1d92ad1
fix(fanout): emit fanout.slack.dup_delete_failed on ghost-message acc…
May 13, 2026
1a5cbe2
chore(fanout): tighten RouteOutcome arrays to ReadonlyArray (#79 revi…
May 13, 2026
b7bc393
refactor(fanout): make SlackDispatchEvent a type alias of FanOutEvent…
May 13, 2026
5c42d2c
fix(fanout): generalize Slack dedup to cover agent_error + log Retry-…
May 13, 2026
7b9b68f
test(fanout): conditional UpdateItem race + dup-delete coverage (#79 …
May 13, 2026
7a4fa20
test(fanout): cover task_stranded + agent_error renderers (#79 test g…
May 13, 2026
0e77b86
test(fanout): cover agent_error dedup + dedup-slot isolation (#79 tes…
May 13, 2026
b7225fd
test(fanout): add construct-level tests for FanOutConsumer (#79 test …
May 13, 2026
d4f6afe
test(fanout): cover task_stranded through terminal dedup (#79 test ga…
May 13, 2026
9ff9b45
fix(fanout): re-read TaskRecord before terminal cleanup to close orph…
May 13, 2026
29dbaa3
Merge branch 'main' into feat/migrate-slack-to-fanout
krokoko May 13, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions cdk/src/constructs/fanout-consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import * as path from 'path';
import { Duration } from 'aws-cdk-lib';
import * as dynamodb from 'aws-cdk-lib/aws-dynamodb';
import * as iam from 'aws-cdk-lib/aws-iam';
import { StartingPosition, Architecture, Runtime } from 'aws-cdk-lib/aws-lambda';
import { DynamoEventSource, SqsDlq } from 'aws-cdk-lib/aws-lambda-event-sources';
import * as lambda from 'aws-cdk-lib/aws-lambda-nodejs';
Expand Down Expand Up @@ -62,6 +63,18 @@ export interface FanOutConsumerProps {
*/
readonly githubTokenSecret?: sm.ISecret;

/**
* Secrets Manager ARN-prefix pattern for per-workspace Slack bot
* tokens. Required ONLY when the platform deploys SlackIntegration —
* the Slack dispatcher reads bot tokens at this scope. Matches the
* other "guarded by prop" grants (taskTable, repoTable,
* githubTokenSecret): a deployment without Slack onboarding gets no
* dangling IAM permission to ``bgagent/slack/*``. Typically passed
* as ``Stack.of(this).formatArn({ ..., resourceName:
* 'bgagent/slack/*' })``. Found in PR #79 review (#2 CRITICAL).
*/
readonly slackSecretArnPattern?: string;

/**
* Maximum batch size delivered to the Lambda per invocation.
*
Expand Down Expand Up @@ -134,6 +147,20 @@ export class FanOutConsumer extends Construct {
this.fn.addEnvironment('GITHUB_TOKEN_SECRET_ARN', props.githubTokenSecret.secretArn);
}

// Slack dispatcher reads per-workspace bot tokens from Secrets
// Manager (``bgagent/slack/<team_id>``). Scope the grant to the
// caller-provided prefix so the fan-out Lambda cannot read
// unrelated platform secrets — matches the policy the old
// standalone ``SlackNotifyFn`` held before issue #64. Guarded on
// ``slackSecretArnPattern`` so deployments without Slack
// onboarding don't get a dangling IAM grant (PR #79 review #2).
if (props.slackSecretArnPattern) {
this.fn.addToRolePolicy(new iam.PolicyStatement({
actions: ['secretsmanager:GetSecretValue'],
resources: [props.slackSecretArnPattern],
}));
}

this.fn.addEventSource(new DynamoEventSource(props.taskEventsTable, {
startingPosition: StartingPosition.LATEST,
batchSize: props.batchSize ?? 100,
Expand Down
44 changes: 11 additions & 33 deletions cdk/src/constructs/slack-integration.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,7 @@ import * as apigw from 'aws-cdk-lib/aws-apigateway';
import * as cognito from 'aws-cdk-lib/aws-cognito';
import * as dynamodb from 'aws-cdk-lib/aws-dynamodb';
import * as iam from 'aws-cdk-lib/aws-iam';
import { Runtime, Architecture, StartingPosition, FilterCriteria, FilterRule } from 'aws-cdk-lib/aws-lambda';
import * as lambdaEventSources from 'aws-cdk-lib/aws-lambda-event-sources';
import { Runtime, Architecture } from 'aws-cdk-lib/aws-lambda';
import * as lambda from 'aws-cdk-lib/aws-lambda-nodejs';
import * as secretsmanager from 'aws-cdk-lib/aws-secretsmanager';
import { NagSuppressions } from 'cdk-nag';
Expand Down Expand Up @@ -73,9 +72,14 @@ export interface SlackIntegrationProps {
* Creates:
* - SlackInstallationTable (per-workspace installation records)
* - SlackUserMappingTable (Slack user → platform user mappings)
* - Lambda handlers for OAuth, slash commands, events, notifications, and account linking
* - Lambda handlers for OAuth, slash commands, events, and account linking
* - API Gateway routes under /slack/*
* - DynamoDB Streams event source for outbound notifications
*
* Outbound Slack delivery (task lifecycle notifications) runs through
* ``FanOutConsumer`` as a per-channel dispatcher. Before issue #64 this
* construct also owned a ``SlackNotifyFn`` DynamoDB Streams consumer on
* ``TaskEventsTable``; that consumer was removed to keep the stream at
* the DynamoDB-documented one-reader-per-shard limit.
*/
export class SlackIntegration extends Construct {
/** The Slack installation table. */
Expand Down Expand Up @@ -330,34 +334,8 @@ export class SlackIntegration extends Construct {
});
this.userMappingTable.grantReadWriteData(slackLinkFn);

// --- Outbound Notification Handler (DynamoDB Streams trigger) ---
const slackNotifyFn = new lambda.NodejsFunction(this, 'SlackNotifyFn', {
entry: path.join(handlersDir, 'slack-notify.ts'),
handler: 'handler',
runtime: Runtime.NODEJS_24_X,
architecture: Architecture.ARM_64,
timeout: Duration.seconds(30),
environment: {
TASK_TABLE_NAME: props.taskTable.tableName,
},
bundling: commonBundling,
});
props.taskTable.grantReadWriteData(slackNotifyFn);
slackNotifyFn.addToRolePolicy(readSlackSecretsPolicy);

// DynamoDB Streams event source with filtering
slackNotifyFn.addEventSource(new lambdaEventSources.DynamoEventSource(props.taskEventsTable, {
startingPosition: StartingPosition.LATEST,
batchSize: 10,
maxBatchingWindow: Duration.seconds(0),
retryAttempts: 3,
bisectBatchOnError: true,
filters: [
FilterCriteria.filter({
eventName: FilterRule.isEqual('INSERT'),
}),
],
}));
// Outbound Slack delivery runs through FanOutConsumer — see the
// construct doc above for the reader-count rationale (issue #64).

// ═══════════════════════════════════════════════════════════════════════════
// API Gateway Routes
Expand Down Expand Up @@ -436,7 +414,7 @@ export class SlackIntegration extends Construct {
}

// Standard Lambda suppressions
const allFunctions = [oauthCallbackFn, slackEventsFn, slackCommandsFn, commandProcessorFn, slackLinkFn, slackNotifyFn, slackInteractionsFn];
const allFunctions = [oauthCallbackFn, slackEventsFn, slackCommandsFn, commandProcessorFn, slackLinkFn, slackInteractionsFn];
for (const fn of allFunctions) {
NagSuppressions.addResourceSuppressions(fn, [
{
Expand Down
Loading