From 86f2cb10ac79a5b8e89963035026b4b9f321f78f Mon Sep 17 00:00:00 2001 From: WcaleNieWolny Date: Tue, 14 Oct 2025 13:33:31 +0200 Subject: [PATCH 1/8] feat: rename cron jobs for plan and stats --- cloudflare_workers/api/index.ts | 8 +- netlify/edge-functions/triggers.ts | 8 +- src/components.d.ts | 1 - .../functions/_backend/triggers/cron_plan.ts | 36 ------ .../_backend/triggers/cron_stat_app.ts | 113 ++++++++++++++++++ .../_backend/triggers/cron_stat_org.ts | 36 ++++++ .../functions/_backend/triggers/cron_stats.ts | 113 ------------------ supabase/functions/triggers/index.ts | 8 +- .../20251014105957_rename_plan_cron.sql | 61 ++++++++++ ...on_stats.test.ts => cron_stat_app.test.ts} | 8 +- ....test.ts => cron_stat_integration.test.ts} | 22 ++-- ...ron_plan.test.ts => cron_stat_org.test.ts} | 24 ++-- tests/error-cases.test.ts | 6 +- ...s => queue_cron_stat_org_function.test.ts} | 6 +- tests/queue_load.test.ts | 2 +- tests/test-utils.ts | 4 +- tests/trigger-error-cases.test.ts | 12 +- 17 files changed, 264 insertions(+), 204 deletions(-) delete mode 100644 supabase/functions/_backend/triggers/cron_plan.ts create mode 100644 supabase/functions/_backend/triggers/cron_stat_app.ts create mode 100644 supabase/functions/_backend/triggers/cron_stat_org.ts delete mode 100644 supabase/functions/_backend/triggers/cron_stats.ts create mode 100644 supabase/migrations/20251014105957_rename_plan_cron.sql rename tests/{cron_stats.test.ts => cron_stat_app.test.ts} (95%) rename tests/{cron_plan_integration.test.ts => cron_stat_integration.test.ts} (95%) rename tests/{cron_plan.test.ts => cron_stat_org.test.ts} (95%) rename tests/{queue_cron_plan_function.test.ts => queue_cron_stat_org_function.test.ts} (97%) diff --git a/cloudflare_workers/api/index.ts b/cloudflare_workers/api/index.ts index f52159926c..6d47119f45 100644 --- a/cloudflare_workers/api/index.ts +++ b/cloudflare_workers/api/index.ts @@ -25,8 +25,8 @@ import { app as clear_app_cache } from '../../supabase/functions/_backend/trigge import { app as clear_device_cache } from '../../supabase/functions/_backend/triggers/clear_device_cache.ts' import { app as cron_clear_versions } from '../../supabase/functions/_backend/triggers/cron_clear_versions.ts' import { app as cron_email } from '../../supabase/functions/_backend/triggers/cron_email.ts' -import { app as cron_plan } from '../../supabase/functions/_backend/triggers/cron_plan.ts' -import { app as cron_stats } from '../../supabase/functions/_backend/triggers/cron_stats.ts' +import { app as cron_stat_org } from '../../supabase/functions/_backend/triggers/cron_stat_org.ts' +import { app as cron_stat_app } from '../../supabase/functions/_backend/triggers/cron_stat_app.ts' import { app as logsnag_insights } from '../../supabase/functions/_backend/triggers/logsnag_insights.ts' import { app as on_app_create } from '../../supabase/functions/_backend/triggers/on_app_create.ts' import { app as on_channel_update } from '../../supabase/functions/_backend/triggers/on_channel_update.ts' @@ -96,8 +96,8 @@ appTriggers.route('/on_manifest_create', on_manifest_create) appTriggers.route('/on_deploy_history_create', on_deploy_history_create) appTriggers.route('/stripe_event', stripe_event) appTriggers.route('/on_organization_create', on_organization_create) -appTriggers.route('/cron_stats', cron_stats) -appTriggers.route('/cron_plan', cron_plan) +appTriggers.route('/cron_stat_app', cron_stat_app) +appTriggers.route('/cron_stat_org', cron_stat_org) appTriggers.route('/queue_consumer', queue_consumer) app.route('/triggers', appTriggers) diff --git a/netlify/edge-functions/triggers.ts b/netlify/edge-functions/triggers.ts index 4626917caf..6ff227b00e 100644 --- a/netlify/edge-functions/triggers.ts +++ b/netlify/edge-functions/triggers.ts @@ -9,8 +9,8 @@ import { Hono } from 'hono/tiny' import { app as clear_app_cache } from '../../supabase/functions/_backend/triggers/clear_app_cache.ts' import { app as clear_device_cache } from '../../supabase/functions/_backend/triggers/clear_device_cache.ts' import { app as cron_email } from '../../supabase/functions/_backend/triggers/cron_email.ts' -import { app as cron_plan } from '../../supabase/functions/_backend/triggers/cron_plan.ts' -import { app as cron_stats } from '../../supabase/functions/_backend/triggers/cron_stats.ts' +import { app as cron_stat_org } from '../../supabase/functions/_backend/triggers/cron_stat_org.ts' +import { app as cron_stat_app } from '../../supabase/functions/_backend/triggers/cron_stat_app.ts' import { app as logsnag_insights } from '../../supabase/functions/_backend/triggers/logsnag_insights.ts' import { app as on_channel_update } from '../../supabase/functions/_backend/triggers/on_channel_update.ts' import { app as on_deploy_history_create } from '../../supabase/functions/_backend/triggers/on_deploy_history_create.ts' @@ -50,8 +50,8 @@ appGlobal.route('/on_version_update', on_version_update) appGlobal.route('/on_version_delete', on_version_delete) appGlobal.route('/on_manifest_create', on_manifest_create) appGlobal.route('/stripe_event', stripe_event) -appGlobal.route('/cron_stats', cron_stats) -appGlobal.route('/cron_plan', cron_plan) +appGlobal.route('/cron_stat_app', cron_stat_app) +appGlobal.route('/cron_stat_org', cron_stat_org) appGlobal.route('/on_deploy_history_create', on_deploy_history_create) appGlobal.route('/queue_consumer', queue_consumer) diff --git a/src/components.d.ts b/src/components.d.ts index d837101048..9dabfccf7f 100644 --- a/src/components.d.ts +++ b/src/components.d.ts @@ -25,7 +25,6 @@ declare module 'vue' { DropdownProfile: typeof import('./components/dashboard/DropdownProfile.vue')['default'] FailedCard: typeof import('./components/FailedCard.vue')['default'] HistoryTable: typeof import('./components/tables/HistoryTable.vue')['default'] - IIonCopyOutline: typeof import('~icons/ion/copy-outline')['default'] InfoRow: typeof import('./components/package/InfoRow.vue')['default'] LangSelector: typeof import('./components/LangSelector.vue')['default'] LineChartStats: typeof import('./components/dashboard/LineChartStats.vue')['default'] diff --git a/supabase/functions/_backend/triggers/cron_plan.ts b/supabase/functions/_backend/triggers/cron_plan.ts deleted file mode 100644 index d67e3f0f0b..0000000000 --- a/supabase/functions/_backend/triggers/cron_plan.ts +++ /dev/null @@ -1,36 +0,0 @@ -import type { MiddlewareKeyVariables } from '../utils/hono.ts' -import { Hono } from 'hono/tiny' -import { BRES, middlewareAPISecret, parseBody, simpleError } from '../utils/hono.ts' -import { cloudlog } from '../utils/loggin.ts' -import { checkPlanOrg } from '../utils/plans.ts' -import { supabaseAdmin } from '../utils/supabase.ts' - -interface OrgToGet { - orgId?: string - customerId?: string -} - -export const app = new Hono() - -app.post('/', middlewareAPISecret, async (c) => { - const body = await parseBody(c) - cloudlog({ requestId: c.get('requestId'), message: 'post cron plan body', body }) - if (!body.orgId) - throw simpleError('no_orgId', 'No orgId', { body }) - - await checkPlanOrg(c, body.orgId) - - // Update plan_calculated_at timestamp if we have customerId - if (body.customerId) { - const supabase = supabaseAdmin(c) - await supabase - .from('stripe_info') - .update({ plan_calculated_at: new Date().toISOString() }) - .eq('customer_id', body.customerId) - .throwOnError() - - cloudlog({ requestId: c.get('requestId'), message: 'plan calculated timestamp updated', customerId: body.customerId }) - } - - return c.json(BRES) -}) diff --git a/supabase/functions/_backend/triggers/cron_stat_app.ts b/supabase/functions/_backend/triggers/cron_stat_app.ts new file mode 100644 index 0000000000..2d7ccbc4ae --- /dev/null +++ b/supabase/functions/_backend/triggers/cron_stat_app.ts @@ -0,0 +1,113 @@ +import type { MiddlewareKeyVariables } from '../utils/hono.ts' +import { Hono } from 'hono/tiny' +import { middlewareAPISecret, parseBody, quickError, simpleError, useCors } from '../utils/hono.ts' +import { cloudlog } from '../utils/loggin.ts' +import { readStatsBandwidth, readStatsMau, readStatsStorage, readStatsVersion } from '../utils/stats.ts' +import { supabaseAdmin } from '../utils/supabase.ts' + +interface DataToGet { + appId?: string + orgId?: string + todayOnly?: boolean +} + +export const app = new Hono() + +app.use('/', useCors) + +app.post('/', middlewareAPISecret, async (c) => { + const body = await parseBody(c) + cloudlog({ requestId: c.get('requestId'), message: 'post cron_stat_app body', body }) + if (!body.appId) + throw simpleError('no_appId', 'No appId', { body }) + if (!body.orgId) + throw simpleError('no_orgId', 'No orgId', { body }) + + const supabase = supabaseAdmin(c) + + const app = await supabase.from('apps') + .select('*') + .eq('app_id', body.appId) + .single() + if (!app.data) + throw quickError(404, 'app_not_found', 'App not found', { body }) + if (app.data.owner_org !== body.orgId) + throw quickError(401, 'app_not_found', 'This app is not owned by the organization', { body }) + + // get the period of the billing of the organization + const cycleInfoData = await supabase.rpc('get_cycle_info_org', { orgid: body.orgId }).single() + const cycleInfo = cycleInfoData.data + if (!cycleInfo?.subscription_anchor_start || !cycleInfo?.subscription_anchor_end) + throw simpleError('cannot_get_cycle_info', 'Cannot get cycle info', { cycleInfoData }) + + cloudlog({ requestId: c.get('requestId'), message: 'cycleInfo', cycleInfo }) + const startDate = cycleInfo.subscription_anchor_start + const endDate = cycleInfo.subscription_anchor_end + + // get mau + let mau = await readStatsMau(c, body.appId, startDate, endDate) + // get bandwidth + let bandwidth = await readStatsBandwidth(c, body.appId, startDate, endDate) + // get storage + let storage = await readStatsStorage(c, body.appId, startDate, endDate) + let versionUsage = await readStatsVersion(c, body.appId, startDate, endDate) + + if (body.todayOnly) { + // take only the last day + mau = mau.slice(-1) + bandwidth = bandwidth.slice(-1) + storage = storage.slice(-1) + versionUsage = versionUsage.slice(-1) + } + + cloudlog({ requestId: c.get('requestId'), message: 'mau', mauLength: mau.length, mauCount: mau.reduce((acc, curr) => acc + curr.mau, 0), mau: JSON.stringify(mau) }) + cloudlog({ requestId: c.get('requestId'), message: 'bandwidth', bandwidthLength: bandwidth.length, bandwidthCount: bandwidth.reduce((acc, curr) => acc + curr.bandwidth, 0), bandwidth: JSON.stringify(bandwidth) }) + cloudlog({ requestId: c.get('requestId'), message: 'storage', storageLength: storage.length, storageCount: storage.reduce((acc, curr) => acc + curr.storage, 0), storage: JSON.stringify(storage) }) + cloudlog({ requestId: c.get('requestId'), message: 'versionUsage', versionUsageLength: versionUsage.length, versionUsageCount: versionUsage.reduce((acc, curr) => acc + curr.get + curr.fail + curr.install + curr.uninstall, 0), versionUsage: JSON.stringify(versionUsage) }) + + // save to daily_mau, daily_bandwidth and daily_storage + await Promise.all([ + supabase.from('daily_mau') + .upsert(mau, { onConflict: 'app_id,date' }) + .eq('app_id', body.appId) + .throwOnError(), + supabase.from('daily_bandwidth') + .upsert(bandwidth, { onConflict: 'app_id,date' }) + .eq('app_id', body.appId) + .throwOnError(), + supabase.from('daily_storage') + .upsert(storage, { onConflict: 'app_id,date' }) + .eq('app_id', body.appId) + .throwOnError(), + supabase.from('daily_version') + .upsert(versionUsage, { onConflict: 'app_id,date,version_id' }) + .eq('app_id', body.appId) + .throwOnError(), + ]) + + cloudlog({ requestId: c.get('requestId'), message: 'stats saved', mauLength: mau.length, bandwidthLength: bandwidth.length, storageLength: storage.length, versionUsageLength: versionUsage.length }) + + await supabase.from('orgs') + .update({ stats_updated_at: new Date().toISOString() }) + .eq('id', body.orgId) + .throwOnError() + + // Get customer_id for the organization to queue plan processing + const { data: orgData, error: orgError } = await supabase + .from('orgs') + .select('customer_id') + .eq('id', body.orgId) + .single() + + if (!orgError && orgData?.customer_id) { + // Queue plan processing for this organization + await supabase.rpc('queue_cron_plan_for_org', { + org_id: body.orgId, + customer_id: orgData.customer_id, + }).throwOnError() + + cloudlog({ requestId: c.get('requestId'), message: 'plan processing queued for org', orgId: body.orgId, customerId: orgData.customer_id }) + } + + return c.json({ status: 'Stats saved', mau, bandwidth, storage, versionUsage }) +}) diff --git a/supabase/functions/_backend/triggers/cron_stat_org.ts b/supabase/functions/_backend/triggers/cron_stat_org.ts new file mode 100644 index 0000000000..5d8eb5fd1b --- /dev/null +++ b/supabase/functions/_backend/triggers/cron_stat_org.ts @@ -0,0 +1,36 @@ +import type { MiddlewareKeyVariables } from '../utils/hono.ts' +import { Hono } from 'hono/tiny' +import { BRES, middlewareAPISecret, parseBody, simpleError } from '../utils/hono.ts' +import { cloudlog } from '../utils/loggin.ts' +import { checkPlanOrg } from '../utils/plans.ts' +import { supabaseAdmin } from '../utils/supabase.ts' + +interface OrgToGet { + orgId?: string + customerId?: string +} + +export const app = new Hono() + +app.post('/', middlewareAPISecret, async (c) => { + const body = await parseBody(c) + cloudlog({ requestId: c.get('requestId'), message: 'post cron_stat_org body', body }) + if (!body.orgId) + throw simpleError('no_orgId', 'No orgId', { body }) + + await checkPlanOrg(c, body.orgId) + + // Update plan_calculated_at timestamp if we have customerId + if (body.customerId) { + const supabase = supabaseAdmin(c) + await supabase + .from('stripe_info') + .update({ plan_calculated_at: new Date().toISOString() }) + .eq('customer_id', body.customerId) + .throwOnError() + + cloudlog({ requestId: c.get('requestId'), message: 'plan calculated timestamp updated', customerId: body.customerId }) + } + + return c.json(BRES) +}) diff --git a/supabase/functions/_backend/triggers/cron_stats.ts b/supabase/functions/_backend/triggers/cron_stats.ts deleted file mode 100644 index e146b3ac19..0000000000 --- a/supabase/functions/_backend/triggers/cron_stats.ts +++ /dev/null @@ -1,113 +0,0 @@ -import type { MiddlewareKeyVariables } from '../utils/hono.ts' -import { Hono } from 'hono/tiny' -import { middlewareAPISecret, parseBody, quickError, simpleError, useCors } from '../utils/hono.ts' -import { cloudlog } from '../utils/loggin.ts' -import { readStatsBandwidth, readStatsMau, readStatsStorage, readStatsVersion } from '../utils/stats.ts' -import { supabaseAdmin } from '../utils/supabase.ts' - -interface DataToGet { - appId?: string - orgId?: string - todayOnly?: boolean -} - -export const app = new Hono() - -app.use('/', useCors) - -app.post('/', middlewareAPISecret, async (c) => { - const body = await parseBody(c) - cloudlog({ requestId: c.get('requestId'), message: 'post cron_stats body', body }) - if (!body.appId) - throw simpleError('no_appId', 'No appId', { body }) - if (!body.orgId) - throw simpleError('no_orgId', 'No orgId', { body }) - - const supabase = supabaseAdmin(c) - - const app = await supabase.from('apps') - .select('*') - .eq('app_id', body.appId) - .single() - if (!app.data) - throw quickError(404, 'app_not_found', 'App not found', { body }) - if (app.data.owner_org !== body.orgId) - throw quickError(401, 'app_not_found', 'This app is not owned by the organization', { body }) - - // get the period of the billing of the organization - const cycleInfoData = await supabase.rpc('get_cycle_info_org', { orgid: body.orgId }).single() - const cycleInfo = cycleInfoData.data - if (!cycleInfo?.subscription_anchor_start || !cycleInfo?.subscription_anchor_end) - throw simpleError('cannot_get_cycle_info', 'Cannot get cycle info', { cycleInfoData }) - - cloudlog({ requestId: c.get('requestId'), message: 'cycleInfo', cycleInfo }) - const startDate = cycleInfo.subscription_anchor_start - const endDate = cycleInfo.subscription_anchor_end - - // get mau - let mau = await readStatsMau(c, body.appId, startDate, endDate) - // get bandwidth - let bandwidth = await readStatsBandwidth(c, body.appId, startDate, endDate) - // get storage - let storage = await readStatsStorage(c, body.appId, startDate, endDate) - let versionUsage = await readStatsVersion(c, body.appId, startDate, endDate) - - if (body.todayOnly) { - // take only the last day - mau = mau.slice(-1) - bandwidth = bandwidth.slice(-1) - storage = storage.slice(-1) - versionUsage = versionUsage.slice(-1) - } - - cloudlog({ requestId: c.get('requestId'), message: 'mau', mauLength: mau.length, mauCount: mau.reduce((acc, curr) => acc + curr.mau, 0), mau: JSON.stringify(mau) }) - cloudlog({ requestId: c.get('requestId'), message: 'bandwidth', bandwidthLength: bandwidth.length, bandwidthCount: bandwidth.reduce((acc, curr) => acc + curr.bandwidth, 0), bandwidth: JSON.stringify(bandwidth) }) - cloudlog({ requestId: c.get('requestId'), message: 'storage', storageLength: storage.length, storageCount: storage.reduce((acc, curr) => acc + curr.storage, 0), storage: JSON.stringify(storage) }) - cloudlog({ requestId: c.get('requestId'), message: 'versionUsage', versionUsageLength: versionUsage.length, versionUsageCount: versionUsage.reduce((acc, curr) => acc + curr.get + curr.fail + curr.install + curr.uninstall, 0), versionUsage: JSON.stringify(versionUsage) }) - - // save to daily_mau, daily_bandwidth and daily_storage - await Promise.all([ - supabase.from('daily_mau') - .upsert(mau, { onConflict: 'app_id,date' }) - .eq('app_id', body.appId) - .throwOnError(), - supabase.from('daily_bandwidth') - .upsert(bandwidth, { onConflict: 'app_id,date' }) - .eq('app_id', body.appId) - .throwOnError(), - supabase.from('daily_storage') - .upsert(storage, { onConflict: 'app_id,date' }) - .eq('app_id', body.appId) - .throwOnError(), - supabase.from('daily_version') - .upsert(versionUsage, { onConflict: 'app_id,date,version_id' }) - .eq('app_id', body.appId) - .throwOnError(), - ]) - - cloudlog({ requestId: c.get('requestId'), message: 'stats saved', mauLength: mau.length, bandwidthLength: bandwidth.length, storageLength: storage.length, versionUsageLength: versionUsage.length }) - - await supabase.from('orgs') - .update({ stats_updated_at: new Date().toISOString() }) - .eq('id', body.orgId) - .throwOnError() - - // Get customer_id for the organization to queue plan processing - const { data: orgData, error: orgError } = await supabase - .from('orgs') - .select('customer_id') - .eq('id', body.orgId) - .single() - - if (!orgError && orgData?.customer_id) { - // Queue plan processing for this organization - await supabase.rpc('queue_cron_plan_for_org', { - org_id: body.orgId, - customer_id: orgData.customer_id, - }).throwOnError() - - cloudlog({ requestId: c.get('requestId'), message: 'plan processing queued for org', orgId: body.orgId, customerId: orgData.customer_id }) - } - - return c.json({ status: 'Stats saved', mau, bandwidth, storage, versionUsage }) -}) diff --git a/supabase/functions/triggers/index.ts b/supabase/functions/triggers/index.ts index 43c287d3d9..b24e6add02 100644 --- a/supabase/functions/triggers/index.ts +++ b/supabase/functions/triggers/index.ts @@ -2,8 +2,8 @@ import { app as clear_app_cache } from '../_backend/triggers/clear_app_cache.ts' import { app as clear_device_cache } from '../_backend/triggers/clear_device_cache.ts' import { app as cron_clear_versions } from '../_backend/triggers/cron_clear_versions.ts' import { app as cron_email } from '../_backend/triggers/cron_email.ts' -import { app as cron_plan } from '../_backend/triggers/cron_plan.ts' -import { app as cron_stats } from '../_backend/triggers/cron_stats.ts' +import { app as cron_stat_org } from '../_backend/triggers/cron_stat_org.ts' +import { app as cron_stat_app } from '../_backend/triggers/cron_stat_app.ts' import { app as logsnag_insights } from '../_backend/triggers/logsnag_insights.ts' import { app as on_app_create } from '../_backend/triggers/on_app_create.ts' import { app as on_app_delete } from '../_backend/triggers/on_app_delete.ts' @@ -42,8 +42,8 @@ appGlobal.route('/on_version_delete', on_version_delete) appGlobal.route('/on_manifest_create', on_manifest_create) appGlobal.route('/stripe_event', stripe_event) appGlobal.route('/on_organization_create', on_organization_create) -appGlobal.route('/cron_stats', cron_stats) -appGlobal.route('/cron_plan', cron_plan) +appGlobal.route('/cron_stat_app', cron_stat_app) +appGlobal.route('/cron_stat_org', cron_stat_org) appGlobal.route('/cron_clear_versions', cron_clear_versions) appGlobal.route('/on_organization_delete', on_organization_delete) appGlobal.route('/on_deploy_history_create', on_deploy_history_create) diff --git a/supabase/migrations/20251014105957_rename_plan_cron.sql b/supabase/migrations/20251014105957_rename_plan_cron.sql new file mode 100644 index 0000000000..a2bbd79d7b --- /dev/null +++ b/supabase/migrations/20251014105957_rename_plan_cron.sql @@ -0,0 +1,61 @@ +-- Simple renaming of cron_stats to cron_stat_app and cron_plan to cron_stat_org + +-- Unschedule existing cron jobs +SELECT cron.unschedule('process_cron_stats_queue'); +SELECT cron.unschedule('process_cron_stats_jobs'); +SELECT cron.unschedule('process_cron_plan_queue'); + +-- Rename the message queues +SELECT pgmq.drop_queue('cron_stats'); +SELECT pgmq.drop_queue('cron_plan'); +SELECT pgmq.create('cron_stat_app'); +SELECT pgmq.create('cron_stat_org'); + +-- Reschedule the cron jobs with new queue names +SELECT cron.schedule( + 'process_cron_stat_app_jobs', + '0 */6 * * *', + 'SELECT process_cron_stats_jobs();' +); + +SELECT cron.schedule( + 'process_cron_stat_app_queue', + '* * * * *', + 'SELECT public.process_function_queue(''cron_stat_app'')' +); + +SELECT cron.schedule( + 'process_cron_stat_org_queue', + '* * * * *', + 'SELECT public.process_function_queue(''cron_stat_org'')' +); + +-- Update the queue_cron_plan_for_org function to use the new queue name +CREATE OR REPLACE FUNCTION public.queue_cron_plan_for_org(org_id uuid, customer_id text) +RETURNS void +LANGUAGE plpgsql +SECURITY DEFINER +AS $$ +DECLARE + last_calculated timestamptz; +BEGIN + -- Check when plan was last calculated for this customer + SELECT plan_calculated_at INTO last_calculated + FROM public.stripe_info + WHERE stripe_info.customer_id = queue_cron_plan_for_org.customer_id; + + -- Only queue if plan wasn't calculated in the last hour + IF last_calculated IS NULL OR last_calculated < NOW() - INTERVAL '1 hour' THEN + PERFORM pgmq.send('cron_stat_org', + jsonb_build_object( + 'function_name', 'cron_stat_org', + 'function_type', 'cloudflare', + 'payload', jsonb_build_object( + 'orgId', org_id, + 'customerId', customer_id + ) + ) + ); + END IF; +END; +$$; \ No newline at end of file diff --git a/tests/cron_stats.test.ts b/tests/cron_stat_app.test.ts similarity index 95% rename from tests/cron_stats.test.ts rename to tests/cron_stat_app.test.ts index 08a87a4380..a5e6e446e0 100644 --- a/tests/cron_stats.test.ts +++ b/tests/cron_stat_app.test.ts @@ -2,14 +2,14 @@ import { randomUUID } from 'node:crypto' import { afterAll, beforeAll, describe, expect, it } from 'vitest' import { BASE_URL, ORG_ID, getSupabaseClient, resetAndSeedAppData, resetAndSeedAppDataStats, resetAppData, resetAppDataStats } from './test-utils.ts' -const appId = `com.cron.stats.${randomUUID()}` +const appId = `com.cron.${randomUUID().slice(0, 8)}` const triggerHeaders = { 'Content-Type': 'application/json', 'apisecret': 'testsecret', } -describe('[POST] /triggers/cron_stats', () => { +describe('[POST] /triggers/cron_stat_app', () => { beforeAll(async () => { await resetAndSeedAppData(appId) await resetAndSeedAppDataStats(appId) @@ -29,7 +29,7 @@ describe('[POST] /triggers/cron_stats', () => { }) it('updates stats_updated_at with a fresh timestamp', async () => { - const response = await fetch(`${BASE_URL}/triggers/cron_stats`, { + const response = await fetch(`${BASE_URL}/triggers/cron_stat_app`, { method: 'POST', headers: triggerHeaders, body: JSON.stringify({ @@ -72,7 +72,7 @@ describe('[POST] /triggers/cron_stats', () => { .eq('customer_id', 'cus_Pa0k8TO6HVln6A') // From seed data .throwOnError() - const response = await fetch(`${BASE_URL}/triggers/cron_stats`, { + const response = await fetch(`${BASE_URL}/triggers/cron_stat_app`, { method: 'POST', headers: triggerHeaders, body: JSON.stringify({ diff --git a/tests/cron_plan_integration.test.ts b/tests/cron_stat_integration.test.ts similarity index 95% rename from tests/cron_plan_integration.test.ts rename to tests/cron_stat_integration.test.ts index 94abf5b16b..279af1d374 100644 --- a/tests/cron_plan_integration.test.ts +++ b/tests/cron_stat_integration.test.ts @@ -2,14 +2,14 @@ import { randomUUID } from 'node:crypto' import { afterAll, beforeAll, describe, expect, it } from 'vitest' import { BASE_URL, ORG_ID, USER_ID, getSupabaseClient, resetAndSeedAppData, resetAndSeedAppDataStats, resetAppData, resetAppDataStats } from './test-utils.ts' -const appId = `com.cron.plan.integration.${randomUUID()}` +const appId = `com.cron.${randomUUID().slice(0, 8)}` const triggerHeaders = { 'Content-Type': 'application/json', 'apisecret': 'testsecret', } -describe('[Integration] cron_stats -> cron_plan flow', () => { +describe('[Integration] cron_stat_app -> cron_stat_org flow', () => { beforeAll(async () => { await resetAndSeedAppData(appId) await resetAndSeedAppDataStats(appId) @@ -36,7 +36,7 @@ describe('[Integration] cron_stats -> cron_plan flow', () => { await resetAppDataStats(appId) }) - it('cron_stats triggers plan processing and updates plan_calculated_at', async () => { + it('cron_stat_app triggers plan processing and updates plan_calculated_at', async () => { const supabase = getSupabaseClient() // First, get the actual customer_id for our test org @@ -72,8 +72,8 @@ describe('[Integration] cron_stats -> cron_plan flow', () => { expect(initialStripeInfo?.plan_calculated_at).toBeNull() - // Trigger cron_stats which should queue plan processing - const statsResponse = await fetch(`${BASE_URL}/triggers/cron_stats`, { + // Trigger cron_stat_app which should queue plan processing + const statsResponse = await fetch(`${BASE_URL}/triggers/cron_stat_app`, { method: 'POST', headers: triggerHeaders, body: JSON.stringify({ @@ -100,7 +100,7 @@ describe('[Integration] cron_stats -> cron_plan flow', () => { // The plan processing would normally be triggered by the queue processor // Manually trigger cron_plan to simulate queue processing - const planResponse = await fetch(`${BASE_URL}/triggers/cron_plan`, { + const planResponse = await fetch(`${BASE_URL}/triggers/cron_stat_org`, { method: 'POST', headers: triggerHeaders, body: JSON.stringify({ @@ -154,7 +154,7 @@ describe('[Integration] cron_stats -> cron_plan flow', () => { .eq('customer_id', orgData.customer_id) .throwOnError() - // Call the queue function directly (simulating what cron_stats does) + // Call the queue function directly (simulating what cron_stat_app does) const { error } = await supabase.rpc('queue_cron_plan_for_org', { org_id: ORG_ID, customer_id: orgData.customer_id @@ -212,7 +212,7 @@ describe('[Integration] cron_stats -> cron_plan flow', () => { expect(error).toBeNull() // Now manually trigger plan processing to simulate queue processing - const planResponse = await fetch(`${BASE_URL}/triggers/cron_plan`, { + const planResponse = await fetch(`${BASE_URL}/triggers/cron_stat_org`, { method: 'POST', headers: triggerHeaders, body: JSON.stringify({ @@ -240,7 +240,7 @@ describe('[Integration] cron_stats -> cron_plan flow', () => { }) it('handles missing customer_id gracefully', async () => { - // Trigger cron_stats for an org without customer_id + // Trigger cron_stat_app for an org without customer_id const supabase = getSupabaseClient() // Create a temporary org without customer_id @@ -277,8 +277,8 @@ describe('[Integration] cron_stats -> cron_plan flow', () => { }) .throwOnError() - // Trigger cron_stats - should not error even without customer_id - const statsResponse = await fetch(`${BASE_URL}/triggers/cron_stats`, { + // Trigger cron_stat_app - should not error even without customer_id + const statsResponse = await fetch(`${BASE_URL}/triggers/cron_stat_app`, { method: 'POST', headers: triggerHeaders, body: JSON.stringify({ diff --git a/tests/cron_plan.test.ts b/tests/cron_stat_org.test.ts similarity index 95% rename from tests/cron_plan.test.ts rename to tests/cron_stat_org.test.ts index 705e4628e6..640b6f48d0 100644 --- a/tests/cron_plan.test.ts +++ b/tests/cron_stat_org.test.ts @@ -112,9 +112,9 @@ afterAll(async () => { await resetAppDataStats(APPNAME) }) -describe('[POST] /triggers/cron_plan', () => { +describe('[POST] /triggers/cron_stat_org', () => { it('should return 400 when orgId is missing', async () => { - const response = await fetch(`${BASE_URL}/triggers/cron_plan`, { + const response = await fetch(`${BASE_URL}/triggers/cron_stat_org`, { method: 'POST', headers, body: JSON.stringify({}), @@ -150,7 +150,7 @@ describe('[POST] /triggers/cron_plan', () => { if (error) throw error - const response = await fetch(`${BASE_URL}/triggers/cron_plan`, { + const response = await fetch(`${BASE_URL}/triggers/cron_stat_org`, { method: 'POST', headers, body: JSON.stringify({ orgId: ORG_ID }), @@ -218,7 +218,7 @@ describe('[POST] /triggers/cron_plan', () => { if (error) throw error - const response = await fetch(`${BASE_URL}/triggers/cron_plan`, { + const response = await fetch(`${BASE_URL}/triggers/cron_stat_org`, { method: 'POST', headers, body: JSON.stringify({ orgId: ORG_ID }), @@ -295,7 +295,7 @@ describe('[POST] /triggers/cron_plan', () => { if (error) throw error - const response = await fetch(`${BASE_URL}/triggers/cron_plan`, { + const response = await fetch(`${BASE_URL}/triggers/cron_stat_org`, { method: 'POST', headers, body: JSON.stringify({ orgId: ORG_ID }), @@ -366,7 +366,7 @@ describe('[POST] /triggers/cron_plan', () => { expect(setMauError).toBeFalsy() // Run cron plan to set exceeded status - const response = await fetch(`${BASE_URL}/triggers/cron_plan`, { + const response = await fetch(`${BASE_URL}/triggers/cron_stat_org`, { method: 'POST', headers, body: JSON.stringify({ orgId: ORG_ID }), @@ -395,7 +395,7 @@ describe('[POST] /triggers/cron_plan', () => { expect(appMetricsCacheError).toBeFalsy() // Run cron plan again - const response2 = await fetch(`${BASE_URL}/triggers/cron_plan`, { + const response2 = await fetch(`${BASE_URL}/triggers/cron_stat_org`, { method: 'POST', headers, body: JSON.stringify({ orgId: ORG_ID }), @@ -420,7 +420,7 @@ describe('[POST] /triggers/cron_plan', () => { expect(setStorageError).toBeFalsy() // Run cron plan to set exceeded status - const response = await fetch(`${BASE_URL}/triggers/cron_plan`, { + const response = await fetch(`${BASE_URL}/triggers/cron_stat_org`, { method: 'POST', headers, body: JSON.stringify({ orgId: ORG_ID }), @@ -448,7 +448,7 @@ describe('[POST] /triggers/cron_plan', () => { expect(storageCacheError).toBeFalsy() // Run cron plan again - const response2 = await fetch(`${BASE_URL}/triggers/cron_plan`, { + const response2 = await fetch(`${BASE_URL}/triggers/cron_stat_org`, { method: 'POST', headers, body: JSON.stringify({ orgId: ORG_ID }), @@ -482,7 +482,7 @@ describe('[POST] /triggers/cron_plan', () => { expect(setBandwidthError).toBeFalsy() // Run cron plan to set exceeded status - const response = await fetch(`${BASE_URL}/triggers/cron_plan`, { + const response = await fetch(`${BASE_URL}/triggers/cron_stat_org`, { method: 'POST', headers, body: JSON.stringify({ orgId: ORG_ID }), @@ -511,7 +511,7 @@ describe('[POST] /triggers/cron_plan', () => { expect(appMetricsCacheError).toBeFalsy() // Run cron plan again - const response2 = await fetch(`${BASE_URL}/triggers/cron_plan`, { + const response2 = await fetch(`${BASE_URL}/triggers/cron_stat_org`, { method: 'POST', headers, body: JSON.stringify({ orgId: ORG_ID }), @@ -564,7 +564,7 @@ describe('[POST] /triggers/cron_plan', () => { // expect(deletedApp).toBeTruthy() // // Wait for the trigger to process by calling cron_plan - // const response = await fetch(`${BASE_URL}/triggers/cron_plan`, { + // const response = await fetch(`${BASE_URL}/triggers/cron_stat_org`, { // method: 'POST', // headers, // body: JSON.stringify({ orgId: ORG_ID }), diff --git a/tests/error-cases.test.ts b/tests/error-cases.test.ts index 8c8337a422..9015f50994 100644 --- a/tests/error-cases.test.ts +++ b/tests/error-cases.test.ts @@ -194,8 +194,8 @@ describe('server Error Cases (5xx)', () => { }) describe('trigger Endpoint Error Cases', () => { - it('should return 400 for cron_stats without appId', async () => { - const response = await fetch(`${BASE_URL}/triggers/cron_stats`, { + it('should return 400 for cron_stat_app without appId', async () => { + const response = await fetch(`${BASE_URL}/triggers/cron_stat_app`, { method: 'POST', headers: { 'Content-Type': 'application/json', @@ -209,7 +209,7 @@ describe('trigger Endpoint Error Cases', () => { }) it('should return 400 for cron_plan without orgId', async () => { - const response = await fetch(`${BASE_URL}/triggers/cron_plan`, { + const response = await fetch(`${BASE_URL}/triggers/cron_stat_org`, { method: 'POST', headers: { 'Content-Type': 'application/json', diff --git a/tests/queue_cron_plan_function.test.ts b/tests/queue_cron_stat_org_function.test.ts similarity index 97% rename from tests/queue_cron_plan_function.test.ts rename to tests/queue_cron_stat_org_function.test.ts index 12a42a6b3d..6ff06253dc 100644 --- a/tests/queue_cron_plan_function.test.ts +++ b/tests/queue_cron_stat_org_function.test.ts @@ -1,7 +1,7 @@ import { afterAll, beforeAll, describe, expect, it } from 'vitest' import { ORG_ID, getSupabaseClient, getCronPlanQueueCount, getLatestCronPlanMessage, cleanupPostgresClient } from './test-utils.ts' -describe('[Function] queue_cron_plan_for_org', () => { +describe('[Function] queue_cron_stat_org_for_org', () => { let testCustomerId: string | null = null beforeAll(async () => { @@ -66,7 +66,7 @@ describe('[Function] queue_cron_plan_for_org', () => { // Verify the queue record contains correct data const latestMessage = await getLatestCronPlanMessage() expect(latestMessage).toMatchObject({ - function_name: 'cron_plan', + function_name: 'cron_stat_org', function_type: 'cloudflare', payload: { orgId: ORG_ID, @@ -155,7 +155,7 @@ describe('[Function] queue_cron_plan_for_org', () => { // Verify the queue record contains correct data const latestMessage = await getLatestCronPlanMessage() expect(latestMessage).toMatchObject({ - function_name: 'cron_plan', + function_name: 'cron_stat_org', function_type: 'cloudflare', payload: { orgId: ORG_ID, diff --git a/tests/queue_load.test.ts b/tests/queue_load.test.ts index 85b98d7a85..32fb136838 100644 --- a/tests/queue_load.test.ts +++ b/tests/queue_load.test.ts @@ -243,7 +243,7 @@ describe('queue Load Test', () => { // fetch(`${BASE_URL_TRIGGER}/queue_consumer/sync`, { // method: 'POST', // headers: headersInternal, - // body: JSON.stringify({ queue_name: 'cron_stats' }), + // body: JSON.stringify({ queue_name: 'cron_stat_app' }), // }), // ) diff --git a/tests/test-utils.ts b/tests/test-utils.ts index 46d4772b9f..4797f37e5d 100644 --- a/tests/test-utils.ts +++ b/tests/test-utils.ts @@ -372,12 +372,12 @@ export async function executeSQL(query: string, params?: any[]): Promise { } export async function getCronPlanQueueCount(): Promise { - const result = await executeSQL('SELECT COUNT(*) as count FROM pgmq.q_cron_plan') + const result = await executeSQL('SELECT COUNT(*) as count FROM pgmq.q_cron_stat_org') return parseInt(result[0]?.count || '0') } export async function getLatestCronPlanMessage(): Promise { - const result = await executeSQL('SELECT message FROM pgmq.q_cron_plan ORDER BY msg_id DESC LIMIT 1') + const result = await executeSQL('SELECT message FROM pgmq.q_cron_stat_org ORDER BY msg_id DESC LIMIT 1') return result[0]?.message } diff --git a/tests/trigger-error-cases.test.ts b/tests/trigger-error-cases.test.ts index e90598a740..8e26ea3864 100644 --- a/tests/trigger-error-cases.test.ts +++ b/tests/trigger-error-cases.test.ts @@ -18,9 +18,9 @@ afterAll(async () => { await resetAppData(APPNAME) }) -describe('[POST] /triggers/cron_stats - Error Cases', () => { +describe('[POST] /triggers/cron_stat_app - Error Cases', () => { it('should return 400 when appId is missing', async () => { - const response = await fetch(`${BASE_URL}/triggers/cron_stats`, { + const response = await fetch(`${BASE_URL}/triggers/cron_stat_app`, { method: 'POST', headers: triggerHeaders, body: JSON.stringify({}), @@ -31,7 +31,7 @@ describe('[POST] /triggers/cron_stats - Error Cases', () => { }) it('should return 400 when org is missing', async () => { - const response = await fetch(`${BASE_URL}/triggers/cron_stats`, { + const response = await fetch(`${BASE_URL}/triggers/cron_stat_app`, { method: 'POST', headers: triggerHeaders, body: JSON.stringify({ @@ -44,7 +44,7 @@ describe('[POST] /triggers/cron_stats - Error Cases', () => { }) it('should return 400 when appId is not provided', async () => { - const response = await fetch(`${BASE_URL}/triggers/cron_stats`, { + const response = await fetch(`${BASE_URL}/triggers/cron_stat_app`, { method: 'POST', headers: triggerHeaders, body: JSON.stringify({ @@ -57,9 +57,9 @@ describe('[POST] /triggers/cron_stats - Error Cases', () => { }) }) -describe('[POST] /triggers/cron_plan - Error Cases', () => { +describe('[POST] /triggers/cron_stat_org - Error Cases', () => { it('should return 400 when orgId is missing', async () => { - const response = await fetch(`${BASE_URL}/triggers/cron_plan`, { + const response = await fetch(`${BASE_URL}/triggers/cron_stat_org`, { method: 'POST', headers: triggerHeaders, body: JSON.stringify({}), From a44f243b820422f723422707ab1b5a4becc4f57c Mon Sep 17 00:00:00 2001 From: "autofix-ci[bot]" <114827586+autofix-ci[bot]@users.noreply.github.com> Date: Tue, 14 Oct 2025 11:34:35 +0000 Subject: [PATCH 2/8] [autofix.ci] apply automated fixes --- .../_backend/triggers/cron_stat_app.ts | 194 +++++++++--------- .../_backend/triggers/cron_stat_org.ts | 36 ++-- supabase/functions/triggers/index.ts | 2 +- 3 files changed, 116 insertions(+), 116 deletions(-) diff --git a/supabase/functions/_backend/triggers/cron_stat_app.ts b/supabase/functions/_backend/triggers/cron_stat_app.ts index 2d7ccbc4ae..7883fdc059 100644 --- a/supabase/functions/_backend/triggers/cron_stat_app.ts +++ b/supabase/functions/_backend/triggers/cron_stat_app.ts @@ -6,9 +6,9 @@ import { readStatsBandwidth, readStatsMau, readStatsStorage, readStatsVersion } import { supabaseAdmin } from '../utils/supabase.ts' interface DataToGet { - appId?: string - orgId?: string - todayOnly?: boolean + appId?: string + orgId?: string + todayOnly?: boolean } export const app = new Hono() @@ -16,98 +16,98 @@ export const app = new Hono() app.use('/', useCors) app.post('/', middlewareAPISecret, async (c) => { - const body = await parseBody(c) - cloudlog({ requestId: c.get('requestId'), message: 'post cron_stat_app body', body }) - if (!body.appId) - throw simpleError('no_appId', 'No appId', { body }) - if (!body.orgId) - throw simpleError('no_orgId', 'No orgId', { body }) - - const supabase = supabaseAdmin(c) - - const app = await supabase.from('apps') - .select('*') - .eq('app_id', body.appId) - .single() - if (!app.data) - throw quickError(404, 'app_not_found', 'App not found', { body }) - if (app.data.owner_org !== body.orgId) - throw quickError(401, 'app_not_found', 'This app is not owned by the organization', { body }) - - // get the period of the billing of the organization - const cycleInfoData = await supabase.rpc('get_cycle_info_org', { orgid: body.orgId }).single() - const cycleInfo = cycleInfoData.data - if (!cycleInfo?.subscription_anchor_start || !cycleInfo?.subscription_anchor_end) - throw simpleError('cannot_get_cycle_info', 'Cannot get cycle info', { cycleInfoData }) - - cloudlog({ requestId: c.get('requestId'), message: 'cycleInfo', cycleInfo }) - const startDate = cycleInfo.subscription_anchor_start - const endDate = cycleInfo.subscription_anchor_end - - // get mau - let mau = await readStatsMau(c, body.appId, startDate, endDate) - // get bandwidth - let bandwidth = await readStatsBandwidth(c, body.appId, startDate, endDate) - // get storage - let storage = await readStatsStorage(c, body.appId, startDate, endDate) - let versionUsage = await readStatsVersion(c, body.appId, startDate, endDate) - - if (body.todayOnly) { - // take only the last day - mau = mau.slice(-1) - bandwidth = bandwidth.slice(-1) - storage = storage.slice(-1) - versionUsage = versionUsage.slice(-1) - } - - cloudlog({ requestId: c.get('requestId'), message: 'mau', mauLength: mau.length, mauCount: mau.reduce((acc, curr) => acc + curr.mau, 0), mau: JSON.stringify(mau) }) - cloudlog({ requestId: c.get('requestId'), message: 'bandwidth', bandwidthLength: bandwidth.length, bandwidthCount: bandwidth.reduce((acc, curr) => acc + curr.bandwidth, 0), bandwidth: JSON.stringify(bandwidth) }) - cloudlog({ requestId: c.get('requestId'), message: 'storage', storageLength: storage.length, storageCount: storage.reduce((acc, curr) => acc + curr.storage, 0), storage: JSON.stringify(storage) }) - cloudlog({ requestId: c.get('requestId'), message: 'versionUsage', versionUsageLength: versionUsage.length, versionUsageCount: versionUsage.reduce((acc, curr) => acc + curr.get + curr.fail + curr.install + curr.uninstall, 0), versionUsage: JSON.stringify(versionUsage) }) - - // save to daily_mau, daily_bandwidth and daily_storage - await Promise.all([ - supabase.from('daily_mau') - .upsert(mau, { onConflict: 'app_id,date' }) - .eq('app_id', body.appId) - .throwOnError(), - supabase.from('daily_bandwidth') - .upsert(bandwidth, { onConflict: 'app_id,date' }) - .eq('app_id', body.appId) - .throwOnError(), - supabase.from('daily_storage') - .upsert(storage, { onConflict: 'app_id,date' }) - .eq('app_id', body.appId) - .throwOnError(), - supabase.from('daily_version') - .upsert(versionUsage, { onConflict: 'app_id,date,version_id' }) - .eq('app_id', body.appId) - .throwOnError(), - ]) - - cloudlog({ requestId: c.get('requestId'), message: 'stats saved', mauLength: mau.length, bandwidthLength: bandwidth.length, storageLength: storage.length, versionUsageLength: versionUsage.length }) - - await supabase.from('orgs') - .update({ stats_updated_at: new Date().toISOString() }) - .eq('id', body.orgId) - .throwOnError() - - // Get customer_id for the organization to queue plan processing - const { data: orgData, error: orgError } = await supabase - .from('orgs') - .select('customer_id') - .eq('id', body.orgId) - .single() - - if (!orgError && orgData?.customer_id) { - // Queue plan processing for this organization - await supabase.rpc('queue_cron_plan_for_org', { - org_id: body.orgId, - customer_id: orgData.customer_id, - }).throwOnError() - - cloudlog({ requestId: c.get('requestId'), message: 'plan processing queued for org', orgId: body.orgId, customerId: orgData.customer_id }) - } - - return c.json({ status: 'Stats saved', mau, bandwidth, storage, versionUsage }) + const body = await parseBody(c) + cloudlog({ requestId: c.get('requestId'), message: 'post cron_stat_app body', body }) + if (!body.appId) + throw simpleError('no_appId', 'No appId', { body }) + if (!body.orgId) + throw simpleError('no_orgId', 'No orgId', { body }) + + const supabase = supabaseAdmin(c) + + const app = await supabase.from('apps') + .select('*') + .eq('app_id', body.appId) + .single() + if (!app.data) + throw quickError(404, 'app_not_found', 'App not found', { body }) + if (app.data.owner_org !== body.orgId) + throw quickError(401, 'app_not_found', 'This app is not owned by the organization', { body }) + + // get the period of the billing of the organization + const cycleInfoData = await supabase.rpc('get_cycle_info_org', { orgid: body.orgId }).single() + const cycleInfo = cycleInfoData.data + if (!cycleInfo?.subscription_anchor_start || !cycleInfo?.subscription_anchor_end) + throw simpleError('cannot_get_cycle_info', 'Cannot get cycle info', { cycleInfoData }) + + cloudlog({ requestId: c.get('requestId'), message: 'cycleInfo', cycleInfo }) + const startDate = cycleInfo.subscription_anchor_start + const endDate = cycleInfo.subscription_anchor_end + + // get mau + let mau = await readStatsMau(c, body.appId, startDate, endDate) + // get bandwidth + let bandwidth = await readStatsBandwidth(c, body.appId, startDate, endDate) + // get storage + let storage = await readStatsStorage(c, body.appId, startDate, endDate) + let versionUsage = await readStatsVersion(c, body.appId, startDate, endDate) + + if (body.todayOnly) { + // take only the last day + mau = mau.slice(-1) + bandwidth = bandwidth.slice(-1) + storage = storage.slice(-1) + versionUsage = versionUsage.slice(-1) + } + + cloudlog({ requestId: c.get('requestId'), message: 'mau', mauLength: mau.length, mauCount: mau.reduce((acc, curr) => acc + curr.mau, 0), mau: JSON.stringify(mau) }) + cloudlog({ requestId: c.get('requestId'), message: 'bandwidth', bandwidthLength: bandwidth.length, bandwidthCount: bandwidth.reduce((acc, curr) => acc + curr.bandwidth, 0), bandwidth: JSON.stringify(bandwidth) }) + cloudlog({ requestId: c.get('requestId'), message: 'storage', storageLength: storage.length, storageCount: storage.reduce((acc, curr) => acc + curr.storage, 0), storage: JSON.stringify(storage) }) + cloudlog({ requestId: c.get('requestId'), message: 'versionUsage', versionUsageLength: versionUsage.length, versionUsageCount: versionUsage.reduce((acc, curr) => acc + curr.get + curr.fail + curr.install + curr.uninstall, 0), versionUsage: JSON.stringify(versionUsage) }) + + // save to daily_mau, daily_bandwidth and daily_storage + await Promise.all([ + supabase.from('daily_mau') + .upsert(mau, { onConflict: 'app_id,date' }) + .eq('app_id', body.appId) + .throwOnError(), + supabase.from('daily_bandwidth') + .upsert(bandwidth, { onConflict: 'app_id,date' }) + .eq('app_id', body.appId) + .throwOnError(), + supabase.from('daily_storage') + .upsert(storage, { onConflict: 'app_id,date' }) + .eq('app_id', body.appId) + .throwOnError(), + supabase.from('daily_version') + .upsert(versionUsage, { onConflict: 'app_id,date,version_id' }) + .eq('app_id', body.appId) + .throwOnError(), + ]) + + cloudlog({ requestId: c.get('requestId'), message: 'stats saved', mauLength: mau.length, bandwidthLength: bandwidth.length, storageLength: storage.length, versionUsageLength: versionUsage.length }) + + await supabase.from('orgs') + .update({ stats_updated_at: new Date().toISOString() }) + .eq('id', body.orgId) + .throwOnError() + + // Get customer_id for the organization to queue plan processing + const { data: orgData, error: orgError } = await supabase + .from('orgs') + .select('customer_id') + .eq('id', body.orgId) + .single() + + if (!orgError && orgData?.customer_id) { + // Queue plan processing for this organization + await supabase.rpc('queue_cron_plan_for_org', { + org_id: body.orgId, + customer_id: orgData.customer_id, + }).throwOnError() + + cloudlog({ requestId: c.get('requestId'), message: 'plan processing queued for org', orgId: body.orgId, customerId: orgData.customer_id }) + } + + return c.json({ status: 'Stats saved', mau, bandwidth, storage, versionUsage }) }) diff --git a/supabase/functions/_backend/triggers/cron_stat_org.ts b/supabase/functions/_backend/triggers/cron_stat_org.ts index 5d8eb5fd1b..a221cff1aa 100644 --- a/supabase/functions/_backend/triggers/cron_stat_org.ts +++ b/supabase/functions/_backend/triggers/cron_stat_org.ts @@ -6,31 +6,31 @@ import { checkPlanOrg } from '../utils/plans.ts' import { supabaseAdmin } from '../utils/supabase.ts' interface OrgToGet { - orgId?: string - customerId?: string + orgId?: string + customerId?: string } export const app = new Hono() app.post('/', middlewareAPISecret, async (c) => { - const body = await parseBody(c) - cloudlog({ requestId: c.get('requestId'), message: 'post cron_stat_org body', body }) - if (!body.orgId) - throw simpleError('no_orgId', 'No orgId', { body }) + const body = await parseBody(c) + cloudlog({ requestId: c.get('requestId'), message: 'post cron_stat_org body', body }) + if (!body.orgId) + throw simpleError('no_orgId', 'No orgId', { body }) - await checkPlanOrg(c, body.orgId) + await checkPlanOrg(c, body.orgId) - // Update plan_calculated_at timestamp if we have customerId - if (body.customerId) { - const supabase = supabaseAdmin(c) - await supabase - .from('stripe_info') - .update({ plan_calculated_at: new Date().toISOString() }) - .eq('customer_id', body.customerId) - .throwOnError() + // Update plan_calculated_at timestamp if we have customerId + if (body.customerId) { + const supabase = supabaseAdmin(c) + await supabase + .from('stripe_info') + .update({ plan_calculated_at: new Date().toISOString() }) + .eq('customer_id', body.customerId) + .throwOnError() - cloudlog({ requestId: c.get('requestId'), message: 'plan calculated timestamp updated', customerId: body.customerId }) - } + cloudlog({ requestId: c.get('requestId'), message: 'plan calculated timestamp updated', customerId: body.customerId }) + } - return c.json(BRES) + return c.json(BRES) }) diff --git a/supabase/functions/triggers/index.ts b/supabase/functions/triggers/index.ts index b24e6add02..97cd5fd91a 100644 --- a/supabase/functions/triggers/index.ts +++ b/supabase/functions/triggers/index.ts @@ -2,8 +2,8 @@ import { app as clear_app_cache } from '../_backend/triggers/clear_app_cache.ts' import { app as clear_device_cache } from '../_backend/triggers/clear_device_cache.ts' import { app as cron_clear_versions } from '../_backend/triggers/cron_clear_versions.ts' import { app as cron_email } from '../_backend/triggers/cron_email.ts' -import { app as cron_stat_org } from '../_backend/triggers/cron_stat_org.ts' import { app as cron_stat_app } from '../_backend/triggers/cron_stat_app.ts' +import { app as cron_stat_org } from '../_backend/triggers/cron_stat_org.ts' import { app as logsnag_insights } from '../_backend/triggers/logsnag_insights.ts' import { app as on_app_create } from '../_backend/triggers/on_app_create.ts' import { app as on_app_delete } from '../_backend/triggers/on_app_delete.ts' From cc76290db4a9a197abb1f8307f22827d1363d82d Mon Sep 17 00:00:00 2001 From: WcaleNieWolny Date: Tue, 14 Oct 2025 14:59:18 +0200 Subject: [PATCH 3/8] refactor: rename queue_cron_plan_for_org to queue_cron_stat_org_for_org - Rename function from queue_cron_plan_for_org to queue_cron_stat_org_for_org to align with cron_stat_org queue naming - Update original migration (20251007134349) and rename migration (20251014105957) - Update all test files to use new function name - Update cron_stat_app.ts to call renamed function - Update Supabase types to reflect new function signature - Function still sends to 'cron_stat_org' queue with same payload structure - All tests passing (434/434 application tests, 526/526 SQL tests) --- .../_backend/triggers/cron_stat_app.ts | 2 +- .../_backend/utils/supabase.types.ts | 334 +++++++++--------- ...007134349_cron_plan_from_stats_backend.sql | 14 +- .../20251014105957_rename_plan_cron.sql | 6 +- tests/cron_stat_app.test.ts | 2 +- tests/cron_stat_integration.test.ts | 4 +- tests/queue_cron_stat_org_function.test.ts | 10 +- 7 files changed, 186 insertions(+), 186 deletions(-) diff --git a/supabase/functions/_backend/triggers/cron_stat_app.ts b/supabase/functions/_backend/triggers/cron_stat_app.ts index 2d7ccbc4ae..d3e019d7ef 100644 --- a/supabase/functions/_backend/triggers/cron_stat_app.ts +++ b/supabase/functions/_backend/triggers/cron_stat_app.ts @@ -101,7 +101,7 @@ app.post('/', middlewareAPISecret, async (c) => { if (!orgError && orgData?.customer_id) { // Queue plan processing for this organization - await supabase.rpc('queue_cron_plan_for_org', { + await supabase.rpc('queue_cron_stat_org_for_org', { org_id: body.orgId, customer_id: orgData.customer_id, }).throwOnError() diff --git a/supabase/functions/_backend/utils/supabase.types.ts b/supabase/functions/_backend/utils/supabase.types.ts index 5cea3f2c0a..f289edbc3f 100644 --- a/supabase/functions/_backend/utils/supabase.types.ts +++ b/supabase/functions/_backend/utils/supabase.types.ts @@ -124,8 +124,8 @@ export type Database = { id: number link: string | null manifest: - | Database["public"]["CompositeTypes"]["manifest_entry"][] - | null + | Database["public"]["CompositeTypes"]["manifest_entry"][] + | null min_update_version: string | null name: string native_packages: Json[] | null @@ -146,8 +146,8 @@ export type Database = { id?: number link?: string | null manifest?: - | Database["public"]["CompositeTypes"]["manifest_entry"][] - | null + | Database["public"]["CompositeTypes"]["manifest_entry"][] + | null min_update_version?: string | null name: string native_packages?: Json[] | null @@ -168,8 +168,8 @@ export type Database = { id?: number link?: string | null manifest?: - | Database["public"]["CompositeTypes"]["manifest_entry"][] - | null + | Database["public"]["CompositeTypes"]["manifest_entry"][] + | null min_update_version?: string | null name?: string native_packages?: Json[] | null @@ -1413,19 +1413,19 @@ export type Database = { } check_min_rights: { Args: - | { - app_id: string - channel_id: number - min_right: Database["public"]["Enums"]["user_min_right"] - org_id: string - } - | { - app_id: string - channel_id: number - min_right: Database["public"]["Enums"]["user_min_right"] - org_id: string - user_id: string - } + | { + app_id: string + channel_id: number + min_right: Database["public"]["Enums"]["user_min_right"] + org_id: string + } + | { + app_id: string + channel_id: number + min_right: Database["public"]["Enums"]["user_min_right"] + org_id: string + user_id: string + } Returns: boolean } check_revert_to_builtin_version: { @@ -1504,8 +1504,8 @@ export type Database = { } exist_app_versions: { Args: - | { apikey: string; appid: string; name_version: string } - | { appid: string; name_version: string } + | { apikey: string; appid: string; name_version: string } + | { appid: string; name_version: string } Returns: boolean } find_best_plan_v3: { @@ -1532,8 +1532,8 @@ export type Database = { } get_app_metrics: { Args: - | { org_id: string } - | { p_end_date: string; p_org_id: string; p_start_date: string } + | { org_id: string } + | { p_end_date: string; p_org_id: string; p_start_date: string } Returns: { app_id: string bandwidth: number @@ -1587,8 +1587,8 @@ export type Database = { } get_global_metrics: { Args: - | { end_date: string; org_id: string; start_date: string } - | { org_id: string } + | { end_date: string; org_id: string; start_date: string } + | { org_id: string } Returns: { bandwidth: number date: string @@ -1602,8 +1602,8 @@ export type Database = { } get_identity: { Args: - | Record - | { keymode: Database["public"]["Enums"]["key_mode"][] } + | Record + | { keymode: Database["public"]["Enums"]["key_mode"][] } Returns: string } get_identity_apikey_only: { @@ -1695,8 +1695,8 @@ export type Database = { } get_plan_usage_percent_detailed: { Args: - | { cycle_end: string; cycle_start: string; orgid: string } - | { orgid: string } + | { cycle_end: string; cycle_start: string; orgid: string } + | { orgid: string } Returns: { bandwidth_percent: number mau_percent: number @@ -1717,8 +1717,8 @@ export type Database = { } get_total_metrics: { Args: - | { end_date: string; org_id: string; start_date: string } - | { org_id: string } + | { end_date: string; org_id: string; start_date: string } + | { org_id: string } Returns: { bandwidth: number fail: number @@ -1768,8 +1768,8 @@ export type Database = { id: number link: string | null manifest: - | Database["public"]["CompositeTypes"]["manifest_entry"][] - | null + | Database["public"]["CompositeTypes"]["manifest_entry"][] + | null min_update_version: string | null name: string native_packages: Json[] | null @@ -1846,22 +1846,22 @@ export type Database = { } is_allowed_capgkey: { Args: - | { - apikey: string - app_id: string - keymode: Database["public"]["Enums"]["key_mode"][] - } - | { - apikey: string - keymode: Database["public"]["Enums"]["key_mode"][] - } + | { + apikey: string + app_id: string + keymode: Database["public"]["Enums"]["key_mode"][] + } + | { + apikey: string + keymode: Database["public"]["Enums"]["key_mode"][] + } Returns: boolean } is_app_owner: { Args: - | { apikey: string; appid: string } - | { appid: string } - | { appid: string; userid: string } + | { apikey: string; appid: string } + | { appid: string } + | { appid: string; userid: string } Returns: boolean } is_bandwidth_exceeded_by_org: { @@ -1993,7 +1993,7 @@ export type Database = { Args: Record Returns: undefined } - queue_cron_plan_for_org: { + queue_cron_stat_org_for_org: { Args: { customer_id: string; org_id: string } Returns: undefined } @@ -2123,74 +2123,74 @@ export type Database = { key_mode: "read" | "write" | "all" | "upload" platform_os: "ios" | "android" stats_action: - | "delete" - | "reset" - | "set" - | "get" - | "set_fail" - | "update_fail" - | "download_fail" - | "windows_path_fail" - | "canonical_path_fail" - | "directory_path_fail" - | "unzip_fail" - | "low_mem_fail" - | "download_10" - | "download_20" - | "download_30" - | "download_40" - | "download_50" - | "download_60" - | "download_70" - | "download_80" - | "download_90" - | "download_complete" - | "decrypt_fail" - | "app_moved_to_foreground" - | "app_moved_to_background" - | "uninstall" - | "needPlanUpgrade" - | "missingBundle" - | "noNew" - | "disablePlatformIos" - | "disablePlatformAndroid" - | "disableAutoUpdateToMajor" - | "cannotUpdateViaPrivateChannel" - | "disableAutoUpdateToMinor" - | "disableAutoUpdateToPatch" - | "channelMisconfigured" - | "disableAutoUpdateMetadata" - | "disableAutoUpdateUnderNative" - | "disableDevBuild" - | "disableEmulator" - | "cannotGetBundle" - | "checksum_fail" - | "NoChannelOrOverride" - | "setChannel" - | "getChannel" - | "rateLimited" - | "disableAutoUpdate" - | "InvalidIp" - | "ping" + | "delete" + | "reset" + | "set" + | "get" + | "set_fail" + | "update_fail" + | "download_fail" + | "windows_path_fail" + | "canonical_path_fail" + | "directory_path_fail" + | "unzip_fail" + | "low_mem_fail" + | "download_10" + | "download_20" + | "download_30" + | "download_40" + | "download_50" + | "download_60" + | "download_70" + | "download_80" + | "download_90" + | "download_complete" + | "decrypt_fail" + | "app_moved_to_foreground" + | "app_moved_to_background" + | "uninstall" + | "needPlanUpgrade" + | "missingBundle" + | "noNew" + | "disablePlatformIos" + | "disablePlatformAndroid" + | "disableAutoUpdateToMajor" + | "cannotUpdateViaPrivateChannel" + | "disableAutoUpdateToMinor" + | "disableAutoUpdateToPatch" + | "channelMisconfigured" + | "disableAutoUpdateMetadata" + | "disableAutoUpdateUnderNative" + | "disableDevBuild" + | "disableEmulator" + | "cannotGetBundle" + | "checksum_fail" + | "NoChannelOrOverride" + | "setChannel" + | "getChannel" + | "rateLimited" + | "disableAutoUpdate" + | "InvalidIp" + | "ping" stripe_status: - | "created" - | "succeeded" - | "updated" - | "failed" - | "deleted" - | "canceled" + | "created" + | "succeeded" + | "updated" + | "failed" + | "deleted" + | "canceled" usage_mode: "last_saved" | "5min" | "day" | "cycle" user_min_right: - | "invite_read" - | "invite_upload" - | "invite_write" - | "invite_admin" - | "invite_super_admin" - | "read" - | "upload" - | "write" - | "admin" - | "super_admin" + | "invite_read" + | "invite_upload" + | "invite_write" + | "invite_admin" + | "invite_super_admin" + | "read" + | "upload" + | "write" + | "admin" + | "super_admin" user_role: "read" | "upload" | "write" | "admin" version_action: "get" | "fail" | "install" | "uninstall" } @@ -2235,116 +2235,116 @@ type DefaultSchema = DatabaseWithoutInternals[Extract] export type Tables< DefaultSchemaTableNameOrOptions extends - | keyof (DefaultSchema["Tables"] & DefaultSchema["Views"]) - | { schema: keyof DatabaseWithoutInternals }, + | keyof (DefaultSchema["Tables"] & DefaultSchema["Views"]) + | { schema: keyof DatabaseWithoutInternals }, TableName extends DefaultSchemaTableNameOrOptions extends { schema: keyof DatabaseWithoutInternals } - ? keyof (DatabaseWithoutInternals[DefaultSchemaTableNameOrOptions["schema"]]["Tables"] & - DatabaseWithoutInternals[DefaultSchemaTableNameOrOptions["schema"]]["Views"]) - : never = never, + ? keyof (DatabaseWithoutInternals[DefaultSchemaTableNameOrOptions["schema"]]["Tables"] & + DatabaseWithoutInternals[DefaultSchemaTableNameOrOptions["schema"]]["Views"]) + : never = never, > = DefaultSchemaTableNameOrOptions extends { schema: keyof DatabaseWithoutInternals } ? (DatabaseWithoutInternals[DefaultSchemaTableNameOrOptions["schema"]]["Tables"] & - DatabaseWithoutInternals[DefaultSchemaTableNameOrOptions["schema"]]["Views"])[TableName] extends { + DatabaseWithoutInternals[DefaultSchemaTableNameOrOptions["schema"]]["Views"])[TableName] extends { Row: infer R } - ? R - : never + ? R + : never : DefaultSchemaTableNameOrOptions extends keyof (DefaultSchema["Tables"] & - DefaultSchema["Views"]) - ? (DefaultSchema["Tables"] & - DefaultSchema["Views"])[DefaultSchemaTableNameOrOptions] extends { - Row: infer R - } - ? R - : never - : never + DefaultSchema["Views"]) + ? (DefaultSchema["Tables"] & + DefaultSchema["Views"])[DefaultSchemaTableNameOrOptions] extends { + Row: infer R + } + ? R + : never + : never export type TablesInsert< DefaultSchemaTableNameOrOptions extends - | keyof DefaultSchema["Tables"] - | { schema: keyof DatabaseWithoutInternals }, + | keyof DefaultSchema["Tables"] + | { schema: keyof DatabaseWithoutInternals }, TableName extends DefaultSchemaTableNameOrOptions extends { schema: keyof DatabaseWithoutInternals } - ? keyof DatabaseWithoutInternals[DefaultSchemaTableNameOrOptions["schema"]]["Tables"] - : never = never, + ? keyof DatabaseWithoutInternals[DefaultSchemaTableNameOrOptions["schema"]]["Tables"] + : never = never, > = DefaultSchemaTableNameOrOptions extends { schema: keyof DatabaseWithoutInternals } ? DatabaseWithoutInternals[DefaultSchemaTableNameOrOptions["schema"]]["Tables"][TableName] extends { - Insert: infer I - } - ? I - : never + Insert: infer I + } + ? I + : never : DefaultSchemaTableNameOrOptions extends keyof DefaultSchema["Tables"] - ? DefaultSchema["Tables"][DefaultSchemaTableNameOrOptions] extends { - Insert: infer I - } - ? I - : never - : never + ? DefaultSchema["Tables"][DefaultSchemaTableNameOrOptions] extends { + Insert: infer I + } + ? I + : never + : never export type TablesUpdate< DefaultSchemaTableNameOrOptions extends - | keyof DefaultSchema["Tables"] - | { schema: keyof DatabaseWithoutInternals }, + | keyof DefaultSchema["Tables"] + | { schema: keyof DatabaseWithoutInternals }, TableName extends DefaultSchemaTableNameOrOptions extends { schema: keyof DatabaseWithoutInternals } - ? keyof DatabaseWithoutInternals[DefaultSchemaTableNameOrOptions["schema"]]["Tables"] - : never = never, + ? keyof DatabaseWithoutInternals[DefaultSchemaTableNameOrOptions["schema"]]["Tables"] + : never = never, > = DefaultSchemaTableNameOrOptions extends { schema: keyof DatabaseWithoutInternals } ? DatabaseWithoutInternals[DefaultSchemaTableNameOrOptions["schema"]]["Tables"][TableName] extends { - Update: infer U - } - ? U - : never + Update: infer U + } + ? U + : never : DefaultSchemaTableNameOrOptions extends keyof DefaultSchema["Tables"] - ? DefaultSchema["Tables"][DefaultSchemaTableNameOrOptions] extends { - Update: infer U - } - ? U - : never - : never + ? DefaultSchema["Tables"][DefaultSchemaTableNameOrOptions] extends { + Update: infer U + } + ? U + : never + : never export type Enums< DefaultSchemaEnumNameOrOptions extends - | keyof DefaultSchema["Enums"] - | { schema: keyof DatabaseWithoutInternals }, + | keyof DefaultSchema["Enums"] + | { schema: keyof DatabaseWithoutInternals }, EnumName extends DefaultSchemaEnumNameOrOptions extends { schema: keyof DatabaseWithoutInternals } - ? keyof DatabaseWithoutInternals[DefaultSchemaEnumNameOrOptions["schema"]]["Enums"] - : never = never, + ? keyof DatabaseWithoutInternals[DefaultSchemaEnumNameOrOptions["schema"]]["Enums"] + : never = never, > = DefaultSchemaEnumNameOrOptions extends { schema: keyof DatabaseWithoutInternals } ? DatabaseWithoutInternals[DefaultSchemaEnumNameOrOptions["schema"]]["Enums"][EnumName] : DefaultSchemaEnumNameOrOptions extends keyof DefaultSchema["Enums"] - ? DefaultSchema["Enums"][DefaultSchemaEnumNameOrOptions] - : never + ? DefaultSchema["Enums"][DefaultSchemaEnumNameOrOptions] + : never export type CompositeTypes< PublicCompositeTypeNameOrOptions extends - | keyof DefaultSchema["CompositeTypes"] - | { schema: keyof DatabaseWithoutInternals }, + | keyof DefaultSchema["CompositeTypes"] + | { schema: keyof DatabaseWithoutInternals }, CompositeTypeName extends PublicCompositeTypeNameOrOptions extends { schema: keyof DatabaseWithoutInternals } - ? keyof DatabaseWithoutInternals[PublicCompositeTypeNameOrOptions["schema"]]["CompositeTypes"] - : never = never, + ? keyof DatabaseWithoutInternals[PublicCompositeTypeNameOrOptions["schema"]]["CompositeTypes"] + : never = never, > = PublicCompositeTypeNameOrOptions extends { schema: keyof DatabaseWithoutInternals } ? DatabaseWithoutInternals[PublicCompositeTypeNameOrOptions["schema"]]["CompositeTypes"][CompositeTypeName] : PublicCompositeTypeNameOrOptions extends keyof DefaultSchema["CompositeTypes"] - ? DefaultSchema["CompositeTypes"][PublicCompositeTypeNameOrOptions] - : never + ? DefaultSchema["CompositeTypes"][PublicCompositeTypeNameOrOptions] + : never export const Constants = { graphql_public: { diff --git a/supabase/migrations/20251007134349_cron_plan_from_stats_backend.sql b/supabase/migrations/20251007134349_cron_plan_from_stats_backend.sql index 8562e51c5c..e24786b237 100644 --- a/supabase/migrations/20251007134349_cron_plan_from_stats_backend.sql +++ b/supabase/migrations/20251007134349_cron_plan_from_stats_backend.sql @@ -15,7 +15,7 @@ SELECT cron.schedule( ALTER TABLE public.stripe_info ADD COLUMN IF NOT EXISTS plan_calculated_at timestamp with time zone; -- Update the queue function to check if plan was calculated in the last hour -CREATE OR REPLACE FUNCTION public.queue_cron_plan_for_org(org_id uuid, customer_id text) +CREATE OR REPLACE FUNCTION public.queue_cron_stat_org_for_org(org_id uuid, customer_id text) RETURNS void LANGUAGE plpgsql SECURITY DEFINER @@ -27,7 +27,7 @@ BEGIN -- Check when plan was last calculated for this customer SELECT plan_calculated_at INTO last_calculated FROM public.stripe_info - WHERE stripe_info.customer_id = queue_cron_plan_for_org.customer_id; + WHERE stripe_info.customer_id = queue_cron_stat_org_for_org.customer_id; -- Only queue if plan wasn't calculated in the last hour IF last_calculated IS NULL OR last_calculated < NOW() - INTERVAL '1 hour' THEN @@ -46,10 +46,10 @@ END; $$; -ALTER FUNCTION public.queue_cron_plan_for_org(uuid, text) OWNER TO postgres; +ALTER FUNCTION public.queue_cron_stat_org_for_org(uuid, text) OWNER TO postgres; -- Revoke all permissions first, then grant only to service_role -REVOKE ALL ON FUNCTION public.queue_cron_plan_for_org(uuid, text) FROM PUBLIC; -REVOKE ALL ON FUNCTION public.queue_cron_plan_for_org(uuid, text) FROM anon; -REVOKE ALL ON FUNCTION public.queue_cron_plan_for_org(uuid, text) FROM authenticated; -GRANT ALL ON FUNCTION public.queue_cron_plan_for_org(uuid, text) TO service_role; \ No newline at end of file +REVOKE ALL ON FUNCTION public.queue_cron_stat_org_for_org(uuid, text) FROM PUBLIC; +REVOKE ALL ON FUNCTION public.queue_cron_stat_org_for_org(uuid, text) FROM anon; +REVOKE ALL ON FUNCTION public.queue_cron_stat_org_for_org(uuid, text) FROM authenticated; +GRANT ALL ON FUNCTION public.queue_cron_stat_org_for_org(uuid, text) TO service_role; \ No newline at end of file diff --git a/supabase/migrations/20251014105957_rename_plan_cron.sql b/supabase/migrations/20251014105957_rename_plan_cron.sql index a2bbd79d7b..101deb5cd2 100644 --- a/supabase/migrations/20251014105957_rename_plan_cron.sql +++ b/supabase/migrations/20251014105957_rename_plan_cron.sql @@ -30,8 +30,8 @@ SELECT cron.schedule( 'SELECT public.process_function_queue(''cron_stat_org'')' ); --- Update the queue_cron_plan_for_org function to use the new queue name -CREATE OR REPLACE FUNCTION public.queue_cron_plan_for_org(org_id uuid, customer_id text) +-- Update the queue_cron_stat_org_for_org function to use the new queue name +CREATE OR REPLACE FUNCTION public.queue_cron_stat_org_for_org(org_id uuid, customer_id text) RETURNS void LANGUAGE plpgsql SECURITY DEFINER @@ -42,7 +42,7 @@ BEGIN -- Check when plan was last calculated for this customer SELECT plan_calculated_at INTO last_calculated FROM public.stripe_info - WHERE stripe_info.customer_id = queue_cron_plan_for_org.customer_id; + WHERE stripe_info.customer_id = queue_cron_stat_org_for_org.customer_id; -- Only queue if plan wasn't calculated in the last hour IF last_calculated IS NULL OR last_calculated < NOW() - INTERVAL '1 hour' THEN diff --git a/tests/cron_stat_app.test.ts b/tests/cron_stat_app.test.ts index a5e6e446e0..3d71670c33 100644 --- a/tests/cron_stat_app.test.ts +++ b/tests/cron_stat_app.test.ts @@ -87,7 +87,7 @@ describe('[POST] /triggers/cron_stat_app', () => { // Verify that the queue function can be called (indicates plan processing was queued) // We can't easily check queue contents, but we can verify the function works - const { error: queueError } = await supabase.rpc('queue_cron_plan_for_org', { + const { error: queueError } = await supabase.rpc('queue_cron_stat_org_for_org', { org_id: ORG_ID, customer_id: 'cus_Pa0k8TO6HVln6A' }) diff --git a/tests/cron_stat_integration.test.ts b/tests/cron_stat_integration.test.ts index 279af1d374..6d1939d147 100644 --- a/tests/cron_stat_integration.test.ts +++ b/tests/cron_stat_integration.test.ts @@ -155,7 +155,7 @@ describe('[Integration] cron_stat_app -> cron_stat_org flow', () => { .throwOnError() // Call the queue function directly (simulating what cron_stat_app does) - const { error } = await supabase.rpc('queue_cron_plan_for_org', { + const { error } = await supabase.rpc('queue_cron_stat_org_for_org', { org_id: ORG_ID, customer_id: orgData.customer_id }) @@ -204,7 +204,7 @@ describe('[Integration] cron_stat_app -> cron_stat_org flow', () => { .throwOnError() // Call the queue function directly - const { error } = await supabase.rpc('queue_cron_plan_for_org', { + const { error } = await supabase.rpc('queue_cron_stat_org_for_org', { org_id: ORG_ID, customer_id: orgData.customer_id }) diff --git a/tests/queue_cron_stat_org_function.test.ts b/tests/queue_cron_stat_org_function.test.ts index 6ff06253dc..8ccdd2df33 100644 --- a/tests/queue_cron_stat_org_function.test.ts +++ b/tests/queue_cron_stat_org_function.test.ts @@ -52,7 +52,7 @@ describe('[Function] queue_cron_stat_org_for_org', () => { const initialCount = await getCronPlanQueueCount() // Call the function - const { error } = await supabase.rpc('queue_cron_plan_for_org', { + const { error } = await supabase.rpc('queue_cron_stat_org_for_org', { org_id: ORG_ID, customer_id: testCustomerId }) @@ -95,7 +95,7 @@ describe('[Function] queue_cron_stat_org_for_org', () => { const initialCount = await getCronPlanQueueCount() // Call the function - const { error } = await supabase.rpc('queue_cron_plan_for_org', { + const { error } = await supabase.rpc('queue_cron_stat_org_for_org', { org_id: ORG_ID, customer_id: testCustomerId }) @@ -141,7 +141,7 @@ describe('[Function] queue_cron_stat_org_for_org', () => { const initialCount = await getCronPlanQueueCount() // Call the function - const { error } = await supabase.rpc('queue_cron_plan_for_org', { + const { error } = await supabase.rpc('queue_cron_stat_org_for_org', { org_id: ORG_ID, customer_id: testCustomerId }) @@ -168,7 +168,7 @@ describe('[Function] queue_cron_stat_org_for_org', () => { const supabase = getSupabaseClient() // Call with non-existent customer_id - const { error } = await supabase.rpc('queue_cron_plan_for_org', { + const { error } = await supabase.rpc('queue_cron_stat_org_for_org', { org_id: ORG_ID, customer_id: 'non_existent_customer' }) @@ -187,7 +187,7 @@ describe('[Function] queue_cron_stat_org_for_org', () => { // The actual permission restriction is tested at the database level const supabase = getSupabaseClient() - const { error } = await supabase.rpc('queue_cron_plan_for_org', { + const { error } = await supabase.rpc('queue_cron_stat_org_for_org', { org_id: ORG_ID, customer_id: testCustomerId }) From 835513061281a2b3b3a7832fd9c7fe9bd970848f Mon Sep 17 00:00:00 2001 From: WcaleNieWolny Date: Tue, 14 Oct 2025 15:03:43 +0200 Subject: [PATCH 4/8] fix: coderabbit --- tests/cron_stat_integration.test.ts | 552 ++++++++++----------- tests/queue_cron_stat_org_function.test.ts | 2 +- 2 files changed, 277 insertions(+), 277 deletions(-) diff --git a/tests/cron_stat_integration.test.ts b/tests/cron_stat_integration.test.ts index 6d1939d147..32b790bcfb 100644 --- a/tests/cron_stat_integration.test.ts +++ b/tests/cron_stat_integration.test.ts @@ -1,297 +1,297 @@ import { randomUUID } from 'node:crypto' import { afterAll, beforeAll, describe, expect, it } from 'vitest' -import { BASE_URL, ORG_ID, USER_ID, getSupabaseClient, resetAndSeedAppData, resetAndSeedAppDataStats, resetAppData, resetAppDataStats } from './test-utils.ts' +import { BASE_URL, getSupabaseClient, ORG_ID, resetAndSeedAppData, resetAndSeedAppDataStats, resetAppData, resetAppDataStats, USER_ID } from './test-utils.ts' const appId = `com.cron.${randomUUID().slice(0, 8)}` const triggerHeaders = { - 'Content-Type': 'application/json', - 'apisecret': 'testsecret', + 'Content-Type': 'application/json', + 'apisecret': 'testsecret', } describe('[Integration] cron_stat_app -> cron_stat_org flow', () => { - beforeAll(async () => { - await resetAndSeedAppData(appId) - await resetAndSeedAppDataStats(appId) - - const supabase = getSupabaseClient() - - // Reset timestamps - await supabase - .from('orgs') - .update({ stats_updated_at: null }) - .eq('id', ORG_ID) - .throwOnError() - - // Reset plan calculated timestamp - await supabase - .from('stripe_info') - .update({ plan_calculated_at: null }) - .eq('customer_id', 'cus_Pa0k8TO6HVln6A') // From seed data - .throwOnError() + beforeAll(async () => { + await resetAndSeedAppData(appId) + await resetAndSeedAppDataStats(appId) + + const supabase = getSupabaseClient() + + // Reset timestamps + await supabase + .from('orgs') + .update({ stats_updated_at: null }) + .eq('id', ORG_ID) + .throwOnError() + + // Reset plan calculated timestamp + await supabase + .from('stripe_info') + .update({ plan_calculated_at: null }) + .eq('customer_id', 'cus_Pa0k8TO6HVln6A') // From seed data + .throwOnError() + }) + + afterAll(async () => { + await resetAppData(appId) + await resetAppDataStats(appId) + }) + + it('cron_stat_app triggers plan processing and updates plan_calculated_at', async () => { + const supabase = getSupabaseClient() + + // First, get the actual customer_id for our test org + const { data: orgData } = await supabase + .from('orgs') + .select('customer_id') + .eq('id', ORG_ID) + .single() + .throwOnError() + + console.log('Test org customer_id:', orgData?.customer_id) + + // Skip test if no customer_id (this org doesn't have stripe setup) + if (!orgData?.customer_id) { + console.log('Skipping test - org has no customer_id') + return + } + + // Reset plan_calculated_at to null for this customer + await supabase + .from('stripe_info') + .update({ plan_calculated_at: null }) + .eq('customer_id', orgData.customer_id) + .throwOnError() + + // Verify initial state - no plan_calculated_at + const { data: initialStripeInfo } = await supabase + .from('stripe_info') + .select('plan_calculated_at') + .eq('customer_id', orgData.customer_id) + .single() + .throwOnError() + + expect(initialStripeInfo?.plan_calculated_at).toBeNull() + + // Trigger cron_stat_app which should queue plan processing + const statsResponse = await fetch(`${BASE_URL}/triggers/cron_stat_app`, { + method: 'POST', + headers: triggerHeaders, + body: JSON.stringify({ + appId, + orgId: ORG_ID, + }), }) - afterAll(async () => { - await resetAppData(appId) - await resetAppDataStats(appId) + expect(statsResponse.status).toBe(200) + const statsJson = await statsResponse.json() as { status?: string } + expect(statsJson.status).toBe('Stats saved') + + // Verify stats_updated_at was set + const { data: org } = await supabase + .from('orgs') + .select('stats_updated_at') + .eq('id', ORG_ID) + .single() + .throwOnError() + + expect(org?.stats_updated_at).toBeTruthy() + + // Check that a plan job was queued (we can't easily test queue contents, but we can verify the function doesn't error) + // The plan processing would normally be triggered by the queue processor + + // Manually trigger cron_plan to simulate queue processing + const planResponse = await fetch(`${BASE_URL}/triggers/cron_stat_org`, { + method: 'POST', + headers: triggerHeaders, + body: JSON.stringify({ + orgId: ORG_ID, + customerId: orgData.customer_id, + }), }) - it('cron_stat_app triggers plan processing and updates plan_calculated_at', async () => { - const supabase = getSupabaseClient() - - // First, get the actual customer_id for our test org - const { data: orgData } = await supabase - .from('orgs') - .select('customer_id') - .eq('id', ORG_ID) - .single() - .throwOnError() - - console.log('Test org customer_id:', orgData?.customer_id) - - // Skip test if no customer_id (this org doesn't have stripe setup) - if (!orgData?.customer_id) { - console.log('Skipping test - org has no customer_id') - return - } - - // Reset plan_calculated_at to null for this customer - await supabase - .from('stripe_info') - .update({ plan_calculated_at: null }) - .eq('customer_id', orgData.customer_id) - .throwOnError() - - // Verify initial state - no plan_calculated_at - const { data: initialStripeInfo } = await supabase - .from('stripe_info') - .select('plan_calculated_at') - .eq('customer_id', orgData.customer_id) - .single() - .throwOnError() - - expect(initialStripeInfo?.plan_calculated_at).toBeNull() - - // Trigger cron_stat_app which should queue plan processing - const statsResponse = await fetch(`${BASE_URL}/triggers/cron_stat_app`, { - method: 'POST', - headers: triggerHeaders, - body: JSON.stringify({ - appId, - orgId: ORG_ID, - }), - }) - - expect(statsResponse.status).toBe(200) - const statsJson = await statsResponse.json() as { status?: string } - expect(statsJson.status).toBe('Stats saved') - - // Verify stats_updated_at was set - const { data: org } = await supabase - .from('orgs') - .select('stats_updated_at') - .eq('id', ORG_ID) - .single() - .throwOnError() - - expect(org?.stats_updated_at).toBeTruthy() - - // Check that a plan job was queued (we can't easily test queue contents, but we can verify the function doesn't error) - // The plan processing would normally be triggered by the queue processor - - // Manually trigger cron_plan to simulate queue processing - const planResponse = await fetch(`${BASE_URL}/triggers/cron_stat_org`, { - method: 'POST', - headers: triggerHeaders, - body: JSON.stringify({ - orgId: ORG_ID, - customerId: orgData.customer_id, - }), - }) - - expect(planResponse.status).toBe(200) - - // Verify plan_calculated_at was updated - const { data: updatedStripeInfo } = await supabase - .from('stripe_info') - .select('plan_calculated_at') - .eq('customer_id', orgData.customer_id) - .single() - .throwOnError() - - expect(updatedStripeInfo?.plan_calculated_at).toBeTruthy() - - const timestamp = updatedStripeInfo?.plan_calculated_at - const updatedAtMs = new Date(timestamp!).getTime() - expect(Number.isNaN(updatedAtMs)).toBe(false) - - const diffMs = Math.abs(Date.now() - updatedAtMs) - expect(diffMs).toBeLessThan(60_000) // Within last minute + expect(planResponse.status).toBe(200) + + // Verify plan_calculated_at was updated + const { data: updatedStripeInfo } = await supabase + .from('stripe_info') + .select('plan_calculated_at') + .eq('customer_id', orgData.customer_id) + .single() + .throwOnError() + + expect(updatedStripeInfo?.plan_calculated_at).toBeTruthy() + + const timestamp = updatedStripeInfo?.plan_calculated_at + const updatedAtMs = new Date(timestamp!).getTime() + expect(Number.isNaN(updatedAtMs)).toBe(false) + + const diffMs = Math.abs(Date.now() - updatedAtMs) + expect(diffMs).toBeLessThan(60_000) // Within last minute + }) + + it('rate limiting prevents duplicate plan processing within 1 hour', async () => { + const supabase = getSupabaseClient() + + // Get the actual customer_id for our test org + const { data: orgData } = await supabase + .from('orgs') + .select('customer_id') + .eq('id', ORG_ID) + .single() + .throwOnError() + + // Skip test if no customer_id + if (!orgData?.customer_id) { + console.log('Skipping test - org has no customer_id') + return + } + + // Set plan_calculated_at to 30 minutes ago (within 1 hour) + const thirtyMinutesAgo = new Date(Date.now() - 30 * 60 * 1000) + await supabase + .from('stripe_info') + .update({ plan_calculated_at: thirtyMinutesAgo.toISOString() }) + .eq('customer_id', orgData.customer_id) + .throwOnError() + + // Call the queue function directly (simulating what cron_stat_app does) + const { error } = await supabase.rpc('queue_cron_stat_org_for_org', { + org_id: ORG_ID, + customer_id: orgData.customer_id, }) - it('rate limiting prevents duplicate plan processing within 1 hour', async () => { - const supabase = getSupabaseClient() - - // Get the actual customer_id for our test org - const { data: orgData } = await supabase - .from('orgs') - .select('customer_id') - .eq('id', ORG_ID) - .single() - .throwOnError() - - // Skip test if no customer_id - if (!orgData?.customer_id) { - console.log('Skipping test - org has no customer_id') - return - } - - // Set plan_calculated_at to 30 minutes ago (within 1 hour) - const thirtyMinutesAgo = new Date(Date.now() - 30 * 60 * 1000) - await supabase - .from('stripe_info') - .update({ plan_calculated_at: thirtyMinutesAgo.toISOString() }) - .eq('customer_id', orgData.customer_id) - .throwOnError() - - // Call the queue function directly (simulating what cron_stat_app does) - const { error } = await supabase.rpc('queue_cron_stat_org_for_org', { - org_id: ORG_ID, - customer_id: orgData.customer_id - }) - - // Should not error (rate limiting should silently skip) - expect(error).toBeNull() - - // The timestamp should remain unchanged (not updated) - const { data: stripeInfo } = await supabase - .from('stripe_info') - .select('plan_calculated_at') - .eq('customer_id', orgData.customer_id) - .single() - .throwOnError() - - const actualTimestamp = new Date(stripeInfo?.plan_calculated_at!).getTime() - const expectedTimestamp = thirtyMinutesAgo.getTime() - - // Should be within 1 second of the original timestamp (accounting for precision) - expect(Math.abs(actualTimestamp - expectedTimestamp)).toBeLessThan(1000) + // Should not error (rate limiting should silently skip) + expect(error).toBeNull() + + // The timestamp should remain unchanged (not updated) + const { data: stripeInfo } = await supabase + .from('stripe_info') + .select('plan_calculated_at') + .eq('customer_id', orgData.customer_id) + .single() + .throwOnError() + + const actualTimestamp = new Date(stripeInfo?.plan_calculated_at ?? 0).getTime() + const expectedTimestamp = thirtyMinutesAgo.getTime() + + // Should be within 1 second of the original timestamp (accounting for precision) + expect(Math.abs(actualTimestamp - expectedTimestamp)).toBeLessThan(1000) + }) + + it('allows plan processing after 1 hour has passed', async () => { + const supabase = getSupabaseClient() + + // Get the actual customer_id for our test org + const { data: orgData } = await supabase + .from('orgs') + .select('customer_id') + .eq('id', ORG_ID) + .single() + .throwOnError() + + // Skip test if no customer_id + if (!orgData?.customer_id) { + console.log('Skipping test - org has no customer_id') + return + } + + // Set plan_calculated_at to 2 hours ago (outside 1 hour window) + const twoHoursAgo = new Date(Date.now() - 2 * 60 * 60 * 1000) + await supabase + .from('stripe_info') + .update({ plan_calculated_at: twoHoursAgo.toISOString() }) + .eq('customer_id', orgData.customer_id) + .throwOnError() + + // Call the queue function directly + const { error } = await supabase.rpc('queue_cron_stat_org_for_org', { + org_id: ORG_ID, + customer_id: orgData.customer_id, }) - it('allows plan processing after 1 hour has passed', async () => { - const supabase = getSupabaseClient() - - // Get the actual customer_id for our test org - const { data: orgData } = await supabase - .from('orgs') - .select('customer_id') - .eq('id', ORG_ID) - .single() - .throwOnError() - - // Skip test if no customer_id - if (!orgData?.customer_id) { - console.log('Skipping test - org has no customer_id') - return - } - - // Set plan_calculated_at to 2 hours ago (outside 1 hour window) - const twoHoursAgo = new Date(Date.now() - 2 * 60 * 60 * 1000) - await supabase - .from('stripe_info') - .update({ plan_calculated_at: twoHoursAgo.toISOString() }) - .eq('customer_id', orgData.customer_id) - .throwOnError() - - // Call the queue function directly - const { error } = await supabase.rpc('queue_cron_stat_org_for_org', { - org_id: ORG_ID, - customer_id: orgData.customer_id - }) - - expect(error).toBeNull() - - // Now manually trigger plan processing to simulate queue processing - const planResponse = await fetch(`${BASE_URL}/triggers/cron_stat_org`, { - method: 'POST', - headers: triggerHeaders, - body: JSON.stringify({ - orgId: ORG_ID, - customerId: orgData.customer_id, - }), - }) - - expect(planResponse.status).toBe(200) - - // Verify plan_calculated_at was updated to recent time - const { data: stripeInfo } = await supabase - .from('stripe_info') - .select('plan_calculated_at') - .eq('customer_id', orgData.customer_id) - .single() - .throwOnError() - - const timestamp = stripeInfo?.plan_calculated_at - const updatedAtMs = new Date(timestamp!).getTime() - const diffMs = Math.abs(Date.now() - updatedAtMs) - - // Should be updated to within the last minute - expect(diffMs).toBeLessThan(60_000) + expect(error).toBeNull() + + // Now manually trigger plan processing to simulate queue processing + const planResponse = await fetch(`${BASE_URL}/triggers/cron_stat_org`, { + method: 'POST', + headers: triggerHeaders, + body: JSON.stringify({ + orgId: ORG_ID, + customerId: orgData.customer_id, + }), }) - it('handles missing customer_id gracefully', async () => { - // Trigger cron_stat_app for an org without customer_id - const supabase = getSupabaseClient() - - // Create a temporary org without customer_id - const tempOrgId = randomUUID() - await supabase - .from('orgs') - .insert({ - id: tempOrgId, - name: 'Test Org No Customer', - management_email: 'test@example.com', - created_by: USER_ID, - }) - .throwOnError() - - // Create app for this org - const tempAppId = `com.test.nocustomer.${randomUUID()}` - await supabase - .from('apps') - .insert({ - app_id: tempAppId, - owner_org: tempOrgId, - name: 'Test App No Customer', - icon_url: 'https://example.com/icon.png', - }) - .throwOnError() - - // Create app version - await supabase - .from('app_versions') - .insert({ - app_id: tempAppId, - name: '1.0.0', - owner_org: tempOrgId, - }) - .throwOnError() - - // Trigger cron_stat_app - should not error even without customer_id - const statsResponse = await fetch(`${BASE_URL}/triggers/cron_stat_app`, { - method: 'POST', - headers: triggerHeaders, - body: JSON.stringify({ - appId: tempAppId, - orgId: tempOrgId, - }), - }) - - expect(statsResponse.status).toBe(200) - - // Clean up - await supabase.from('app_versions').delete().eq('app_id', tempAppId).throwOnError() - await supabase.from('apps').delete().eq('app_id', tempAppId).throwOnError() - await supabase.from('orgs').delete().eq('id', tempOrgId).throwOnError() + expect(planResponse.status).toBe(200) + + // Verify plan_calculated_at was updated to recent time + const { data: stripeInfo } = await supabase + .from('stripe_info') + .select('plan_calculated_at') + .eq('customer_id', orgData.customer_id) + .single() + .throwOnError() + + const timestamp = stripeInfo?.plan_calculated_at + const updatedAtMs = new Date(timestamp!).getTime() + const diffMs = Math.abs(Date.now() - updatedAtMs) + + // Should be updated to within the last minute + expect(diffMs).toBeLessThan(60_000) + }) + + it('handles missing customer_id gracefully', async () => { + // Trigger cron_stat_app for an org without customer_id + const supabase = getSupabaseClient() + + // Create a temporary org without customer_id + const tempOrgId = randomUUID() + await supabase + .from('orgs') + .insert({ + id: tempOrgId, + name: 'Test Org No Customer', + management_email: 'test@example.com', + created_by: USER_ID, + }) + .throwOnError() + + // Create app for this org + const tempAppId = `com.test.nocustomer.${randomUUID()}` + await supabase + .from('apps') + .insert({ + app_id: tempAppId, + owner_org: tempOrgId, + name: 'Test App No Customer', + icon_url: 'https://example.com/icon.png', + }) + .throwOnError() + + // Create app version + await supabase + .from('app_versions') + .insert({ + app_id: tempAppId, + name: '1.0.0', + owner_org: tempOrgId, + }) + .throwOnError() + + // Trigger cron_stat_app - should not error even without customer_id + const statsResponse = await fetch(`${BASE_URL}/triggers/cron_stat_app`, { + method: 'POST', + headers: triggerHeaders, + body: JSON.stringify({ + appId: tempAppId, + orgId: tempOrgId, + }), }) + + expect(statsResponse.status).toBe(200) + + // Clean up + await supabase.from('app_versions').delete().eq('app_id', tempAppId).throwOnError() + await supabase.from('apps').delete().eq('app_id', tempAppId).throwOnError() + await supabase.from('orgs').delete().eq('id', tempOrgId).throwOnError() + }) }) diff --git a/tests/queue_cron_stat_org_function.test.ts b/tests/queue_cron_stat_org_function.test.ts index 8ccdd2df33..c01ebe9793 100644 --- a/tests/queue_cron_stat_org_function.test.ts +++ b/tests/queue_cron_stat_org_function.test.ts @@ -1,7 +1,7 @@ import { afterAll, beforeAll, describe, expect, it } from 'vitest' import { ORG_ID, getSupabaseClient, getCronPlanQueueCount, getLatestCronPlanMessage, cleanupPostgresClient } from './test-utils.ts' -describe('[Function] queue_cron_stat_org_for_org', () => { +describe('[Function] queue_cron_plan_for_org', () => { let testCustomerId: string | null = null beforeAll(async () => { From f811def8e07649c06cc575e7e10386fc65b2d515 Mon Sep 17 00:00:00 2001 From: WcaleNieWolny Date: Tue, 14 Oct 2025 15:04:37 +0200 Subject: [PATCH 5/8] fix: typo --- supabase/functions/_backend/triggers/cron_stat_app.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/supabase/functions/_backend/triggers/cron_stat_app.ts b/supabase/functions/_backend/triggers/cron_stat_app.ts index 7883fdc059..8643bbce0f 100644 --- a/supabase/functions/_backend/triggers/cron_stat_app.ts +++ b/supabase/functions/_backend/triggers/cron_stat_app.ts @@ -101,7 +101,7 @@ app.post('/', middlewareAPISecret, async (c) => { if (!orgError && orgData?.customer_id) { // Queue plan processing for this organization - await supabase.rpc('queue_cron_plan_for_org', { + await supabase.rpc('queue_cron_stat_org_for_org', { org_id: body.orgId, customer_id: orgData.customer_id, }).throwOnError() From a3f679a8ef1bc577de7c7a5e375b838b4a9ff61b Mon Sep 17 00:00:00 2001 From: WcaleNieWolny Date: Tue, 14 Oct 2025 15:52:46 +0200 Subject: [PATCH 6/8] fix: coderabbit again --- tests/queue_cron_stat_org_function.test.ts | 25 +++++++++++----------- 1 file changed, 13 insertions(+), 12 deletions(-) diff --git a/tests/queue_cron_stat_org_function.test.ts b/tests/queue_cron_stat_org_function.test.ts index c01ebe9793..91139f8c72 100644 --- a/tests/queue_cron_stat_org_function.test.ts +++ b/tests/queue_cron_stat_org_function.test.ts @@ -1,5 +1,5 @@ import { afterAll, beforeAll, describe, expect, it } from 'vitest' -import { ORG_ID, getSupabaseClient, getCronPlanQueueCount, getLatestCronPlanMessage, cleanupPostgresClient } from './test-utils.ts' +import { cleanupPostgresClient, getCronPlanQueueCount, getLatestCronPlanMessage, getSupabaseClient, ORG_ID } from './test-utils.ts' describe('[Function] queue_cron_plan_for_org', () => { let testCustomerId: string | null = null @@ -16,7 +16,8 @@ describe('[Function] queue_cron_plan_for_org', () => { if (orgData?.customer_id) { testCustomerId = orgData.customer_id - } else { + } + else { // Fallback: get any existing stripe_info record const { data: stripeData } = await supabase .from('stripe_info') @@ -54,7 +55,7 @@ describe('[Function] queue_cron_plan_for_org', () => { // Call the function const { error } = await supabase.rpc('queue_cron_stat_org_for_org', { org_id: ORG_ID, - customer_id: testCustomerId + customer_id: testCustomerId, }) expect(error).toBeNull() @@ -70,8 +71,8 @@ describe('[Function] queue_cron_plan_for_org', () => { function_type: 'cloudflare', payload: { orgId: ORG_ID, - customerId: testCustomerId - } + customerId: testCustomerId, + }, }) }) @@ -97,7 +98,7 @@ describe('[Function] queue_cron_plan_for_org', () => { // Call the function const { error } = await supabase.rpc('queue_cron_stat_org_for_org', { org_id: ORG_ID, - customer_id: testCustomerId + customer_id: testCustomerId, }) expect(error).toBeNull() @@ -114,7 +115,7 @@ describe('[Function] queue_cron_plan_for_org', () => { .single() .throwOnError() - const actualTimestamp = new Date(stripeInfo?.plan_calculated_at!).getTime() + const actualTimestamp = new Date(stripeInfo?.plan_calculated_at ?? 0).getTime() const expectedTimestamp = thirtyMinutesAgo.getTime() // Should be within 1 second of the original timestamp (rate limiting prevented update) @@ -143,7 +144,7 @@ describe('[Function] queue_cron_plan_for_org', () => { // Call the function const { error } = await supabase.rpc('queue_cron_stat_org_for_org', { org_id: ORG_ID, - customer_id: testCustomerId + customer_id: testCustomerId, }) expect(error).toBeNull() @@ -159,8 +160,8 @@ describe('[Function] queue_cron_plan_for_org', () => { function_type: 'cloudflare', payload: { orgId: ORG_ID, - customerId: testCustomerId - } + customerId: testCustomerId, + }, }) }) @@ -170,7 +171,7 @@ describe('[Function] queue_cron_plan_for_org', () => { // Call with non-existent customer_id const { error } = await supabase.rpc('queue_cron_stat_org_for_org', { org_id: ORG_ID, - customer_id: 'non_existent_customer' + customer_id: 'non_existent_customer', }) expect(error).toBeNull() @@ -189,7 +190,7 @@ describe('[Function] queue_cron_plan_for_org', () => { const { error } = await supabase.rpc('queue_cron_stat_org_for_org', { org_id: ORG_ID, - customer_id: testCustomerId + customer_id: testCustomerId, }) expect(error).toBeNull() From e8e05060c12c57d6462c876aaad79ed1095ff7b5 Mon Sep 17 00:00:00 2001 From: WcaleNieWolny Date: Tue, 14 Oct 2025 15:59:59 +0200 Subject: [PATCH 7/8] fix: coderabbit again --- tests/queue_cron_stat_org_function.test.ts | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/queue_cron_stat_org_function.test.ts b/tests/queue_cron_stat_org_function.test.ts index 91139f8c72..795ec5871b 100644 --- a/tests/queue_cron_stat_org_function.test.ts +++ b/tests/queue_cron_stat_org_function.test.ts @@ -1,7 +1,8 @@ import { afterAll, beforeAll, describe, expect, it } from 'vitest' import { cleanupPostgresClient, getCronPlanQueueCount, getLatestCronPlanMessage, getSupabaseClient, ORG_ID } from './test-utils.ts' -describe('[Function] queue_cron_plan_for_org', () => { + +describe('[Function] queue_cron_stat_org_for_org', () => { let testCustomerId: string | null = null beforeAll(async () => { From 0a643fbefe337069ea6c21ce7dfac0a06c415692 Mon Sep 17 00:00:00 2001 From: WcaleNieWolny Date: Tue, 14 Oct 2025 17:15:48 +0200 Subject: [PATCH 8/8] fix: undo chaning old migration --- ...20251007134349_cron_plan_from_stats_backend.sql | 14 +++++++------- .../migrations/20251014105957_rename_plan_cron.sql | 14 +++++++++++++- 2 files changed, 20 insertions(+), 8 deletions(-) diff --git a/supabase/migrations/20251007134349_cron_plan_from_stats_backend.sql b/supabase/migrations/20251007134349_cron_plan_from_stats_backend.sql index e24786b237..8562e51c5c 100644 --- a/supabase/migrations/20251007134349_cron_plan_from_stats_backend.sql +++ b/supabase/migrations/20251007134349_cron_plan_from_stats_backend.sql @@ -15,7 +15,7 @@ SELECT cron.schedule( ALTER TABLE public.stripe_info ADD COLUMN IF NOT EXISTS plan_calculated_at timestamp with time zone; -- Update the queue function to check if plan was calculated in the last hour -CREATE OR REPLACE FUNCTION public.queue_cron_stat_org_for_org(org_id uuid, customer_id text) +CREATE OR REPLACE FUNCTION public.queue_cron_plan_for_org(org_id uuid, customer_id text) RETURNS void LANGUAGE plpgsql SECURITY DEFINER @@ -27,7 +27,7 @@ BEGIN -- Check when plan was last calculated for this customer SELECT plan_calculated_at INTO last_calculated FROM public.stripe_info - WHERE stripe_info.customer_id = queue_cron_stat_org_for_org.customer_id; + WHERE stripe_info.customer_id = queue_cron_plan_for_org.customer_id; -- Only queue if plan wasn't calculated in the last hour IF last_calculated IS NULL OR last_calculated < NOW() - INTERVAL '1 hour' THEN @@ -46,10 +46,10 @@ END; $$; -ALTER FUNCTION public.queue_cron_stat_org_for_org(uuid, text) OWNER TO postgres; +ALTER FUNCTION public.queue_cron_plan_for_org(uuid, text) OWNER TO postgres; -- Revoke all permissions first, then grant only to service_role -REVOKE ALL ON FUNCTION public.queue_cron_stat_org_for_org(uuid, text) FROM PUBLIC; -REVOKE ALL ON FUNCTION public.queue_cron_stat_org_for_org(uuid, text) FROM anon; -REVOKE ALL ON FUNCTION public.queue_cron_stat_org_for_org(uuid, text) FROM authenticated; -GRANT ALL ON FUNCTION public.queue_cron_stat_org_for_org(uuid, text) TO service_role; \ No newline at end of file +REVOKE ALL ON FUNCTION public.queue_cron_plan_for_org(uuid, text) FROM PUBLIC; +REVOKE ALL ON FUNCTION public.queue_cron_plan_for_org(uuid, text) FROM anon; +REVOKE ALL ON FUNCTION public.queue_cron_plan_for_org(uuid, text) FROM authenticated; +GRANT ALL ON FUNCTION public.queue_cron_plan_for_org(uuid, text) TO service_role; \ No newline at end of file diff --git a/supabase/migrations/20251014105957_rename_plan_cron.sql b/supabase/migrations/20251014105957_rename_plan_cron.sql index 101deb5cd2..65b878dbe2 100644 --- a/supabase/migrations/20251014105957_rename_plan_cron.sql +++ b/supabase/migrations/20251014105957_rename_plan_cron.sql @@ -35,6 +35,7 @@ CREATE OR REPLACE FUNCTION public.queue_cron_stat_org_for_org(org_id uuid, custo RETURNS void LANGUAGE plpgsql SECURITY DEFINER +SET search_path = '' AS $$ DECLARE last_calculated timestamptz; @@ -58,4 +59,15 @@ BEGIN ); END IF; END; -$$; \ No newline at end of file +$$; + +ALTER FUNCTION public.queue_cron_stat_org_for_org(uuid, text) OWNER TO postgres; + +-- Revoke all permissions first, then grant only to service_role +REVOKE ALL ON FUNCTION public.queue_cron_stat_org_for_org(uuid, text) FROM PUBLIC; +REVOKE ALL ON FUNCTION public.queue_cron_stat_org_for_org(uuid, text) FROM anon; +REVOKE ALL ON FUNCTION public.queue_cron_stat_org_for_org(uuid, text) FROM authenticated; +GRANT ALL ON FUNCTION public.queue_cron_stat_org_for_org(uuid, text) TO service_role; + +-- Drop the old function that is no longer needed +DROP FUNCTION IF EXISTS public.queue_cron_plan_for_org(uuid, text); \ No newline at end of file