Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions .github/workflows/deploy-translation.yml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,19 @@ jobs:
${{ runner.os }}-bun-
- run: bun install --frozen-lockfile
- run: bun run ci:verify:translation
- name: Ensure translation queue exists
run: |
set +e
output="$(bunx wrangler queues create capgo-translation-refresh 2>&1)"
status=$?
printf '%s\n' "$output"
if [ "$status" -eq 0 ]; then
exit 0
fi
printf '%s\n' "$output" | grep -Eiq 'already exists|already.*capgo-translation-refresh'
env:
CLOUDFLARE_API_TOKEN: ${{ secrets.CLOUDFLARE_API_TOKEN }}
CLOUDFLARE_ACCOUNT_ID: ${{ secrets.CLOUDFLARE_ACCOUNT_ID }}
- run: bun run deploy:translation
env:
CLOUDFLARE_API_TOKEN: ${{ secrets.CLOUDFLARE_API_TOKEN }}
Expand Down
154 changes: 137 additions & 17 deletions apps/translation-worker/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,35 @@ type WorkerService = {
fetch(input: RequestInfo | URL, init?: RequestInit): Promise<Response>
}

type TranslationQueueReason = 'miss' | 'stale'

type TranslationJob = {
url: string
locale: Locale
cacheVersion: string
reason: TranslationQueueReason
}

type QueueBinding<T> = {
send(message: T): Promise<void>
}

type QueueMessage<T> = {
body: T
}

type MessageBatch<T> = {
messages: QueueMessage<T>[]
}

interface Env {
AI: AiBinding
WEB: WorkerService
DOCS: WorkerService
TRANSLATION_QUEUE: QueueBinding<TranslationJob>
TRANSLATION_MODEL?: string
}

interface ExecutionContext {
waitUntil(promise: Promise<unknown>): void
}

