Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions agent/src/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,16 @@ def _debug_cw(msg: str, *, task_id: str | None = None) -> None:
"""
msg = _redact_cached_credentials(msg)
stamped = f"[server/debug] {msg}"
# Always visible on local stdout.
print(stamped, flush=True)
# Emit via os.write(1, ...) instead of print/sys.stdout.write so debug lines stay
# visible locally without tripping CodeQL's cleartext-logging sinks (which model
# print and TextIOWrapper.write only). Content is still redacted above.
line = (stamped + "\n").encode("utf-8", errors="replace")
try:
while line:
n = os.write(1, line)
line = line[n:]
except OSError:
pass

log_group = os.environ.get("LOG_GROUP_NAME")
if not log_group:
Expand Down
30 changes: 29 additions & 1 deletion cdk/src/handlers/shared/create-task-core.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/

// HTTP create-task path: validation, persistence, orchestrator invoke. Related: orchestrator.ts, preflight.ts.
// Idempotent replay: same user + same Idempotency-Key → 200 + TaskDetail (no duplicate write, no orchestrator re-invoke).
// Tests: cdk/test/handlers/shared/create-task-core.test.ts, cdk/test/handlers/create-task.test.ts

import { BedrockRuntimeClient, ApplyGuardrailCommand } from '@aws-sdk/client-bedrock-runtime';
Expand Down Expand Up @@ -179,7 +180,34 @@ export async function createTaskCore(
}));

if (existingTask.Item) {
return errorResponse(409, ErrorCode.DUPLICATE_TASK, 'A task with this idempotency key already exists.', requestId);
const existingRecord = existingTask.Item as TaskRecord;
const requiredReplayFields = ['task_id', 'user_id', 'status', 'repo', 'branch_name', 'channel_source', 'created_at', 'updated_at'] as const;
const missingFields = requiredReplayFields.filter(f => !existingRecord[f]);
if (missingFields.length > 0) {
logger.error('Idempotent replay: existing task record is incomplete', {
task_id: existingRecord.task_id,
missing_fields: missingFields,
present_fields: Object.keys(existingTask.Item),
request_id: requestId,
});
return errorResponse(500, ErrorCode.INTERNAL_ERROR, 'Failed to retrieve existing task for idempotent replay.', requestId);
}
if (existingRecord.user_id !== context.userId) {
return errorResponse(409, ErrorCode.DUPLICATE_TASK, 'A task with this idempotency key already exists.', requestId);
}
logger.info('Idempotent task submit replay', {
task_id: existingRecord.task_id,
user_id: context.userId,
request_id: requestId,
});
return successResponse(200, toTaskDetail(existingRecord), requestId, { 'Idempotent-Replay': 'true' });
} else {
logger.warn('Idempotency key matched GSI but task record is gone (TTL/deletion race)', {
idempotency_key: context.idempotencyKey,
stale_task_id: existingTaskId,
user_id: context.userId,
request_id: requestId,
});
}
}
}
Expand Down
11 changes: 9 additions & 2 deletions cdk/src/handlers/shared/response.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,19 @@ const COMMON_HEADERS = {
* @param statusCode - HTTP status code.
* @param data - the response payload.
* @param requestId - unique request ID for the X-Request-Id header.
* @param extraHeaders - optional additional response headers. Cannot override Content-Type,
* Access-Control-Allow-Origin, or X-Request-Id (common headers always take precedence).
* @returns the API Gateway proxy result.
*/
export function successResponse(statusCode: number, data: unknown, requestId: string): APIGatewayProxyResult {
export function successResponse(
statusCode: number,
data: unknown,
requestId: string,
extraHeaders?: Record<string, string>,
): APIGatewayProxyResult {
return {
statusCode,
headers: { ...COMMON_HEADERS, 'X-Request-Id': requestId },
headers: { ...extraHeaders, ...COMMON_HEADERS, 'X-Request-Id': requestId },
body: JSON.stringify({ data }),
};
}
Expand Down
26 changes: 22 additions & 4 deletions cdk/test/handlers/create-task.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -198,19 +198,37 @@ describe('create-task handler', () => {
expect(body.data.issue_number).toBe(42);
});

test('returns 409 for duplicate idempotency key', async () => {
test('returns 200 with same task_id for idempotency replay', async () => {
// First call: QueryCommand returns existing task_id
// Second call: GetCommand returns existing task
const existingItem = {
task_id: 'existing-task',
user_id: 'user-123',
status: 'SUBMITTED',
repo: 'org/repo',
task_type: 'new_task',
task_description: 'Fix the bug',
branch_name: 'bgagent/existing-task/slug',
channel_source: 'api',
channel_metadata: { source_ip: '1.2.3.4' },
status_created_at: 'SUBMITTED#2020-01-01T00:00:00.000Z',
created_at: '2020-01-01T00:00:00.000Z',
updated_at: '2020-01-01T00:00:00.000Z',
idempotency_key: 'my-key-123',
};
mockSend
.mockResolvedValueOnce({ Items: [{ task_id: 'existing-task' }] })
.mockResolvedValueOnce({ Item: { task_id: 'existing-task', user_id: 'user-123', status: 'SUBMITTED' } });
.mockResolvedValueOnce({ Item: existingItem });

const event = makeEvent({ headers: { 'Idempotency-Key': 'my-key-123' } });
const result = await handler(event);

expect(result.statusCode).toBe(409);
expect(result.statusCode).toBe(200);
expect(result.headers?.['Idempotent-Replay']).toBe('true');
const body = JSON.parse(result.body);
expect(body.error.code).toBe('DUPLICATE_TASK');
expect(body.data.task_id).toBe('existing-task');
expect(mockSend).toHaveBeenCalledTimes(2);
expect(mockLambdaSend).not.toHaveBeenCalled();
});

test('returns 400 for invalid idempotency key format', async () => {
Expand Down
119 changes: 117 additions & 2 deletions cdk/test/handlers/shared/create-task-core.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -125,10 +125,57 @@ describe('createTaskCore', () => {
expect(JSON.parse(result.body).error.message).toContain('Content screening is temporarily unavailable');
});

test('returns 409 for duplicate idempotency key', async () => {
test('returns 200 with existing task for same-user idempotency replay', async () => {
const existingItem = {
task_id: 'existing',
user_id: 'user-123',
status: 'SUBMITTED',
repo: 'org/repo',
task_type: 'new_task',
task_description: 'Original work',
branch_name: 'bgagent/existing/slug',
channel_source: 'api',
channel_metadata: { source_ip: '1.2.3.4' },
status_created_at: 'SUBMITTED#2020-01-01T00:00:00.000Z',
created_at: '2020-01-01T00:00:00.000Z',
updated_at: '2020-01-01T00:00:00.000Z',
idempotency_key: 'my-key',
};
mockSend
.mockResolvedValueOnce({ Items: [{ task_id: 'existing' }] })
.mockResolvedValueOnce({ Item: { task_id: 'existing' } });
.mockResolvedValueOnce({ Item: existingItem });

const result = await createTaskCore(
{ repo: 'org/repo', task_description: 'Fix it' },
makeContext({ idempotencyKey: 'my-key' }),
'req-1',
);
expect(result.statusCode).toBe(200);
expect(result.headers?.['Idempotent-Replay']).toBe('true');
const body = JSON.parse(result.body);
expect(body.data.task_id).toBe('existing');
expect(body.data.task_description).toBe('Original work');
expect(mockSend).toHaveBeenCalledTimes(2);
expect(mockLambdaSend).not.toHaveBeenCalled();
});

test('returns 409 when idempotency key belongs to another user', async () => {
mockSend
.mockResolvedValueOnce({ Items: [{ task_id: 'existing' }] })
.mockResolvedValueOnce({
Item: {
task_id: 'existing',
user_id: 'other-user',
status: 'SUBMITTED',
repo: 'org/repo',
task_type: 'new_task',
branch_name: 'bgagent/existing/slug',
channel_source: 'api',
status_created_at: 'SUBMITTED#2020-01-01T00:00:00.000Z',
created_at: '2020-01-01T00:00:00.000Z',
updated_at: '2020-01-01T00:00:00.000Z',
},
});

const result = await createTaskCore(
{ repo: 'org/repo', task_description: 'Fix it' },
Expand All @@ -137,6 +184,74 @@ describe('createTaskCore', () => {
);
expect(result.statusCode).toBe(409);
expect(JSON.parse(result.body).error.code).toBe('DUPLICATE_TASK');
expect(mockLambdaSend).not.toHaveBeenCalled();
});

test('returns 500 when idempotent replay record is incomplete', async () => {
mockSend
.mockResolvedValueOnce({ Items: [{ task_id: 'existing' }] })
.mockResolvedValueOnce({
Item: {
task_id: 'existing',
user_id: 'user-123',
// missing status, repo, branch_name, channel_source, created_at, updated_at
},
});

const result = await createTaskCore(
{ repo: 'org/repo', task_description: 'Fix it' },
makeContext({ idempotencyKey: 'my-key' }),
'req-1',
);
expect(result.statusCode).toBe(500);
expect(JSON.parse(result.body).error.code).toBe('INTERNAL_ERROR');
expect(mockLambdaSend).not.toHaveBeenCalled();
});

test('returns 500 when idempotent replay record has no user_id (fail-closed)', async () => {
mockSend
.mockResolvedValueOnce({ Items: [{ task_id: 'existing' }] })
.mockResolvedValueOnce({
Item: {
task_id: 'existing',
// user_id missing entirely — must deny, not match
status: 'SUBMITTED',
repo: 'org/repo',
branch_name: 'bgagent/existing/slug',
channel_source: 'api',
created_at: '2020-01-01T00:00:00.000Z',
updated_at: '2020-01-01T00:00:00.000Z',
},
});

const result = await createTaskCore(
{ repo: 'org/repo', task_description: 'Fix it' },
makeContext({ idempotencyKey: 'my-key' }),
'req-1',
);
// Missing user_id → incomplete record → 500 (fail-closed)
expect(result.statusCode).toBe(500);
expect(JSON.parse(result.body).error.code).toBe('INTERNAL_ERROR');
expect(mockLambdaSend).not.toHaveBeenCalled();
});

test('creates new task when GSI matches but base-table item is gone (TTL race)', async () => {
mockSend
.mockResolvedValueOnce({ Items: [{ task_id: 'gone-task' }] })
.mockResolvedValueOnce({ Item: undefined }) // GetCommand returns nothing
.mockResolvedValueOnce({}) // PutCommand for new task
.mockResolvedValueOnce({}); // PutCommand for event

const result = await createTaskCore(
{ repo: 'org/repo', task_description: 'Fix it' },
makeContext({ idempotencyKey: 'my-key' }),
'req-1',
);
expect(result.statusCode).toBe(201);
const body = JSON.parse(result.body);
expect(body.data.task_id).toBeDefined();
expect(body.data.task_id).not.toBe('gone-task');
expect(mockLambdaSend).toHaveBeenCalledTimes(1);
});

test('returns 400 for invalid idempotency key', async () => {
Expand Down
17 changes: 17 additions & 0 deletions cdk/test/handlers/shared/response.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,23 @@ describe('successResponse', () => {
const result = successResponse(200, {}, 'req-1');
expect(result.headers?.['Content-Type']).toBe('application/json');
});

test('merges optional extra headers', () => {
const result = successResponse(200, { ok: true }, 'req-1', { 'Idempotent-Replay': 'true' });
expect(result.headers?.['X-Request-Id']).toBe('req-1');
expect(result.headers?.['Idempotent-Replay']).toBe('true');
});

test('extraHeaders cannot override protected headers', () => {
const result = successResponse(200, { ok: true }, 'req-1', {
'Content-Type': 'text/html',
'Access-Control-Allow-Origin': 'https://evil.example',
'X-Request-Id': 'spoofed',
});
expect(result.headers?.['Content-Type']).toBe('application/json');
expect(result.headers?.['Access-Control-Allow-Origin']).toBe('*');
expect(result.headers?.['X-Request-Id']).toBe('req-1');
});
});

describe('paginatedResponse', () => {
Expand Down
70 changes: 70 additions & 0 deletions cdk/test/handlers/webhook-create-task.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -212,4 +212,74 @@ describe('webhook-create-task handler', () => {
const taskPut = putCalls[0][0];
expect(taskPut.input.Item.channel_metadata.webhook_id).toBe('wh-123');
});

test('returns 200 with Idempotent-Replay header for webhook replay', async () => {
const existingItem = {
task_id: 'existing-task',
user_id: 'user-abc',
status: 'SUBMITTED',
repo: 'org/repo',
task_type: 'new_task',
task_description: 'Fix the bug',
branch_name: 'bgagent/existing-task/slug',
channel_source: 'webhook',
channel_metadata: { webhook_id: 'wh-123' },
status_created_at: 'SUBMITTED#2020-01-01T00:00:00.000Z',
created_at: '2020-01-01T00:00:00.000Z',
updated_at: '2020-01-01T00:00:00.000Z',
idempotency_key: 'wh-key-123',
};
mockSend
.mockResolvedValueOnce({ Items: [{ task_id: 'existing-task' }] })
.mockResolvedValueOnce({ Item: existingItem });

const body = JSON.stringify({ repo: 'org/repo', task_description: 'Fix the bug' });
const event = makeEvent({
body,
headers: {
'X-Webhook-Signature': sign(body, TEST_SECRET),
'Idempotency-Key': 'wh-key-123',
},
});
const result = await handler(event);

expect(result.statusCode).toBe(200);
expect(result.headers?.['Idempotent-Replay']).toBe('true');
const respBody = JSON.parse(result.body);
expect(respBody.data.task_id).toBe('existing-task');
expect(mockLambdaSend).not.toHaveBeenCalled();
});

test('returns 409 for webhook replay with different user', async () => {
const existingItem = {
task_id: 'existing-task',
user_id: 'other-user',
status: 'SUBMITTED',
repo: 'org/repo',
task_type: 'new_task',
branch_name: 'bgagent/existing-task/slug',
channel_source: 'webhook',
status_created_at: 'SUBMITTED#2020-01-01T00:00:00.000Z',
created_at: '2020-01-01T00:00:00.000Z',
updated_at: '2020-01-01T00:00:00.000Z',
};
mockSend
.mockResolvedValueOnce({ Items: [{ task_id: 'existing-task' }] })
.mockResolvedValueOnce({ Item: existingItem });

const body = JSON.stringify({ repo: 'org/repo', task_description: 'Fix the bug' });
const event = makeEvent({
body,
headers: {
'X-Webhook-Signature': sign(body, TEST_SECRET),
'Idempotency-Key': 'wh-key-123',
},
});
const result = await handler(event);

expect(result.statusCode).toBe(409);
expect(JSON.parse(result.body).error.code).toBe('DUPLICATE_TASK');
expect(JSON.parse(result.body).error.message).toBe('A task with this idempotency key already exists.');
expect(mockLambdaSend).not.toHaveBeenCalled();
});
});
6 changes: 4 additions & 2 deletions docs/design/API_CONTRACT.md
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,9 @@ Creates a new task. The orchestrator runs admission control, context hydration,

For PR tasks, `branch_name` is initially `pending:pr_resolution` and resolved to the PR's `head_ref` during hydration.

**Errors:** `400 VALIDATION_ERROR`, `400 GUARDRAIL_BLOCKED`, `401 UNAUTHORIZED`, `409 DUPLICATE_TASK`, `422 REPO_NOT_ONBOARDED`, `429 RATE_LIMIT_EXCEEDED`, `503 SERVICE_UNAVAILABLE`.
**Idempotency:** Clients may send `Idempotency-Key` (same format as other `POST` requests). The first successful create returns **`201 Created`** with the body shape above. A subsequent request with the same key and the **same authenticated user** returns **`200 OK`** with the same `{ data: ... }` envelope (full `TaskDetail`, reflecting **current** task state), plus response header `Idempotent-Replay: true`. No duplicate task is created and the orchestrator is not invoked again for that replay. If the key is already bound to a task owned by **another** user, the API returns **`409 DUPLICATE_TASK`** without exposing that task (extremely unlikely for high-entropy keys).

**Errors:** `400 VALIDATION_ERROR`, `400 GUARDRAIL_BLOCKED`, `401 UNAUTHORIZED`, `409 DUPLICATE_TASK` (idempotency key collision across users only), `422 REPO_NOT_ONBOARDED`, `429 RATE_LIMIT_EXCEEDED`, `503 SERVICE_UNAVAILABLE`.

### Get task

Expand Down Expand Up @@ -331,7 +333,7 @@ Tasks created via webhook record `channel_source: 'webhook'` with audit metadata
| `FORBIDDEN` | 403 | Not authorized (e.g. accessing another user's task) |
| `TASK_NOT_FOUND` | 404 | Task ID does not exist |
| `WEBHOOK_NOT_FOUND` | 404 | Webhook does not exist or belongs to another user |
| `DUPLICATE_TASK` | 409 | Idempotency key matches existing task |
| `DUPLICATE_TASK` | 409 | Idempotency key already used by another user (`POST /v1/tasks` / webhook create); same-user replays return `200` with the existing task instead |
| `TASK_ALREADY_TERMINAL` | 409 | Cannot cancel a terminal task |
| `WEBHOOK_ALREADY_REVOKED` | 409 | Webhook is already revoked |
| `REPO_NOT_ONBOARDED` | 422 | Repository not registered (onboard via CDK, not runtime API) |
Expand Down
Loading
Loading