diff --git a/supabase/functions/_backend/files/files.ts b/supabase/functions/_backend/files/files.ts index 6a6bdcfa5a..45caa3c327 100644 --- a/supabase/functions/_backend/files/files.ts +++ b/supabase/functions/_backend/files/files.ts @@ -155,6 +155,20 @@ async function recoverUploadOffsetFromDurableObject( } } +function retryableUploadUnavailableResponse(): Response { + return new Response(JSON.stringify({ + error: 'upload_retryable', + message: 'Upload worker moved during this request. Retry the upload request.', + }), { + status: 503, + headers: { + 'Content-Type': 'application/json', + 'Retry-After': '1', + 'Tus-Resumable': TUS_VERSION, + }, + }) +} + async function fetchUploadHandlerWithRetry( c: Context, handler: DurableObjectStub, @@ -201,9 +215,10 @@ async function fetchUploadHandlerWithRetry( } catch (error) { lastError = error + const isRetryableDurableObjectError = isRetryableDurableObjectFetchError(error) const shouldRetry = canRetryRequest && attempt < maxAttempts - && isRetryableDurableObjectFetchError(error) + && isRetryableDurableObjectError cloudlogErr({ requestId: c.get('requestId'), @@ -217,6 +232,15 @@ async function fetchUploadHandlerWithRetry( }) if (!shouldRetry) { + if (!canRetryRequest && isRetryableDurableObjectError) { + cloudlog({ + requestId: c.get('requestId'), + message: 'upload handler - durable object fetch failed for streaming request, returning retryable response', + attempt, + fileId: c.get('fileId'), + }) + return retryableUploadUnavailableResponse() + } throw error } @@ -378,7 +402,7 @@ async function getHandler(c: Context): Promise { } catch (error) { cloudlogErr({ requestId: c.get('requestId'), message: 'getHandler files get failed', fileId, error }) - throw quickError(503, 'upstream_unavailable', 'File storage temporarily unavailable', { fileId }) + throw quickError(503, 'upstream_unavailable', 'File storage temporarily unavailable', { fileId }, error, { alert: false }) } if (object == null) { cloudlog({ requestId: c.get('requestId'), message: 'getHandler files object is null' }) @@ -878,4 +902,5 @@ app.route('/ok', ok) export const filesTestUtils = { fetchUploadHandlerWithRetry, isRetryableDurableObjectFetchError, + retryableUploadUnavailableResponse, } diff --git a/supabase/functions/_backend/public/build/request.ts b/supabase/functions/_backend/public/build/request.ts index 0ed567b2ee..038eb16ca9 100644 --- a/supabase/functions/_backend/public/build/request.ts +++ b/supabase/functions/_backend/public/build/request.ts @@ -33,6 +33,10 @@ interface BuilderJobResponse { status: string } +function throwBuilderUnavailable(message: string, moreInfo: Record = {}, cause?: unknown): never { + throw quickError(503, 'service_unavailable', message, moreInfo, cause, { alert: false }) +} + /** * Construct the JSON body forwarded to the builder's POST /jobs endpoint. * Extracted for testability โ€” the handler calls this, and unit tests assert the shape. @@ -171,7 +175,7 @@ export async function requestBuild( builder_url_configured: !!builderUrl, builder_api_key_configured: !!builderApiKey, }) - throw quickError(503, 'service_unavailable', 'Build service unavailable (builder not configured)') + throwBuilderUnavailable('Build service unavailable (builder not configured)') } try { @@ -227,10 +231,16 @@ export async function requestBuild( app_id, platform, }) - throw quickError(503, 'service_unavailable', 'Build service unavailable (builder error)') + throwBuilderUnavailable('Build service unavailable (builder error)', { + status: builderResponse.status, + statusText: builderResponse.statusText, + }) } } catch (error) { + if (error && typeof error === 'object' && 'status' in error && (error as { status?: unknown }).status === 503) { + throw error + } cloudlogErr({ requestId: c.get('requestId'), message: 'Builder API fetch failed', @@ -242,7 +252,7 @@ export async function requestBuild( app_id, platform, }) - throw quickError(503, 'service_unavailable', 'Build service unavailable (builder call failed)') + throwBuilderUnavailable('Build service unavailable (builder call failed)', {}, error) } const upload_expires_at = new Date(Date.now() + 60 * 60 * 1000) @@ -255,7 +265,7 @@ export async function requestBuild( builder_url: builderUrl, builder_api_key_present: !!builderApiKey, }) - throw quickError(503, 'service_unavailable', 'Build service unavailable (upload URL missing)') + throwBuilderUnavailable('Build service unavailable (upload URL missing)') } const upload_url = `${getEnv(c, 'PUBLIC_URL') || 'https://api.capgo.app'}/build/upload/${builderJob.jobId}` diff --git a/supabase/functions/_backend/triggers/queue_consumer.ts b/supabase/functions/_backend/triggers/queue_consumer.ts index a0cbcc91a8..5234d1efb9 100644 --- a/supabase/functions/_backend/triggers/queue_consumer.ts +++ b/supabase/functions/_backend/triggers/queue_consumer.ts @@ -11,6 +11,7 @@ import { backgroundTask, getEnv } from '../utils/utils.ts' // Define constants const DEFAULT_BATCH_SIZE = 950 // Default batch size for queue reads limit of CF is 1000 fetches so we take a safe margin +export const MAX_QUEUE_READS = 5 const DISCORD_IGNORED_ERROR_CODES = new Set(['version_not_found', 'no_channel']) // Zod schema for a message object @@ -37,6 +38,20 @@ interface Message { export const messagesArraySchema = z.array(messageSchema) +interface FailureDetail { + function_name: string + function_type: string + msg_id: number + read_count: number + status: number + status_text: string + error_code?: string + error_message?: string + response_body?: string + payload_size: number + cf_id: string +} + function extractMessageBody(message: Message): Record { if (message.message?.payload !== undefined) return (message.message.payload ?? {}) as Record @@ -45,6 +60,102 @@ function extractMessageBody(message: Message): Record { return legacyBody } +function getActionableQueueFailures(failureDetails: FailureDetail[]): FailureDetail[] { + return failureDetails.filter((detail) => { + if (detail.read_count < MAX_QUEUE_READS) + return false + return !detail.error_code || !DISCORD_IGNORED_ERROR_CODES.has(detail.error_code) + }) +} + +function truncateDiscordField(value: string, maxLength = 1024): string { + if (value.length <= maxLength) + return value + return `${value.slice(0, maxLength - 15)}... (truncated)` +} + +function isAsciiLetterOrDigit(char: string): boolean { + if (!char) + return false + const code = char.charCodeAt(0) + return (code >= 48 && code <= 57) + || (code >= 65 && code <= 90) + || (code >= 97 && code <= 122) +} + +function isEmailLocalChar(char: string): boolean { + return isAsciiLetterOrDigit(char) + || char === '.' + || char === '_' + || char === '%' + || char === '+' + || char === '-' +} + +function isEmailDomainChar(char: string): boolean { + return isAsciiLetterOrDigit(char) + || char === '.' + || char === '-' +} + +function isLikelyEmail(value: string): boolean { + const atIndex = value.indexOf('@') + if (atIndex <= 0 || atIndex !== value.lastIndexOf('@') || atIndex === value.length - 1) + return false + + const domainPart = value.slice(atIndex + 1) + const lastDotIndex = domainPart.lastIndexOf('.') + if (lastDotIndex <= 0 || lastDotIndex === domainPart.length - 1) + return false + + const tld = domainPart.slice(lastDotIndex + 1) + return tld.length >= 2 +} + +function redactEmailLikeSubstrings(value: string): string { + let result = '' + let cursor = 0 + let searchIndex = 0 + + while (searchIndex < value.length) { + const atIndex = value.indexOf('@', searchIndex) + if (atIndex === -1) + break + + let start = atIndex + while (start > cursor && isEmailLocalChar(value[start - 1])) + start-- + + let end = atIndex + 1 + while (end < value.length && isEmailDomainChar(value[end])) + end++ + + const candidate = value.slice(start, end) + if (isLikelyEmail(candidate)) { + result += value.slice(cursor, start) + result += '[REDACTED_EMAIL]' + cursor = end + searchIndex = end + continue + } + + searchIndex = atIndex + 1 + } + + if (cursor === 0) + return value + + return `${result}${value.slice(cursor)}` +} + +function sanitizeDiscordResponseBody(value: string): string { + return redactEmailLikeSubstrings(value) + .replace(/\b(Bearer\s+)[\w.~+/-]+=*/gi, '$1[REDACTED_TOKEN]') + .replace(/((?:api[-_]?key|token|authorization|password|secret|access[-_]?token|refresh[-_]?token)["']?\s*[:=]\s*["']?)([^"',\s}]+)/gi, '$1[REDACTED]') + .replace(/\b[\dA-F]{32,}\b/gi, '[REDACTED_TOKEN]') + .replace(/\b[\w+/=-]{40,}\b/g, '[REDACTED_TOKEN]') +} + // Helper function to generate UUID v4 function generateUUID(): string { return crypto.randomUUID() @@ -85,12 +196,13 @@ async function processQueue(c: Context, db: ReturnType, queu } const cfId = generateUUID() const httpResponse = await http_post_helper(c, function_name, function_type, body, cfId) - const errorCode = await extractErrorCode(httpResponse) + const errorDetails = await extractErrorDetails(httpResponse) return { httpResponse, - errorCode, + errorDetails, cfId, + payloadSize: JSON.stringify(body).length, ...message, } })) @@ -128,12 +240,14 @@ async function processQueue(c: Context, db: ReturnType, queu read_count: msg.read_ct, status: msg.httpResponse.status, status_text: msg.httpResponse.statusText, - error_code: msg.errorCode ?? undefined, - payload_size: msg.message?.payload ? JSON.stringify(msg.message.payload).length : 0, + error_code: msg.errorDetails.errorCode ?? undefined, + error_message: msg.errorDetails.errorMessage ?? undefined, + response_body: msg.errorDetails.bodyPreview ?? undefined, + payload_size: msg.payloadSize, cf_id: msg.cfId, })) - const actionableFailures = failureDetails.filter(detail => !detail.error_code || !DISCORD_IGNORED_ERROR_CODES.has(detail.error_code)) + const actionableFailures = getActionableQueueFailures(failureDetails) const groupedByFunction = actionableFailures.reduce((acc, detail) => { const key = detail.function_name @@ -163,10 +277,12 @@ async function processQueue(c: Context, db: ReturnType, queu }, { name: '๐Ÿ” Detailed Failures', - value: actionableFailures.slice(0, 10).map((detail) => { + value: truncateDiscordField(actionableFailures.slice(0, 10).map((detail) => { const cfLogUrl = `https://dash.cloudflare.com/${getEnv(c, 'CF_ACCOUNT_ANALYTICS_ID')}/workers/services/view/capgo_api-prod/production/observability/logs?workers-observability-view=%22invocations%22&filters=%5B%7B%22key%22%3A%22%24workers.event.request.headers.x-capgo-cf-id%22%2C%22type%22%3A%22string%22%2C%22value%22%3A%22${detail.cf_id}%22%2C%22operation%22%3A%22eq%22%7D%5D` - return `**${detail.function_name}** | Status: ${detail.status} | Read: ${detail.read_count}/5 | [CF Logs](${cfLogUrl})` - }).join('\n'), + const errorInfo = detail.error_code ? ` | Error: ${detail.error_code}` : '' + const messageInfo = detail.error_message ? ` | ${truncateDiscordField(detail.error_message.replace(/\s+/g, ' ').trim(), 180)}` : '' + return `**${detail.function_name}** | Status: ${detail.status} | Read: ${detail.read_count}/${MAX_QUEUE_READS}${errorInfo}${messageInfo} | [CF Logs](${cfLogUrl})` + }).join('\n')), inline: false, }, { @@ -181,7 +297,7 @@ async function processQueue(c: Context, db: ReturnType, queu }, { name: 'โš ๏ธ Retry Analysis', - value: `**Will Retry:** ${actionableFailures.filter(d => d.read_count < 5).length}\n**Will Archive:** ${actionableFailures.filter(d => d.read_count >= 5).length}`, + value: `**Retry Budget Exhausted:** ${actionableFailures.length}\n**Will Archive:** ${actionableFailures.length}`, inline: true, }, { @@ -189,6 +305,13 @@ async function processQueue(c: Context, db: ReturnType, queu value: `**Avg Size:** ${Math.round(actionableFailures.reduce((sum, d) => sum + d.payload_size, 0) / actionableFailures.length)} bytes\n**Max Size:** ${Math.max(...actionableFailures.map(d => d.payload_size))} bytes`, inline: true, }, + { + name: '๐Ÿงพ Sanitized Response Body', + value: truncateDiscordField(actionableFailures + .map(detail => detail.response_body ? `**${detail.function_name}:** ${sanitizeDiscordResponseBody(detail.response_body)}` : `**${detail.function_name}:** (empty)`) + .join('\n')), + inline: false, + }, ], footer: { text: `Queue: ${queueName} | Environment: ${getEnv(c, 'ENVIRONMENT') ?? 'unknown'}`, @@ -198,7 +321,12 @@ async function processQueue(c: Context, db: ReturnType, queu }) } else { - cloudlog({ requestId: c.get('requestId'), message: `[${queueName}] Suppressed Discord alert for ignored error codes.`, ignoredErrors: Array.from(DISCORD_IGNORED_ERROR_CODES) }) + cloudlog({ + requestId: c.get('requestId'), + message: `[${queueName}] Suppressed Discord alert for retryable or ignored queue failures.`, + retryingFailures: failureDetails.filter(detail => detail.read_count < MAX_QUEUE_READS).length, + ignoredErrors: Array.from(DISCORD_IGNORED_ERROR_CODES), + }) } // set visibility timeout to random number to prevent Auto DDOS } @@ -211,19 +339,31 @@ async function processQueue(c: Context, db: ReturnType, queu } } -async function extractErrorCode(response: Response): Promise { +async function extractErrorDetails(response: Response): Promise<{ + errorCode: string | null + errorMessage: string | null + bodyPreview: string | null +}> { if (response.status < 400) { - return null + return { + bodyPreview: null, + errorCode: null, + errorMessage: null, + } } const cloned = response.clone() const contentType = cloned.headers.get('content-type') ?? '' let payload: any = null + let bodyPreview: string | null = null try { if (contentType.includes('application/json')) { - payload = await cloned.json() + const text = await cloned.text() + bodyPreview = text.slice(0, 500) + payload = text ? JSON.parse(text) : null } else { const text = await cloned.text() + bodyPreview = text ? text.slice(0, 500) : null if (text) { try { payload = JSON.parse(text) @@ -239,11 +379,18 @@ async function extractErrorCode(response: Response): Promise { } if (payload && typeof payload === 'object') { const errorCode = payload.error ?? payload.errorCode - if (typeof errorCode === 'string') { - return errorCode + const errorMessage = payload.message ?? payload.errorMessage + return { + bodyPreview, + errorCode: typeof errorCode === 'string' ? errorCode : null, + errorMessage: typeof errorMessage === 'string' ? errorMessage : null, } } - return null + return { + bodyPreview, + errorCode: null, + errorMessage: null, + } } // Reads messages from the queue and logs them @@ -475,5 +622,8 @@ app.post('/sync', async (c) => { }) export const __queueConsumerTestUtils__ = { + extractErrorDetails, extractMessageBody, + getActionableQueueFailures, + sanitizeDiscordResponseBody, } diff --git a/supabase/functions/_backend/utils/hono.ts b/supabase/functions/_backend/utils/hono.ts index e5dfb3e1b6..1d2b721ea9 100644 --- a/supabase/functions/_backend/utils/hono.ts +++ b/supabase/functions/_backend/utils/hono.ts @@ -293,13 +293,18 @@ export function simpleErrorWithStatus(c: Context, status: ContentfulStatusCode, return c.json(res, status) } -export function quickError(status: number, errorCode: string, message: string, moreInfo: any = {}, cause?: any): never { +export interface QuickErrorOptions { + alert?: boolean +} + +export function quickError(status: number, errorCode: string, message: string, moreInfo: any = {}, cause?: any, options: QuickErrorOptions = {}): never { // Store error details in cause so onError can extract them const errorDetails = { error: errorCode, message, moreInfo, originalCause: cause, + suppressDiscordAlert: options.alert === false, } // Throw a simple HTTPException - onError will create the response with X-Request-Id header throw new HTTPException(status as any, { diff --git a/supabase/functions/_backend/utils/on_error.ts b/supabase/functions/_backend/utils/on_error.ts index 4bb78307b2..d0ae4a6507 100644 --- a/supabase/functions/_backend/utils/on_error.ts +++ b/supabase/functions/_backend/utils/on_error.ts @@ -101,6 +101,9 @@ export function onError(functionName: string) { moreInfo: res.moreInfo, stack: serializeError(e)?.stack ?? 'N/A', }) + const suppressDiscordAlert = e.cause + && typeof e.cause === 'object' + && (e.cause as { suppressDiscordAlert?: unknown }).suppressDiscordAlert === true if (e.status === 429) { const rateLimitResetAt = typeof res.moreInfo?.rateLimitResetAt === 'number' ? res.moreInfo.rateLimitResetAt : undefined let retryAfterSeconds = typeof res.moreInfo?.retryAfterSeconds === 'number' ? res.moreInfo.retryAfterSeconds : undefined @@ -115,7 +118,7 @@ export function onError(functionName: string) { } return c.json({ error: 'too_many_requests', message: 'You are being rate limited' }, e.status) } - if (e.status >= 500) { + if (e.status >= 500 && !suppressDiscordAlert) { await backgroundTask(c, sendDiscordAlert500(c, functionName, body, e)) void backgroundTask(c, capturePosthogException(c, { error: e, diff --git a/tests/backend-alert-resilience.unit.test.ts b/tests/backend-alert-resilience.unit.test.ts index 8d1e4e8001..1f58344587 100644 --- a/tests/backend-alert-resilience.unit.test.ts +++ b/tests/backend-alert-resilience.unit.test.ts @@ -242,7 +242,7 @@ describe('backend alert resilience helpers', () => { expect(handler.fetch).toHaveBeenCalledTimes(2) }) - it('does not retry retryable durable object resets for streaming upload bodies', async () => { + it('returns retryable response for streaming upload bodies when a durable object moves', async () => { const { filesTestUtils } = await import('../supabase/functions/_backend/files/files.ts') const handler = { @@ -259,15 +259,22 @@ describe('backend alert resilience helpers', () => { ) }) - await expect(filesTestUtils.fetchUploadHandlerWithRetry( + const response = await filesTestUtils.fetchUploadHandlerWithRetry( createTestContext(), handler, new Request('http://localhost/files/upload/attachments/test.zip', { method: 'PATCH', body: 'chunk-data', }), - )).rejects.toThrow('cannot access storage because object has moved to a different machine') + ) + expect(response.status).toBe(503) + expect(response.headers.get('Retry-After')).toBe('1') + expect(response.headers.get('Tus-Resumable')).toBe('1.0.0') + await expect(response.json()).resolves.toEqual({ + error: 'upload_retryable', + message: 'Upload worker moved during this request. Retry the upload request.', + }) expect(handler.fetch).toHaveBeenCalledTimes(1) }) diff --git a/tests/cli-new-encryption.test.ts b/tests/cli-new-encryption.test.ts index 291d5feec6..1aa7c89edd 100644 --- a/tests/cli-new-encryption.test.ts +++ b/tests/cli-new-encryption.test.ts @@ -200,7 +200,7 @@ describe.concurrent('tests CLI encryption encrypt/upload/download/decrypt', () = await resetAppData(APPNAME) await resetAppDataStats(APPNAME) } - }) + }, 60000) it.concurrent('test upload bundle with custom key data', async () => { const id = randomUUID() @@ -233,7 +233,7 @@ describe.concurrent('tests CLI encryption encrypt/upload/download/decrypt', () = await resetAppData(APPNAME) await resetAppDataStats(APPNAME) } - }) + }, 60000) it.concurrent('test upload bundle with custom key path', async () => { const id = randomUUID() @@ -271,7 +271,7 @@ describe.concurrent('tests CLI encryption encrypt/upload/download/decrypt', () = await resetAppData(APPNAME) await resetAppDataStats(APPNAME) } - }) + }, 60000) }) describe.concurrent('tests CLI upload no encryption', () => { @@ -335,5 +335,5 @@ describe.concurrent('tests CLI upload no encryption', () => { await resetAppData(APPNAME) await resetAppDataStats(APPNAME) } - }) + }, 60000) }) diff --git a/tests/cli.test.ts b/tests/cli.test.ts index e6bfdaee07..c5d2fc3cf3 100644 --- a/tests/cli.test.ts +++ b/tests/cli.test.ts @@ -99,7 +99,7 @@ describe('tests CLI upload', () => { ignoreCompatibilityCheck: true, }) expect(result.success).toBe(true) - }, 30000) + }, 60000) it('should not upload same hash twice', async () => { const appName = `com.cli_duplicate_${randomUUID()}` @@ -132,10 +132,10 @@ describe('tests CLI upload', () => { resetAppDataStats(appName), ]) } - }, 30000) + }, 60000) }) -describe.concurrent('tests CLI upload options in parallel', () => { +describe('tests CLI upload options in parallel', () => { const sharedId = randomUUID() const SHARED_APPNAME = `com.cli_shared_${sharedId}` @@ -197,9 +197,9 @@ describe.concurrent('tests CLI upload options in parallel', () => { }) expect(result.success).toBe(false) expect(result.error).toContain('notifyAppReady') - }, 30000) + }, 60000) - it.concurrent('test --min-update-version', async () => { + it('test --min-update-version', async () => { const semver = getSemver() const result = await uploadBundleSDK(SHARED_APPNAME, semver, 'production', { ignoreCompatibilityCheck: true, @@ -217,9 +217,9 @@ describe.concurrent('tests CLI upload options in parallel', () => { expect(error).toBeNull() expect(data?.min_update_version).toBe('1.0.0') - }, 30000) + }, 60000) - it.concurrent('cannot upload with wrong api key', async () => { + it('cannot upload with wrong api key', async () => { const testApiKey = randomUUID() const semver = getSemver() const result = await uploadBundleSDK(SHARED_APPNAME, semver, 'production', { @@ -228,7 +228,7 @@ describe.concurrent('tests CLI upload options in parallel', () => { }) expect(result.success).toBe(false) expect(result.error).toContain('Invalid API key') - }, 30000) + }, 60000) it.concurrent('should test upload with org-limited API key', async () => { const app = apiTestApps.get('org-limited') @@ -255,7 +255,7 @@ describe.concurrent('tests CLI upload options in parallel', () => { if (createdApikeyId !== null) await deleteScopedApiKey(createdApikeyId) } - }, 30000) + }, 60000) it.concurrent('should fail upload with wrong org-limited API key', async () => { const app = apiTestApps.get('wrong-org') @@ -288,5 +288,5 @@ describe.concurrent('tests CLI upload options in parallel', () => { if (createdApikeyId !== null) await deleteScopedApiKey(createdApikeyId) } - }, 30000) + }, 60000) }) diff --git a/tests/on-error-posthog.unit.test.ts b/tests/on-error-posthog.unit.test.ts index 2627cb2e4b..49960a3241 100644 --- a/tests/on-error-posthog.unit.test.ts +++ b/tests/on-error-posthog.unit.test.ts @@ -95,6 +95,33 @@ describe('onError PostHog capture', () => { }) }) + it('skips crash reporting for expected operational HTTP exceptions', async () => { + const { onError } = await import('../supabase/functions/_backend/utils/on_error.ts') + + const error = new HTTPException(503, { + cause: { + error: 'service_unavailable', + message: 'Build service unavailable', + moreInfo: {}, + suppressDiscordAlert: true, + }, + }) + + const response = await onError('api')(error, createContext()) + + expect(backgroundTaskMock).not.toHaveBeenCalled() + expect(sendDiscordAlert500Mock).not.toHaveBeenCalled() + expect(capturePosthogExceptionMock).not.toHaveBeenCalled() + expect(response).toEqual({ + body: { + error: 'service_unavailable', + message: 'Build service unavailable', + moreInfo: {}, + }, + status: 503, + }) + }) + it('skips PostHog capture for client HTTP exceptions', async () => { const { onError } = await import('../supabase/functions/_backend/utils/on_error.ts') diff --git a/tests/plan-usage-org-rpc-access.test.ts b/tests/plan-usage-org-rpc-access.test.ts index fdfc40a7b1..608d0adba2 100644 --- a/tests/plan-usage-org-rpc-access.test.ts +++ b/tests/plan-usage-org-rpc-access.test.ts @@ -8,6 +8,7 @@ import { getSupabaseClient, POSTGRES_URL } from './test-utils.ts' const SUPABASE_URL = (env.SUPABASE_URL ?? '').replace(/\/$/, '') const SUPABASE_ANON_KEY = env.SUPABASE_ANON_KEY ?? '' +const USE_CLOUDFLARE_WORKERS = env.USE_CLOUDFLARE_WORKERS === 'true' if (!SUPABASE_URL) throw new Error('SUPABASE_URL is required for plan usage RPC authorization tests') @@ -69,131 +70,131 @@ async function createAuthenticatedClient(email: string, password: string) { return client } -beforeAll(async () => { - const { data: ownerAuth, error: ownerAuthError } = await serviceRoleSupabase.auth.admin.createUser({ - email: OWNER_EMAIL, - password: TEST_PASSWORD, - email_confirm: true, - }) - if (ownerAuthError) - throw ownerAuthError - - const { data: attackerAuth, error: attackerAuthError } = await serviceRoleSupabase.auth.admin.createUser({ - email: ATTACKER_EMAIL, - password: TEST_PASSWORD, - email_confirm: true, - }) - if (attackerAuthError) - throw attackerAuthError - - ownerUserId = ownerAuth.user.id - attackerUserId = attackerAuth.user.id - - const { error: usersError } = await serviceRoleSupabase.from('users').insert([ - { - id: ownerUserId, +describe.skipIf(USE_CLOUDFLARE_WORKERS)('plan usage org RPC authorization', () => { + beforeAll(async () => { + const { data: ownerAuth, error: ownerAuthError } = await serviceRoleSupabase.auth.admin.createUser({ email: OWNER_EMAIL, - }, - { - id: attackerUserId, + password: TEST_PASSWORD, + email_confirm: true, + }) + if (ownerAuthError) + throw ownerAuthError + + const { data: attackerAuth, error: attackerAuthError } = await serviceRoleSupabase.auth.admin.createUser({ email: ATTACKER_EMAIL, - }, - ]) - if (usersError) - throw usersError - - const { data: planRow, error: planError } = await serviceRoleSupabase - .from('plans') - .select('name,stripe_id') - .order('created_at', { ascending: true }) - .limit(1) - .single() - if (planError) - throw planError - planName = planRow.name - - const { error: stripeError } = await serviceRoleSupabase.from('stripe_info').insert({ - customer_id: CUSTOMER_ID, - product_id: planRow.stripe_id, - status: 'succeeded', - subscription_anchor_start: new Date(Date.now() - 10 * 24 * 60 * 60 * 1000).toISOString(), - subscription_anchor_end: new Date(Date.now() + 20 * 24 * 60 * 60 * 1000).toISOString(), - }) - if (stripeError) - throw stripeError - - const { data: orgRow, error: orgError } = await serviceRoleSupabase - .from('orgs') - .insert({ - created_by: ownerUserId, - name: 'Plan Access Test Org', - management_email: OWNER_EMAIL, + password: TEST_PASSWORD, + email_confirm: true, + }) + if (attackerAuthError) + throw attackerAuthError + + ownerUserId = ownerAuth.user.id + attackerUserId = attackerAuth.user.id + + const { error: usersError } = await serviceRoleSupabase.from('users').insert([ + { + id: ownerUserId, + email: OWNER_EMAIL, + }, + { + id: attackerUserId, + email: ATTACKER_EMAIL, + }, + ]) + if (usersError) + throw usersError + + const { data: planRow, error: planError } = await serviceRoleSupabase + .from('plans') + .select('name,stripe_id') + .order('created_at', { ascending: true }) + .limit(1) + .single() + if (planError) + throw planError + planName = planRow.name + + const { error: stripeError } = await serviceRoleSupabase.from('stripe_info').insert({ customer_id: CUSTOMER_ID, + product_id: planRow.stripe_id, + status: 'succeeded', + subscription_anchor_start: new Date(Date.now() - 10 * 24 * 60 * 60 * 1000).toISOString(), + subscription_anchor_end: new Date(Date.now() + 20 * 24 * 60 * 60 * 1000).toISOString(), }) - .select('id') - .single() - if (orgError) - throw orgError - orgId = orgRow.id - - const { error: orgUserError } = await serviceRoleSupabase.from('org_users').insert({ - org_id: orgId, - user_id: ownerUserId, - user_right: 'super_admin', - }) - if (orgUserError) - throw orgUserError + if (stripeError) + throw stripeError + + const { data: orgRow, error: orgError } = await serviceRoleSupabase + .from('orgs') + .insert({ + created_by: ownerUserId, + name: 'Plan Access Test Org', + management_email: OWNER_EMAIL, + customer_id: CUSTOMER_ID, + }) + .select('id') + .single() + if (orgError) + throw orgError + orgId = orgRow.id + + const { error: orgUserError } = await serviceRoleSupabase.from('org_users').insert({ + org_id: orgId, + user_id: ownerUserId, + user_right: 'super_admin', + }) + if (orgUserError) + throw orgUserError - ownerSupabase = await createAuthenticatedClient(OWNER_EMAIL, TEST_PASSWORD) - attackerSupabase = await createAuthenticatedClient(ATTACKER_EMAIL, TEST_PASSWORD) + ownerSupabase = await createAuthenticatedClient(OWNER_EMAIL, TEST_PASSWORD) + attackerSupabase = await createAuthenticatedClient(ATTACKER_EMAIL, TEST_PASSWORD) - const { data: cycleData, error: cycleError } = await ownerSupabase.rpc('get_cycle_info_org', { - orgid: orgId, - }) - if (cycleError) - throw cycleError - if (!cycleData?.[0]?.subscription_anchor_start || !cycleData[0]?.subscription_anchor_end) - throw new Error('Expected get_cycle_info_org to return a billing cycle for the test org') - - const cycleStart = cycleData[0].subscription_anchor_start.slice(0, 10) - const cycleEnd = cycleData[0].subscription_anchor_end.slice(0, 10) - - const { error: cacheError } = await serviceRoleSupabase.from('app_metrics_cache').insert({ - org_id: orgId, - start_date: cycleStart, - end_date: cycleEnd, - response: [], - cached_at: new Date().toISOString(), + const { data: cycleData, error: cycleError } = await ownerSupabase.rpc('get_cycle_info_org', { + orgid: orgId, + }) + if (cycleError) + throw cycleError + if (!cycleData?.[0]?.subscription_anchor_start || !cycleData[0]?.subscription_anchor_end) + throw new Error('Expected get_cycle_info_org to return a billing cycle for the test org') + + const cycleStart = cycleData[0].subscription_anchor_start.slice(0, 10) + const cycleEnd = cycleData[0].subscription_anchor_end.slice(0, 10) + + const { error: cacheError } = await serviceRoleSupabase.from('app_metrics_cache').insert({ + org_id: orgId, + start_date: cycleStart, + end_date: cycleEnd, + response: [], + cached_at: new Date().toISOString(), + }) + if (cacheError) + throw cacheError }) - if (cacheError) - throw cacheError -}) -afterAll(async () => { - if (orgId) - await serviceRoleSupabase.from('app_metrics_cache').delete().eq('org_id', orgId) + afterAll(async () => { + if (orgId) + await serviceRoleSupabase.from('app_metrics_cache').delete().eq('org_id', orgId) - if (orgId) { - await serviceRoleSupabase.from('org_users').delete().eq('org_id', orgId) - await serviceRoleSupabase.from('orgs').delete().eq('id', orgId) - } + if (orgId) { + await serviceRoleSupabase.from('org_users').delete().eq('org_id', orgId) + await serviceRoleSupabase.from('orgs').delete().eq('id', orgId) + } - await serviceRoleSupabase.from('stripe_info').delete().eq('customer_id', CUSTOMER_ID) + await serviceRoleSupabase.from('stripe_info').delete().eq('customer_id', CUSTOMER_ID) - if (ownerUserId) - await serviceRoleSupabase.from('users').delete().eq('id', ownerUserId) - if (attackerUserId) - await serviceRoleSupabase.from('users').delete().eq('id', attackerUserId) + if (ownerUserId) + await serviceRoleSupabase.from('users').delete().eq('id', ownerUserId) + if (attackerUserId) + await serviceRoleSupabase.from('users').delete().eq('id', attackerUserId) - if (ownerUserId) - await serviceRoleSupabase.auth.admin.deleteUser(ownerUserId) - if (attackerUserId) - await serviceRoleSupabase.auth.admin.deleteUser(attackerUserId) + if (ownerUserId) + await serviceRoleSupabase.auth.admin.deleteUser(ownerUserId) + if (attackerUserId) + await serviceRoleSupabase.auth.admin.deleteUser(attackerUserId) - await pgPool.end() -}) + await pgPool.end() + }) -describe('plan usage org RPC authorization', () => { it.concurrent('allows authorized org members to read plan usage RPCs', async () => { const { data: planNameData, error: planNameError } = await ownerSupabase.rpc('get_current_plan_name_org', { orgid: orgId, diff --git a/tests/queue-consumer-message-shape.unit.test.ts b/tests/queue-consumer-message-shape.unit.test.ts index 5b57384c4d..25f1a11984 100644 --- a/tests/queue-consumer-message-shape.unit.test.ts +++ b/tests/queue-consumer-message-shape.unit.test.ts @@ -1,5 +1,5 @@ import { describe, expect, it } from 'vitest' -import { __queueConsumerTestUtils__, messagesArraySchema } from '../supabase/functions/_backend/triggers/queue_consumer.ts' +import { __queueConsumerTestUtils__, MAX_QUEUE_READS, messagesArraySchema } from '../supabase/functions/_backend/triggers/queue_consumer.ts' describe('queue_consumer legacy message compatibility', () => { it.concurrent('uses the payload envelope when it is present', () => { @@ -56,4 +56,86 @@ describe('queue_consumer legacy message compatibility', () => { expect(__queueConsumerTestUtils__.extractMessageBody(message!)).toEqual({}) }) + + it.concurrent('does not alert Discord while failed messages still have retries left', () => { + expect(__queueConsumerTestUtils__.getActionableQueueFailures([ + { + cf_id: 'cf-1', + function_name: 'on_version_update', + function_type: 'supabase', + msg_id: 1, + payload_size: 10, + read_count: 1, + status: 502, + status_text: 'Bad Gateway', + }, + ])).toEqual([]) + }) + + it.concurrent('alerts Discord after retry budget is exhausted', () => { + const failure = { + cf_id: 'cf-1', + error_code: 'internal_error', + function_name: 'on_version_update', + function_type: 'supabase', + msg_id: 1, + payload_size: 10, + read_count: MAX_QUEUE_READS, + status: 500, + status_text: 'Internal Server Error', + } + + expect(__queueConsumerTestUtils__.getActionableQueueFailures([failure])).toEqual([failure]) + }) + + it.concurrent('keeps ignored queue errors out of Discord after retries are exhausted', () => { + expect(__queueConsumerTestUtils__.getActionableQueueFailures([ + { + cf_id: 'cf-1', + error_code: 'version_not_found', + function_name: 'on_version_update', + function_type: 'supabase', + msg_id: 1, + payload_size: 10, + read_count: MAX_QUEUE_READS, + status: 400, + status_text: 'Bad Request', + }, + ])).toEqual([]) + }) + + it.concurrent('redacts sensitive data before queue failures are sent to Discord', () => { + const sanitized = __queueConsumerTestUtils__.sanitizeDiscordResponseBody(JSON.stringify({ + authorization: 'Bearer abcdefghijklmnopqrstuvwxyz1234567890', + email: 'alice@capgo.app', + stack: 'Error: builder unavailable', + token: 'super-secret-token-value', + traceId: 'ABCDEF0123456789ABCDEF0123456789', + })) + + expect(sanitized).toContain('[REDACTED_EMAIL]') + expect(sanitized).toContain('[REDACTED_TOKEN]') + expect(sanitized).toContain('[REDACTED]') + expect(sanitized).not.toContain('alice@capgo.app') + expect(sanitized).not.toContain('super-secret-token-value') + expect(sanitized).toContain('builder unavailable') + }) + + it.concurrent('keeps message-only JSON error details actionable', async () => { + const response = new Response(JSON.stringify({ + message: 'builder unavailable', + }), { + headers: { + 'content-type': 'application/json', + }, + status: 503, + statusText: 'Service Unavailable', + }) + + await expect(__queueConsumerTestUtils__.extractErrorDetails(response)).resolves.toEqual({ + bodyPreview: '{"message":"builder unavailable"}', + errorCode: null, + errorMessage: 'builder unavailable', + }) + }) })