diff --git a/bun.lock b/bun.lock index 0caa6d3b46..4b570dc7fb 100644 --- a/bun.lock +++ b/bun.lock @@ -57,7 +57,7 @@ "@logsnag/node": "1.0.1", "@revenuecat/purchases-capacitor": "11.2.13", "@std/semver": "npm:@jsr/std__semver@1.0.8", - "@supabase/supabase-js": "2.93.3", + "@supabase/supabase-js": "2.100.1", "@tailwindcss/forms": "^0.5.11", "@types/semver": "^7.7.1", "@vuepic/vue-datepicker": "^12.1.0", @@ -86,7 +86,7 @@ "qrcode": "^1.5.4", "semver": "^7.7.4", "stripe": "^19.3.1", - "supabase": "^2.78.1", + "supabase": "^2.84.2", "turndown": "^7.2.2", "vite-plugin-devtools-json": "^1.0.0", "vue": "3.5.29", @@ -1262,17 +1262,19 @@ "@stylistic/eslint-plugin": ["@stylistic/eslint-plugin@5.6.1", "", { "dependencies": { "@eslint-community/eslint-utils": "^4.9.0", "@typescript-eslint/types": "^8.47.0", "eslint-visitor-keys": "^4.2.1", "espree": "^10.4.0", "estraverse": "^5.3.0", "picomatch": "^4.0.3" }, "peerDependencies": { "eslint": ">=9.0.0" } }, "sha512-JCs+MqoXfXrRPGbGmho/zGS/jMcn3ieKl/A8YImqib76C8kjgZwq5uUFzc30lJkMvcchuRn6/v8IApLxli3Jyw=="], - "@supabase/auth-js": ["@supabase/auth-js@2.93.3", "", { "dependencies": { "tslib": "2.8.1" } }, "sha512-JdnkHZPKexVGSNONtu89RHU4bxz3X9kxx+f5ZnR5osoCIX+vs/MckwWRPZEybAEvlJXt5xjomDb3IB876QCxWQ=="], + "@supabase/auth-js": ["@supabase/auth-js@2.100.1", "", { "dependencies": { "tslib": "2.8.1" } }, "sha512-c5FB4nrG7cs1mLSzFGuIVl2iR2YO5XkSJ96uF4zubYm8YDn71XOi2emE9sBm/avfGCj61jaRBLOvxEAVnpys0Q=="], - "@supabase/functions-js": ["@supabase/functions-js@2.93.3", "", { "dependencies": { "tslib": "2.8.1" } }, "sha512-qWO0gHNDm/5jRjROv/nv9L6sYabCWS1kzorOLUv3kqCwRvEJLYZga93ppJPrZwOgoZfXmJzvpjY8fODA4HQfBw=="], + "@supabase/functions-js": ["@supabase/functions-js@2.100.1", "", { "dependencies": { "tslib": "2.8.1" } }, "sha512-mo8QheoV4KR+wSubtyEWhZUxWnCM7YZ23TncccMAlbWAHb8YTDqRGRm9IalWCAswniKyud6buZCk9snRqI86KA=="], - "@supabase/postgrest-js": ["@supabase/postgrest-js@2.93.3", "", { "dependencies": { "tslib": "2.8.1" } }, "sha512-+iJ96g94skO2e4clsRSmEXg22NUOjh9BziapsJSAvnB1grOBf/BA8vGtCHjNOA+Z6lvKXL1jwBqcL9+fS1W/Lg=="], + "@supabase/phoenix": ["@supabase/phoenix@0.4.0", "", {}, "sha512-RHSx8bHS02xwfHdAbX5Lpbo6PXbgyf7lTaXTlwtFDPwOIw64NnVRwFAXGojHhjtVYI+PEPNSWwkL90f4agN3bw=="], - "@supabase/realtime-js": ["@supabase/realtime-js@2.93.3", "", { "dependencies": { "@types/phoenix": "^1.6.6", "@types/ws": "^8.18.1", "tslib": "2.8.1", "ws": "^8.18.2" } }, "sha512-gnYpcFzwy8IkezRP4CDbT5I8jOsiOjrWrqTY1B+7jIriXsnpifmlM6RRjLBm9oD7OwPG0/WksniGPdKW67sXOA=="], + "@supabase/postgrest-js": ["@supabase/postgrest-js@2.100.1", "", { "dependencies": { "tslib": "2.8.1" } }, "sha512-OIh4mOSo2LdqF2kox76OAPDtcSs+PwKABJOjc6plUV4/LXhFEsI2uwdEEIs7K7fd141qehWEVl/Y+Ts0fNvYsw=="], - "@supabase/storage-js": ["@supabase/storage-js@2.93.3", "", { "dependencies": { "iceberg-js": "^0.8.1", "tslib": "2.8.1" } }, "sha512-cw4qXiLrx3apglDM02Tx/w/stvFlrkKocC6vCvuFAz3JtVEl1zH8MUfDQDTH59kJAQVaVdbewrMWSoBob7REnA=="], + "@supabase/realtime-js": ["@supabase/realtime-js@2.100.1", "", { "dependencies": { "@supabase/phoenix": "^0.4.0", "@types/ws": "^8.18.1", "tslib": "2.8.1", "ws": "^8.18.2" } }, "sha512-FHuRWPX4qZQ4x+0Q+ZrKaBZnOiVGiwsgiAUJM98pYRib1yeaE/fOM1lZ1ozd+4gA8Udw23OyaD8SxKS5mT5NYw=="], - "@supabase/supabase-js": ["@supabase/supabase-js@2.93.3", "", { "dependencies": { "@supabase/auth-js": "2.93.3", "@supabase/functions-js": "2.93.3", "@supabase/postgrest-js": "2.93.3", "@supabase/realtime-js": "2.93.3", "@supabase/storage-js": "2.93.3" } }, "sha512-paUqEqdBI9ztr/4bbMoCgeJ6M8ZTm2fpfjSOlzarPuzYveKFM20ZfDZqUpi9CFfYagYj5Iv3m3ztUjaI9/tM1w=="], + "@supabase/storage-js": ["@supabase/storage-js@2.100.1", "", { "dependencies": { "iceberg-js": "^0.8.1", "tslib": "2.8.1" } }, "sha512-x9xpEIoWM4xKiAlwfWTgHPSN6N4Y0aS4FVU4F6ZPbq7Gayw08SrtC6/YH/gOr8CjXQr0HxXYXDop2xGTSjubYA=="], + + "@supabase/supabase-js": ["@supabase/supabase-js@2.100.1", "", { "dependencies": { "@supabase/auth-js": "2.100.1", "@supabase/functions-js": "2.100.1", "@supabase/postgrest-js": "2.100.1", "@supabase/realtime-js": "2.100.1", "@supabase/storage-js": "2.100.1" } }, "sha512-CAeFm5sfX8sbTzxoxRafhohreIzl9a7R6qHTck3MrgTqm5M5g/u0IHfEKYzI9w/17r8NINl8UZrw2i08wrO7Iw=="], "@surma/rollup-plugin-off-main-thread": ["@surma/rollup-plugin-off-main-thread@2.2.3", "", { "dependencies": { "ejs": "^3.1.6", "json5": "^2.2.0", "magic-string": "^0.25.0", "string.prototype.matchall": "^4.0.6" } }, "sha512-lR8q/9W7hZpMWweNiAKU7NQerBnzQQLvi8qnTDU/fxItPhtZVMbPV3lbCwjhIlNBe9Bbr5V+KHshvWmVSG9cxQ=="], @@ -1378,8 +1380,6 @@ "@types/pg": ["@types/pg@8.18.0", "", { "dependencies": { "@types/node": "*", "pg-protocol": "*", "pg-types": "^2.2.0" } }, "sha512-gT+oueVQkqnj6ajGJXblFR4iavIXWsGAFCk3dP4Kki5+a9R4NMt0JARdk6s8cUKcfUoqP5dAtDSLU8xYUTFV+Q=="], - "@types/phoenix": ["@types/phoenix@1.6.7", "", {}, "sha512-oN9ive//QSBkf19rfDv45M7eZPi0eEXylht2OLEXicu5b4KoQ1OzXIw+xDSGWxSxe1JmepRR/ZH283vsu518/Q=="], - "@types/qrcode": ["@types/qrcode@1.5.6", "", { "dependencies": { "@types/node": "*" } }, "sha512-te7NQcV2BOvdj2b1hCAHzAoMNuj65kNBMz0KBaxM6c3VGBOhU0dURQKOtH8CFNI/dsKkwlv32p26qYQTWoB5bw=="], "@types/qs": ["@types/qs@6.14.0", "", {}, "sha512-eOunJqu0K1923aExK6y8p6fsihYEn/BYuQ4g0CxAAgFc4b/ZLN4CrsRZ55srTdqoiLzU2B2evC+apEIxprEzkQ=="], @@ -1634,7 +1634,7 @@ "adm-zip": ["adm-zip@0.5.16", "", {}, "sha512-TGw5yVi4saajsSEgz25grObGHEUaDrniwvA2qwSC060KfqGPdglhvPMA2lPIoxs3PQIItj2iag35fONcQqgUaQ=="], - "agent-base": ["agent-base@7.1.4", "", {}, "sha512-MnA+YT8fwfJPgBx3m60MNqakm30XOkyIoH1y6huTQvC0PwZG7ki8NacLBcrPbNoo8vEZy7Jpuk7+jMO+CUovTQ=="], + "agent-base": ["agent-base@8.0.0", "", {}, "sha512-QT8i0hCz6C/KQ+KTAbSNwCHDGdmUJl2tp2ZpNlGSWCfhUNVbYG2WLE3MdZGBAgXPV4GAvjGMxo+C1hroyxmZEg=="], "ajv": ["ajv@6.14.0", "", { "dependencies": { "fast-deep-equal": "^3.1.1", "fast-json-stable-stringify": "^2.0.0", "json-schema-traverse": "^0.4.1", "uri-js": "^4.2.2" } }, "sha512-IWrosm/yrn43eiKqkfkHis7QioDleaXQHdDVPKg0FSwwd/DuvyX79TZnFOnYpB7dcsFAMmtFztZuXPDvSePkFw=="], @@ -2324,7 +2324,7 @@ "http-proxy-middleware": ["http-proxy-middleware@2.0.9", "", { "dependencies": { "@types/http-proxy": "^1.17.8", "http-proxy": "^1.18.1", "is-glob": "^4.0.1", "is-plain-obj": "^3.0.0", "micromatch": "^4.0.2" }, "peerDependencies": { "@types/express": "^4.17.13" }, "optionalPeers": ["@types/express"] }, "sha512-c1IyJYLYppU574+YI7R4QyX2ystMtVXZwIdzazUIPIJsHuWNd+mho2j+bKoHftndicGj9yh+xjd+l0yj7VeT1Q=="], - "https-proxy-agent": ["https-proxy-agent@7.0.6", "", { "dependencies": { "agent-base": "^7.1.2", "debug": "4" } }, "sha512-vK9P5/iUfdl95AI+JVyUuIcVtd4ofvtrOr3HNtM2yxC9bnMbEdp3x01OhQNnjb8IJYi38VlTE3mBXwcfvywuSw=="], + "https-proxy-agent": ["https-proxy-agent@8.0.0", "", { "dependencies": { "agent-base": "8.0.0", "debug": "^4.3.4" } }, "sha512-YYeW+iCnAS3xhvj2dvVoWgsbca3RfQy/IlaNHHOtDmU0jMqPI9euIq3Y9BJETdxk16h9NHHCKqp/KB9nIMStCQ=="], "human-signals": ["human-signals@2.1.0", "", {}, "sha512-B4FFZ6q/T2jhhksgkbEW3HBvWIfDW85snkQgawt07S7J5QXTk6BkNV+0yAeZrM5QpMAdYlocGoljn0sJ/WQkFw=="], @@ -3264,7 +3264,7 @@ "stylehacks": ["stylehacks@5.1.1", "", { "dependencies": { "browserslist": "^4.21.4", "postcss-selector-parser": "^6.0.4" }, "peerDependencies": { "postcss": "^8.2.15" } }, "sha512-sBpcd5Hx7G6seo7b1LkpttvTz7ikD0LlH5RmdcBNb6fFR0Fl7LQwHDFr300q4cwUqi+IYrFGmsIHieMBfnN/Bw=="], - "supabase": ["supabase@2.78.1", "", { "dependencies": { "bin-links": "^6.0.0", "https-proxy-agent": "^7.0.2", "node-fetch": "^3.3.2", "tar": "7.5.11" }, "bin": { "supabase": "bin/supabase" } }, "sha512-fCIE/LTTr1IGrrYLqYBI3w89QU1qW+mRVtUi/Dmrtj+oXtDX4E8VgfFlXZpoYsXy86cfE9RZXUJVAGgvdNfTPg=="], + "supabase": ["supabase@2.84.4", "", { "dependencies": { "bin-links": "^6.0.0", "https-proxy-agent": "^8.0.0", "node-fetch": "^3.3.2", "tar": "7.5.13" }, "bin": { "supabase": "bin/supabase" } }, "sha512-+WSe/7FFMuEOa1LJr1tZh12WDwW6lpKSmBjiEmf7m9j/ialf2oxeUMlsJCdYpST5kQ7PN0XDyvqnjE0tv/AB2w=="], "superjson": ["superjson@2.2.6", "", { "dependencies": { "copy-anything": "^4" } }, "sha512-H+ue8Zo4vJmV2nRjpx86P35lzwDT3nItnIsocgumgr0hHMQ+ZGq5vrERg9kJBo5AWGmxZDhzDo+WVIJqkB0cGA=="], @@ -4278,6 +4278,8 @@ "strip-literal/js-tokens": ["js-tokens@9.0.1", "", {}, "sha512-mxa9E9ITFOt0ban3j6L5MpjwegGz6lBQmM1IJkWeBZGcMxto50+eWdjC/52xDbS2vy0k7vIMK0Fe2wfL9OQSpQ=="], + "supabase/tar": ["tar@7.5.13", "", { "dependencies": { "@isaacs/fs-minipass": "^4.0.0", "chownr": "^3.0.0", "minipass": "^7.1.2", "minizlib": "^3.1.0", "yallist": "^5.0.0" } }, "sha512-tOG/7GyXpFevhXVh8jOPJrmtRpOTsYqUIkVdVooZYJS/z8WhfQUX8RJILmeuJNinGAMSu1veBr4asSHFt5/hng=="], + "svgo/commander": ["commander@7.2.0", "", {}, "sha512-QrWXB+ZQSVPmIWIhtEO9H+gwHaMGYiF5ChvoJ+K9ZGHG/sVsa6yiesAD1GC/x46sET00Xlwo1u49RVVVzvcSkw=="], "tempy/is-stream": ["is-stream@2.0.1", "", {}, "sha512-hFoiJiTl63nn+kstHGBtewWSKnQLpyb155KHheA1l39uvtO9nWIop1p3udqPcUd/xbF1VLMO4n7OI6p7RbngDg=="], diff --git a/cloudflare_workers/api/index.ts b/cloudflare_workers/api/index.ts index 3a8a900845..146633a856 100644 --- a/cloudflare_workers/api/index.ts +++ b/cloudflare_workers/api/index.ts @@ -41,18 +41,24 @@ import { app as organization } from '../../supabase/functions/_backend/public/or import { app as replication } from '../../supabase/functions/_backend/public/replication.ts' import { app as statistics } from '../../supabase/functions/_backend/public/statistics/index.ts' import { app as webhooks } from '../../supabase/functions/_backend/public/webhooks/index.ts' +import { app as credit_usage_alerts } from '../../supabase/functions/_backend/triggers/credit_usage_alerts.ts' import { app as cron_clean_orphan_images } from '../../supabase/functions/_backend/triggers/cron_clean_orphan_images.ts' import { app as cron_clear_versions } from '../../supabase/functions/_backend/triggers/cron_clear_versions.ts' import { app as cron_email } from '../../supabase/functions/_backend/triggers/cron_email.ts' +import { app as cron_reconcile_build_status } from '../../supabase/functions/_backend/triggers/cron_reconcile_build_status.ts' import { app as cron_stat_app } from '../../supabase/functions/_backend/triggers/cron_stat_app.ts' import { app as cron_stat_org } from '../../supabase/functions/_backend/triggers/cron_stat_org.ts' import { app as cron_sync_sub } from '../../supabase/functions/_backend/triggers/cron_sync_sub.ts' import { app as logsnag_insights } from '../../supabase/functions/_backend/triggers/logsnag_insights.ts' import { app as on_app_create } from '../../supabase/functions/_backend/triggers/on_app_create.ts' +import { app as on_app_delete } from '../../supabase/functions/_backend/triggers/on_app_delete.ts' +import { app as on_app_update } from '../../supabase/functions/_backend/triggers/on_app_update.ts' import { app as on_channel_update } from '../../supabase/functions/_backend/triggers/on_channel_update.ts' import { app as on_deploy_history_create } from '../../supabase/functions/_backend/triggers/on_deploy_history_create.ts' import { app as on_manifest_create } from '../../supabase/functions/_backend/triggers/on_manifest_create.ts' +import { app as on_org_update } from '../../supabase/functions/_backend/triggers/on_org_update.ts' import { app as on_organization_create } from '../../supabase/functions/_backend/triggers/on_organization_create.ts' +import { app as on_organization_delete } from '../../supabase/functions/_backend/triggers/on_organization_delete.ts' import { app as on_user_create } from '../../supabase/functions/_backend/triggers/on_user_create.ts' import { app as on_user_delete } from '../../supabase/functions/_backend/triggers/on_user_delete.ts' import { app as on_user_update } from '../../supabase/functions/_backend/triggers/on_user_update.ts' @@ -61,6 +67,8 @@ import { app as on_version_delete } from '../../supabase/functions/_backend/trig import { app as on_version_update } from '../../supabase/functions/_backend/triggers/on_version_update.ts' import { app as queue_consumer } from '../../supabase/functions/_backend/triggers/queue_consumer.ts' import { app as stripe_event } from '../../supabase/functions/_backend/triggers/stripe_event.ts' +import { app as webhook_delivery } from '../../supabase/functions/_backend/triggers/webhook_delivery.ts' +import { app as webhook_dispatcher } from '../../supabase/functions/_backend/triggers/webhook_dispatcher.ts' import { createAllCatch, createHono } from '../../supabase/functions/_backend/utils/hono.ts' import { version } from '../../supabase/functions/_backend/utils/version.ts' @@ -122,9 +130,15 @@ appTriggers.route('/ok', ok) appTriggers.route('/cron_email', cron_email) appTriggers.route('/cron_clear_versions', cron_clear_versions) appTriggers.route('/cron_clean_orphan_images', cron_clean_orphan_images) +appTriggers.route('/cron_reconcile_build_status', cron_reconcile_build_status) +appTriggers.route('/credit_usage_alerts', credit_usage_alerts) appTriggers.route('/logsnag_insights', logsnag_insights) appTriggers.route('/on_channel_update', on_channel_update) appTriggers.route('/on_app_create', on_app_create) +appTriggers.route('/on_app_delete', on_app_delete) +appTriggers.route('/on_app_update', on_app_update) +appTriggers.route('/on_org_update', on_org_update) +appTriggers.route('/on_organization_delete', on_organization_delete) appTriggers.route('/on_user_create', on_user_create) appTriggers.route('/on_user_update', on_user_update) appTriggers.route('/on_user_delete', on_user_delete) @@ -139,6 +153,8 @@ appTriggers.route('/cron_stat_app', cron_stat_app) appTriggers.route('/cron_stat_org', cron_stat_org) appTriggers.route('/cron_sync_sub', cron_sync_sub) appTriggers.route('/queue_consumer', queue_consumer) +appTriggers.route('/webhook_delivery', webhook_delivery) +appTriggers.route('/webhook_dispatcher', webhook_dispatcher) app.route('/triggers', appTriggers) app.route('/private', appPrivate) diff --git a/package.json b/package.json index 95f7220672..fd6e3a1ada 100644 --- a/package.json +++ b/package.json @@ -181,7 +181,7 @@ "@logsnag/node": "1.0.1", "@revenuecat/purchases-capacitor": "11.2.13", "@std/semver": "npm:@jsr/std__semver@1.0.8", - "@supabase/supabase-js": "2.93.3", + "@supabase/supabase-js": "2.100.1", "@tailwindcss/forms": "^0.5.11", "@types/semver": "^7.7.1", "@vuepic/vue-datepicker": "^12.1.0", @@ -210,7 +210,7 @@ "qrcode": "^1.5.4", "semver": "^7.7.4", "stripe": "^19.3.1", - "supabase": "^2.78.1", + "supabase": "^2.84.2", "turndown": "^7.2.2", "vite-plugin-devtools-json": "^1.0.0", "vue": "3.5.29", diff --git a/supabase/functions/_backend/private/download_link.ts b/supabase/functions/_backend/private/download_link.ts index 1f6ec2432b..82642206f9 100644 --- a/supabase/functions/_backend/private/download_link.ts +++ b/supabase/functions/_backend/private/download_link.ts @@ -60,8 +60,7 @@ app.post('/', middlewareAuth, async (c) => { const { data: manifest, error: getManifestError } = await supabase .from('manifest') .select('*') - .eq('app_id', body.app_id) - .eq('id', body.id) + .eq('app_version_id', bundle.id) if (getManifestError) { throw simpleError('cannot_get_manifest', 'Cannot get manifest', { getManifestError }) diff --git a/supabase/functions/_backend/triggers/webhook_delivery.ts b/supabase/functions/_backend/triggers/webhook_delivery.ts index 62b7c945fb..ff10419ef0 100644 --- a/supabase/functions/_backend/triggers/webhook_delivery.ts +++ b/supabase/functions/_backend/triggers/webhook_delivery.ts @@ -42,8 +42,12 @@ app.post('/', middlewareAPISecret, async (c) => { try { const body = await c.req.json() - // Extract delivery data from the queue message - const deliveryData: DeliveryMessage = body.payload || body + // queue_consumer posts the queue payload directly, while direct trigger calls may + // still send the full pgmq envelope. Only unwrap when the delivery envelope is + // not already present on the body. + const deliveryData: DeliveryMessage = body?.delivery_id && body?.webhook_id && body?.url + ? body + : (body.payload || body) cloudlog({ requestId: c.get('requestId'), diff --git a/supabase/migrations/20260327220305_add_webhook_queues_to_cron_tasks.sql b/supabase/migrations/20260327220305_add_webhook_queues_to_cron_tasks.sql new file mode 100644 index 0000000000..5c5dd0acae --- /dev/null +++ b/supabase/migrations/20260327220305_add_webhook_queues_to_cron_tasks.sql @@ -0,0 +1,48 @@ +-- Ensure webhook queues are drained by the table-driven cron scheduler. +-- +-- Webhooks were originally added to the legacy hard-coded process_all_cron_tasks +-- implementation, but the later cron_tasks migration rebuilt the high-frequency +-- queue list without carrying webhook_dispatcher/webhook_delivery forward. +-- Update the active cron_tasks row in place so existing environments start +-- processing webhook queues again. + +WITH updated_target AS ( + SELECT + ct.name, + ( + WITH current_target AS ( + SELECT COALESCE(ct.target::jsonb, '[]'::jsonb) AS target + ), + ordered_items AS ( + SELECT value, ordinality + FROM current_target, + jsonb_array_elements_text(current_target.target) WITH ORDINALITY AS existing_items(value, ordinality) + + UNION ALL + + SELECT 'webhook_dispatcher', 1000000 + FROM current_target + WHERE NOT current_target.target ? 'webhook_dispatcher' + + UNION ALL + + SELECT 'webhook_delivery', 1000001 + FROM current_target + WHERE NOT current_target.target ? 'webhook_delivery' + ) + SELECT + COALESCE( + jsonb_agg(value ORDER BY ordinality), + '["webhook_dispatcher","webhook_delivery"]'::jsonb + )::text + FROM ordered_items + ) AS normalized_target + FROM public.cron_tasks AS ct + WHERE ct.name = 'high_frequency_queues' +) +UPDATE public.cron_tasks AS ct +SET + target = updated_target.normalized_target, + updated_at = now() +FROM updated_target +WHERE ct.name = updated_target.name; diff --git a/supabase/tests/49_test_webhook_cron_registration.sql b/supabase/tests/49_test_webhook_cron_registration.sql new file mode 100644 index 0000000000..d04546a6b9 --- /dev/null +++ b/supabase/tests/49_test_webhook_cron_registration.sql @@ -0,0 +1,41 @@ +BEGIN; + +SELECT plan(2); + +SELECT tests.authenticate_as_service_role(); + +SELECT ok( + ( + SELECT count(*)::int + FROM public.cron_tasks + WHERE + name = 'high_frequency_queues' + AND enabled = TRUE + AND task_type = 'function_queue'::public.cron_task_type + AND target::jsonb ? 'webhook_dispatcher' + AND target::jsonb ? 'webhook_delivery' + ) = 1, + 'cron_tasks high_frequency_queues includes webhook dispatcher and delivery queues' +); + +SELECT ok( + ( + WITH queue_order AS ( + SELECT value, ordinality + FROM public.cron_tasks, + jsonb_array_elements_text(target::jsonb) WITH ORDINALITY AS queue_items(value, ordinality) + WHERE name = 'high_frequency_queues' + ) + SELECT + MAX(CASE WHEN value = 'webhook_dispatcher' THEN ordinality END) + < MAX(CASE WHEN value = 'webhook_delivery' THEN ordinality END) + FROM queue_order + ), + 'cron_tasks high_frequency_queues processes webhook dispatcher before delivery' +); + +SELECT tests.clear_authentication(); + +SELECT * FROM finish(); -- noqa: AM04 + +ROLLBACK; diff --git a/tests/webhook-queue-processing.test.ts b/tests/webhook-queue-processing.test.ts new file mode 100644 index 0000000000..6acd18ac9a --- /dev/null +++ b/tests/webhook-queue-processing.test.ts @@ -0,0 +1,222 @@ +import { randomUUID } from 'node:crypto' +import { Pool } from 'pg' +import { afterAll, beforeAll, describe, expect, it } from 'vitest' + +import { + BASE_URL, + getSupabaseClient, + headersInternal, + POSTGRES_URL, + TEST_EMAIL, + USER_ID, +} from './test-utils.ts' + +const BASE_URL_TRIGGER = `${BASE_URL}/triggers` +const pool = new Pool({ + connectionString: POSTGRES_URL, + max: 1, + idleTimeoutMillis: 2000, +}) + +const WEBHOOK_QUEUE_TEST_ORG_ID = randomUUID() +const webhookName = `Webhook Queue Test ${randomUUID()}` +const customerId = `cus_webhook_queue_${randomUUID().replace(/-/g, '').slice(0, 20)}` + +let createdWebhookId: string | null = null + +async function fetchQueueSync(queueName: string, maxRetries = 4) { + let lastError: Error | null = null + + for (let attempt = 0; attempt < maxRetries; attempt++) { + try { + const response = await fetch(`${BASE_URL_TRIGGER}/queue_consumer/sync`, { + method: 'POST', + headers: headersInternal, + body: JSON.stringify({ queue_name: queueName }), + }) + + if (response.status === 202) { + expect(await response.json()).toEqual({ status: 'ok' }) + return + } + + lastError = new Error(`queue_consumer/sync returned HTTP ${response.status} for ${queueName}`) + } + catch (error) { + lastError = error instanceof Error ? error : new Error(String(error)) + + if (attempt === maxRetries - 1) { + throw new Error(`queue_consumer/sync network failure for ${queueName}: ${lastError.message}`) + } + } + + if (attempt < maxRetries - 1) + await new Promise(resolve => setTimeout(resolve, 250 * (attempt + 1))) + } + + if (lastError) { + throw new Error(`queue_consumer/sync failed for ${queueName}: ${lastError.message}`) + } + + throw new Error(`queue_consumer/sync failed for ${queueName}`) +} + +async function waitForDeliveryRecord(webhookId: string, timeoutMs = 10000) { + const start = Date.now() + + while (Date.now() - start < timeoutMs) { + const { data, error } = await (getSupabaseClient() as any) + .from('webhook_deliveries') + .select('*') + .eq('webhook_id', webhookId) + .order('created_at', { ascending: false }) + .limit(1) + + if (error) + throw error + + if (data?.[0]) + return data[0] + + await new Promise(resolve => setTimeout(resolve, 250)) + } + + throw new Error(`Timed out waiting for delivery record for webhook ${webhookId}`) +} + +async function waitForWebhookDeliveryQueueMessage(deliveryId: string, timeoutMs = 10000) { + const start = Date.now() + + while (Date.now() - start < timeoutMs) { + const { rows } = await pool.query( + `SELECT count(*) AS count + FROM pgmq.q_webhook_delivery + WHERE message->'payload'->>'delivery_id' = $1`, + [deliveryId], + ) + + if (Number(rows[0]?.count ?? 0) > 0) + return + + await new Promise(resolve => setTimeout(resolve, 250)) + } + + throw new Error(`Timed out waiting for webhook_delivery queue message for delivery ${deliveryId}`) +} + +async function waitForDeliveryCompletion(deliveryId: string, timeoutMs = 15000) { + const start = Date.now() + let lastState: Record | null = null + + while (Date.now() - start < timeoutMs) { + const { data, error } = await (getSupabaseClient() as any) + .from('webhook_deliveries') + .select('*') + .eq('id', deliveryId) + .single() + + if (error) + throw error + + lastState = data + + if (data?.status && data.status !== 'pending') + return data + + await new Promise(resolve => setTimeout(resolve, 250)) + } + + throw new Error(`Timed out waiting for delivery ${deliveryId} to complete: ${JSON.stringify(lastState)}`) +} + +describe('webhook queue processing', () => { + beforeAll(async () => { + const { error: stripeError } = await getSupabaseClient().from('stripe_info').insert({ + customer_id: customerId, + status: 'succeeded', + product_id: 'prod_LQIregjtNduh4q', + subscription_id: `sub_${randomUUID()}`, + trial_at: new Date(Date.now() + 15 * 24 * 60 * 60 * 1000).toISOString(), + is_good_plan: true, + }) + if (stripeError) + throw stripeError + + const { error: orgError } = await getSupabaseClient().from('orgs').insert({ + id: WEBHOOK_QUEUE_TEST_ORG_ID, + name: webhookName, + management_email: TEST_EMAIL, + created_by: USER_ID, + customer_id: customerId, + }) + if (orgError) + throw orgError + + const { data: webhook, error: webhookError } = await (getSupabaseClient() as any) + .from('webhooks') + .insert({ + org_id: WEBHOOK_QUEUE_TEST_ORG_ID, + name: webhookName, + url: 'https://example.com/webhook', + events: ['apps'], + enabled: true, + created_by: USER_ID, + }) + .select() + .single() + + if (webhookError) + throw webhookError + + createdWebhookId = webhook.id + }) + + afterAll(async () => { + if (createdWebhookId) { + await (getSupabaseClient() as any).from('webhook_deliveries').delete().eq('webhook_id', createdWebhookId) + await (getSupabaseClient() as any).from('webhooks').delete().eq('id', createdWebhookId) + } + + await getSupabaseClient().from('orgs').delete().eq('id', WEBHOOK_QUEUE_TEST_ORG_ID) + await getSupabaseClient().from('stripe_info').delete().eq('customer_id', customerId) + await pool.end() + }) + + it('dispatches and delivers webhook queue messages end to end', { timeout: 30000 }, async () => { + if (!createdWebhookId) + throw new Error('Webhook was not created in setup') + + const queueMessage = { + function_name: 'webhook_dispatcher', + function_type: 'cloudflare', + payload: { + audit_log_id: 999999, + table_name: 'apps', + operation: 'UPDATE', + org_id: WEBHOOK_QUEUE_TEST_ORG_ID, + record_id: `app_${randomUUID()}`, + old_record: { name: 'Before' }, + new_record: { name: 'After' }, + changed_fields: ['name'], + user_id: USER_ID, + created_at: new Date().toISOString(), + }, + } + + await pool.query('SELECT pgmq.send($1, $2::jsonb)', ['webhook_dispatcher', JSON.stringify(queueMessage)]) + + await fetchQueueSync('webhook_dispatcher') + const createdDelivery = await waitForDeliveryRecord(createdWebhookId) + + expect(createdDelivery.event_type).toBe('apps.UPDATE') + expect(createdDelivery.status).toBe('pending') + + await waitForWebhookDeliveryQueueMessage(createdDelivery.id) + await fetchQueueSync('webhook_delivery') + const completedDelivery = await waitForDeliveryCompletion(createdDelivery.id) + + expect(completedDelivery.status).toBe('failed') + expect(completedDelivery.attempt_count).toBe(1) + expect(completedDelivery.response_body).toBeTruthy() + }) +})