diff --git a/.github/workflows/deploy-translation.yml b/.github/workflows/deploy-translation.yml index 539d44b1b..14c4723ee 100644 --- a/.github/workflows/deploy-translation.yml +++ b/.github/workflows/deploy-translation.yml @@ -41,6 +41,19 @@ jobs: ${{ runner.os }}-bun- - run: bun install --frozen-lockfile - run: bun run ci:verify:translation + - name: Ensure translation queue exists + run: | + set +e + output="$(bunx wrangler queues create capgo-translation-refresh 2>&1)" + status=$? + printf '%s\n' "$output" + if [ "$status" -eq 0 ]; then + exit 0 + fi + printf '%s\n' "$output" | grep -Eiq 'already exists|already.*capgo-translation-refresh' + env: + CLOUDFLARE_API_TOKEN: ${{ secrets.CLOUDFLARE_API_TOKEN }} + CLOUDFLARE_ACCOUNT_ID: ${{ secrets.CLOUDFLARE_ACCOUNT_ID }} - run: bun run deploy:translation env: CLOUDFLARE_API_TOKEN: ${{ secrets.CLOUDFLARE_API_TOKEN }} diff --git a/apps/translation-worker/src/index.ts b/apps/translation-worker/src/index.ts index 626b9a1df..d0e2cee33 100644 --- a/apps/translation-worker/src/index.ts +++ b/apps/translation-worker/src/index.ts @@ -15,17 +15,35 @@ type WorkerService = { fetch(input: RequestInfo | URL, init?: RequestInit): Promise } +type TranslationQueueReason = 'miss' | 'stale' + +type TranslationJob = { + url: string + locale: Locale + cacheVersion: string + reason: TranslationQueueReason +} + +type QueueBinding = { + send(message: T): Promise +} + +type QueueMessage = { + body: T +} + +type MessageBatch = { + messages: QueueMessage[] +} + interface Env { AI: AiBinding WEB: WorkerService DOCS: WorkerService + TRANSLATION_QUEUE: QueueBinding TRANSLATION_MODEL?: string } -interface ExecutionContext { - waitUntil(promise: Promise): void -} - declare global { interface CacheStorage { readonly default: Cache @@ -56,6 +74,7 @@ const ALL_LOCALES = [DEFAULT_LOCALE, ...SUPPORTED_LOCALES] as const const DEFAULT_MODEL = '@cf/moonshotai/kimi-k2.6' const FRESH_MS = 24 * 60 * 60 * 1000 const CACHE_KEEP_SECONDS = 7 * 24 * 60 * 60 +const TRANSLATION_PENDING_SECONDS = 10 * 60 const TRANSLATION_CACHE_VERSION = '2026-05-01-kimi-k2.6-v1' const CLIENT_NO_STORE = 'no-store, max-age=0, must-revalidate' const MAX_HTML_BYTES = 1_500_000 @@ -166,6 +185,10 @@ function extractLocale(pathname: string): Locale | null { return SUPPORTED_LOCALES.includes(firstSegment as Locale) ? (firstSegment as Locale) : null } +function isSupportedLocale(value: string): value is Locale { + return SUPPORTED_LOCALES.includes(value as Locale) +} + function localizedPath(basePath: string, locale: string): string { const normalizedBasePath = normalizePathname(stripLocalePrefix(basePath)) if (locale === DEFAULT_LOCALE) return normalizedBasePath @@ -259,6 +282,14 @@ function cacheKeyFor(requestUrl: URL, locale: Locale): Request { return new Request(cacheUrl.toString(), { method: 'GET' }) } +function pendingKeyFor(requestUrl: URL, locale: Locale): Request { + const pendingUrl = new URL(requestUrl) + pendingUrl.pathname = localizedPath(pendingUrl.pathname, locale) + pendingUrl.search = '' + pendingUrl.searchParams.set('__capgo_translation_pending', TRANSLATION_CACHE_VERSION) + return new Request(pendingUrl.toString(), { method: 'GET' }) +} + function withResponseHeaders(response: Response, cacheState: 'MISS' | 'HIT' | 'STALE' | 'BYPASS', isHead = false): Response { const headers = new Headers(response.headers) headers.set('Cache-Control', CLIENT_NO_STORE) @@ -1047,7 +1078,94 @@ async function refreshCache(request: Request, env: Env, requestUrl: URL, locale: return response } -async function serveTranslated(request: Request, env: Env, ctx: ExecutionContext, requestUrl: URL, locale: Locale): Promise { +async function enqueueTranslation(env: Env, requestUrl: URL, locale: Locale, reason: TranslationQueueReason): Promise { + const pendingKey = pendingKeyFor(requestUrl, locale) + let markedPending = false + + try { + if (await caches.default.match(pendingKey)) return + + await caches.default.put( + pendingKey, + new Response(reason, { + headers: { + 'Cache-Control': `public, max-age=${TRANSLATION_PENDING_SECONDS}`, + 'X-Capgo-Translation-Pending': reason, + }, + }), + ) + markedPending = true + } catch (error) { + console.error('Failed to mark translated page as pending', { pathname: requestUrl.pathname, locale, reason, error }) + } + + try { + await env.TRANSLATION_QUEUE.send({ + url: requestUrl.toString(), + locale, + cacheVersion: TRANSLATION_CACHE_VERSION, + reason, + }) + } catch (error) { + if (markedPending) { + try { + await caches.default.delete(pendingKey) + } catch (deleteError) { + console.error('Failed to clear translated page pending marker after enqueue failure', { pathname: requestUrl.pathname, locale, reason, error: deleteError }) + } + } + throw error + } +} + +async function enqueueTranslationSafely(env: Env, requestUrl: URL, locale: Locale, reason: TranslationQueueReason): Promise { + try { + await enqueueTranslation(env, requestUrl, locale, reason) + } catch (error) { + console.error('Failed to enqueue translated page', { pathname: requestUrl.pathname, locale, reason, error }) + } +} + +async function processTranslationJob(job: TranslationJob, env: Env): Promise { + if (job.cacheVersion !== TRANSLATION_CACHE_VERSION || !isSupportedLocale(job.locale)) return + + const requestUrl = new URL(job.url) + requestUrl.pathname = localizedPath(requestUrl.pathname, job.locale) + const cacheKey = cacheKeyFor(requestUrl, job.locale) + const pendingKey = pendingKeyFor(requestUrl, job.locale) + let completed = false + + try { + const cachedResponse = await caches.default.match(cacheKey) + if (cachedResponse) { + const translatedAt = readTranslatedAt(cachedResponse) + if (Date.now() - translatedAt <= FRESH_MS) { + completed = true + return + } + } + + const renderRequest = new Request(requestUrl.toString(), { + method: 'GET', + headers: { + Accept: 'text/html', + 'Accept-Language': job.locale, + }, + }) + await refreshCache(renderRequest, env, requestUrl, job.locale, cacheKey) + completed = true + } finally { + if (completed) { + try { + await caches.default.delete(pendingKey) + } catch (error) { + console.error('Failed to clear translated page pending marker', { pathname: requestUrl.pathname, locale: job.locale, error }) + } + } + } +} + +async function serveTranslated(request: Request, env: Env, requestUrl: URL, locale: Locale): Promise { const cacheKey = cacheKeyFor(requestUrl, locale) const cachedResponse = await caches.default.match(cacheKey) const isHead = request.method === 'HEAD' @@ -1056,25 +1174,17 @@ async function serveTranslated(request: Request, env: Env, ctx: ExecutionContext const translatedAt = readTranslatedAt(cachedResponse) const isStale = Date.now() - translatedAt > FRESH_MS if (isStale) { - ctx.waitUntil( - refreshCache(request, env, requestUrl, locale, cacheKey).catch((error) => { - console.error('Failed to refresh translated page', { pathname: requestUrl.pathname, locale, error }) - }), - ) + await enqueueTranslationSafely(env, requestUrl, locale, 'stale') } return withResponseHeaders(cachedResponse, isStale ? 'STALE' : 'HIT', isHead) } - ctx.waitUntil( - refreshCache(request, env, requestUrl, locale, cacheKey).catch((error) => { - console.error('Failed to create translated page', { pathname: requestUrl.pathname, locale, error }) - }), - ) + await enqueueTranslationSafely(env, requestUrl, locale, 'miss') return temporaryEnglishRedirectResponse(requestUrl, isHead) } export default { - async fetch(request: Request, env: Env, ctx: ExecutionContext): Promise { + async fetch(request: Request, env: Env): Promise { const requestUrl = new URL(request.url) const locale = extractLocale(requestUrl.pathname) @@ -1089,10 +1199,20 @@ export default { } try { - return await serveTranslated(request, env, ctx, requestUrl, locale) + return await serveTranslated(request, env, requestUrl, locale) } catch (error) { console.error('Translation worker failed', { pathname: requestUrl.pathname, locale, error }) return temporaryEnglishRedirectResponse(requestUrl, request.method === 'HEAD') } }, + async queue(batch: MessageBatch, env: Env): Promise { + for (const message of batch.messages) { + try { + await processTranslationJob(message.body, env) + } catch (error) { + console.error('Failed to process translated page queue job', { job: message.body, error }) + throw error + } + } + }, } diff --git a/apps/translation-worker/wrangler.jsonc b/apps/translation-worker/wrangler.jsonc index 275212987..2b3a0a40e 100644 --- a/apps/translation-worker/wrangler.jsonc +++ b/apps/translation-worker/wrangler.jsonc @@ -11,6 +11,24 @@ "vars": { "TRANSLATION_MODEL": "@cf/moonshotai/kimi-k2.6", }, + "queues": { + "producers": [ + { + "binding": "TRANSLATION_QUEUE", + "queue": "capgo-translation-refresh", + }, + ], + "consumers": [ + { + "queue": "capgo-translation-refresh", + "max_batch_size": 1, + "max_batch_timeout": 1, + "max_retries": 3, + "retry_delay": 60, + "max_concurrency": 2, + }, + ], + }, "services": [ { "binding": "WEB", @@ -45,6 +63,24 @@ ], "env": { "development": { + "queues": { + "producers": [ + { + "binding": "TRANSLATION_QUEUE", + "queue": "capgo-translation-refresh-development", + }, + ], + "consumers": [ + { + "queue": "capgo-translation-refresh-development", + "max_batch_size": 1, + "max_batch_timeout": 1, + "max_retries": 3, + "retry_delay": 60, + "max_concurrency": 2, + }, + ], + }, "services": [ { "binding": "WEB",