From c40dc076c5251d5303f3a297f943ae8e6c7a9f1c Mon Sep 17 00:00:00 2001 From: Martin Donadieu Date: Mon, 18 May 2026 17:14:18 +0200 Subject: [PATCH 1/2] fix(backend): retry manifest size lookup --- scripts/backfill_manifest_file_sizes.mjs | 408 ++++++++++++++++++ .../_backend/triggers/on_manifest_create.ts | 45 +- .../_backend/triggers/queue_consumer.ts | 86 +++- supabase/functions/_backend/utils/s3.ts | 167 ++++--- tests/backend-alert-resilience.unit.test.ts | 9 + .../queue-consumer-message-shape.unit.test.ts | 23 + 6 files changed, 658 insertions(+), 80 deletions(-) create mode 100644 scripts/backfill_manifest_file_sizes.mjs diff --git a/scripts/backfill_manifest_file_sizes.mjs b/scripts/backfill_manifest_file_sizes.mjs new file mode 100644 index 0000000000..1d76984cd6 --- /dev/null +++ b/scripts/backfill_manifest_file_sizes.mjs @@ -0,0 +1,408 @@ +#!/usr/bin/env bun + +/** + * Backfill manifest.file_size from trusted object storage metadata. + * + * Dry-run by default: + * bun scripts/backfill_manifest_file_sizes.mjs --app-version-id=180988804 + * + * Apply updates: + * bun scripts/backfill_manifest_file_sizes.mjs --app-version-id=180988804 --apply + * bun scripts/backfill_manifest_file_sizes.mjs --all --apply + */ + +import { mkdirSync, writeFileSync } from 'node:fs' +import { dirname, resolve } from 'node:path' +import process from 'node:process' +import { fileURLToPath } from 'node:url' +import { S3Client } from '@bradenmacdonald/s3-lite-client' +import { config } from 'dotenv' +import pg from 'pg' + +const __dirname = dirname(fileURLToPath(import.meta.url)) + +for (const envPath of [ + '../.env', + '../.env.local', + '../internal/cloudflare/.env.prod', + '../internal/cloudflare/.env.local', +]) { + config({ path: resolve(__dirname, envPath) }) +} + +const DB_URL_ENV_KEYS = [ + 'MAIN_SUPABASE_DB_URL', + 'DATABASE_URL', + 'POSTGRES_URL', + 'SUPABASE_DB_URL', + 'SUPABASE_DB_DIRECT_URL', + 'DIRECT_URL', +] + +function hasFlag(name) { + return process.argv.includes(name) +} + +function getArgValue(name) { + const prefix = `${name}=` + const match = process.argv.find(arg => arg.startsWith(prefix)) + if (match) + return match.slice(prefix.length) + + const index = process.argv.indexOf(name) + if (index !== -1) + return process.argv[index + 1] + + return undefined +} + +function getNumberArg(name, fallback) { + const value = getArgValue(name) + if (value === undefined) + return fallback + const parsed = Number.parseInt(value, 10) + if (!Number.isFinite(parsed) || parsed <= 0) + throw new Error(`${name} must be a positive integer`) + return parsed +} + +function getDatabaseUrl() { + for (const key of DB_URL_ENV_KEYS) { + const value = process.env[key] + if (value) + return value + } + throw new Error(`Missing Postgres URL. Set one of: ${DB_URL_ENV_KEYS.join(', ')}`) +} + +function getRequiredEnv(name) { + const value = process.env[name] + if (!value) + throw new Error(`Missing env var: ${name}`) + return value +} + +function getStorageEndpoint() { + const endpoint = getRequiredEnv('S3_ENDPOINT') + return endpoint.includes('://') ? endpoint : `https://${endpoint}` +} + +function parseObjectSizeFromHeaders(contentRange, contentLength) { + if (contentRange && contentRange.includes('/')) { + const total = Number.parseInt(contentRange.split('/').at(1) ?? '0', 10) + if (Number.isFinite(total) && total > 0) + return total + } + + if (contentLength) { + const len = Number.parseInt(contentLength, 10) + if (Number.isFinite(len) && len > 0) + return len + } + + return 0 +} + +function serializeError(error) { + if (error instanceof Error) { + return { + message: error.message, + name: error.name, + status: error.status ?? error.statusCode ?? error.code, + } + } + return { message: String(error) } +} + +async function getObjectSize(s3, s3Path) { + try { + const stat = await s3.statObject(s3Path, { + headers: { 'Accept-Encoding': 'identity' }, + }) + const size = Number.isFinite(stat.size) ? stat.size : 0 + if (size > 0) + return { method: 'head', size } + } + catch (error) { + const rangeResult = await getObjectSizeWithRange(s3, s3Path, 'head_error') + if (rangeResult.size > 0) + return rangeResult + return { ...rangeResult, error: serializeError(error) } + } + + return await getObjectSizeWithRange(s3, s3Path, 'missing_head_size') +} + +async function getObjectSizeWithRange(s3, s3Path, reason) { + try { + const url = await s3.getPresignedUrl('GET', s3Path, { + parameters: { 'x-id': 'GetObject' }, + }) + const response = await fetch(url, { + method: 'GET', + headers: { + 'Accept-Encoding': 'identity', + 'Range': 'bytes=0-0', + }, + }) + const contentRange = response.headers.get('content-range') || response.headers.get('Content-Range') + const contentLength = response.headers.get('content-length') || response.headers.get('Content-Length') + const size = parseObjectSizeFromHeaders(contentRange, contentLength) + response.body?.cancel() + return { + contentLength, + contentRange, + method: 'range', + reason, + size, + status: response.status, + statusText: response.statusText, + } + } + catch (error) { + return { + error: serializeError(error), + method: 'range', + reason, + size: 0, + } + } +} + +async function mapWithConcurrency(items, concurrency, mapper) { + const results = [] + let cursor = 0 + + async function worker() { + while (cursor < items.length) { + const index = cursor++ + results[index] = await mapper(items[index], index) + } + } + + const workerCount = Math.min(Math.max(1, concurrency), items.length) + await Promise.all(Array.from({ length: workerCount }, () => worker())) + return results +} + +function buildCandidateQuery({ afterId, appId, appVersionId, includeDeleted, limit }) { + const params = [afterId] + const where = [ + 'm.id > $1', + '(m.file_size IS NULL OR m.file_size <= 0)', + 'm.s3_path IS NOT NULL', + ] + + if (!includeDeleted) + where.push('av.deleted = false') + + if (appVersionId) { + params.push(appVersionId) + where.push(`m.app_version_id = $${params.length}`) + } + + if (appId) { + params.push(appId) + where.push(`av.app_id = $${params.length}`) + } + + params.push(limit) + + return { + params, + sql: ` + SELECT + m.id, + m.app_version_id, + m.file_name, + m.s3_path, + m.file_size, + av.app_id, + av.name AS version_name, + av.deleted + FROM public.manifest m + INNER JOIN public.app_versions av ON av.id = m.app_version_id + WHERE ${where.join(' AND ')} + ORDER BY m.id + LIMIT $${params.length} + `, + } +} + +async function main() { + if (hasFlag('--help') || hasFlag('-h')) { + console.log(`Usage: + bun scripts/backfill_manifest_file_sizes.mjs --app-version-id= [--apply] + bun scripts/backfill_manifest_file_sizes.mjs --app-id= [--limit=1000] [--apply] + bun scripts/backfill_manifest_file_sizes.mjs --all --apply + +Options: + --apply Update manifest.file_size. Without this, dry-run only. + --all Scan all manifest rows with missing size. + --app-version-id Restrict to one bundle id. + --app-id Restrict to one app id. + --limit Max rows to scan without --all. Default: 500. + --batch-size DB page size. Default: 500. + --concurrency Storage HEAD/RANGE concurrency. Default: 20. + --include-deleted Include deleted bundles. + --verbose Print every checked row. +`) + return + } + + const apply = hasFlag('--apply') + const all = hasFlag('--all') + const includeDeleted = hasFlag('--include-deleted') + const verbose = hasFlag('--verbose') + const appVersionIdRaw = getArgValue('--app-version-id') + const appVersionId = appVersionIdRaw ? Number.parseInt(appVersionIdRaw, 10) : null + const appId = getArgValue('--app-id') ?? null + const limit = all ? Number.POSITIVE_INFINITY : getNumberArg('--limit', 500) + const batchSize = getNumberArg('--batch-size', 500) + const concurrency = getNumberArg('--concurrency', 20) + + if (!all && !appVersionId && !appId) + throw new Error('Pass --app-version-id, --app-id, or --all') + if (appVersionIdRaw && (!Number.isFinite(appVersionId) || appVersionId <= 0)) + throw new Error('--app-version-id must be a positive integer') + + const pool = new pg.Pool({ + connectionString: getDatabaseUrl(), + ssl: { rejectUnauthorized: false }, + }) + const s3 = new S3Client({ + accessKey: getRequiredEnv('S3_ACCESS_KEY_ID'), + bucket: getRequiredEnv('S3_BUCKET'), + endPoint: getStorageEndpoint(), + pathStyle: true, + region: process.env.S3_REGION || 'auto', + secretKey: getRequiredEnv('S3_SECRET_ACCESS_KEY'), + }) + + const report = { + apply, + appId, + appVersionId, + checked: 0, + errors: [], + fixed: 0, + includeDeleted, + missingSize: 0, + scannedAt: new Date().toISOString(), + unchanged: 0, + } + + let afterId = 0 + let remaining = limit + + try { + while (remaining > 0) { + const pageLimit = all ? batchSize : Math.min(batchSize, remaining) + const query = buildCandidateQuery({ + afterId, + appId, + appVersionId, + includeDeleted, + limit: pageLimit, + }) + const candidates = (await pool.query(query.sql, query.params)).rows + if (candidates.length === 0) + break + + afterId = candidates[candidates.length - 1].id + remaining -= candidates.length + + const results = await mapWithConcurrency(candidates, concurrency, async (row) => { + const storage = await getObjectSize(s3, row.s3_path) + if (verbose) { + console.log(`${row.id} ${row.file_name} size=${storage.size} method=${storage.method}${storage.status ? ` status=${storage.status}` : ''}`) + } + + if (storage.size <= 0) { + return { + row, + storage, + updated: false, + } + } + + if (!apply) { + return { + row, + storage, + updated: false, + } + } + + const update = await pool.query( + ` + UPDATE public.manifest + SET file_size = $1 + WHERE id = $2 + AND (file_size IS NULL OR file_size <= 0) + `, + [storage.size, row.id], + ) + + return { + row, + storage, + updated: update.rowCount > 0, + } + }) + + for (const result of results) { + report.checked += 1 + if (result.storage.size > 0) { + if (result.updated) + report.fixed += 1 + else + report.unchanged += 1 + } + else { + report.missingSize += 1 + report.errors.push({ + app_id: result.row.app_id, + app_version_id: result.row.app_version_id, + error: result.storage.error, + file_name: result.row.file_name, + id: result.row.id, + method: result.storage.method, + reason: result.storage.reason, + s3_path: result.row.s3_path, + status: result.storage.status, + version_name: result.row.version_name, + }) + } + } + + console.log(`Checked ${report.checked}, ${apply ? 'fixed' : 'fixable'} ${apply ? report.fixed : report.unchanged}, missing ${report.missingSize}`) + + if (!all && remaining <= 0) + break + } + } + finally { + await pool.end() + } + + const outputDir = resolve(__dirname, '../tmp/manifest_file_size_backfill') + mkdirSync(outputDir, { recursive: true }) + const outputPath = resolve(outputDir, `manifest-file-size-backfill-${Date.now()}.json`) + writeFileSync(outputPath, `${JSON.stringify(report, null, 2)}\n`) + + console.log('\nSummary') + console.log(` Mode: ${apply ? 'apply' : 'dry-run'}`) + console.log(` Checked: ${report.checked}`) + console.log(` ${apply ? 'Fixed' : 'Fixable'}: ${apply ? report.fixed : report.unchanged}`) + console.log(` Missing size: ${report.missingSize}`) + console.log(` Report: ${outputPath}`) + + if (report.missingSize > 0) + process.exitCode = 1 +} + +main().catch((error) => { + console.error(error) + process.exit(1) +}) diff --git a/supabase/functions/_backend/triggers/on_manifest_create.ts b/supabase/functions/_backend/triggers/on_manifest_create.ts index 6ba5791a07..fae8526f2c 100644 --- a/supabase/functions/_backend/triggers/on_manifest_create.ts +++ b/supabase/functions/_backend/triggers/on_manifest_create.ts @@ -3,7 +3,7 @@ import type { MiddlewareKeyVariables } from '../utils/hono.ts' import type { RetryableResult } from '../utils/retry.ts' import type { Database } from '../utils/supabase.types.ts' import { Hono } from 'hono/tiny' -import { BRES, middlewareAPISecret, simpleError, triggerValidator } from '../utils/hono.ts' +import { BRES, middlewareAPISecret, quickError, simpleError, triggerValidator } from '../utils/hono.ts' import { cloudlog, cloudlogErr } from '../utils/logging.ts' import { isRetryablePostgrestResult, retryWithBackoff } from '../utils/retry.ts' import { s3 } from '../utils/s3.ts' @@ -14,8 +14,17 @@ const SIZE_RETRY_DELAY_MS = 500 const MANIFEST_UPDATE_RETRY_ATTEMPTS = 3 const MANIFEST_UPDATE_RETRY_DELAY_MS = 300 -async function getManifestSizeWithRetry(c: Context, s3Path: string): Promise<{ size: number, lastError?: unknown }> { - const { result, lastError } = await retryWithBackoff( +function getQueueLogMetadata(c: Context) { + return { + queueName: c.req.header('x-capgo-queue-name') ?? null, + queueMsgId: c.req.header('x-capgo-queue-msg-id') ?? null, + queueReadCount: c.req.header('x-capgo-queue-read-count') ?? null, + cfId: c.req.header('x-capgo-cf-id') ?? null, + } +} + +async function getManifestSizeWithRetry(c: Context, s3Path: string): Promise<{ size: number, lastError?: unknown, attempts: number }> { + const { result, lastError, attempts } = await retryWithBackoff( () => s3.getSize(c, s3Path), { attempts: SIZE_RETRY_ATTEMPTS, @@ -24,7 +33,11 @@ async function getManifestSizeWithRetry(c: Context, s3Path: string): Promise<{ s }, ) - return { size: typeof result === 'number' ? result : 0, lastError } + return { attempts, size: typeof result === 'number' ? result : 0, lastError } +} + +function shouldRetryManifestSizeLookup(size: number, currentFileSize: number | null | undefined): boolean { + return size <= 0 && !(currentFileSize && currentFileSize > 0) } async function runManifestUpdateWithRetry( @@ -67,21 +80,23 @@ async function runManifestUpdateWithRetry( } async function updateManifestSize(c: Context, record: Database['public']['Tables']['manifest']['Row']) { + const queue = getQueueLogMetadata(c) if (!record.s3_path) { - cloudlog({ requestId: c.get('requestId'), message: 'No s3 path', id: record.id }) + cloudlog({ requestId: c.get('requestId'), message: 'No s3 path', id: record.id, app_version_id: record.app_version_id, file_name: record.file_name, queue }) throw simpleError('no_s3_path', 'No s3 path', { record }) } - const { size, lastError } = await getManifestSizeWithRetry(c, record.s3_path) + const { size, lastError, attempts } = await getManifestSizeWithRetry(c, record.s3_path) if (lastError) { - cloudlogErr({ requestId: c.get('requestId'), message: 'getSize failed after retries', id: record.id, s3_path: record.s3_path, error: lastError }) + cloudlogErr({ requestId: c.get('requestId'), message: 'getSize failed after retries', id: record.id, app_version_id: record.app_version_id, file_name: record.file_name, s3_path: record.s3_path, attempts, queue, error: lastError }) } - if (size === 0) { - if (record.file_size && record.file_size > 0) { - cloudlog({ requestId: c.get('requestId'), message: 'getSize returned 0, keeping existing file_size', id: record.id, s3_path: record.s3_path, file_size: record.file_size }) - return c.json(BRES) - } - cloudlog({ requestId: c.get('requestId'), message: 'getSize returned 0 after retries, skipping update', id: record.id, s3_path: record.s3_path }) + if (shouldRetryManifestSizeLookup(size, record.file_size)) { + cloudlogErr({ requestId: c.get('requestId'), message: 'getSize returned 0 after retries', id: record.id, app_version_id: record.app_version_id, file_name: record.file_name, s3_path: record.s3_path, attempts, queue }) + // Return non-2xx so queue_consumer keeps the message and applies its 5-read retry budget. + throw quickError(503, 'manifest_size_not_found', 'Manifest file size metadata was not found', { attempts, file_name: record.file_name, id: record.id, queue, s3_path: record.s3_path }, lastError, { alert: false }) + } + if (size <= 0) { + cloudlog({ requestId: c.get('requestId'), message: 'getSize returned 0, keeping existing file_size', id: record.id, app_version_id: record.app_version_id, file_name: record.file_name, s3_path: record.s3_path, file_size: record.file_size, attempts, queue }) return c.json(BRES) } @@ -90,9 +105,10 @@ async function updateManifestSize(c: Context, record: Database['public']['Tables .from('manifest') .update({ file_size: size }) .eq('id', record.id)) + cloudlog({ requestId: c.get('requestId'), message: 'manifest file_size updated', id: record.id, app_version_id: record.app_version_id, file_name: record.file_name, s3_path: record.s3_path, size, attempts, queue }) } catch (updateError) { - cloudlog({ requestId: c.get('requestId'), message: 'error update manifest size', error: updateError }) + cloudlog({ requestId: c.get('requestId'), message: 'error update manifest size', id: record.id, app_version_id: record.app_version_id, file_name: record.file_name, s3_path: record.s3_path, size, attempts, queue, error: updateError }) throw simpleError('manifest_update_failed', 'Failed to update manifest file_size', { record, updateError }) } @@ -116,4 +132,5 @@ app.post('/', middlewareAPISecret, triggerValidator('manifest', 'INSERT'), (c) = export const onManifestCreateTestUtils = { isRetryablePostgrestResult, runManifestUpdateWithRetry, + shouldRetryManifestSizeLookup, } diff --git a/supabase/functions/_backend/triggers/queue_consumer.ts b/supabase/functions/_backend/triggers/queue_consumer.ts index 556ba8da02..f3f3630688 100644 --- a/supabase/functions/_backend/triggers/queue_consumer.ts +++ b/supabase/functions/_backend/triggers/queue_consumer.ts @@ -12,6 +12,9 @@ import { backgroundTask, getEnv } from '../utils/utils.ts' // Define constants const DEFAULT_BATCH_SIZE = 950 // Default batch size for queue reads limit of CF is 1000 fetches so we take a safe margin +const MANIFEST_QUEUE_BATCH_SIZE = 100 +const DEFAULT_QUEUE_HTTP_CONCURRENCY = 25 +const MANIFEST_QUEUE_HTTP_CONCURRENCY = 10 export const MAX_QUEUE_READS = 5 const DISCORD_IGNORED_ERROR_CODES = new Set(['version_not_found', 'no_channel']) @@ -40,6 +43,12 @@ interface Message { export const messagesArraySchema = messageSchema.array() +interface QueueMessageMetadata { + queueName: string + msgId: number + readCount: number +} + interface FailureDetail { function_name: string function_type: string @@ -163,6 +172,38 @@ function generateUUID(): string { return crypto.randomUUID() } +function getQueueBatchSize(queueName: string, requestedBatchSize: number): number { + if (queueName === 'on_manifest_create') + return Math.min(requestedBatchSize, MANIFEST_QUEUE_BATCH_SIZE) + return requestedBatchSize +} + +function getQueueHttpConcurrency(queueName: string): number { + if (queueName === 'on_manifest_create') + return MANIFEST_QUEUE_HTTP_CONCURRENCY + return DEFAULT_QUEUE_HTTP_CONCURRENCY +} + +async function mapWithConcurrency( + items: T[], + concurrency: number, + mapper: (item: T, index: number) => Promise, +): Promise { + const results: R[] = [] + let cursor = 0 + + async function worker() { + while (cursor < items.length) { + const index = cursor++ + results[index] = await mapper(items[index]!, index) + } + } + + const workerCount = Math.min(Math.max(1, concurrency), items.length) + await Promise.all(Array.from({ length: workerCount }, () => worker())) + return results +} + async function processQueue(c: Context, db: ReturnType, queueName: string, batchSize: number = DEFAULT_BATCH_SIZE) { const messages = await readQueue(c, db, queueName, batchSize) @@ -172,20 +213,21 @@ async function processQueue(c: Context, db: ReturnType, queu } const [messagesToProcess, messagesToSkip] = messages.reduce((acc, message) => { - acc[message.read_ct <= 5 ? 0 : 1].push(message) + acc[message.read_ct <= MAX_QUEUE_READS ? 0 : 1].push(message) return acc }, [[], []] as [typeof messages, typeof messages]) - cloudlog(`[${queueName}] Processing ${messagesToProcess.length} messages and skipping ${messagesToSkip.length} messages.`) + const processConcurrency = getQueueHttpConcurrency(queueName) + cloudlog(`[${queueName}] Processing ${messagesToProcess.length} messages and skipping ${messagesToSkip.length} messages with concurrency ${processConcurrency}.`) - // Archive messages that have been read 5 or more times + // Archive messages after the configured retry budget is exhausted. if (messagesToSkip.length > 0) { - cloudlog(`[${queueName}] Archiving ${messagesToSkip.length} messages that have been read 5 or more times.`) + cloudlog(`[${queueName}] Archiving ${messagesToSkip.length} messages that exceeded the retry budget.`) await archive_queue_messages(c, db, queueName, messagesToSkip.map(msg => msg.msg_id)) } - // Process messages that have been read less than 5 times - const results = await Promise.all(messagesToProcess.map(async (message) => { + // Process messages that are still within the retry budget. + const results = await mapWithConcurrency(messagesToProcess, processConcurrency, async (message) => { const function_name = message.message?.function_name ?? 'unknown' const function_type = message.message?.function_type ?? 'supabase' const body = extractMessageBody(message) @@ -197,7 +239,11 @@ async function processQueue(c: Context, db: ReturnType, queu }) } const cfId = generateUUID() - const httpResponse = await http_post_helper(c, function_name, function_type, body, cfId) + const httpResponse = await http_post_helper(c, function_name, function_type, body, cfId, { + msgId: message.msg_id, + queueName, + readCount: message.read_ct, + }) const errorDetails = await extractErrorDetails(httpResponse) return { @@ -207,7 +253,7 @@ async function processQueue(c: Context, db: ReturnType, queu payloadSize: JSON.stringify(body).length, ...message, } - })) + }) // Update all messages with their CF IDs const cfIdUpdates = results.map(result => ({ @@ -347,6 +393,7 @@ async function extractErrorDetails(response: Response): Promise<{ bodyPreview: string | null }> { if (response.status < 400) { + response.body?.cancel() return { bodyPreview: null, errorCode: null, @@ -447,12 +494,18 @@ export async function http_post_helper( function_type: string | null | undefined, body: any, cfId: string, + metadata?: QueueMessageMetadata, ): Promise { - const headers = { + const headers: Record = { 'Content-Type': 'application/json', 'apisecret': getEnv(c, 'API_SECRET'), 'x-capgo-cf-id': cfId, } + if (metadata) { + headers['x-capgo-queue-name'] = metadata.queueName + headers['x-capgo-queue-msg-id'] = String(metadata.msgId) + headers['x-capgo-queue-read-count'] = String(metadata.readCount) + } let url: string const cfPpUrl = getEnv(c, 'CLOUDFLARE_PP_FUNCTION_URL') @@ -602,8 +655,17 @@ app.post('/sync', async (c) => { } } - // Compute finalBatchSize: use provided batchSize capped with DEFAULT_BATCH_SIZE, or fall back to DEFAULT_BATCH_SIZE - const finalBatchSize = batchSize !== undefined ? Math.min(batchSize, DEFAULT_BATCH_SIZE) : DEFAULT_BATCH_SIZE + // Compute finalBatchSize: use provided batchSize capped with DEFAULT_BATCH_SIZE, or fall back to DEFAULT_BATCH_SIZE. + const requestedBatchSize = batchSize !== undefined ? Math.min(batchSize, DEFAULT_BATCH_SIZE) : DEFAULT_BATCH_SIZE + const finalBatchSize = getQueueBatchSize(queueName, requestedBatchSize) + if (finalBatchSize !== requestedBatchSize) { + cloudlog({ + requestId: c.get('requestId'), + message: `[Sync Request] Queue batch size capped for ${queueName}.`, + requestedBatchSize, + finalBatchSize, + }) + } await backgroundTask(c, (async () => { cloudlog({ requestId: c.get('requestId'), message: `[Background Queue Sync] Starting background execution for queue: ${queueName} with batch size: ${finalBatchSize}` }) @@ -627,5 +689,7 @@ export const __queueConsumerTestUtils__ = { extractErrorDetails, extractMessageBody, getActionableQueueFailures, + getQueueBatchSize, + getQueueHttpConcurrency, sanitizeDiscordResponseBody, } diff --git a/supabase/functions/_backend/utils/s3.ts b/supabase/functions/_backend/utils/s3.ts index 567459de9d..12d849b14b 100644 --- a/supabase/functions/_backend/utils/s3.ts +++ b/supabase/functions/_backend/utils/s3.ts @@ -1,7 +1,7 @@ import type { Context } from 'hono' import type { Database } from '../utils/supabase.types.ts' import { S3Client } from '@bradenmacdonald/s3-lite-client' -import { cloudlog } from './logging.ts' +import { cloudlog, cloudlogErr, serializeError } from './logging.ts' import { getEnv } from './utils.ts' function firstForwardedHeaderValue(value: string | undefined): string | undefined { @@ -175,77 +175,134 @@ async function getSignedUrl(c: Context, fileId: string, expirySeconds: number) { return url } -async function getSize(c: Context, fileId: string) { - const client = initS3(c) - try { - // Ask Cloudflare/R2 for the raw object (no brotli/gzip) so Content-Length is preserved. - const file = await client.statObject(fileId, { - headers: { 'Accept-Encoding': 'identity' }, - }) +function parseObjectSizeFromHeaders(contentRange: string | null, contentLength: string | null): number { + if (contentRange && contentRange.includes('/')) { + const total = Number.parseInt(contentRange.split('/').at(1) ?? '0', 10) + if (Number.isFinite(total) && total > 0) + return total + } - let size = Number.isFinite(file.size) ? file.size : 0 - cloudlog({ requestId: c.get('requestId'), message: 'getSize head result', fileId, headSize: size, headRawSize: file.size }) + if (contentLength) { + const len = Number.parseInt(contentLength, 10) + if (Number.isFinite(len) && len > 0) + return len + } - // Fallback: some proxied HEAD responses still omit Content-Length (size becomes NaN) - if (!size) { - try { - const url = await client.getPresignedUrl('GET', fileId, { - parameters: { 'x-id': 'GetObject' }, - }) - const res = await fetch(url, { - method: 'GET', - headers: { - 'Range': 'bytes=0-0', // minimal range; forces Content-Range with total length - 'Accept-Encoding': 'identity', - }, - }) + return 0 +} - const contentRange = res.headers.get('content-range') || res.headers.get('Content-Range') - const contentLength = res.headers.get('content-length') || res.headers.get('Content-Length') +function serializeStorageError(error: unknown) { + const serialized = serializeError(error) + const status = error && typeof error === 'object' + ? ((error as { status?: unknown, statusCode?: unknown, code?: unknown }).status + ?? (error as { statusCode?: unknown }).statusCode + ?? (error as { code?: unknown }).code) + : undefined - cloudlog({ - requestId: c.get('requestId'), - message: 'getSize fallback headers', - fileId, - status: res.status, - contentRange, - contentLength, - }) + return { + ...serialized, + status, + } +} - if (contentRange && contentRange.includes('/')) { - const total = Number.parseInt(contentRange.split('/').at(1) ?? '0', 10) - if (Number.isFinite(total) && total > 0) - size = total - } +async function getSizeFromRangeFallback( + c: Context, + client: ReturnType, + fileId: string, + reason: 'head_error' | 'missing_head_size', +): Promise { + try { + const url = await client.getPresignedUrl('GET', fileId, { + parameters: { 'x-id': 'GetObject' }, + }) + const res = await fetch(url, { + method: 'GET', + headers: { + 'Range': 'bytes=0-0', // minimal range; forces Content-Range with total length + 'Accept-Encoding': 'identity', + }, + }) - if (!size && contentLength) { - const len = Number.parseInt(contentLength, 10) - if (Number.isFinite(len) && len > 0) - size = len - } + const contentRange = res.headers.get('content-range') || res.headers.get('Content-Range') + const contentLength = res.headers.get('content-length') || res.headers.get('Content-Length') + const size = parseObjectSizeFromHeaders(contentRange, contentLength) + res.body?.cancel() - cloudlog({ requestId: c.get('requestId'), message: 'getSize fallback parsed', fileId, sizeAfterFallback: size }) - } - catch (fallbackError) { - cloudlog({ requestId: c.get('requestId'), message: 'getSize fallback failed', fileId, fallbackError }) - } - } + cloudlog({ + requestId: c.get('requestId'), + message: 'getSize range fallback result', + fileId, + reason, + status: res.status, + statusText: res.statusText, + contentRange, + contentLength, + size, + }) + return size + } + catch (fallbackError) { + cloudlogErr({ + requestId: c.get('requestId'), + message: 'getSize range fallback failed', + fileId, + reason, + error: serializeStorageError(fallbackError), + bucket: getEnv(c, 'S3_BUCKET'), + endpoint: getEnv(c, 'S3_ENDPOINT'), + }) + return 0 + } +} +async function getSize(c: Context, fileId: string) { + const client = initS3(c) + let size = 0 + let headError: unknown + let usedFallback = false + try { + // Ask Cloudflare/R2 for the raw object (no brotli/gzip) so Content-Length is preserved. + const file = await client.statObject(fileId, { + headers: { 'Accept-Encoding': 'identity' }, + }) + size = Number.isFinite(file.size) ? file.size : 0 cloudlog({ requestId: c.get('requestId'), - message: 'getSize', - file, + message: 'getSize head result', fileId, + headSize: size, + headRawSize: file.size, bucket: getEnv(c, 'S3_BUCKET'), endpoint: getEnv(c, 'S3_ENDPOINT'), - finalSize: size, }) - return size } catch (error) { - cloudlog({ requestId: c.get('requestId'), message: 'getSize', error }) - return 0 + headError = error + cloudlogErr({ + requestId: c.get('requestId'), + message: 'getSize head failed', + fileId, + error: serializeStorageError(error), + bucket: getEnv(c, 'S3_BUCKET'), + endpoint: getEnv(c, 'S3_ENDPOINT'), + }) + } + + if (!size) { + usedFallback = true + size = await getSizeFromRangeFallback(c, client, fileId, headError ? 'head_error' : 'missing_head_size') } + + cloudlog({ + requestId: c.get('requestId'), + message: 'getSize final', + fileId, + bucket: getEnv(c, 'S3_BUCKET'), + endpoint: getEnv(c, 'S3_ENDPOINT'), + finalSize: size, + usedFallback, + }) + return size } async function getObject(c: Context, fileId: string): Promise { diff --git a/tests/backend-alert-resilience.unit.test.ts b/tests/backend-alert-resilience.unit.test.ts index 1f58344587..6ad6c5f1c9 100644 --- a/tests/backend-alert-resilience.unit.test.ts +++ b/tests/backend-alert-resilience.unit.test.ts @@ -371,6 +371,15 @@ describe('backend alert resilience helpers', () => { expect(attempts).toBe(2) }) + it.concurrent('marks missing manifest storage size as queue-retryable unless a trusted size already exists', async () => { + const { onManifestCreateTestUtils } = await import('../supabase/functions/_backend/triggers/on_manifest_create.ts') + + expect(onManifestCreateTestUtils.shouldRetryManifestSizeLookup(0, 0)).toBe(true) + expect(onManifestCreateTestUtils.shouldRetryManifestSizeLookup(0, null)).toBe(true) + expect(onManifestCreateTestUtils.shouldRetryManifestSizeLookup(0, 128)).toBe(false) + expect(onManifestCreateTestUtils.shouldRetryManifestSizeLookup(128, 0)).toBe(false) + }) + it.concurrent('returns empty strings when env bindings are missing from the context', () => { const context = { req: { diff --git a/tests/queue-consumer-message-shape.unit.test.ts b/tests/queue-consumer-message-shape.unit.test.ts index 816e14e95a..3a191c0e12 100644 --- a/tests/queue-consumer-message-shape.unit.test.ts +++ b/tests/queue-consumer-message-shape.unit.test.ts @@ -73,6 +73,29 @@ describe('queue_consumer legacy message compatibility', () => { ])).toEqual([]) }) + it.concurrent('keeps manifest size lookup failures retrying until the queue budget is exhausted', () => { + expect(__queueConsumerTestUtils__.getActionableQueueFailures([ + { + cf_id: 'cf-manifest', + error_code: 'manifest_size_not_found', + function_name: 'on_manifest_create', + function_type: 'supabase', + msg_id: 10, + payload_size: 10, + read_count: MAX_QUEUE_READS - 1, + status: 503, + status_text: 'Service Unavailable', + }, + ])).toEqual([]) + }) + + it.concurrent('caps manifest queue batches and concurrency to avoid storage bursts', () => { + expect(__queueConsumerTestUtils__.getQueueBatchSize('on_manifest_create', 950)).toBe(100) + expect(__queueConsumerTestUtils__.getQueueBatchSize('cron_email', 950)).toBe(950) + expect(__queueConsumerTestUtils__.getQueueHttpConcurrency('on_manifest_create')).toBe(10) + expect(__queueConsumerTestUtils__.getQueueHttpConcurrency('cron_email')).toBe(25) + }) + it.concurrent('alerts Discord after retry budget is exhausted', () => { const failure = { cf_id: 'cf-1', From 1120a7a59d2b21fcfffc490607268a640de5acd9 Mon Sep 17 00:00:00 2001 From: Martin Donadieu Date: Mon, 18 May 2026 17:45:02 +0200 Subject: [PATCH 2/2] fix(backend): address manifest review comments --- scripts/backfill_manifest_file_sizes.mjs | 28 +++++++++++++------ .../_backend/triggers/queue_consumer.ts | 18 ++++++++++-- supabase/functions/_backend/utils/s3.ts | 24 ++++++++++++++-- 3 files changed, 58 insertions(+), 12 deletions(-) diff --git a/scripts/backfill_manifest_file_sizes.mjs b/scripts/backfill_manifest_file_sizes.mjs index 1d76984cd6..988fefd0f0 100644 --- a/scripts/backfill_manifest_file_sizes.mjs +++ b/scripts/backfill_manifest_file_sizes.mjs @@ -27,7 +27,7 @@ for (const envPath of [ '../internal/cloudflare/.env.prod', '../internal/cloudflare/.env.local', ]) { - config({ path: resolve(__dirname, envPath) }) + config({ path: resolve(__dirname, envPath), override: true, quiet: true }) } const DB_URL_ENV_KEYS = [ @@ -46,12 +46,20 @@ function hasFlag(name) { function getArgValue(name) { const prefix = `${name}=` const match = process.argv.find(arg => arg.startsWith(prefix)) - if (match) - return match.slice(prefix.length) + if (match) { + const value = match.slice(prefix.length) + if (!value) + throw new Error(`${name} requires a value`) + return value + } const index = process.argv.indexOf(name) - if (index !== -1) - return process.argv[index + 1] + if (index !== -1) { + const value = process.argv[index + 1] + if (!value || value.startsWith('--')) + throw new Error(`${name} requires a value`) + return value + } return undefined } @@ -147,8 +155,12 @@ async function getObjectSizeWithRange(s3, s3Path, reason) { }) const contentRange = response.headers.get('content-range') || response.headers.get('Content-Range') const contentLength = response.headers.get('content-length') || response.headers.get('Content-Length') - const size = parseObjectSizeFromHeaders(contentRange, contentLength) - response.body?.cancel() + const size = response.status === 206 && contentRange + ? parseObjectSizeFromHeaders(contentRange, null) + : response.status === 200 + ? parseObjectSizeFromHeaders(null, contentLength) + : 0 + await response.body?.cancel() return { contentLength, contentRange, @@ -176,7 +188,7 @@ async function mapWithConcurrency(items, concurrency, mapper) { async function worker() { while (cursor < items.length) { const index = cursor++ - results[index] = await mapper(items[index], index) + results[index] = await mapper(items[index]) } } diff --git a/supabase/functions/_backend/triggers/queue_consumer.ts b/supabase/functions/_backend/triggers/queue_consumer.ts index f3f3630688..743394de0c 100644 --- a/supabase/functions/_backend/triggers/queue_consumer.ts +++ b/supabase/functions/_backend/triggers/queue_consumer.ts @@ -218,11 +218,25 @@ async function processQueue(c: Context, db: ReturnType, queu }, [[], []] as [typeof messages, typeof messages]) const processConcurrency = getQueueHttpConcurrency(queueName) - cloudlog(`[${queueName}] Processing ${messagesToProcess.length} messages and skipping ${messagesToSkip.length} messages with concurrency ${processConcurrency}.`) + cloudlog({ + requestId: c.get('requestId'), + message: `[${queueName}] Processing queue batch.`, + queueName, + processingCount: messagesToProcess.length, + skippedCount: messagesToSkip.length, + concurrency: processConcurrency, + retryBudget: MAX_QUEUE_READS, + }) // Archive messages after the configured retry budget is exhausted. if (messagesToSkip.length > 0) { - cloudlog(`[${queueName}] Archiving ${messagesToSkip.length} messages that exceeded the retry budget.`) + cloudlog({ + requestId: c.get('requestId'), + message: `[${queueName}] Archiving messages that exceeded the retry budget.`, + queueName, + archiveCount: messagesToSkip.length, + retryBudget: MAX_QUEUE_READS, + }) await archive_queue_messages(c, db, queueName, messagesToSkip.map(msg => msg.msg_id)) } diff --git a/supabase/functions/_backend/utils/s3.ts b/supabase/functions/_backend/utils/s3.ts index 12d849b14b..5e865704da 100644 --- a/supabase/functions/_backend/utils/s3.ts +++ b/supabase/functions/_backend/utils/s3.ts @@ -225,8 +225,28 @@ async function getSizeFromRangeFallback( const contentRange = res.headers.get('content-range') || res.headers.get('Content-Range') const contentLength = res.headers.get('content-length') || res.headers.get('Content-Length') - const size = parseObjectSizeFromHeaders(contentRange, contentLength) - res.body?.cancel() + if (!res.ok) { + await res.body?.cancel() + cloudlog({ + requestId: c.get('requestId'), + message: 'getSize range fallback returned non-success response', + fileId, + reason, + status: res.status, + statusText: res.statusText, + contentRange, + contentLength, + size: 0, + }) + return 0 + } + + const size = res.status === 206 && contentRange + ? parseObjectSizeFromHeaders(contentRange, null) + : res.status === 200 + ? parseObjectSizeFromHeaders(null, contentLength) + : 0 + await res.body?.cancel() cloudlog({ requestId: c.get('requestId'),