declare global {
interface CacheStorage {
readonly default: Cache
Expand Down Expand Up @@ -56,6 +74,7 @@ const ALL_LOCALES = [DEFAULT_LOCALE, ...SUPPORTED_LOCALES] as const
const DEFAULT_MODEL = '@cf/moonshotai/kimi-k2.6'
const FRESH_MS = 24 * 60 * 60 * 1000
const CACHE_KEEP_SECONDS = 7 * 24 * 60 * 60
const TRANSLATION_PENDING_SECONDS = 10 * 60
const TRANSLATION_CACHE_VERSION = '2026-05-01-kimi-k2.6-v1'
const CLIENT_NO_STORE = 'no-store, max-age=0, must-revalidate'
const MAX_HTML_BYTES = 1_500_000
Expand Down Expand Up @@ -166,6 +185,10 @@ function extractLocale(pathname: string): Locale | null {
return SUPPORTED_LOCALES.includes(firstSegment as Locale) ? (firstSegment as Locale) : null
}

function isSupportedLocale(value: string): value is Locale {
return SUPPORTED_LOCALES.includes(value as Locale)
}

function localizedPath(basePath: string, locale: string): string {
const normalizedBasePath = normalizePathname(stripLocalePrefix(basePath))
if (locale === DEFAULT_LOCALE) return normalizedBasePath
Expand Down Expand Up @@ -259,6 +282,14 @@ function cacheKeyFor(requestUrl: URL, locale: Locale): Request {
return new Request(cacheUrl.toString(), { method: 'GET' })
}

function pendingKeyFor(requestUrl: URL, locale: Locale): Request {
const pendingUrl = new URL(requestUrl)
pendingUrl.pathname = localizedPath(pendingUrl.pathname, locale)
pendingUrl.search = ''
pendingUrl.searchParams.set('__capgo_translation_pending', TRANSLATION_CACHE_VERSION)
return new Request(pendingUrl.toString(), { method: 'GET' })
}

function withResponseHeaders(response: Response, cacheState: 'MISS' | 'HIT' | 'STALE' | 'BYPASS', isHead = false): Response {
const headers = new Headers(response.headers)
headers.set('Cache-Control', CLIENT_NO_STORE)
Expand Down Expand Up @@ -1047,7 +1078,94 @@ async function refreshCache(request: Request, env: Env, requestUrl: URL, locale:
return response
}

async function serveTranslated(request: Request, env: Env, ctx: ExecutionContext, requestUrl: URL, locale: Locale): Promise<Response> {
async function enqueueTranslation(env: Env, requestUrl: URL, locale: Locale, reason: TranslationQueueReason): Promise<void> {
const pendingKey = pendingKeyFor(requestUrl, locale)
let markedPending = false

try {
if (await caches.default.match(pendingKey)) return

await caches.default.put(
pendingKey,
new Response(reason, {
headers: {
'Cache-Control': `public, max-age=${TRANSLATION_PENDING_SECONDS}`,
'X-Capgo-Translation-Pending': reason,
},
}),
)
markedPending = true
} catch (error) {
console.error('Failed to mark translated page as pending', { pathname: requestUrl.pathname, locale, reason, error })
}

try {
await env.TRANSLATION_QUEUE.send({
url: requestUrl.toString(),
locale,
cacheVersion: TRANSLATION_CACHE_VERSION,
reason,
})
} catch (error) {
if (markedPending) {
try {
await caches.default.delete(pendingKey)
} catch (deleteError) {
console.error('Failed to clear translated page pending marker after enqueue failure', { pathname: requestUrl.pathname, locale, reason, error: deleteError })
}
}
throw error
}
}

async function enqueueTranslationSafely(env: Env, requestUrl: URL, locale: Locale, reason: TranslationQueueReason): Promise<void> {
try {
await enqueueTranslation(env, requestUrl, locale, reason)
} catch (error) {
console.error('Failed to enqueue translated page', { pathname: requestUrl.pathname, locale, reason, error })
}
}

async function processTranslationJob(job: TranslationJob, env: Env): Promise<void> {
if (job.cacheVersion !== TRANSLATION_CACHE_VERSION || !isSupportedLocale(job.locale)) return

const requestUrl = new URL(job.url)
requestUrl.pathname = localizedPath(requestUrl.pathname, job.locale)
const cacheKey = cacheKeyFor(requestUrl, job.locale)
const pendingKey = pendingKeyFor(requestUrl, job.locale)
let completed = false

try {
const cachedResponse = await caches.default.match(cacheKey)
if (cachedResponse) {
const translatedAt = readTranslatedAt(cachedResponse)
if (Date.now() - translatedAt <= FRESH_MS) {
completed = true
return
}
}

const renderRequest = new Request(requestUrl.toString(), {
method: 'GET',
headers: {
Accept: 'text/html',
'Accept-Language': job.locale,
},
})
await refreshCache(renderRequest, env, requestUrl, job.locale, cacheKey)
completed = true
} finally {
if (completed) {
try {
await caches.default.delete(pendingKey)
} catch (error) {
console.error('Failed to clear translated page pending marker', { pathname: requestUrl.pathname, locale: job.locale, error })
}
}
}
}

async function serveTranslated(request: Request, env: Env, requestUrl: URL, locale: Locale): Promise<Response> {
const cacheKey = cacheKeyFor(requestUrl, locale)
const cachedResponse = await caches.default.match(cacheKey)
const isHead = request.method === 'HEAD'
Expand All @@ -1056,25 +1174,17 @@ async function serveTranslated(request: Request, env: Env, ctx: ExecutionContext
const translatedAt = readTranslatedAt(cachedResponse)
const isStale = Date.now() - translatedAt > FRESH_MS
if (isStale) {
ctx.waitUntil(
refreshCache(request, env, requestUrl, locale, cacheKey).catch((error) => {
console.error('Failed to refresh translated page', { pathname: requestUrl.pathname, locale, error })
}),
)
await enqueueTranslationSafely(env, requestUrl, locale, 'stale')
}
return withResponseHeaders(cachedResponse, isStale ? 'STALE' : 'HIT', isHead)
}

ctx.waitUntil(
refreshCache(request, env, requestUrl, locale, cacheKey).catch((error) => {
console.error('Failed to create translated page', { pathname: requestUrl.pathname, locale, error })
}),
)
await enqueueTranslationSafely(env, requestUrl, locale, 'miss')
return temporaryEnglishRedirectResponse(requestUrl, isHead)
}

export default {
async fetch(request: Request, env: Env, ctx: ExecutionContext): Promise<Response> {
async fetch(request: Request, env: Env): Promise<Response> {
const requestUrl = new URL(request.url)
const locale = extractLocale(requestUrl.pathname)

Expand All @@ -1089,10 +1199,20 @@ export default {
}

try {
return await serveTranslated(request, env, ctx, requestUrl, locale)
return await serveTranslated(request, env, requestUrl, locale)
} catch (error) {
console.error('Translation worker failed', { pathname: requestUrl.pathname, locale, error })
return temporaryEnglishRedirectResponse(requestUrl, request.method === 'HEAD')
}
},
async queue(batch: MessageBatch<TranslationJob>, env: Env): Promise<void> {
for (const message of batch.messages) {
try {
await processTranslationJob(message.body, env)
} catch (error) {
console.error('Failed to process translated page queue job', { job: message.body, error })
throw error
}
}
},
}
36 changes: 36 additions & 0 deletions apps/translation-worker/wrangler.jsonc
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,24 @@
"vars": {
"TRANSLATION_MODEL": "@cf/moonshotai/kimi-k2.6",
},
"queues": {
"producers": [
{
"binding": "TRANSLATION_QUEUE",
"queue": "capgo-translation-refresh",
},
],
"consumers": [
{
"queue": "capgo-translation-refresh",
"max_batch_size": 1,
"max_batch_timeout": 1,
"max_retries": 3,
"retry_delay": 60,
"max_concurrency": 2,
},
],
},
"services": [
{
"binding": "WEB",
Expand Down Expand Up @@ -45,6 +63,24 @@
],
"env": {
"development": {
"queues": {
"producers": [
{
"binding": "TRANSLATION_QUEUE",
"queue": "capgo-translation-refresh-development",
},
],
"consumers": [
{
"queue": "capgo-translation-refresh-development",
"max_batch_size": 1,
"max_batch_timeout": 1,
"max_retries": 3,
"retry_delay": 60,
"max_concurrency": 2,
},
],
},
"services": [
{
"binding": "WEB",
Expand Down
Loading