From 2f64944fcadcc9230548ddd412d3d28f5c748e8a Mon Sep 17 00:00:00 2001 From: Waleed Latif Date: Tue, 7 Apr 2026 20:31:57 -0700 Subject: [PATCH 1/4] refactor(polling): consolidate polling services into provider handler pattern MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Eliminate self-POST anti-pattern and extract shared boilerplate from 4 polling services into a clean handler registry mirroring lib/webhooks/providers/. - Add processPolledWebhookEvent() to processor.ts for direct in-process webhook execution, removing HTTP round-trips that caused Lambda 403/timeout errors - Extract shared utilities (markWebhookFailed/Success, fetchActiveWebhooks, runWithConcurrency, resolveOAuthCredential, updateWebhookProviderConfig) - Create PollingProviderHandler interface with per-provider implementations - Consolidate 4 identical route files into single dynamic [provider] route - Standardize concurrency to 10 across all providers - No infra changes needed — Helm cron paths resolve via dynamic route Co-Authored-By: Claude Opus 4.6 --- .../poll/{rss => [provider]}/route.ts | 38 +- apps/sim/app/api/webhooks/poll/gmail/route.ts | 68 -- apps/sim/app/api/webhooks/poll/imap/route.ts | 68 -- .../app/api/webhooks/poll/outlook/route.ts | 68 -- .../sim/lib/webhooks/gmail-polling-service.ts | 791 ------------------ apps/sim/lib/webhooks/polling/gmail.ts | 553 ++++++++++++ .../imap.ts} | 416 +++------ apps/sim/lib/webhooks/polling/index.ts | 9 + apps/sim/lib/webhooks/polling/orchestrator.ts | 46 + .../outlook.ts} | 456 +++------- apps/sim/lib/webhooks/polling/registry.ts | 19 + apps/sim/lib/webhooks/polling/rss.ts | 307 +++++++ apps/sim/lib/webhooks/polling/types.ts | 58 ++ apps/sim/lib/webhooks/polling/utils.ts | 240 ++++++ apps/sim/lib/webhooks/processor.ts | 216 ++++- apps/sim/lib/webhooks/rss-polling-service.ts | 442 ---------- 16 files changed, 1697 insertions(+), 2098 deletions(-) rename apps/sim/app/api/webhooks/poll/{rss => [provider]}/route.ts (55%) delete mode 100644 apps/sim/app/api/webhooks/poll/gmail/route.ts delete mode 100644 apps/sim/app/api/webhooks/poll/imap/route.ts delete mode 100644 apps/sim/app/api/webhooks/poll/outlook/route.ts delete mode 100644 apps/sim/lib/webhooks/gmail-polling-service.ts create mode 100644 apps/sim/lib/webhooks/polling/gmail.ts rename apps/sim/lib/webhooks/{imap-polling-service.ts => polling/imap.ts} (57%) create mode 100644 apps/sim/lib/webhooks/polling/index.ts create mode 100644 apps/sim/lib/webhooks/polling/orchestrator.ts rename apps/sim/lib/webhooks/{outlook-polling-service.ts => polling/outlook.ts} (50%) create mode 100644 apps/sim/lib/webhooks/polling/registry.ts create mode 100644 apps/sim/lib/webhooks/polling/rss.ts create mode 100644 apps/sim/lib/webhooks/polling/types.ts create mode 100644 apps/sim/lib/webhooks/polling/utils.ts delete mode 100644 apps/sim/lib/webhooks/rss-polling-service.ts diff --git a/apps/sim/app/api/webhooks/poll/rss/route.ts b/apps/sim/app/api/webhooks/poll/[provider]/route.ts similarity index 55% rename from apps/sim/app/api/webhooks/poll/rss/route.ts rename to apps/sim/app/api/webhooks/poll/[provider]/route.ts index cd221abe394..425a375c62d 100644 --- a/apps/sim/app/api/webhooks/poll/rss/route.ts +++ b/apps/sim/app/api/webhooks/poll/[provider]/route.ts @@ -3,31 +3,33 @@ import { type NextRequest, NextResponse } from 'next/server' import { verifyCronAuth } from '@/lib/auth/internal' import { acquireLock, releaseLock } from '@/lib/core/config/redis' import { generateShortId } from '@/lib/core/utils/uuid' -import { pollRssWebhooks } from '@/lib/webhooks/rss-polling-service' +import { pollProvider, VALID_POLLING_PROVIDERS } from '@/lib/webhooks/polling' -const logger = createLogger('RssPollingAPI') +const logger = createLogger('PollingAPI') export const dynamic = 'force-dynamic' -export const maxDuration = 180 // Allow up to 3 minutes for polling to complete +export const maxDuration = 180 -const LOCK_KEY = 'rss-polling-lock' -const LOCK_TTL_SECONDS = 180 // Same as maxDuration (3 min) - -export async function GET(request: NextRequest) { +export async function GET( + request: NextRequest, + { params }: { params: Promise<{ provider: string }> } +) { + const { provider } = await params const requestId = generateShortId() - logger.info(`RSS webhook polling triggered (${requestId})`) + if (!VALID_POLLING_PROVIDERS.has(provider)) { + return NextResponse.json({ error: `Unknown polling provider: ${provider}` }, { status: 404 }) + } + + const LOCK_KEY = `${provider}-polling-lock` let lockValue: string | undefined try { - const authError = verifyCronAuth(request, 'RSS webhook polling') - if (authError) { - return authError - } + const authError = verifyCronAuth(request, `${provider} webhook polling`) + if (authError) return authError lockValue = requestId - const locked = await acquireLock(LOCK_KEY, lockValue, LOCK_TTL_SECONDS) - + const locked = await acquireLock(LOCK_KEY, lockValue, 180) if (!locked) { return NextResponse.json( { @@ -40,21 +42,21 @@ export async function GET(request: NextRequest) { ) } - const results = await pollRssWebhooks() + const results = await pollProvider(provider) return NextResponse.json({ success: true, - message: 'RSS polling completed', + message: `${provider} polling completed`, requestId, status: 'completed', ...results, }) } catch (error) { - logger.error(`Error during RSS polling (${requestId}):`, error) + logger.error(`Error during ${provider} polling (${requestId}):`, error) return NextResponse.json( { success: false, - message: 'RSS polling failed', + message: `${provider} polling failed`, error: error instanceof Error ? error.message : 'Unknown error', requestId, }, diff --git a/apps/sim/app/api/webhooks/poll/gmail/route.ts b/apps/sim/app/api/webhooks/poll/gmail/route.ts deleted file mode 100644 index 5eabf8b9b63..00000000000 --- a/apps/sim/app/api/webhooks/poll/gmail/route.ts +++ /dev/null @@ -1,68 +0,0 @@ -import { createLogger } from '@sim/logger' -import { type NextRequest, NextResponse } from 'next/server' -import { verifyCronAuth } from '@/lib/auth/internal' -import { acquireLock, releaseLock } from '@/lib/core/config/redis' -import { generateShortId } from '@/lib/core/utils/uuid' -import { pollGmailWebhooks } from '@/lib/webhooks/gmail-polling-service' - -const logger = createLogger('GmailPollingAPI') - -export const dynamic = 'force-dynamic' -export const maxDuration = 180 // Allow up to 3 minutes for polling to complete - -const LOCK_KEY = 'gmail-polling-lock' -const LOCK_TTL_SECONDS = 180 // Same as maxDuration (3 min) - -export async function GET(request: NextRequest) { - const requestId = generateShortId() - logger.info(`Gmail webhook polling triggered (${requestId})`) - - let lockValue: string | undefined - - try { - const authError = verifyCronAuth(request, 'Gmail webhook polling') - if (authError) { - return authError - } - - lockValue = requestId // unique value to identify the holder - const locked = await acquireLock(LOCK_KEY, lockValue, LOCK_TTL_SECONDS) - - if (!locked) { - return NextResponse.json( - { - success: true, - message: 'Polling already in progress – skipped', - requestId, - status: 'skip', - }, - { status: 202 } - ) - } - - const results = await pollGmailWebhooks() - - return NextResponse.json({ - success: true, - message: 'Gmail polling completed', - requestId, - status: 'completed', - ...results, - }) - } catch (error) { - logger.error(`Error during Gmail polling (${requestId}):`, error) - return NextResponse.json( - { - success: false, - message: 'Gmail polling failed', - error: error instanceof Error ? error.message : 'Unknown error', - requestId, - }, - { status: 500 } - ) - } finally { - if (lockValue) { - await releaseLock(LOCK_KEY, lockValue).catch(() => {}) - } - } -} diff --git a/apps/sim/app/api/webhooks/poll/imap/route.ts b/apps/sim/app/api/webhooks/poll/imap/route.ts deleted file mode 100644 index 29826e04bff..00000000000 --- a/apps/sim/app/api/webhooks/poll/imap/route.ts +++ /dev/null @@ -1,68 +0,0 @@ -import { createLogger } from '@sim/logger' -import { type NextRequest, NextResponse } from 'next/server' -import { verifyCronAuth } from '@/lib/auth/internal' -import { acquireLock, releaseLock } from '@/lib/core/config/redis' -import { generateShortId } from '@/lib/core/utils/uuid' -import { pollImapWebhooks } from '@/lib/webhooks/imap-polling-service' - -const logger = createLogger('ImapPollingAPI') - -export const dynamic = 'force-dynamic' -export const maxDuration = 180 // Allow up to 3 minutes for polling to complete - -const LOCK_KEY = 'imap-polling-lock' -const LOCK_TTL_SECONDS = 180 // Same as maxDuration (3 min) - -export async function GET(request: NextRequest) { - const requestId = generateShortId() - logger.info(`IMAP webhook polling triggered (${requestId})`) - - let lockValue: string | undefined - - try { - const authError = verifyCronAuth(request, 'IMAP webhook polling') - if (authError) { - return authError - } - - lockValue = requestId // unique value to identify the holder - const locked = await acquireLock(LOCK_KEY, lockValue, LOCK_TTL_SECONDS) - - if (!locked) { - return NextResponse.json( - { - success: true, - message: 'Polling already in progress – skipped', - requestId, - status: 'skip', - }, - { status: 202 } - ) - } - - const results = await pollImapWebhooks() - - return NextResponse.json({ - success: true, - message: 'IMAP polling completed', - requestId, - status: 'completed', - ...results, - }) - } catch (error) { - logger.error(`Error during IMAP polling (${requestId}):`, error) - return NextResponse.json( - { - success: false, - message: 'IMAP polling failed', - error: error instanceof Error ? error.message : 'Unknown error', - requestId, - }, - { status: 500 } - ) - } finally { - if (lockValue) { - await releaseLock(LOCK_KEY, lockValue).catch(() => {}) - } - } -} diff --git a/apps/sim/app/api/webhooks/poll/outlook/route.ts b/apps/sim/app/api/webhooks/poll/outlook/route.ts deleted file mode 100644 index 2a72c34f86a..00000000000 --- a/apps/sim/app/api/webhooks/poll/outlook/route.ts +++ /dev/null @@ -1,68 +0,0 @@ -import { createLogger } from '@sim/logger' -import { type NextRequest, NextResponse } from 'next/server' -import { verifyCronAuth } from '@/lib/auth/internal' -import { acquireLock, releaseLock } from '@/lib/core/config/redis' -import { generateShortId } from '@/lib/core/utils/uuid' -import { pollOutlookWebhooks } from '@/lib/webhooks/outlook-polling-service' - -const logger = createLogger('OutlookPollingAPI') - -export const dynamic = 'force-dynamic' -export const maxDuration = 180 // Allow up to 3 minutes for polling to complete - -const LOCK_KEY = 'outlook-polling-lock' -const LOCK_TTL_SECONDS = 180 // Same as maxDuration (3 min) - -export async function GET(request: NextRequest) { - const requestId = generateShortId() - logger.info(`Outlook webhook polling triggered (${requestId})`) - - let lockValue: string | undefined - - try { - const authError = verifyCronAuth(request, 'Outlook webhook polling') - if (authError) { - return authError - } - - lockValue = requestId // unique value to identify the holder - const locked = await acquireLock(LOCK_KEY, lockValue, LOCK_TTL_SECONDS) - - if (!locked) { - return NextResponse.json( - { - success: true, - message: 'Polling already in progress – skipped', - requestId, - status: 'skip', - }, - { status: 202 } - ) - } - - const results = await pollOutlookWebhooks() - - return NextResponse.json({ - success: true, - message: 'Outlook polling completed', - requestId, - status: 'completed', - ...results, - }) - } catch (error) { - logger.error(`Error during Outlook polling (${requestId}):`, error) - return NextResponse.json( - { - success: false, - message: 'Outlook polling failed', - error: error instanceof Error ? error.message : 'Unknown error', - requestId, - }, - { status: 500 } - ) - } finally { - if (lockValue) { - await releaseLock(LOCK_KEY, lockValue).catch(() => {}) - } - } -} diff --git a/apps/sim/lib/webhooks/gmail-polling-service.ts b/apps/sim/lib/webhooks/gmail-polling-service.ts deleted file mode 100644 index 6c916cc3cf8..00000000000 --- a/apps/sim/lib/webhooks/gmail-polling-service.ts +++ /dev/null @@ -1,791 +0,0 @@ -import { db } from '@sim/db' -import { - account, - credentialSet, - webhook, - workflow, - workflowDeploymentVersion, -} from '@sim/db/schema' -import { createLogger } from '@sim/logger' -import { and, eq, isNull, or, sql } from 'drizzle-orm' -import { isOrganizationOnTeamOrEnterprisePlan } from '@/lib/billing' -import { pollingIdempotency } from '@/lib/core/idempotency/service' -import { getInternalApiBaseUrl } from '@/lib/core/utils/urls' -import { generateShortId } from '@/lib/core/utils/uuid' -import { - getOAuthToken, - refreshAccessTokenIfNeeded, - resolveOAuthAccountId, -} from '@/app/api/auth/oauth/utils' -import type { GmailAttachment } from '@/tools/gmail/types' -import { downloadAttachments, extractAttachmentInfo } from '@/tools/gmail/utils' -import { MAX_CONSECUTIVE_FAILURES } from '@/triggers/constants' - -const logger = createLogger('GmailPollingService') - -interface GmailWebhookConfig { - labelIds: string[] - labelFilterBehavior: 'INCLUDE' | 'EXCLUDE' - markAsRead: boolean - searchQuery?: string - maxEmailsPerPoll?: number - lastCheckedTimestamp?: string - historyId?: string - includeAttachments?: boolean - includeRawEmail?: boolean -} - -interface GmailEmail { - id: string - threadId: string - historyId?: string - labelIds?: string[] - payload?: any - snippet?: string - internalDate?: string -} - -export interface SimplifiedEmail { - id: string - threadId: string - subject: string - from: string - to: string - cc: string - date: string | null - bodyText: string - bodyHtml: string - labels: string[] - hasAttachments: boolean - attachments: GmailAttachment[] -} - -export interface GmailWebhookPayload { - email: SimplifiedEmail - timestamp: string - rawEmail?: GmailEmail // Only included when includeRawEmail is true -} - -async function markWebhookFailed(webhookId: string) { - try { - const result = await db - .update(webhook) - .set({ - failedCount: sql`COALESCE(${webhook.failedCount}, 0) + 1`, - lastFailedAt: new Date(), - updatedAt: new Date(), - }) - .where(eq(webhook.id, webhookId)) - .returning({ failedCount: webhook.failedCount }) - - const newFailedCount = result[0]?.failedCount || 0 - const shouldDisable = newFailedCount >= MAX_CONSECUTIVE_FAILURES - - if (shouldDisable) { - await db - .update(webhook) - .set({ - isActive: false, - updatedAt: new Date(), - }) - .where(eq(webhook.id, webhookId)) - - logger.warn( - `Webhook ${webhookId} auto-disabled after ${MAX_CONSECUTIVE_FAILURES} consecutive failures` - ) - } - } catch (err) { - logger.error(`Failed to mark webhook ${webhookId} as failed:`, err) - } -} - -async function markWebhookSuccess(webhookId: string) { - try { - await db - .update(webhook) - .set({ - failedCount: 0, // Reset on success - updatedAt: new Date(), - }) - .where(eq(webhook.id, webhookId)) - } catch (err) { - logger.error(`Failed to mark webhook ${webhookId} as successful:`, err) - } -} - -export async function pollGmailWebhooks() { - logger.info('Starting Gmail webhook polling') - - try { - const activeWebhooksResult = await db - .select({ webhook }) - .from(webhook) - .innerJoin(workflow, eq(webhook.workflowId, workflow.id)) - .leftJoin( - workflowDeploymentVersion, - and( - eq(workflowDeploymentVersion.workflowId, workflow.id), - eq(workflowDeploymentVersion.isActive, true) - ) - ) - .where( - and( - eq(webhook.provider, 'gmail'), - eq(webhook.isActive, true), - eq(workflow.isDeployed, true), - or( - eq(webhook.deploymentVersionId, workflowDeploymentVersion.id), - and(isNull(workflowDeploymentVersion.id), isNull(webhook.deploymentVersionId)) - ) - ) - ) - - const activeWebhooks = activeWebhooksResult.map((r) => r.webhook) - - if (!activeWebhooks.length) { - logger.info('No active Gmail webhooks found') - return { total: 0, successful: 0, failed: 0, details: [] } - } - - logger.info(`Found ${activeWebhooks.length} active Gmail webhooks`) - - // Limit the number of webhooks processed in parallel to avoid - // exhausting Postgres or Gmail API connections when many users exist. - const CONCURRENCY = 10 - - const running: Promise[] = [] - let successCount = 0 - let failureCount = 0 - - const enqueue = async (webhookData: (typeof activeWebhooks)[number]) => { - const webhookId = webhookData.id - const requestId = generateShortId() - - try { - const metadata = webhookData.providerConfig as any - const credentialId: string | undefined = metadata?.credentialId - const userId: string | undefined = metadata?.userId - const credentialSetId: string | undefined = webhookData.credentialSetId ?? undefined - - if (!credentialId && !userId) { - logger.error(`[${requestId}] Missing credential info for webhook ${webhookId}`) - await markWebhookFailed(webhookId) - failureCount++ - return - } - - if (credentialSetId) { - const [cs] = await db - .select({ organizationId: credentialSet.organizationId }) - .from(credentialSet) - .where(eq(credentialSet.id, credentialSetId)) - .limit(1) - - if (cs?.organizationId) { - const hasAccess = await isOrganizationOnTeamOrEnterprisePlan(cs.organizationId) - if (!hasAccess) { - logger.error( - `[${requestId}] Polling Group plan restriction: Your current plan does not support Polling Groups. Upgrade to Team or Enterprise to use this feature.`, - { - webhookId, - credentialSetId, - organizationId: cs.organizationId, - } - ) - await markWebhookFailed(webhookId) - failureCount++ - return - } - } - } - - let accessToken: string | null = null - - if (credentialId) { - const resolved = await resolveOAuthAccountId(credentialId) - if (!resolved) { - logger.error( - `[${requestId}] Failed to resolve OAuth account for credential ${credentialId}, webhook ${webhookId}` - ) - await markWebhookFailed(webhookId) - failureCount++ - return - } - const rows = await db - .select() - .from(account) - .where(eq(account.id, resolved.accountId)) - .limit(1) - if (rows.length === 0) { - logger.error( - `[${requestId}] Credential ${credentialId} not found for webhook ${webhookId}` - ) - await markWebhookFailed(webhookId) - failureCount++ - return - } - const ownerUserId = rows[0].userId - accessToken = await refreshAccessTokenIfNeeded(resolved.accountId, ownerUserId, requestId) - } else if (userId) { - // Legacy fallback for webhooks without credentialId - accessToken = await getOAuthToken(userId, 'google-email') - } - - if (!accessToken) { - logger.error(`[${requestId}] Failed to get Gmail access token for webhook ${webhookId}`) - await markWebhookFailed(webhookId) - failureCount++ - return - } - - const config = webhookData.providerConfig as unknown as GmailWebhookConfig - - const now = new Date() - - const fetchResult = await fetchNewEmails(accessToken, config, requestId) - - const { emails, latestHistoryId } = fetchResult - - if (!emails || !emails.length) { - await updateWebhookLastChecked( - webhookId, - now.toISOString(), - latestHistoryId || config.historyId - ) - await markWebhookSuccess(webhookId) - logger.info(`[${requestId}] No new emails found for webhook ${webhookId}`) - successCount++ - return - } - - logger.info(`[${requestId}] Found ${emails.length} new emails for webhook ${webhookId}`) - - logger.info(`[${requestId}] Processing ${emails.length} emails for webhook ${webhookId}`) - - const emailsToProcess = emails - - const { processedCount, failedCount } = await processEmails( - emailsToProcess, - webhookData, - config, - accessToken, - requestId - ) - - await updateWebhookLastChecked( - webhookId, - now.toISOString(), - latestHistoryId || config.historyId - ) - - if (failedCount > 0 && processedCount === 0) { - await markWebhookFailed(webhookId) - failureCount++ - logger.warn( - `[${requestId}] All ${failedCount} emails failed to process for webhook ${webhookId}` - ) - } else { - await markWebhookSuccess(webhookId) - successCount++ - logger.info( - `[${requestId}] Successfully processed ${processedCount} emails for webhook ${webhookId}${failedCount > 0 ? ` (${failedCount} failed)` : ''}` - ) - } - } catch (error) { - logger.error(`[${requestId}] Error processing Gmail webhook ${webhookId}:`, error) - await markWebhookFailed(webhookId) - failureCount++ - } - } - - for (const webhookData of activeWebhooks) { - const promise: Promise = enqueue(webhookData) - .catch((err) => { - logger.error('Unexpected error in webhook processing:', err) - failureCount++ - }) - .finally(() => { - const idx = running.indexOf(promise) - if (idx !== -1) running.splice(idx, 1) - }) - - running.push(promise) - - if (running.length >= CONCURRENCY) { - await Promise.race(running) - } - } - - await Promise.allSettled(running) - - const summary = { - total: activeWebhooks.length, - successful: successCount, - failed: failureCount, - details: [], - } - - logger.info('Gmail polling completed', { - total: summary.total, - successful: summary.successful, - failed: summary.failed, - }) - - return summary - } catch (error) { - const errorMessage = error instanceof Error ? error.message : 'Unknown error' - logger.error('Error in Gmail polling service:', errorMessage) - throw error - } -} - -async function fetchNewEmails(accessToken: string, config: GmailWebhookConfig, requestId: string) { - try { - const useHistoryApi = !!config.historyId - let emails = [] - let latestHistoryId = config.historyId - - if (useHistoryApi) { - const historyUrl = `https://gmail.googleapis.com/gmail/v1/users/me/history?startHistoryId=${config.historyId}` - - const historyResponse = await fetch(historyUrl, { - headers: { - Authorization: `Bearer ${accessToken}`, - }, - }) - - if (!historyResponse.ok) { - const errorData = await historyResponse.json() - logger.error(`[${requestId}] Gmail history API error:`, { - status: historyResponse.status, - statusText: historyResponse.statusText, - error: errorData, - }) - - logger.info(`[${requestId}] Falling back to search API after history API failure`) - const searchResult = await searchEmails(accessToken, config, requestId) - return { - emails: searchResult.emails, - latestHistoryId: searchResult.latestHistoryId, - } - } - - const historyData = await historyResponse.json() - - if (!historyData.history || !historyData.history.length) { - return { emails: [], latestHistoryId } - } - - if (historyData.historyId) { - latestHistoryId = historyData.historyId - } - - const messageIds = new Set() - - for (const history of historyData.history) { - if (history.messagesAdded) { - for (const messageAdded of history.messagesAdded) { - messageIds.add(messageAdded.message.id) - } - } - } - - if (messageIds.size === 0) { - return { emails: [], latestHistoryId } - } - - const sortedIds = [...messageIds].sort().reverse() - - const idsToFetch = sortedIds.slice(0, config.maxEmailsPerPoll || 25) - logger.info(`[${requestId}] Processing ${idsToFetch.length} emails from history API`) - - const emailPromises = idsToFetch.map(async (messageId) => { - return getEmailDetails(accessToken, messageId) - }) - - const emailResults = await Promise.allSettled(emailPromises) - const rejected = emailResults.filter((r) => r.status === 'rejected') - if (rejected.length > 0) { - logger.warn(`[${requestId}] Failed to fetch ${rejected.length} email details`) - } - emails = emailResults - .filter( - (result): result is PromiseFulfilledResult => result.status === 'fulfilled' - ) - .map((result) => result.value) - - emails = filterEmailsByLabels(emails, config) - } else { - const searchResult = await searchEmails(accessToken, config, requestId) - return searchResult - } - - return { emails, latestHistoryId } - } catch (error) { - const errorMessage = error instanceof Error ? error.message : 'Unknown error' - logger.error(`[${requestId}] Error fetching new emails:`, errorMessage) - throw error - } -} - -/** - * Builds a Gmail search query from label and search configuration - */ -function buildGmailSearchQuery(config: { - labelIds?: string[] - labelFilterBehavior?: 'INCLUDE' | 'EXCLUDE' - searchQuery?: string -}): string { - let labelQuery = '' - if (config.labelIds && config.labelIds.length > 0) { - const labelParts = config.labelIds.map((label) => `label:${label}`).join(' OR ') - labelQuery = - config.labelFilterBehavior === 'INCLUDE' - ? config.labelIds.length > 1 - ? `(${labelParts})` - : labelParts - : config.labelIds.length > 1 - ? `-(${labelParts})` - : `-${labelParts}` - } - - let searchQueryPart = '' - if (config.searchQuery?.trim()) { - searchQueryPart = config.searchQuery.trim() - if (searchQueryPart.includes(' OR ') || searchQueryPart.includes(' AND ')) { - searchQueryPart = `(${searchQueryPart})` - } - } - - let baseQuery = '' - if (labelQuery && searchQueryPart) { - baseQuery = `${labelQuery} ${searchQueryPart}` - } else if (searchQueryPart) { - baseQuery = searchQueryPart - } else if (labelQuery) { - baseQuery = labelQuery - } else { - baseQuery = 'in:inbox' - } - - return baseQuery -} - -async function searchEmails(accessToken: string, config: GmailWebhookConfig, requestId: string) { - try { - const baseQuery = buildGmailSearchQuery(config) - - let timeConstraint = '' - - if (config.lastCheckedTimestamp) { - const lastCheckedTime = new Date(config.lastCheckedTimestamp) - const now = new Date() - const minutesSinceLastCheck = (now.getTime() - lastCheckedTime.getTime()) / (60 * 1000) - - if (minutesSinceLastCheck < 60) { - const bufferSeconds = Math.max(1 * 60 * 2, 180) - - const cutoffTime = new Date(lastCheckedTime.getTime() - bufferSeconds * 1000) - - const timestamp = Math.floor(cutoffTime.getTime() / 1000) - - timeConstraint = ` after:${timestamp}` - } else if (minutesSinceLastCheck < 24 * 60) { - const hours = Math.ceil(minutesSinceLastCheck / 60) + 1 // Round up and add 1 hour buffer - timeConstraint = ` newer_than:${hours}h` - } else { - const days = Math.min(Math.ceil(minutesSinceLastCheck / (24 * 60)), 7) + 1 - timeConstraint = ` newer_than:${days}d` - } - } else { - timeConstraint = ' newer_than:1d' - } - - const query = `${baseQuery}${timeConstraint}` - - const searchUrl = `https://gmail.googleapis.com/gmail/v1/users/me/messages?q=${encodeURIComponent(query)}&maxResults=${config.maxEmailsPerPoll || 25}` - - const searchResponse = await fetch(searchUrl, { - headers: { - Authorization: `Bearer ${accessToken}`, - }, - }) - - if (!searchResponse.ok) { - const errorData = await searchResponse.json() - logger.error(`[${requestId}] Gmail search API error:`, { - status: searchResponse.status, - statusText: searchResponse.statusText, - query: query, - error: errorData, - }) - throw new Error( - `Gmail API error: ${searchResponse.status} ${searchResponse.statusText} - ${JSON.stringify(errorData)}` - ) - } - - const searchData = await searchResponse.json() - - if (!searchData.messages || !searchData.messages.length) { - logger.info(`[${requestId}] No emails found matching query: ${query}`) - return { emails: [], latestHistoryId: config.historyId } - } - - const idsToFetch = searchData.messages.slice(0, config.maxEmailsPerPoll || 25) - let latestHistoryId = config.historyId - - logger.info( - `[${requestId}] Processing ${idsToFetch.length} emails from search API (total matches: ${searchData.messages.length})` - ) - - const emailPromises = idsToFetch.map(async (message: { id: string }) => { - return getEmailDetails(accessToken, message.id) - }) - - const emailResults = await Promise.allSettled(emailPromises) - const rejected = emailResults.filter((r) => r.status === 'rejected') - if (rejected.length > 0) { - logger.warn(`[${requestId}] Failed to fetch ${rejected.length} email details`) - } - const emails = emailResults - .filter( - (result): result is PromiseFulfilledResult => result.status === 'fulfilled' - ) - .map((result) => result.value) - - if (emails.length > 0 && emails[0].historyId) { - latestHistoryId = emails[0].historyId - } - - return { emails, latestHistoryId } - } catch (error) { - const errorMessage = error instanceof Error ? error.message : 'Unknown error' - logger.error(`[${requestId}] Error searching emails:`, errorMessage) - throw error - } -} - -async function getEmailDetails(accessToken: string, messageId: string): Promise { - const messageUrl = `https://gmail.googleapis.com/gmail/v1/users/me/messages/${messageId}?format=full` - - const messageResponse = await fetch(messageUrl, { - headers: { - Authorization: `Bearer ${accessToken}`, - }, - }) - - if (!messageResponse.ok) { - const errorData = await messageResponse.json().catch(() => ({})) - throw new Error( - `Failed to fetch email details for message ${messageId}: ${messageResponse.status} ${messageResponse.statusText} - ${JSON.stringify(errorData)}` - ) - } - - return await messageResponse.json() -} - -function filterEmailsByLabels(emails: GmailEmail[], config: GmailWebhookConfig): GmailEmail[] { - if (!config.labelIds.length) { - return emails - } - - return emails.filter((email) => { - const emailLabels = email.labelIds || [] - const hasMatchingLabel = config.labelIds.some((configLabel) => - emailLabels.includes(configLabel) - ) - - return config.labelFilterBehavior === 'INCLUDE' - ? hasMatchingLabel // Include emails with matching labels - : !hasMatchingLabel // Exclude emails with matching labels - }) -} - -async function processEmails( - emails: any[], - webhookData: any, - config: GmailWebhookConfig, - accessToken: string, - requestId: string -) { - let processedCount = 0 - let failedCount = 0 - - for (const email of emails) { - try { - await pollingIdempotency.executeWithIdempotency( - 'gmail', - `${webhookData.id}:${email.id}`, - async () => { - const headers: Record = {} - if (email.payload?.headers) { - for (const header of email.payload.headers) { - headers[header.name.toLowerCase()] = header.value - } - } - - let textContent = '' - let htmlContent = '' - - const extractContent = (part: any) => { - if (!part) return - - if (part.mimeType === 'text/plain' && part.body?.data) { - textContent = Buffer.from(part.body.data, 'base64').toString('utf-8') - } else if (part.mimeType === 'text/html' && part.body?.data) { - htmlContent = Buffer.from(part.body.data, 'base64').toString('utf-8') - } - - if (part.parts && Array.isArray(part.parts)) { - for (const subPart of part.parts) { - extractContent(subPart) - } - } - } - - if (email.payload) { - extractContent(email.payload) - } - - let date: string | null = null - if (headers.date) { - try { - date = new Date(headers.date).toISOString() - } catch (_e) { - // Keep date as null if parsing fails - } - } else if (email.internalDate) { - date = new Date(Number.parseInt(email.internalDate)).toISOString() - } - - let attachments: GmailAttachment[] = [] - const hasAttachments = email.payload - ? extractAttachmentInfo(email.payload).length > 0 - : false - - if (config.includeAttachments && hasAttachments && email.payload) { - try { - const attachmentInfo = extractAttachmentInfo(email.payload) - attachments = await downloadAttachments(email.id, attachmentInfo, accessToken) - } catch (error) { - logger.error( - `[${requestId}] Error downloading attachments for email ${email.id}:`, - error - ) - } - } - - const simplifiedEmail: SimplifiedEmail = { - id: email.id, - threadId: email.threadId, - subject: headers.subject || '[No Subject]', - from: headers.from || '', - to: headers.to || '', - cc: headers.cc || '', - date: date, - bodyText: textContent, - bodyHtml: htmlContent, - labels: email.labelIds || [], - hasAttachments, - attachments, - } - - const payload: GmailWebhookPayload = { - email: simplifiedEmail, - timestamp: new Date().toISOString(), - ...(config.includeRawEmail ? { rawEmail: email } : {}), - } - - const webhookUrl = `${getInternalApiBaseUrl()}/api/webhooks/trigger/${webhookData.path}` - - const response = await fetch(webhookUrl, { - method: 'POST', - headers: { - 'Content-Type': 'application/json', - 'User-Agent': 'Sim/1.0', - }, - body: JSON.stringify(payload), - }) - - if (!response.ok) { - const errorText = await response.text() - logger.error( - `[${requestId}] Failed to trigger webhook for email ${email.id}:`, - response.status, - errorText - ) - throw new Error(`Webhook request failed: ${response.status} - ${errorText}`) - } - - if (config.markAsRead) { - await markEmailAsRead(accessToken, email.id) - } - - return { - emailId: email.id, - webhookStatus: response.status, - processed: true, - } - } - ) - - logger.info( - `[${requestId}] Successfully processed email ${email.id} for webhook ${webhookData.id}` - ) - processedCount++ - } catch (error) { - const errorMessage = error instanceof Error ? error.message : 'Unknown error' - logger.error(`[${requestId}] Error processing email ${email.id}:`, errorMessage) - failedCount++ - } - } - - return { processedCount, failedCount } -} - -async function markEmailAsRead(accessToken: string, messageId: string) { - const modifyUrl = `https://gmail.googleapis.com/gmail/v1/users/me/messages/${messageId}/modify` - - try { - const response = await fetch(modifyUrl, { - method: 'POST', - headers: { - Authorization: `Bearer ${accessToken}`, - 'Content-Type': 'application/json', - }, - body: JSON.stringify({ - removeLabelIds: ['UNREAD'], - }), - }) - - if (!response.ok) { - await response.body?.cancel().catch(() => {}) - throw new Error( - `Failed to mark email ${messageId} as read: ${response.status} ${response.statusText}` - ) - } - } catch (error) { - logger.error(`Error marking email ${messageId} as read:`, error) - throw error - } -} - -async function updateWebhookLastChecked(webhookId: string, timestamp: string, historyId?: string) { - try { - const result = await db.select().from(webhook).where(eq(webhook.id, webhookId)) - const existingConfig = (result[0]?.providerConfig as Record) || {} - await db - .update(webhook) - .set({ - providerConfig: { - ...existingConfig, - lastCheckedTimestamp: timestamp, - ...(historyId ? { historyId } : {}), - } as any, - updatedAt: new Date(), - }) - .where(eq(webhook.id, webhookId)) - } catch (error) { - logger.error(`Error updating webhook ${webhookId} last checked timestamp:`, error) - } -} diff --git a/apps/sim/lib/webhooks/polling/gmail.ts b/apps/sim/lib/webhooks/polling/gmail.ts new file mode 100644 index 00000000000..32d6d25a343 --- /dev/null +++ b/apps/sim/lib/webhooks/polling/gmail.ts @@ -0,0 +1,553 @@ +import { pollingIdempotency } from '@/lib/core/idempotency/service' +import type { PollingProviderHandler, PollWebhookContext } from '@/lib/webhooks/polling/types' +import { + markWebhookFailed, + markWebhookSuccess, + resolveOAuthCredential, + updateWebhookProviderConfig, +} from '@/lib/webhooks/polling/utils' +import { processPolledWebhookEvent } from '@/lib/webhooks/processor' +import type { GmailAttachment } from '@/tools/gmail/types' +import { downloadAttachments, extractAttachmentInfo } from '@/tools/gmail/utils' + +interface GmailWebhookConfig { + labelIds: string[] + labelFilterBehavior: 'INCLUDE' | 'EXCLUDE' + markAsRead: boolean + searchQuery?: string + maxEmailsPerPoll?: number + lastCheckedTimestamp?: string + historyId?: string + includeAttachments?: boolean + includeRawEmail?: boolean +} + +interface GmailEmail { + id: string + threadId: string + historyId?: string + labelIds?: string[] + payload?: Record + snippet?: string + internalDate?: string +} + +export interface SimplifiedEmail { + id: string + threadId: string + subject: string + from: string + to: string + cc: string + date: string | null + bodyText: string + bodyHtml: string + labels: string[] + hasAttachments: boolean + attachments: GmailAttachment[] +} + +export interface GmailWebhookPayload { + email: SimplifiedEmail + timestamp: string + rawEmail?: GmailEmail +} + +export const gmailPollingHandler: PollingProviderHandler = { + provider: 'gmail', + label: 'Gmail', + + async pollWebhook(ctx: PollWebhookContext): Promise<'success' | 'failure'> { + const { webhookData, workflowData, requestId, logger } = ctx + const webhookId = webhookData.id + + try { + const accessToken = await resolveOAuthCredential( + webhookData, + 'google-email', + requestId, + logger + ) + + const config = webhookData.providerConfig as unknown as GmailWebhookConfig + const now = new Date() + + const { emails, latestHistoryId } = await fetchNewEmails( + accessToken, + config, + requestId, + logger + ) + + if (!emails || !emails.length) { + await updateWebhookProviderConfig( + webhookId, + { + lastCheckedTimestamp: now.toISOString(), + ...(latestHistoryId || config.historyId + ? { historyId: latestHistoryId || config.historyId } + : {}), + }, + logger + ) + await markWebhookSuccess(webhookId, logger) + logger.info(`[${requestId}] No new emails found for webhook ${webhookId}`) + return 'success' + } + + logger.info(`[${requestId}] Found ${emails.length} new emails for webhook ${webhookId}`) + + const { processedCount, failedCount } = await processEmails( + emails, + webhookData, + workflowData, + config, + accessToken, + requestId, + logger + ) + + await updateWebhookProviderConfig( + webhookId, + { + lastCheckedTimestamp: now.toISOString(), + ...(latestHistoryId || config.historyId + ? { historyId: latestHistoryId || config.historyId } + : {}), + }, + logger + ) + + if (failedCount > 0 && processedCount === 0) { + await markWebhookFailed(webhookId, logger) + logger.warn( + `[${requestId}] All ${failedCount} emails failed to process for webhook ${webhookId}` + ) + return 'failure' + } + + await markWebhookSuccess(webhookId, logger) + logger.info( + `[${requestId}] Successfully processed ${processedCount} emails for webhook ${webhookId}${failedCount > 0 ? ` (${failedCount} failed)` : ''}` + ) + return 'success' + } catch (error) { + logger.error(`[${requestId}] Error processing Gmail webhook ${webhookId}:`, error) + await markWebhookFailed(webhookId, logger) + return 'failure' + } + }, +} + +async function fetchNewEmails( + accessToken: string, + config: GmailWebhookConfig, + requestId: string, + logger: ReturnType +) { + try { + const useHistoryApi = !!config.historyId + let emails: GmailEmail[] = [] + let latestHistoryId = config.historyId + + if (useHistoryApi) { + const historyUrl = `https://gmail.googleapis.com/gmail/v1/users/me/history?startHistoryId=${config.historyId}` + + const historyResponse = await fetch(historyUrl, { + headers: { Authorization: `Bearer ${accessToken}` }, + }) + + if (!historyResponse.ok) { + const errorData = await historyResponse.json() + logger.error(`[${requestId}] Gmail history API error:`, { + status: historyResponse.status, + statusText: historyResponse.statusText, + error: errorData, + }) + + logger.info(`[${requestId}] Falling back to search API after history API failure`) + return searchEmails(accessToken, config, requestId, logger) + } + + const historyData = await historyResponse.json() + + if (!historyData.history || !historyData.history.length) { + return { emails: [], latestHistoryId } + } + + if (historyData.historyId) { + latestHistoryId = historyData.historyId + } + + const messageIds = new Set() + for (const history of historyData.history) { + if (history.messagesAdded) { + for (const messageAdded of history.messagesAdded) { + messageIds.add(messageAdded.message.id) + } + } + } + + if (messageIds.size === 0) { + return { emails: [], latestHistoryId } + } + + const sortedIds = [...messageIds].sort().reverse() + const idsToFetch = sortedIds.slice(0, config.maxEmailsPerPoll || 25) + logger.info(`[${requestId}] Processing ${idsToFetch.length} emails from history API`) + + const emailResults = await Promise.allSettled( + idsToFetch.map((messageId) => getEmailDetails(accessToken, messageId)) + ) + const rejected = emailResults.filter((r) => r.status === 'rejected') + if (rejected.length > 0) { + logger.warn(`[${requestId}] Failed to fetch ${rejected.length} email details`) + } + emails = emailResults + .filter( + (result): result is PromiseFulfilledResult => result.status === 'fulfilled' + ) + .map((result) => result.value) + + emails = filterEmailsByLabels(emails, config) + } else { + return searchEmails(accessToken, config, requestId, logger) + } + + return { emails, latestHistoryId } + } catch (error) { + const errorMessage = error instanceof Error ? error.message : 'Unknown error' + logger.error(`[${requestId}] Error fetching new emails:`, errorMessage) + throw error + } +} + +function buildGmailSearchQuery(config: { + labelIds?: string[] + labelFilterBehavior?: 'INCLUDE' | 'EXCLUDE' + searchQuery?: string +}): string { + let labelQuery = '' + if (config.labelIds && config.labelIds.length > 0) { + const labelParts = config.labelIds.map((label) => `label:${label}`).join(' OR ') + labelQuery = + config.labelFilterBehavior === 'INCLUDE' + ? config.labelIds.length > 1 + ? `(${labelParts})` + : labelParts + : config.labelIds.length > 1 + ? `-(${labelParts})` + : `-${labelParts}` + } + + let searchQueryPart = '' + if (config.searchQuery?.trim()) { + searchQueryPart = config.searchQuery.trim() + if (searchQueryPart.includes(' OR ') || searchQueryPart.includes(' AND ')) { + searchQueryPart = `(${searchQueryPart})` + } + } + + let baseQuery = '' + if (labelQuery && searchQueryPart) { + baseQuery = `${labelQuery} ${searchQueryPart}` + } else if (searchQueryPart) { + baseQuery = searchQueryPart + } else if (labelQuery) { + baseQuery = labelQuery + } else { + baseQuery = 'in:inbox' + } + + return baseQuery +} + +async function searchEmails( + accessToken: string, + config: GmailWebhookConfig, + requestId: string, + logger: ReturnType +) { + try { + const baseQuery = buildGmailSearchQuery(config) + let timeConstraint = '' + + if (config.lastCheckedTimestamp) { + const lastCheckedTime = new Date(config.lastCheckedTimestamp) + const now = new Date() + const minutesSinceLastCheck = (now.getTime() - lastCheckedTime.getTime()) / (60 * 1000) + + if (minutesSinceLastCheck < 60) { + const bufferSeconds = Math.max(1 * 60 * 2, 180) + const cutoffTime = new Date(lastCheckedTime.getTime() - bufferSeconds * 1000) + const timestamp = Math.floor(cutoffTime.getTime() / 1000) + timeConstraint = ` after:${timestamp}` + } else if (minutesSinceLastCheck < 24 * 60) { + const hours = Math.ceil(minutesSinceLastCheck / 60) + 1 + timeConstraint = ` newer_than:${hours}h` + } else { + const days = Math.min(Math.ceil(minutesSinceLastCheck / (24 * 60)), 7) + 1 + timeConstraint = ` newer_than:${days}d` + } + } else { + timeConstraint = ' newer_than:1d' + } + + const query = `${baseQuery}${timeConstraint}` + const searchUrl = `https://gmail.googleapis.com/gmail/v1/users/me/messages?q=${encodeURIComponent(query)}&maxResults=${config.maxEmailsPerPoll || 25}` + + const searchResponse = await fetch(searchUrl, { + headers: { Authorization: `Bearer ${accessToken}` }, + }) + + if (!searchResponse.ok) { + const errorData = await searchResponse.json() + logger.error(`[${requestId}] Gmail search API error:`, { + status: searchResponse.status, + statusText: searchResponse.statusText, + query, + error: errorData, + }) + throw new Error( + `Gmail API error: ${searchResponse.status} ${searchResponse.statusText} - ${JSON.stringify(errorData)}` + ) + } + + const searchData = await searchResponse.json() + + if (!searchData.messages || !searchData.messages.length) { + logger.info(`[${requestId}] No emails found matching query: ${query}`) + return { emails: [], latestHistoryId: config.historyId } + } + + const idsToFetch = searchData.messages.slice(0, config.maxEmailsPerPoll || 25) + let latestHistoryId = config.historyId + + logger.info( + `[${requestId}] Processing ${idsToFetch.length} emails from search API (total matches: ${searchData.messages.length})` + ) + + const emailResults = await Promise.allSettled( + idsToFetch.map((message: { id: string }) => getEmailDetails(accessToken, message.id)) + ) + const rejected = emailResults.filter((r) => r.status === 'rejected') + if (rejected.length > 0) { + logger.warn(`[${requestId}] Failed to fetch ${rejected.length} email details`) + } + const emails = emailResults + .filter( + (result): result is PromiseFulfilledResult => result.status === 'fulfilled' + ) + .map((result) => result.value) + + if (emails.length > 0 && emails[0].historyId) { + latestHistoryId = emails[0].historyId + } + + return { emails, latestHistoryId } + } catch (error) { + const errorMessage = error instanceof Error ? error.message : 'Unknown error' + logger.error(`[${requestId}] Error searching emails:`, errorMessage) + throw error + } +} + +async function getEmailDetails(accessToken: string, messageId: string): Promise { + const messageUrl = `https://gmail.googleapis.com/gmail/v1/users/me/messages/${messageId}?format=full` + + const messageResponse = await fetch(messageUrl, { + headers: { Authorization: `Bearer ${accessToken}` }, + }) + + if (!messageResponse.ok) { + const errorData = await messageResponse.json().catch(() => ({})) + throw new Error( + `Failed to fetch email details for message ${messageId}: ${messageResponse.status} ${messageResponse.statusText} - ${JSON.stringify(errorData)}` + ) + } + + return await messageResponse.json() +} + +function filterEmailsByLabels(emails: GmailEmail[], config: GmailWebhookConfig): GmailEmail[] { + if (!config.labelIds.length) { + return emails + } + + return emails.filter((email) => { + const emailLabels = email.labelIds || [] + const hasMatchingLabel = config.labelIds.some((configLabel) => + emailLabels.includes(configLabel) + ) + return config.labelFilterBehavior === 'INCLUDE' ? hasMatchingLabel : !hasMatchingLabel + }) +} + +async function processEmails( + emails: GmailEmail[], + webhookData: PollWebhookContext['webhookData'], + workflowData: PollWebhookContext['workflowData'], + config: GmailWebhookConfig, + accessToken: string, + requestId: string, + logger: ReturnType +) { + let processedCount = 0 + let failedCount = 0 + + for (const email of emails) { + try { + await pollingIdempotency.executeWithIdempotency( + 'gmail', + `${webhookData.id}:${email.id}`, + async () => { + const headers: Record = {} + const payload = email.payload as Record | undefined + if (payload?.headers && Array.isArray(payload.headers)) { + for (const header of payload.headers as { name: string; value: string }[]) { + headers[header.name.toLowerCase()] = header.value + } + } + + let textContent = '' + let htmlContent = '' + + const extractContent = (part: Record) => { + if (!part) return + + if (part.mimeType === 'text/plain') { + const body = part.body as { data?: string } | undefined + if (body?.data) { + textContent = Buffer.from(body.data, 'base64').toString('utf-8') + } + } else if (part.mimeType === 'text/html') { + const body = part.body as { data?: string } | undefined + if (body?.data) { + htmlContent = Buffer.from(body.data, 'base64').toString('utf-8') + } + } + + if (part.parts && Array.isArray(part.parts)) { + for (const subPart of part.parts) { + extractContent(subPart as Record) + } + } + } + + if (payload) { + extractContent(payload) + } + + let date: string | null = null + if (headers.date) { + try { + date = new Date(headers.date).toISOString() + } catch (_e) { + // Keep date as null if parsing fails + } + } else if (email.internalDate) { + date = new Date(Number.parseInt(email.internalDate)).toISOString() + } + + let attachments: GmailAttachment[] = [] + const hasAttachments = payload ? extractAttachmentInfo(payload).length > 0 : false + + if (config.includeAttachments && hasAttachments && payload) { + try { + const attachmentInfo = extractAttachmentInfo(payload) + attachments = await downloadAttachments(email.id, attachmentInfo, accessToken) + } catch (error) { + logger.error( + `[${requestId}] Error downloading attachments for email ${email.id}:`, + error + ) + } + } + + const simplifiedEmail: SimplifiedEmail = { + id: email.id, + threadId: email.threadId, + subject: headers.subject || '[No Subject]', + from: headers.from || '', + to: headers.to || '', + cc: headers.cc || '', + date, + bodyText: textContent, + bodyHtml: htmlContent, + labels: email.labelIds || [], + hasAttachments, + attachments, + } + + const webhookPayload: GmailWebhookPayload = { + email: simplifiedEmail, + timestamp: new Date().toISOString(), + ...(config.includeRawEmail ? { rawEmail: email } : {}), + } + + const result = await processPolledWebhookEvent( + webhookData, + workflowData, + webhookPayload as unknown as Record, + requestId + ) + + if (!result.success) { + logger.error( + `[${requestId}] Failed to process webhook for email ${email.id}:`, + result.statusCode, + result.error + ) + throw new Error(`Webhook processing failed (${result.statusCode}): ${result.error}`) + } + + if (config.markAsRead) { + await markEmailAsRead(accessToken, email.id, logger) + } + + return { emailId: email.id, processed: true } + } + ) + + logger.info( + `[${requestId}] Successfully processed email ${email.id} for webhook ${webhookData.id}` + ) + processedCount++ + } catch (error) { + const errorMessage = error instanceof Error ? error.message : 'Unknown error' + logger.error(`[${requestId}] Error processing email ${email.id}:`, errorMessage) + failedCount++ + } + } + + return { processedCount, failedCount } +} + +async function markEmailAsRead( + accessToken: string, + messageId: string, + logger: ReturnType +) { + const modifyUrl = `https://gmail.googleapis.com/gmail/v1/users/me/messages/${messageId}/modify` + + try { + const response = await fetch(modifyUrl, { + method: 'POST', + headers: { + Authorization: `Bearer ${accessToken}`, + 'Content-Type': 'application/json', + }, + body: JSON.stringify({ removeLabelIds: ['UNREAD'] }), + }) + + if (!response.ok) { + await response.body?.cancel().catch(() => {}) + throw new Error( + `Failed to mark email ${messageId} as read: ${response.status} ${response.statusText}` + ) + } + } catch (error) { + logger.error(`Error marking email ${messageId} as read:`, error) + throw error + } +} diff --git a/apps/sim/lib/webhooks/imap-polling-service.ts b/apps/sim/lib/webhooks/polling/imap.ts similarity index 57% rename from apps/sim/lib/webhooks/imap-polling-service.ts rename to apps/sim/lib/webhooks/polling/imap.ts index 73219f72353..e0763b7b0ab 100644 --- a/apps/sim/lib/webhooks/imap-polling-service.ts +++ b/apps/sim/lib/webhooks/polling/imap.ts @@ -1,19 +1,14 @@ -import { db } from '@sim/db' -import { webhook, workflow, workflowDeploymentVersion } from '@sim/db/schema' -import { createLogger } from '@sim/logger' -import type { InferSelectModel } from 'drizzle-orm' -import { and, eq, isNull, or, sql } from 'drizzle-orm' import type { FetchMessageObject, MailboxLockObject } from 'imapflow' import { ImapFlow } from 'imapflow' import { pollingIdempotency } from '@/lib/core/idempotency/service' import { validateDatabaseHost } from '@/lib/core/security/input-validation.server' -import { getInternalApiBaseUrl } from '@/lib/core/utils/urls' -import { generateShortId } from '@/lib/core/utils/uuid' -import { MAX_CONSECUTIVE_FAILURES } from '@/triggers/constants' - -const logger = createLogger('ImapPollingService') - -type WebhookRecord = InferSelectModel +import type { PollingProviderHandler, PollWebhookContext } from '@/lib/webhooks/polling/types' +import { + markWebhookFailed, + markWebhookSuccess, + updateWebhookProviderConfig, +} from '@/lib/webhooks/polling/utils' +import { processPolledWebhookEvent } from '@/lib/webhooks/processor' interface ImapWebhookConfig { host: string @@ -21,13 +16,13 @@ interface ImapWebhookConfig { secure: boolean username: string password: string - mailbox: string | string[] // Can be single mailbox or array of mailboxes + mailbox: string | string[] searchCriteria: string markAsRead: boolean includeAttachments: boolean lastProcessedUid?: number - lastProcessedUidByMailbox?: Record // Track UID per mailbox for multi-mailbox - lastCheckedTimestamp?: string // ISO timestamp of last successful poll + lastProcessedUidByMailbox?: Record + lastCheckedTimestamp?: string maxEmailsPerPoll?: number } @@ -69,206 +64,112 @@ export interface ImapWebhookPayload { timestamp: string } -async function markWebhookFailed(webhookId: string) { - try { - const result = await db - .update(webhook) - .set({ - failedCount: sql`COALESCE(${webhook.failedCount}, 0) + 1`, - lastFailedAt: new Date(), - updatedAt: new Date(), - }) - .where(eq(webhook.id, webhookId)) - .returning({ failedCount: webhook.failedCount }) - - const newFailedCount = result[0]?.failedCount || 0 - const shouldDisable = newFailedCount >= MAX_CONSECUTIVE_FAILURES - - if (shouldDisable) { - await db - .update(webhook) - .set({ - isActive: false, - updatedAt: new Date(), - }) - .where(eq(webhook.id, webhookId)) - - logger.warn( - `Webhook ${webhookId} auto-disabled after ${MAX_CONSECUTIVE_FAILURES} consecutive failures` - ) - } - } catch (err) { - logger.error(`Failed to mark webhook ${webhookId} as failed:`, err) - } -} - -async function markWebhookSuccess(webhookId: string) { - try { - await db - .update(webhook) - .set({ - failedCount: 0, - updatedAt: new Date(), - }) - .where(eq(webhook.id, webhookId)) - } catch (err) { - logger.error(`Failed to mark webhook ${webhookId} as successful:`, err) - } -} - -export async function pollImapWebhooks() { - logger.info('Starting IMAP webhook polling') - - try { - const activeWebhooksResult = await db - .select({ webhook }) - .from(webhook) - .innerJoin(workflow, eq(webhook.workflowId, workflow.id)) - .leftJoin( - workflowDeploymentVersion, - and( - eq(workflowDeploymentVersion.workflowId, workflow.id), - eq(workflowDeploymentVersion.isActive, true) - ) - ) - .where( - and( - eq(webhook.provider, 'imap'), - eq(webhook.isActive, true), - eq(workflow.isDeployed, true), - or( - eq(webhook.deploymentVersionId, workflowDeploymentVersion.id), - and(isNull(workflowDeploymentVersion.id), isNull(webhook.deploymentVersionId)) - ) - ) - ) - - const activeWebhooks = activeWebhooksResult.map((r) => r.webhook) - - if (!activeWebhooks.length) { - logger.info('No active IMAP webhooks found') - return { total: 0, successful: 0, failed: 0, details: [] } - } - - logger.info(`Found ${activeWebhooks.length} active IMAP webhooks`) +export const imapPollingHandler: PollingProviderHandler = { + provider: 'imap', + label: 'IMAP', - const CONCURRENCY = 5 + async pollWebhook(ctx: PollWebhookContext): Promise<'success' | 'failure'> { + const { webhookData, workflowData, requestId, logger } = ctx + const webhookId = webhookData.id - const running: Promise[] = [] - let successCount = 0 - let failureCount = 0 + try { + const config = webhookData.providerConfig as unknown as ImapWebhookConfig - const enqueue = async (webhookData: (typeof activeWebhooks)[number]) => { - const webhookId = webhookData.id - const requestId = generateShortId() + if (!config.host || !config.username || !config.password) { + logger.error(`[${requestId}] Missing IMAP credentials for webhook ${webhookId}`) + await markWebhookFailed(webhookId, logger) + return 'failure' + } - try { - const config = webhookData.providerConfig as unknown as ImapWebhookConfig + const hostValidation = await validateDatabaseHost(config.host, 'host') + if (!hostValidation.isValid) { + logger.error( + `[${requestId}] IMAP host validation failed for webhook ${webhookId}: ${hostValidation.error}` + ) + await markWebhookFailed(webhookId, logger) + return 'failure' + } - if (!config.host || !config.username || !config.password) { - logger.error(`[${requestId}] Missing IMAP credentials for webhook ${webhookId}`) - await markWebhookFailed(webhookId) - failureCount++ - return - } + const { emails, latestUidByMailbox } = await fetchNewEmails( + config, + requestId, + hostValidation.resolvedIP!, + logger + ) + const pollTimestamp = new Date().toISOString() - const hostValidation = await validateDatabaseHost(config.host, 'host') - if (!hostValidation.isValid) { - logger.error( - `[${requestId}] IMAP host validation failed for webhook ${webhookId}: ${hostValidation.error}` - ) - await markWebhookFailed(webhookId) - failureCount++ - return - } + if (!emails || !emails.length) { + await updateImapState(webhookId, latestUidByMailbox, pollTimestamp, config, logger) + await markWebhookSuccess(webhookId, logger) + logger.info(`[${requestId}] No new emails found for webhook ${webhookId}`) + return 'success' + } - const fetchResult = await fetchNewEmails(config, requestId, hostValidation.resolvedIP!) - const { emails, latestUidByMailbox } = fetchResult - const pollTimestamp = new Date().toISOString() + logger.info(`[${requestId}] Found ${emails.length} new emails for webhook ${webhookId}`) - if (!emails || !emails.length) { - await updateWebhookLastProcessedUids(webhookId, latestUidByMailbox, pollTimestamp) - await markWebhookSuccess(webhookId) - logger.info(`[${requestId}] No new emails found for webhook ${webhookId}`) - successCount++ - return - } + const { processedCount, failedCount } = await processEmails( + emails, + webhookData, + workflowData, + config, + requestId, + hostValidation.resolvedIP!, + logger + ) - logger.info(`[${requestId}] Found ${emails.length} new emails for webhook ${webhookId}`) + await updateImapState(webhookId, latestUidByMailbox, pollTimestamp, config, logger) - const { processedCount, failedCount: emailFailedCount } = await processEmails( - emails, - webhookData, - config, - requestId, - hostValidation.resolvedIP! + if (failedCount > 0 && processedCount === 0) { + await markWebhookFailed(webhookId, logger) + logger.warn( + `[${requestId}] All ${failedCount} emails failed to process for webhook ${webhookId}` ) - - await updateWebhookLastProcessedUids(webhookId, latestUidByMailbox, pollTimestamp) - - if (emailFailedCount > 0 && processedCount === 0) { - await markWebhookFailed(webhookId) - failureCount++ - logger.warn( - `[${requestId}] All ${emailFailedCount} emails failed to process for webhook ${webhookId}` - ) - } else { - await markWebhookSuccess(webhookId) - successCount++ - logger.info( - `[${requestId}] Successfully processed ${processedCount} emails for webhook ${webhookId}${emailFailedCount > 0 ? ` (${emailFailedCount} failed)` : ''}` - ) - } - } catch (error) { - logger.error(`[${requestId}] Error processing IMAP webhook ${webhookId}:`, error) - await markWebhookFailed(webhookId) - failureCount++ + return 'failure' } - } - - for (const webhookData of activeWebhooks) { - const promise: Promise = enqueue(webhookData) - .catch((err) => { - logger.error('Unexpected error in webhook processing:', err) - failureCount++ - }) - .finally(() => { - // Self-remove from running array when completed - const idx = running.indexOf(promise) - if (idx !== -1) running.splice(idx, 1) - }) - - running.push(promise) - - if (running.length >= CONCURRENCY) { - await Promise.race(running) - } - } - await Promise.allSettled(running) - - const summary = { - total: activeWebhooks.length, - successful: successCount, - failed: failureCount, - details: [], + await markWebhookSuccess(webhookId, logger) + logger.info( + `[${requestId}] Successfully processed ${processedCount} emails for webhook ${webhookId}${failedCount > 0 ? ` (${failedCount} failed)` : ''}` + ) + return 'success' + } catch (error) { + logger.error(`[${requestId}] Error processing IMAP webhook ${webhookId}:`, error) + await markWebhookFailed(webhookId, logger) + return 'failure' } + }, +} - logger.info('IMAP polling completed', { - total: summary.total, - successful: summary.successful, - failed: summary.failed, - }) +async function updateImapState( + webhookId: string, + uidByMailbox: Record, + timestamp: string, + config: ImapWebhookConfig, + logger: ReturnType +) { + const existingUidByMailbox = config.lastProcessedUidByMailbox || {} + const mergedUidByMailbox = { ...existingUidByMailbox } - return summary - } catch (error) { - const errorMessage = error instanceof Error ? error.message : 'Unknown error' - logger.error('Error in IMAP polling service:', errorMessage) - throw error + for (const [mailbox, uid] of Object.entries(uidByMailbox)) { + mergedUidByMailbox[mailbox] = Math.max(uid, mergedUidByMailbox[mailbox] || 0) } + + await updateWebhookProviderConfig( + webhookId, + { + lastProcessedUidByMailbox: mergedUidByMailbox, + lastCheckedTimestamp: timestamp, + }, + logger + ) } -async function fetchNewEmails(config: ImapWebhookConfig, requestId: string, resolvedIP: string) { +async function fetchNewEmails( + config: ImapWebhookConfig, + requestId: string, + resolvedIP: string, + logger: ReturnType +) { const client = new ImapFlow({ host: resolvedIP, servername: config.host, @@ -278,15 +179,13 @@ async function fetchNewEmails(config: ImapWebhookConfig, requestId: string, reso user: config.username, pass: config.password, }, - tls: { - rejectUnauthorized: true, - }, + tls: { rejectUnauthorized: true }, logger: false, }) const emails: Array<{ uid: number - mailboxPath: string // Track which mailbox this email came from + mailboxPath: string envelope: FetchMessageObject['envelope'] bodyStructure: FetchMessageObject['bodyStructure'] source?: Buffer @@ -305,13 +204,12 @@ async function fetchNewEmails(config: ImapWebhookConfig, requestId: string, reso if (totalEmailsCollected >= maxEmails) break try { - const mailbox = await client.mailboxOpen(mailboxPath) + await client.mailboxOpen(mailboxPath) - // Parse search criteria - expects JSON object from UI - let searchCriteria: any = { unseen: true } + let searchCriteria: Record = { unseen: true } if (config.searchCriteria) { if (typeof config.searchCriteria === 'object') { - searchCriteria = config.searchCriteria + searchCriteria = config.searchCriteria as unknown as Record } else if (typeof config.searchCriteria === 'string') { try { searchCriteria = JSON.parse(config.searchCriteria) @@ -327,15 +225,11 @@ async function fetchNewEmails(config: ImapWebhookConfig, requestId: string, reso searchCriteria = { ...searchCriteria, uid: `${lastUidForMailbox + 1}:*` } } - // Add time-based filtering similar to Gmail - // If lastCheckedTimestamp exists, use it with 1 minute buffer - // If first poll (no timestamp), default to last 24 hours to avoid processing ALL unseen emails if (config.lastCheckedTimestamp) { const lastChecked = new Date(config.lastCheckedTimestamp) const bufferTime = new Date(lastChecked.getTime() - 60000) searchCriteria = { ...searchCriteria, since: bufferTime } } else { - // First poll: only get emails from last 24 hours to avoid overwhelming first run const oneDayAgo = new Date(Date.now() - 24 * 60 * 60 * 1000) searchCriteria = { ...searchCriteria, since: oneDayAgo } } @@ -344,15 +238,13 @@ async function fetchNewEmails(config: ImapWebhookConfig, requestId: string, reso try { const searchResult = await client.search(searchCriteria, { uid: true }) messageUids = searchResult === false ? [] : searchResult - } catch (searchError) { + } catch { continue } - if (messageUids.length === 0) { - continue - } + if (messageUids.length === 0) continue - messageUids.sort((a, b) => a - b) // Sort ascending to process oldest first + messageUids.sort((a, b) => a - b) const remainingSlots = maxEmails - totalEmailsCollected const uidsToProcess = messageUids.slice(0, remainingSlots) @@ -365,12 +257,7 @@ async function fetchNewEmails(config: ImapWebhookConfig, requestId: string, reso for await (const msg of client.fetch( uidsToProcess, - { - uid: true, - envelope: true, - bodyStructure: true, - source: true, - }, + { uid: true, envelope: true, bodyStructure: true, source: true }, { uid: true } )) { emails.push({ @@ -388,7 +275,6 @@ async function fetchNewEmails(config: ImapWebhookConfig, requestId: string, reso } await client.logout() - return { emails, latestUidByMailbox } } catch (error) { try { @@ -400,9 +286,6 @@ async function fetchNewEmails(config: ImapWebhookConfig, requestId: string, reso } } -/** - * Get the list of mailboxes to check based on config - */ function getMailboxesToCheck(config: ImapWebhookConfig): string[] { if (!config.mailbox || (Array.isArray(config.mailbox) && config.mailbox.length === 0)) { return ['INBOX'] @@ -488,7 +371,6 @@ function extractAttachmentsFromSource( bodyStructure: FetchMessageObject['bodyStructure'] ): ImapAttachment[] { const attachments: ImapAttachment[] = [] - if (!bodyStructure) return attachments const content = source.toString('utf-8') @@ -534,24 +416,13 @@ function extractAttachmentsFromSource( return attachments } -/** - * Checks if a body structure contains attachments by examining disposition - */ function hasAttachmentsInBodyStructure(structure: FetchMessageObject['bodyStructure']): boolean { if (!structure) return false - - if (structure.disposition === 'attachment') { - return true - } - - if (structure.disposition === 'inline' && structure.dispositionParameters?.filename) { - return true - } - + if (structure.disposition === 'attachment') return true + if (structure.disposition === 'inline' && structure.dispositionParameters?.filename) return true if (structure.childNodes && Array.isArray(structure.childNodes)) { return structure.childNodes.some((child) => hasAttachmentsInBodyStructure(child)) } - return false } @@ -563,10 +434,12 @@ async function processEmails( bodyStructure: FetchMessageObject['bodyStructure'] source?: Buffer }>, - webhookData: WebhookRecord, + webhookData: PollWebhookContext['webhookData'], + workflowData: PollWebhookContext['workflowData'], config: ImapWebhookConfig, requestId: string, - resolvedIP: string + resolvedIP: string, + logger: ReturnType ) { let processedCount = 0 let failedCount = 0 @@ -580,9 +453,7 @@ async function processEmails( user: config.username, pass: config.password, }, - tls: { - rejectUnauthorized: true, - }, + tls: { rejectUnauthorized: true }, logger: false, }) @@ -644,25 +515,20 @@ async function processEmails( timestamp: new Date().toISOString(), } - const webhookUrl = `${getInternalApiBaseUrl()}/api/webhooks/trigger/${webhookData.path}` - - const response = await fetch(webhookUrl, { - method: 'POST', - headers: { - 'Content-Type': 'application/json', - 'User-Agent': 'Sim/1.0', - }, - body: JSON.stringify(payload), - }) + const result = await processPolledWebhookEvent( + webhookData, + workflowData, + payload as unknown as Record, + requestId + ) - if (!response.ok) { - const errorText = await response.text() + if (!result.success) { logger.error( - `[${requestId}] Failed to trigger webhook for email ${email.uid}:`, - response.status, - errorText + `[${requestId}] Failed to process webhook for email ${email.uid}:`, + result.statusCode, + result.error ) - throw new Error(`Webhook request failed: ${response.status} - ${errorText}`) + throw new Error(`Webhook processing failed (${result.statusCode}): ${result.error}`) } if (config.markAsRead) { @@ -684,11 +550,7 @@ async function processEmails( } } - return { - emailUid: email.uid, - webhookStatus: response.status, - processed: true, - } + return { emailUid: email.uid, processed: true } } ) @@ -717,31 +579,3 @@ async function processEmails( return { processedCount, failedCount } } - -async function updateWebhookLastProcessedUids( - webhookId: string, - uidByMailbox: Record, - timestamp: string -) { - const result = await db.select().from(webhook).where(eq(webhook.id, webhookId)) - const existingConfig = (result[0]?.providerConfig as Record) || {} - - const existingUidByMailbox = existingConfig.lastProcessedUidByMailbox || {} - const mergedUidByMailbox = { ...existingUidByMailbox } - - for (const [mailbox, uid] of Object.entries(uidByMailbox)) { - mergedUidByMailbox[mailbox] = Math.max(uid, mergedUidByMailbox[mailbox] || 0) - } - - await db - .update(webhook) - .set({ - providerConfig: { - ...existingConfig, - lastProcessedUidByMailbox: mergedUidByMailbox, - lastCheckedTimestamp: timestamp, - } as any, - updatedAt: new Date(), - }) - .where(eq(webhook.id, webhookId)) -} diff --git a/apps/sim/lib/webhooks/polling/index.ts b/apps/sim/lib/webhooks/polling/index.ts new file mode 100644 index 00000000000..4d0ba2df0de --- /dev/null +++ b/apps/sim/lib/webhooks/polling/index.ts @@ -0,0 +1,9 @@ +export { pollProvider } from '@/lib/webhooks/polling/orchestrator' +export { getPollingHandler, VALID_POLLING_PROVIDERS } from '@/lib/webhooks/polling/registry' +export type { + PollingProviderHandler, + PollSummary, + PollWebhookContext, + WebhookRecord, + WorkflowRecord, +} from '@/lib/webhooks/polling/types' diff --git a/apps/sim/lib/webhooks/polling/orchestrator.ts b/apps/sim/lib/webhooks/polling/orchestrator.ts new file mode 100644 index 00000000000..508ff91972f --- /dev/null +++ b/apps/sim/lib/webhooks/polling/orchestrator.ts @@ -0,0 +1,46 @@ +import { createLogger } from '@sim/logger' +import { generateShortId } from '@/lib/core/utils/uuid' +import { getPollingHandler } from '@/lib/webhooks/polling/registry' +import type { PollSummary, WebhookRecord, WorkflowRecord } from '@/lib/webhooks/polling/types' +import { fetchActiveWebhooks, runWithConcurrency } from '@/lib/webhooks/polling/utils' + +/** Poll all active webhooks for a given provider. */ +export async function pollProvider(providerName: string): Promise { + const handler = getPollingHandler(providerName) + if (!handler) { + throw new Error(`Unknown polling provider: ${providerName}`) + } + + const logger = createLogger(`${handler.label}PollingService`) + logger.info(`Starting ${handler.label} webhook polling`) + + const activeWebhooks = await fetchActiveWebhooks(handler.provider) + if (!activeWebhooks.length) { + logger.info(`No active ${handler.label} webhooks found`) + return { total: 0, successful: 0, failed: 0 } + } + + logger.info(`Found ${activeWebhooks.length} active ${handler.label} webhooks`) + + const { successCount, failureCount } = await runWithConcurrency( + activeWebhooks, + async (entry) => { + const requestId = generateShortId() + return handler.pollWebhook({ + webhookData: entry.webhook as WebhookRecord, + workflowData: entry.workflow as WorkflowRecord, + requestId, + logger, + }) + }, + logger + ) + + const summary: PollSummary = { + total: activeWebhooks.length, + successful: successCount, + failed: failureCount, + } + logger.info(`${handler.label} polling completed`, summary) + return summary +} diff --git a/apps/sim/lib/webhooks/outlook-polling-service.ts b/apps/sim/lib/webhooks/polling/outlook.ts similarity index 50% rename from apps/sim/lib/webhooks/outlook-polling-service.ts rename to apps/sim/lib/webhooks/polling/outlook.ts index 7cf87f1707c..0eb4e248cb4 100644 --- a/apps/sim/lib/webhooks/outlook-polling-service.ts +++ b/apps/sim/lib/webhooks/polling/outlook.ts @@ -1,77 +1,17 @@ -import { db } from '@sim/db' -import { - account, - credentialSet, - webhook, - workflow, - workflowDeploymentVersion, -} from '@sim/db/schema' -import { createLogger } from '@sim/logger' -import { and, eq, isNull, or, sql } from 'drizzle-orm' import { htmlToText } from 'html-to-text' -import { isOrganizationOnTeamOrEnterprisePlan } from '@/lib/billing' import { pollingIdempotency } from '@/lib/core/idempotency' -import { getInternalApiBaseUrl } from '@/lib/core/utils/urls' -import { generateShortId } from '@/lib/core/utils/uuid' +import type { PollingProviderHandler, PollWebhookContext } from '@/lib/webhooks/polling/types' import { - getOAuthToken, - refreshAccessTokenIfNeeded, - resolveOAuthAccountId, -} from '@/app/api/auth/oauth/utils' -import { MAX_CONSECUTIVE_FAILURES } from '@/triggers/constants' - -const logger = createLogger('OutlookPollingService') - -async function markWebhookFailed(webhookId: string) { - try { - const result = await db - .update(webhook) - .set({ - failedCount: sql`COALESCE(${webhook.failedCount}, 0) + 1`, - lastFailedAt: new Date(), - updatedAt: new Date(), - }) - .where(eq(webhook.id, webhookId)) - .returning({ failedCount: webhook.failedCount }) - - const newFailedCount = result[0]?.failedCount || 0 - const shouldDisable = newFailedCount >= MAX_CONSECUTIVE_FAILURES - - if (shouldDisable) { - await db - .update(webhook) - .set({ - isActive: false, - updatedAt: new Date(), - }) - .where(eq(webhook.id, webhookId)) - - logger.warn( - `Webhook ${webhookId} auto-disabled after ${MAX_CONSECUTIVE_FAILURES} consecutive failures` - ) - } - } catch (err) { - logger.error(`Failed to mark webhook ${webhookId} as failed:`, err) - } -} - -async function markWebhookSuccess(webhookId: string) { - try { - await db - .update(webhook) - .set({ - failedCount: 0, // Reset on success - updatedAt: new Date(), - }) - .where(eq(webhook.id, webhookId)) - } catch (err) { - logger.error(`Failed to mark webhook ${webhookId} as successful:`, err) - } -} + markWebhookFailed, + markWebhookSuccess, + resolveOAuthCredential, + updateWebhookProviderConfig, +} from '@/lib/webhooks/polling/utils' +import { processPolledWebhookEvent } from '@/lib/webhooks/processor' interface OutlookWebhookConfig { credentialId: string - folderIds?: string[] // e.g., ['inbox', 'sent'] + folderIds?: string[] folderFilterBehavior?: 'INCLUDE' | 'EXCLUDE' markAsRead?: boolean maxEmailsPerPoll?: number @@ -145,10 +85,6 @@ export interface OutlookWebhookPayload { rawEmail?: OutlookEmail } -/** - * Convert HTML content to a readable plain-text representation. - * Keeps reasonable newlines and decodes common HTML entities. - */ function convertHtmlToPlainText(html: string): string { if (!html) return '' return htmlToText(html, { @@ -163,217 +99,78 @@ function convertHtmlToPlainText(html: string): string { }) } -export async function pollOutlookWebhooks() { - logger.info('Starting Outlook webhook polling') +export const outlookPollingHandler: PollingProviderHandler = { + provider: 'outlook', + label: 'Outlook', - try { - const activeWebhooksResult = await db - .select({ webhook }) - .from(webhook) - .innerJoin(workflow, eq(webhook.workflowId, workflow.id)) - .leftJoin( - workflowDeploymentVersion, - and( - eq(workflowDeploymentVersion.workflowId, workflow.id), - eq(workflowDeploymentVersion.isActive, true) - ) - ) - .where( - and( - eq(webhook.provider, 'outlook'), - eq(webhook.isActive, true), - eq(workflow.isDeployed, true), - or( - eq(webhook.deploymentVersionId, workflowDeploymentVersion.id), - and(isNull(workflowDeploymentVersion.id), isNull(webhook.deploymentVersionId)) - ) - ) - ) - - const activeWebhooks = activeWebhooksResult.map((r) => r.webhook) + async pollWebhook(ctx: PollWebhookContext): Promise<'success' | 'failure'> { + const { webhookData, workflowData, requestId, logger } = ctx + const webhookId = webhookData.id - if (!activeWebhooks.length) { - logger.info('No active Outlook webhooks found') - return { total: 0, successful: 0, failed: 0, details: [] } - } - - logger.info(`Found ${activeWebhooks.length} active Outlook webhooks`) - - const CONCURRENCY = 10 - const running: Promise[] = [] - let successCount = 0 - let failureCount = 0 - - const enqueue = async (webhookData: (typeof activeWebhooks)[number]) => { - const webhookId = webhookData.id - const requestId = generateShortId() - - try { - logger.info(`[${requestId}] Processing Outlook webhook: ${webhookId}`) - - const metadata = webhookData.providerConfig as any - const credentialId: string | undefined = metadata?.credentialId - const userId: string | undefined = metadata?.userId - const credentialSetId: string | undefined = webhookData.credentialSetId ?? undefined - - if (!credentialId && !userId) { - logger.error(`[${requestId}] Missing credentialId and userId for webhook ${webhookId}`) - await markWebhookFailed(webhookId) - failureCount++ - return - } - - if (credentialSetId) { - const [cs] = await db - .select({ organizationId: credentialSet.organizationId }) - .from(credentialSet) - .where(eq(credentialSet.id, credentialSetId)) - .limit(1) - - if (cs?.organizationId) { - const hasAccess = await isOrganizationOnTeamOrEnterprisePlan(cs.organizationId) - if (!hasAccess) { - logger.error( - `[${requestId}] Polling Group plan restriction: Your current plan does not support Polling Groups. Upgrade to Team or Enterprise to use this feature.`, - { - webhookId, - credentialSetId, - organizationId: cs.organizationId, - } - ) - await markWebhookFailed(webhookId) - failureCount++ - return - } - } - } - - let accessToken: string | null = null - if (credentialId) { - const resolved = await resolveOAuthAccountId(credentialId) - if (!resolved) { - logger.error( - `[${requestId}] Failed to resolve OAuth account for credential ${credentialId}, webhook ${webhookId}` - ) - await markWebhookFailed(webhookId) - failureCount++ - return - } - const rows = await db - .select() - .from(account) - .where(eq(account.id, resolved.accountId)) - .limit(1) - if (!rows.length) { - logger.error( - `[${requestId}] Credential ${credentialId} not found for webhook ${webhookId}` - ) - await markWebhookFailed(webhookId) - failureCount++ - return - } - const ownerUserId = rows[0].userId - accessToken = await refreshAccessTokenIfNeeded(resolved.accountId, ownerUserId, requestId) - } else if (userId) { - accessToken = await getOAuthToken(userId, 'outlook') - } - - if (!accessToken) { - logger.error( - `[${requestId}] Failed to get Outlook access token for webhook ${webhookId} (cred or fallback)` - ) - await markWebhookFailed(webhookId) - failureCount++ - return - } + try { + logger.info(`[${requestId}] Processing Outlook webhook: ${webhookId}`) - const config = webhookData.providerConfig as unknown as OutlookWebhookConfig + const accessToken = await resolveOAuthCredential(webhookData, 'outlook', requestId, logger) + const config = webhookData.providerConfig as unknown as OutlookWebhookConfig + const now = new Date() - const now = new Date() + const { emails } = await fetchNewOutlookEmails(accessToken, config, requestId, logger) - const fetchResult = await fetchNewOutlookEmails(accessToken, config, requestId) - const { emails } = fetchResult + if (!emails || !emails.length) { + await updateWebhookProviderConfig( + webhookId, + { lastCheckedTimestamp: now.toISOString() }, + logger + ) + await markWebhookSuccess(webhookId, logger) + logger.info(`[${requestId}] No new emails found for webhook ${webhookId}`) + return 'success' + } - if (!emails || !emails.length) { - await updateWebhookLastChecked(webhookId, now.toISOString()) - await markWebhookSuccess(webhookId) - logger.info(`[${requestId}] No new emails found for webhook ${webhookId}`) - successCount++ - return - } + logger.info(`[${requestId}] Found ${emails.length} emails for webhook ${webhookId}`) - logger.info(`[${requestId}] Found ${emails.length} emails for webhook ${webhookId}`) + const { processedCount, failedCount } = await processOutlookEmails( + emails, + webhookData, + workflowData, + config, + accessToken, + requestId, + logger + ) - logger.info(`[${requestId}] Processing ${emails.length} emails for webhook ${webhookId}`) + await updateWebhookProviderConfig( + webhookId, + { lastCheckedTimestamp: now.toISOString() }, + logger + ) - const { processedCount, failedCount } = await processOutlookEmails( - emails, - webhookData, - config, - accessToken, - requestId + if (failedCount > 0 && processedCount === 0) { + await markWebhookFailed(webhookId, logger) + logger.warn( + `[${requestId}] All ${failedCount} emails failed to process for webhook ${webhookId}` ) - - await updateWebhookLastChecked(webhookId, now.toISOString()) - - if (failedCount > 0 && processedCount === 0) { - await markWebhookFailed(webhookId) - failureCount++ - logger.warn( - `[${requestId}] All ${failedCount} emails failed to process for webhook ${webhookId}` - ) - } else { - await markWebhookSuccess(webhookId) - successCount++ - logger.info( - `[${requestId}] Successfully processed ${processedCount} emails for webhook ${webhookId}${failedCount > 0 ? ` (${failedCount} failed)` : ''}` - ) - } - } catch (error) { - logger.error(`[${requestId}] Error processing Outlook webhook ${webhookId}:`, error) - await markWebhookFailed(webhookId) - failureCount++ + return 'failure' } - } - - for (const webhookData of activeWebhooks) { - const promise: Promise = enqueue(webhookData) - .catch((err) => { - logger.error('Unexpected error in webhook processing:', err) - failureCount++ - }) - .finally(() => { - const idx = running.indexOf(promise) - if (idx !== -1) running.splice(idx, 1) - }) - - running.push(promise) - - if (running.length >= CONCURRENCY) { - await Promise.race(running) - } - } - await Promise.allSettled(running) - - logger.info(`Outlook polling completed: ${successCount} successful, ${failureCount} failed`) - - return { - total: activeWebhooks.length, - successful: successCount, - failed: failureCount, - details: [], + await markWebhookSuccess(webhookId, logger) + logger.info( + `[${requestId}] Successfully processed ${processedCount} emails for webhook ${webhookId}${failedCount > 0 ? ` (${failedCount} failed)` : ''}` + ) + return 'success' + } catch (error) { + logger.error(`[${requestId}] Error processing Outlook webhook ${webhookId}:`, error) + await markWebhookFailed(webhookId, logger) + return 'failure' } - } catch (error) { - logger.error('Error during Outlook webhook polling:', error) - throw error - } + }, } async function fetchNewOutlookEmails( accessToken: string, config: OutlookWebhookConfig, - requestId: string + requestId: string, + logger: ReturnType ) { try { const apiUrl = 'https://graph.microsoft.com/v1.0/me/messages' @@ -383,9 +180,7 @@ async function fetchNewOutlookEmails( '$select', 'id,conversationId,subject,bodyPreview,body,from,toRecipients,ccRecipients,receivedDateTime,sentDateTime,hasAttachments,isRead,parentFolderId' ) - params.append('$orderby', 'receivedDateTime desc') - params.append('$top', (config.maxEmailsPerPoll || 25).toString()) if (config.lastCheckedTimestamp) { @@ -395,7 +190,6 @@ async function fetchNewOutlookEmails( } const fullUrl = `${apiUrl}?${params.toString()}` - logger.info(`[${requestId}] Fetching emails from: ${fullUrl}`) const response = await fetch(fullUrl, { @@ -427,7 +221,8 @@ async function fetchNewOutlookEmails( resolvedFolderIds = await resolveWellKnownFolderIds( accessToken, config.folderIds, - requestId + requestId, + logger ) } } @@ -463,7 +258,8 @@ function isWellKnownFolderName(folderId: string): boolean { async function resolveWellKnownFolderId( accessToken: string, folderName: string, - requestId: string + requestId: string, + logger: ReturnType ): Promise { try { const response = await fetch(`https://graph.microsoft.com/v1.0/me/mailFolders/${folderName}`, { @@ -491,18 +287,16 @@ async function resolveWellKnownFolderId( async function resolveWellKnownFolderIds( accessToken: string, folderIds: string[], - requestId: string + requestId: string, + logger: ReturnType ): Promise> { const resolvedIds = new Map() - const wellKnownFolders = folderIds.filter(isWellKnownFolderName) - if (wellKnownFolders.length === 0) { - return resolvedIds - } + if (wellKnownFolders.length === 0) return resolvedIds const resolutions = await Promise.all( wellKnownFolders.map(async (folderName) => { - const actualId = await resolveWellKnownFolderId(accessToken, folderName, requestId) + const actualId = await resolveWellKnownFolderId(accessToken, folderName, requestId, logger) return { folderName, actualId } }) ) @@ -516,7 +310,6 @@ async function resolveWellKnownFolderIds( logger.info( `[${requestId}] Resolved ${resolvedIds.size}/${wellKnownFolders.length} well-known folders` ) - return resolvedIds } @@ -525,16 +318,12 @@ function filterEmailsByFolder( config: OutlookWebhookConfig, resolvedFolderIds?: Map ): OutlookEmail[] { - if (!config.folderIds || !config.folderIds.length) { - return emails - } + if (!config.folderIds || !config.folderIds.length) return emails const actualFolderIds = config.folderIds.map((configFolder) => { if (resolvedFolderIds && isWellKnownFolderName(configFolder)) { const resolvedId = resolvedFolderIds.get(configFolder.toLowerCase()) - if (resolvedId) { - return resolvedId - } + if (resolvedId) return resolvedId } return configFolder }) @@ -544,17 +333,18 @@ function filterEmailsByFolder( const hasMatchingFolder = actualFolderIds.some( (folderId) => emailFolderId.toLowerCase() === folderId.toLowerCase() ) - return config.folderFilterBehavior === 'INCLUDE' ? hasMatchingFolder : !hasMatchingFolder }) } async function processOutlookEmails( emails: OutlookEmail[], - webhookData: any, + webhookData: PollWebhookContext['webhookData'], + workflowData: PollWebhookContext['workflowData'], config: OutlookWebhookConfig, accessToken: string, - requestId: string + requestId: string, + logger: ReturnType ) { let processedCount = 0 let failedCount = 0 @@ -568,7 +358,12 @@ async function processOutlookEmails( let attachments: OutlookAttachment[] = [] if (config.includeAttachments && email.hasAttachments) { try { - attachments = await downloadOutlookAttachments(accessToken, email.id, requestId) + attachments = await downloadOutlookAttachments( + accessToken, + email.id, + requestId, + logger + ) } catch (error) { logger.error( `[${requestId}] Error downloading attachments for email ${email.id}:`, @@ -588,12 +383,8 @@ async function processOutlookEmails( bodyText: (() => { const content = email.body?.content || '' const type = (email.body?.contentType || '').toLowerCase() - if (!content) { - return email.bodyPreview || '' - } - if (type === 'text' || type === 'text/plain') { - return content - } + if (!content) return email.bodyPreview || '' + if (type === 'text' || type === 'text/plain') return content return convertHtmlToPlainText(content) })(), bodyHtml: email.body?.content || '', @@ -618,36 +409,27 @@ async function processOutlookEmails( `[${requestId}] Processing email: ${email.subject} from ${email.from?.emailAddress?.address}` ) - const webhookUrl = `${getInternalApiBaseUrl()}/api/webhooks/trigger/${webhookData.path}` - - const response = await fetch(webhookUrl, { - method: 'POST', - headers: { - 'Content-Type': 'application/json', - 'User-Agent': 'Sim/1.0', - }, - body: JSON.stringify(payload), - }) + const result = await processPolledWebhookEvent( + webhookData, + workflowData, + payload as unknown as Record, + requestId + ) - if (!response.ok) { - const errorText = await response.text() + if (!result.success) { logger.error( - `[${requestId}] Failed to trigger webhook for email ${email.id}:`, - response.status, - errorText + `[${requestId}] Failed to process webhook for email ${email.id}:`, + result.statusCode, + result.error ) - throw new Error(`Webhook request failed: ${response.status} - ${errorText}`) + throw new Error(`Webhook processing failed (${result.statusCode}): ${result.error}`) } if (config.markAsRead) { - await markOutlookEmailAsRead(accessToken, email.id) + await markOutlookEmailAsRead(accessToken, email.id, logger) } - return { - emailId: email.id, - webhookStatus: response.status, - processed: true, - } + return { emailId: email.id, processed: true } } ) @@ -667,7 +449,8 @@ async function processOutlookEmails( async function downloadOutlookAttachments( accessToken: string, messageId: string, - requestId: string + requestId: string, + logger: ReturnType ): Promise { const attachments: OutlookAttachment[] = [] @@ -722,7 +505,11 @@ async function downloadOutlookAttachments( return attachments } -async function markOutlookEmailAsRead(accessToken: string, messageId: string) { +async function markOutlookEmailAsRead( + accessToken: string, + messageId: string, + logger: ReturnType +) { try { const response = await fetch(`https://graph.microsoft.com/v1.0/me/messages/${messageId}`, { method: 'PATCH', @@ -730,9 +517,7 @@ async function markOutlookEmailAsRead(accessToken: string, messageId: string) { Authorization: `Bearer ${accessToken}`, 'Content-Type': 'application/json', }, - body: JSON.stringify({ - isRead: true, - }), + body: JSON.stringify({ isRead: true }), }) if (!response.ok) { @@ -746,34 +531,3 @@ async function markOutlookEmailAsRead(accessToken: string, messageId: string) { logger.error(`Error marking email ${messageId} as read:`, error) } } - -async function updateWebhookLastChecked(webhookId: string, timestamp: string) { - try { - const currentWebhook = await db - .select({ providerConfig: webhook.providerConfig }) - .from(webhook) - .where(eq(webhook.id, webhookId)) - .limit(1) - - if (!currentWebhook.length) { - logger.error(`Webhook ${webhookId} not found`) - return - } - - const currentConfig = (currentWebhook[0].providerConfig as any) || {} - const updatedConfig = { - ...currentConfig, - lastCheckedTimestamp: timestamp, - } - - await db - .update(webhook) - .set({ - providerConfig: updatedConfig, - updatedAt: new Date(), - }) - .where(eq(webhook.id, webhookId)) - } catch (error) { - logger.error(`Error updating webhook ${webhookId} last checked timestamp:`, error) - } -} diff --git a/apps/sim/lib/webhooks/polling/registry.ts b/apps/sim/lib/webhooks/polling/registry.ts new file mode 100644 index 00000000000..fe2db69ed4f --- /dev/null +++ b/apps/sim/lib/webhooks/polling/registry.ts @@ -0,0 +1,19 @@ +import { gmailPollingHandler } from '@/lib/webhooks/polling/gmail' +import { imapPollingHandler } from '@/lib/webhooks/polling/imap' +import { outlookPollingHandler } from '@/lib/webhooks/polling/outlook' +import { rssPollingHandler } from '@/lib/webhooks/polling/rss' +import type { PollingProviderHandler } from '@/lib/webhooks/polling/types' + +const POLLING_HANDLERS: Record = { + gmail: gmailPollingHandler, + imap: imapPollingHandler, + outlook: outlookPollingHandler, + rss: rssPollingHandler, +} + +export const VALID_POLLING_PROVIDERS = new Set(Object.keys(POLLING_HANDLERS)) + +/** Look up the polling handler for a provider. */ +export function getPollingHandler(provider: string): PollingProviderHandler | undefined { + return POLLING_HANDLERS[provider] +} diff --git a/apps/sim/lib/webhooks/polling/rss.ts b/apps/sim/lib/webhooks/polling/rss.ts new file mode 100644 index 00000000000..f98b9d16af9 --- /dev/null +++ b/apps/sim/lib/webhooks/polling/rss.ts @@ -0,0 +1,307 @@ +import Parser from 'rss-parser' +import { pollingIdempotency } from '@/lib/core/idempotency/service' +import { + secureFetchWithPinnedIP, + validateUrlWithDNS, +} from '@/lib/core/security/input-validation.server' +import type { PollingProviderHandler, PollWebhookContext } from '@/lib/webhooks/polling/types' +import { + markWebhookFailed, + markWebhookSuccess, + updateWebhookProviderConfig, +} from '@/lib/webhooks/polling/utils' +import { processPolledWebhookEvent } from '@/lib/webhooks/processor' + +const MAX_GUIDS_TO_TRACK = 100 + +interface RssWebhookConfig { + feedUrl: string + lastCheckedTimestamp?: string + lastSeenGuids?: string[] + etag?: string + lastModified?: string +} + +interface RssItem { + title?: string + link?: string + pubDate?: string + guid?: string + description?: string + content?: string + contentSnippet?: string + author?: string + creator?: string + categories?: string[] + enclosure?: { + url: string + type?: string + length?: string | number + } + isoDate?: string + [key: string]: unknown +} + +interface RssFeed { + title?: string + link?: string + description?: string + items: RssItem[] +} + +export interface RssWebhookPayload { + title?: string + link?: string + pubDate?: string + item: RssItem + feed: { + title?: string + link?: string + description?: string + } + timestamp: string +} + +const parser = new Parser({ + timeout: 30000, + headers: { + 'User-Agent': 'Sim/1.0 RSS Poller', + }, +}) + +export const rssPollingHandler: PollingProviderHandler = { + provider: 'rss', + label: 'RSS', + + async pollWebhook(ctx: PollWebhookContext): Promise<'success' | 'failure'> { + const { webhookData, workflowData, requestId, logger } = ctx + const webhookId = webhookData.id + + try { + const config = webhookData.providerConfig as unknown as RssWebhookConfig + + if (!config?.feedUrl) { + logger.error(`[${requestId}] Missing feedUrl for webhook ${webhookId}`) + await markWebhookFailed(webhookId, logger) + return 'failure' + } + + const now = new Date() + const { feed, items: newItems } = await fetchNewRssItems(config, requestId, logger) + + if (!newItems.length) { + await updateRssState(webhookId, now.toISOString(), [], config, logger) + await markWebhookSuccess(webhookId, logger) + logger.info(`[${requestId}] No new items found for webhook ${webhookId}`) + return 'success' + } + + logger.info(`[${requestId}] Found ${newItems.length} new items for webhook ${webhookId}`) + + const { processedCount, failedCount } = await processRssItems( + newItems, + feed, + webhookData, + workflowData, + requestId, + logger + ) + + const newGuids = newItems + .map((item) => item.guid || item.link || '') + .filter((guid) => guid.length > 0) + + await updateRssState(webhookId, now.toISOString(), newGuids, config, logger) + + if (failedCount > 0 && processedCount === 0) { + await markWebhookFailed(webhookId, logger) + logger.warn( + `[${requestId}] All ${failedCount} items failed to process for webhook ${webhookId}` + ) + return 'failure' + } + + await markWebhookSuccess(webhookId, logger) + logger.info( + `[${requestId}] Successfully processed ${processedCount} items for webhook ${webhookId}${failedCount > 0 ? ` (${failedCount} failed)` : ''}` + ) + return 'success' + } catch (error) { + logger.error(`[${requestId}] Error processing RSS webhook ${webhookId}:`, error) + await markWebhookFailed(webhookId, logger) + return 'failure' + } + }, +} + +async function updateRssState( + webhookId: string, + timestamp: string, + newGuids: string[], + config: RssWebhookConfig, + logger: ReturnType +) { + const existingGuids = config.lastSeenGuids || [] + const allGuids = [...newGuids, ...existingGuids].slice(0, MAX_GUIDS_TO_TRACK) + + await updateWebhookProviderConfig( + webhookId, + { + lastCheckedTimestamp: timestamp, + lastSeenGuids: allGuids, + }, + logger + ) +} + +async function fetchNewRssItems( + config: RssWebhookConfig, + requestId: string, + logger: ReturnType +): Promise<{ feed: RssFeed; items: RssItem[] }> { + try { + const urlValidation = await validateUrlWithDNS(config.feedUrl, 'feedUrl') + if (!urlValidation.isValid) { + logger.error(`[${requestId}] Invalid RSS feed URL: ${urlValidation.error}`) + throw new Error(`Invalid RSS feed URL: ${urlValidation.error}`) + } + + const response = await secureFetchWithPinnedIP(config.feedUrl, urlValidation.resolvedIP!, { + headers: { + 'User-Agent': 'Sim/1.0 RSS Poller', + Accept: 'application/rss+xml, application/xml, text/xml, */*', + }, + timeout: 30000, + }) + + if (!response.ok) { + await response.text().catch(() => {}) + throw new Error(`Failed to fetch RSS feed: ${response.status} ${response.statusText}`) + } + + const xmlContent = await response.text() + const feed = await parser.parseString(xmlContent) + + if (!feed.items || !feed.items.length) { + return { feed: feed as RssFeed, items: [] } + } + + const lastCheckedTime = config.lastCheckedTimestamp + ? new Date(config.lastCheckedTimestamp) + : null + const lastSeenGuids = new Set(config.lastSeenGuids || []) + + const newItems = feed.items.filter((item) => { + const itemGuid = item.guid || item.link || '' + + if (itemGuid && lastSeenGuids.has(itemGuid)) { + return false + } + + if (lastCheckedTime && item.isoDate) { + const itemDate = new Date(item.isoDate) + if (itemDate <= lastCheckedTime) { + return false + } + } + + return true + }) + + newItems.sort((a, b) => { + const dateA = a.isoDate ? new Date(a.isoDate).getTime() : 0 + const dateB = b.isoDate ? new Date(b.isoDate).getTime() : 0 + return dateB - dateA + }) + + const limitedItems = newItems.slice(0, 25) + + logger.info( + `[${requestId}] Found ${newItems.length} new items (processing ${limitedItems.length})` + ) + + return { feed: feed as RssFeed, items: limitedItems as RssItem[] } + } catch (error) { + const errorMessage = error instanceof Error ? error.message : 'Unknown error' + logger.error(`[${requestId}] Error fetching RSS feed:`, errorMessage) + throw error + } +} + +async function processRssItems( + items: RssItem[], + feed: RssFeed, + webhookData: PollWebhookContext['webhookData'], + workflowData: PollWebhookContext['workflowData'], + requestId: string, + logger: ReturnType +): Promise<{ processedCount: number; failedCount: number }> { + let processedCount = 0 + let failedCount = 0 + + for (const item of items) { + try { + const itemGuid = item.guid || item.link || `${item.title}-${item.pubDate}` + + await pollingIdempotency.executeWithIdempotency( + 'rss', + `${webhookData.id}:${itemGuid}`, + async () => { + const payload: RssWebhookPayload = { + title: item.title, + link: item.link, + pubDate: item.pubDate, + item: { + title: item.title, + link: item.link, + pubDate: item.pubDate, + guid: item.guid, + description: item.description, + content: item.content, + contentSnippet: item.contentSnippet, + author: item.author || item.creator, + categories: item.categories, + enclosure: item.enclosure, + isoDate: item.isoDate, + }, + feed: { + title: feed.title, + link: feed.link, + description: feed.description, + }, + timestamp: new Date().toISOString(), + } + + const result = await processPolledWebhookEvent( + webhookData, + workflowData, + payload as unknown as Record, + requestId + ) + + if (!result.success) { + logger.error( + `[${requestId}] Failed to process webhook for item ${itemGuid}:`, + result.statusCode, + result.error + ) + throw new Error(`Webhook processing failed (${result.statusCode}): ${result.error}`) + } + + return { itemGuid, processed: true } + } + ) + + logger.info( + `[${requestId}] Successfully processed item ${item.title || itemGuid} for webhook ${webhookData.id}` + ) + processedCount++ + } catch (error) { + const errorMessage = error instanceof Error ? error.message : 'Unknown error' + logger.error(`[${requestId}] Error processing item:`, errorMessage) + failedCount++ + } + } + + return { processedCount, failedCount } +} diff --git a/apps/sim/lib/webhooks/polling/types.ts b/apps/sim/lib/webhooks/polling/types.ts new file mode 100644 index 00000000000..a69b0427f25 --- /dev/null +++ b/apps/sim/lib/webhooks/polling/types.ts @@ -0,0 +1,58 @@ +import type { Logger } from '@sim/logger' + +/** Summary returned after polling all webhooks for a provider. */ +export interface PollSummary { + total: number + successful: number + failed: number +} + +/** Context passed to a provider handler when processing one webhook. */ +export interface PollWebhookContext { + webhookData: WebhookRecord + workflowData: WorkflowRecord + requestId: string + logger: Logger +} + +/** DB row shape for the webhook table. */ +export interface WebhookRecord { + id: string + path: string + provider: string | null + blockId: string | null + providerConfig: unknown + credentialSetId: string | null + workflowId: string + [key: string]: unknown +} + +/** DB row shape for the workflow table. */ +export interface WorkflowRecord { + id: string + userId: string + workspaceId: string + [key: string]: unknown +} + +/** + * Strategy interface for provider-specific polling behavior. + * Mirrors `WebhookProviderHandler` from `providers/types.ts`. + * + * Each provider implements `pollWebhook()` — the full inner loop for one webhook: + * validate config, resolve credentials, fetch new items, process each via + * `processPolledWebhookEvent()` (wrapped in `pollingIdempotency`), update state. + */ +export interface PollingProviderHandler { + /** Provider name used in DB queries (e.g. 'gmail', 'rss'). */ + readonly provider: string + + /** Display label for log messages (e.g. 'Gmail', 'RSS'). */ + readonly label: string + + /** + * Process a single webhook entry. + * Return 'success' (even if 0 new items) or 'failure'. + */ + pollWebhook(ctx: PollWebhookContext): Promise<'success' | 'failure'> +} diff --git a/apps/sim/lib/webhooks/polling/utils.ts b/apps/sim/lib/webhooks/polling/utils.ts new file mode 100644 index 00000000000..3578c97c28b --- /dev/null +++ b/apps/sim/lib/webhooks/polling/utils.ts @@ -0,0 +1,240 @@ +import { db } from '@sim/db' +import { + account, + credentialSet, + webhook, + workflow, + workflowDeploymentVersion, +} from '@sim/db/schema' +import type { Logger } from '@sim/logger' +import { and, eq, isNull, or, sql } from 'drizzle-orm' +import { isOrganizationOnTeamOrEnterprisePlan } from '@/lib/billing' +import type { WebhookRecord, WorkflowRecord } from '@/lib/webhooks/polling/types' +import { + getOAuthToken, + refreshAccessTokenIfNeeded, + resolveOAuthAccountId, +} from '@/app/api/auth/oauth/utils' +import { MAX_CONSECUTIVE_FAILURES } from '@/triggers/constants' + +/** Concurrency limit for parallel webhook processing. Standardized across all providers. */ +export const CONCURRENCY = 10 + +/** Increment the webhook's failure count. Auto-disables after MAX_CONSECUTIVE_FAILURES. */ +export async function markWebhookFailed(webhookId: string, logger: Logger): Promise { + try { + const result = await db + .update(webhook) + .set({ + failedCount: sql`COALESCE(${webhook.failedCount}, 0) + 1`, + lastFailedAt: new Date(), + updatedAt: new Date(), + }) + .where(eq(webhook.id, webhookId)) + .returning({ failedCount: webhook.failedCount }) + + const newFailedCount = result[0]?.failedCount || 0 + if (newFailedCount >= MAX_CONSECUTIVE_FAILURES) { + await db + .update(webhook) + .set({ + isActive: false, + updatedAt: new Date(), + }) + .where(eq(webhook.id, webhookId)) + + logger.warn( + `Webhook ${webhookId} auto-disabled after ${MAX_CONSECUTIVE_FAILURES} consecutive failures` + ) + } + } catch (err) { + logger.error(`Failed to mark webhook ${webhookId} as failed:`, err) + } +} + +/** Reset the webhook's failure count on successful poll. */ +export async function markWebhookSuccess(webhookId: string, logger: Logger): Promise { + try { + await db + .update(webhook) + .set({ + failedCount: 0, + updatedAt: new Date(), + }) + .where(eq(webhook.id, webhookId)) + } catch (err) { + logger.error(`Failed to mark webhook ${webhookId} as successful:`, err) + } +} + +/** Fetch all active webhooks for a provider, joined with their workflow. */ +export async function fetchActiveWebhooks( + provider: string +): Promise<{ webhook: WebhookRecord; workflow: WorkflowRecord }[]> { + const rows = await db + .select({ webhook, workflow }) + .from(webhook) + .innerJoin(workflow, eq(webhook.workflowId, workflow.id)) + .leftJoin( + workflowDeploymentVersion, + and( + eq(workflowDeploymentVersion.workflowId, workflow.id), + eq(workflowDeploymentVersion.isActive, true) + ) + ) + .where( + and( + eq(webhook.provider, provider), + eq(webhook.isActive, true), + eq(workflow.isDeployed, true), + or( + eq(webhook.deploymentVersionId, workflowDeploymentVersion.id), + and(isNull(workflowDeploymentVersion.id), isNull(webhook.deploymentVersionId)) + ) + ) + ) + + return rows as unknown as { webhook: WebhookRecord; workflow: WorkflowRecord }[] +} + +/** + * Run an async function over entries with bounded concurrency. + * Returns aggregate success/failure counts. + */ +export async function runWithConcurrency( + entries: { webhook: WebhookRecord; workflow: WorkflowRecord }[], + processFn: (entry: { + webhook: WebhookRecord + workflow: WorkflowRecord + }) => Promise<'success' | 'failure'>, + logger: Logger +): Promise<{ successCount: number; failureCount: number }> { + const running: Promise[] = [] + let successCount = 0 + let failureCount = 0 + + for (const entry of entries) { + const promise: Promise = processFn(entry) + .then((result) => { + if (result === 'success') { + successCount++ + } else { + failureCount++ + } + }) + .catch((err) => { + logger.error('Unexpected error in webhook processing:', err) + failureCount++ + }) + .finally(() => { + const idx = running.indexOf(promise) + if (idx !== -1) running.splice(idx, 1) + }) + + running.push(promise) + + if (running.length >= CONCURRENCY) { + await Promise.race(running) + } + } + + await Promise.allSettled(running) + + return { successCount, failureCount } +} + +/** + * Read-merge-write pattern for updating provider-specific config fields. + * Each provider passes its specific state updates (historyId, lastSeenGuids, etc.). + */ +export async function updateWebhookProviderConfig( + webhookId: string, + configUpdates: Record, + logger: Logger +): Promise { + try { + const result = await db.select().from(webhook).where(eq(webhook.id, webhookId)) + const existingConfig = (result[0]?.providerConfig as Record) || {} + + await db + .update(webhook) + .set({ + providerConfig: { + ...existingConfig, + ...configUpdates, + } as Record, + updatedAt: new Date(), + }) + .where(eq(webhook.id, webhookId)) + } catch (err) { + logger.error(`Failed to update webhook ${webhookId} config:`, err) + } +} + +/** + * Resolve OAuth credentials for a webhook. Shared by Gmail and Outlook. + * Returns the access token or throws on failure. + */ +export async function resolveOAuthCredential( + webhookData: WebhookRecord, + oauthProvider: string, + requestId: string, + logger: Logger +): Promise { + const metadata = webhookData.providerConfig as Record | null + const credentialId = metadata?.credentialId as string | undefined + const userId = metadata?.userId as string | undefined + const credentialSetId = (webhookData.credentialSetId as string | undefined) ?? undefined + + if (!credentialId && !userId) { + throw new Error(`Missing credential info for webhook ${webhookData.id}`) + } + + if (credentialSetId) { + const [cs] = await db + .select({ organizationId: credentialSet.organizationId }) + .from(credentialSet) + .where(eq(credentialSet.id, credentialSetId)) + .limit(1) + + if (cs?.organizationId) { + const hasAccess = await isOrganizationOnTeamOrEnterprisePlan(cs.organizationId) + if (!hasAccess) { + logger.error( + `[${requestId}] Polling Group plan restriction: Your current plan does not support Polling Groups. Upgrade to Team or Enterprise to use this feature.`, + { + webhookId: webhookData.id, + credentialSetId, + organizationId: cs.organizationId, + } + ) + throw new Error('Polling Group plan restriction') + } + } + } + + let accessToken: string | null = null + + if (credentialId) { + const resolved = await resolveOAuthAccountId(credentialId) + if (!resolved) { + throw new Error( + `Failed to resolve OAuth account for credential ${credentialId}, webhook ${webhookData.id}` + ) + } + const rows = await db.select().from(account).where(eq(account.id, resolved.accountId)).limit(1) + if (!rows.length) { + throw new Error(`Credential ${credentialId} not found for webhook ${webhookData.id}`) + } + const ownerUserId = rows[0].userId + accessToken = await refreshAccessTokenIfNeeded(resolved.accountId, ownerUserId, requestId) + } else if (userId) { + accessToken = await getOAuthToken(userId, oauthProvider) + } + + if (!accessToken) { + throw new Error(`Failed to get ${oauthProvider} access token for webhook ${webhookData.id}`) + } + + return accessToken +} diff --git a/apps/sim/lib/webhooks/processor.ts b/apps/sim/lib/webhooks/processor.ts index eb1503c0029..64877fb6664 100644 --- a/apps/sim/lib/webhooks/processor.ts +++ b/apps/sim/lib/webhooks/processor.ts @@ -4,12 +4,13 @@ import { createLogger } from '@sim/logger' import { and, eq, isNull, or } from 'drizzle-orm' import { type NextRequest, NextResponse } from 'next/server' import { isOrganizationOnTeamOrEnterprisePlan } from '@/lib/billing/core/subscription' +import { tryAdmit } from '@/lib/core/admission/gate' import { getInlineJobQueue, getJobQueue, shouldExecuteInline } from '@/lib/core/async-jobs' import type { AsyncExecutionCorrelation } from '@/lib/core/async-jobs/types' import { createBullMQJobData, isBullMQEnabled } from '@/lib/core/bullmq' import { isProd } from '@/lib/core/config/feature-flags' import { generateId } from '@/lib/core/utils/uuid' -import { enqueueWorkspaceDispatch } from '@/lib/core/workspace-dispatch' +import { DispatchQueueFullError, enqueueWorkspaceDispatch } from '@/lib/core/workspace-dispatch' import { getEffectiveDecryptedEnv } from '@/lib/environment/utils' import { preprocessExecution } from '@/lib/execution/preprocessing' import { @@ -18,6 +19,7 @@ import { requiresPendingWebhookVerification, } from '@/lib/webhooks/pending-verification' import { getProviderHandler } from '@/lib/webhooks/providers' +import { blockExistsInDeployment } from '@/lib/workflows/persistence/utils' import { executeWebhookJob } from '@/background/webhook-execution' import { resolveEnvVarReferences } from '@/executor/utils/reference-validation' import { isPollingWebhookProvider } from '@/triggers/constants' @@ -672,3 +674,215 @@ export async function queueWebhookExecution( return NextResponse.json({ error: 'Internal server error' }, { status: 500 }) } } + +export interface PolledWebhookEventResult { + success: boolean + error?: string + statusCode?: number +} + +interface PolledWebhookRecord { + id: string + path: string + provider: string | null + blockId: string | null + providerConfig: unknown + credentialSetId: string | null + workflowId: string +} + +interface PolledWorkflowRecord { + id: string + userId: string + workspaceId: string +} + +/** + * Processes a polled webhook event directly, bypassing the HTTP trigger route. + * Used by polling services (Gmail, Outlook, IMAP, RSS) to avoid the self-POST + * anti-pattern where they would otherwise POST back to /api/webhooks/trigger/{path}. + * + * Performs only the steps actually needed for polling providers: + * admission control, preprocessing, block existence check, and queue execution. + */ +export async function processPolledWebhookEvent( + foundWebhook: PolledWebhookRecord, + foundWorkflow: PolledWorkflowRecord, + body: Record, + requestId: string +): Promise { + if (!foundWebhook.provider) { + return { success: false, error: 'Webhook has no provider', statusCode: 400 } + } + const provider = foundWebhook.provider + + const ticket = tryAdmit() + if (!ticket) { + logger.warn(`[${requestId}] Admission gate rejected polled webhook event`) + return { success: false, error: 'Server at capacity', statusCode: 429 } + } + + try { + const preprocessResult = await checkWebhookPreprocessing(foundWorkflow, foundWebhook, requestId) + if (preprocessResult.error) { + return { success: false, error: 'Preprocessing failed', statusCode: 500 } + } + + if (foundWebhook.blockId) { + const blockExists = await blockExistsInDeployment(foundWorkflow.id, foundWebhook.blockId) + if (!blockExists) { + logger.info( + `[${requestId}] Trigger block ${foundWebhook.blockId} not found in deployment for workflow ${foundWorkflow.id}` + ) + return { success: false, error: 'Trigger block not found in deployment', statusCode: 404 } + } + } + + const providerConfig = (foundWebhook.providerConfig as Record) || {} + const credentialId = providerConfig.credentialId as string | undefined + const credentialSetId = foundWebhook.credentialSetId as string | undefined + + if (credentialSetId) { + const billingCheck = await verifyCredentialSetBilling(credentialSetId) + if (!billingCheck.valid) { + logger.warn(`[${requestId}] Credential set billing check failed: ${billingCheck.error}`) + return { success: false, error: billingCheck.error, statusCode: 403 } + } + } + + const actorUserId = preprocessResult.actorUserId + if (!actorUserId) { + logger.error(`[${requestId}] No actorUserId provided for webhook ${foundWebhook.id}`) + return { success: false, error: 'Unable to resolve billing account', statusCode: 500 } + } + + const executionId = preprocessResult.executionId ?? generateId() + const correlation = + preprocessResult.correlation ?? + ({ + executionId, + requestId, + source: 'webhook' as const, + workflowId: foundWorkflow.id, + webhookId: foundWebhook.id, + path: foundWebhook.path, + provider, + triggerType: 'webhook', + } satisfies AsyncExecutionCorrelation) + + const payload = { + webhookId: foundWebhook.id, + workflowId: foundWorkflow.id, + userId: actorUserId, + executionId, + requestId, + correlation, + provider, + body, + headers: { 'content-type': 'application/json' } as Record, + path: foundWebhook.path, + blockId: foundWebhook.blockId ?? undefined, + workspaceId: foundWorkflow.workspaceId, + ...(credentialId ? { credentialId } : {}), + } + + if (isPollingWebhookProvider(payload.provider) && !shouldExecuteInline()) { + const jobId = isBullMQEnabled() + ? await enqueueWorkspaceDispatch({ + id: executionId, + workspaceId: foundWorkflow.workspaceId, + lane: 'runtime', + queueName: 'webhook-execution', + bullmqJobName: 'webhook-execution', + bullmqPayload: createBullMQJobData(payload, { + workflowId: foundWorkflow.id, + userId: actorUserId, + correlation, + }), + metadata: { + workflowId: foundWorkflow.id, + userId: actorUserId, + correlation, + }, + }) + : await (await getJobQueue()).enqueue('webhook-execution', payload, { + metadata: { + workflowId: foundWorkflow.id, + workspaceId: foundWorkflow.workspaceId, + userId: actorUserId, + correlation, + }, + }) + logger.info( + `[${requestId}] Queued polling webhook execution task ${jobId} for ${provider} webhook via job queue` + ) + } else { + const jobQueue = await getInlineJobQueue() + const jobId = isBullMQEnabled() + ? await enqueueWorkspaceDispatch({ + id: executionId, + workspaceId: foundWorkflow.workspaceId, + lane: 'runtime', + queueName: 'webhook-execution', + bullmqJobName: 'webhook-execution', + bullmqPayload: createBullMQJobData(payload, { + workflowId: foundWorkflow.id, + userId: actorUserId, + correlation, + }), + metadata: { + workflowId: foundWorkflow.id, + userId: actorUserId, + correlation, + }, + }) + : await jobQueue.enqueue('webhook-execution', payload, { + metadata: { + workflowId: foundWorkflow.id, + workspaceId: foundWorkflow.workspaceId, + userId: actorUserId, + correlation, + }, + }) + logger.info(`[${requestId}] Queued ${provider} webhook execution ${jobId} via inline backend`) + + if (!isBullMQEnabled()) { + void (async () => { + try { + await jobQueue.startJob(jobId) + const output = await executeWebhookJob(payload) + await jobQueue.completeJob(jobId, output) + } catch (error) { + const errorMessage = error instanceof Error ? error.message : String(error) + logger.error(`[${requestId}] Webhook execution failed`, { + jobId, + error: errorMessage, + }) + try { + await jobQueue.markJobFailed(jobId, errorMessage) + } catch (markFailedError) { + logger.error(`[${requestId}] Failed to mark job as failed`, { + jobId, + error: + markFailedError instanceof Error + ? markFailedError.message + : String(markFailedError), + }) + } + } + })() + } + } + + return { success: true } + } catch (error: unknown) { + if (error instanceof DispatchQueueFullError) { + logger.warn(`[${requestId}] Dispatch queue full for polled webhook: ${error.message}`) + return { success: false, error: 'Service temporarily at capacity', statusCode: 503 } + } + logger.error(`[${requestId}] Failed to process polled webhook event:`, error) + return { success: false, error: 'Internal server error', statusCode: 500 } + } finally { + ticket.release() + } +} diff --git a/apps/sim/lib/webhooks/rss-polling-service.ts b/apps/sim/lib/webhooks/rss-polling-service.ts deleted file mode 100644 index e6727a2f7ca..00000000000 --- a/apps/sim/lib/webhooks/rss-polling-service.ts +++ /dev/null @@ -1,442 +0,0 @@ -import { db } from '@sim/db' -import { webhook, workflow, workflowDeploymentVersion } from '@sim/db/schema' -import { createLogger } from '@sim/logger' -import { and, eq, isNull, or, sql } from 'drizzle-orm' -import Parser from 'rss-parser' -import { pollingIdempotency } from '@/lib/core/idempotency/service' -import { - secureFetchWithPinnedIP, - validateUrlWithDNS, -} from '@/lib/core/security/input-validation.server' -import { getInternalApiBaseUrl } from '@/lib/core/utils/urls' -import { generateShortId } from '@/lib/core/utils/uuid' -import { MAX_CONSECUTIVE_FAILURES } from '@/triggers/constants' - -const logger = createLogger('RssPollingService') -const MAX_GUIDS_TO_TRACK = 100 // Track recent guids to prevent duplicates - -interface RssWebhookConfig { - feedUrl: string - lastCheckedTimestamp?: string - lastSeenGuids?: string[] - etag?: string - lastModified?: string -} - -interface RssItem { - title?: string - link?: string - pubDate?: string - guid?: string - description?: string - content?: string - contentSnippet?: string - author?: string - creator?: string - categories?: string[] - enclosure?: { - url: string - type?: string - length?: string | number - } - isoDate?: string - [key: string]: any -} - -interface RssFeed { - title?: string - link?: string - description?: string - items: RssItem[] -} - -export interface RssWebhookPayload { - title?: string - link?: string - pubDate?: string - item: RssItem - feed: { - title?: string - link?: string - description?: string - } - timestamp: string -} - -const parser = new Parser({ - timeout: 30000, - headers: { - 'User-Agent': 'Sim/1.0 RSS Poller', - }, -}) - -async function markWebhookFailed(webhookId: string) { - try { - const result = await db - .update(webhook) - .set({ - failedCount: sql`COALESCE(${webhook.failedCount}, 0) + 1`, - lastFailedAt: new Date(), - updatedAt: new Date(), - }) - .where(eq(webhook.id, webhookId)) - .returning({ failedCount: webhook.failedCount }) - - const newFailedCount = result[0]?.failedCount || 0 - const shouldDisable = newFailedCount >= MAX_CONSECUTIVE_FAILURES - - if (shouldDisable) { - await db - .update(webhook) - .set({ - isActive: false, - updatedAt: new Date(), - }) - .where(eq(webhook.id, webhookId)) - - logger.warn( - `Webhook ${webhookId} auto-disabled after ${MAX_CONSECUTIVE_FAILURES} consecutive failures` - ) - } - } catch (err) { - logger.error(`Failed to mark webhook ${webhookId} as failed:`, err) - } -} - -async function markWebhookSuccess(webhookId: string) { - try { - await db - .update(webhook) - .set({ - failedCount: 0, - updatedAt: new Date(), - }) - .where(eq(webhook.id, webhookId)) - } catch (err) { - logger.error(`Failed to mark webhook ${webhookId} as successful:`, err) - } -} - -export async function pollRssWebhooks() { - logger.info('Starting RSS webhook polling') - - try { - const activeWebhooksResult = await db - .select({ webhook }) - .from(webhook) - .innerJoin(workflow, eq(webhook.workflowId, workflow.id)) - .leftJoin( - workflowDeploymentVersion, - and( - eq(workflowDeploymentVersion.workflowId, workflow.id), - eq(workflowDeploymentVersion.isActive, true) - ) - ) - .where( - and( - eq(webhook.provider, 'rss'), - eq(webhook.isActive, true), - eq(workflow.isDeployed, true), - or( - eq(webhook.deploymentVersionId, workflowDeploymentVersion.id), - and(isNull(workflowDeploymentVersion.id), isNull(webhook.deploymentVersionId)) - ) - ) - ) - - const activeWebhooks = activeWebhooksResult.map((r) => r.webhook) - - if (!activeWebhooks.length) { - logger.info('No active RSS webhooks found') - return { total: 0, successful: 0, failed: 0, details: [] } - } - - logger.info(`Found ${activeWebhooks.length} active RSS webhooks`) - - const CONCURRENCY = 10 - const running: Promise[] = [] - let successCount = 0 - let failureCount = 0 - - const enqueue = async (webhookData: (typeof activeWebhooks)[number]) => { - const webhookId = webhookData.id - const requestId = generateShortId() - - try { - const config = webhookData.providerConfig as unknown as RssWebhookConfig - - if (!config?.feedUrl) { - logger.error(`[${requestId}] Missing feedUrl for webhook ${webhookId}`) - await markWebhookFailed(webhookId) - failureCount++ - return - } - - const now = new Date() - - const { feed, items: newItems } = await fetchNewRssItems(config, requestId) - - if (!newItems.length) { - await updateWebhookConfig(webhookId, now.toISOString(), []) - await markWebhookSuccess(webhookId) - logger.info(`[${requestId}] No new items found for webhook ${webhookId}`) - successCount++ - return - } - - logger.info(`[${requestId}] Found ${newItems.length} new items for webhook ${webhookId}`) - - const { processedCount, failedCount: itemFailedCount } = await processRssItems( - newItems, - feed, - webhookData, - requestId - ) - - const newGuids = newItems - .map((item) => item.guid || item.link || '') - .filter((guid) => guid.length > 0) - - await updateWebhookConfig(webhookId, now.toISOString(), newGuids) - - if (itemFailedCount > 0 && processedCount === 0) { - await markWebhookFailed(webhookId) - failureCount++ - logger.warn( - `[${requestId}] All ${itemFailedCount} items failed to process for webhook ${webhookId}` - ) - } else { - await markWebhookSuccess(webhookId) - successCount++ - logger.info( - `[${requestId}] Successfully processed ${processedCount} items for webhook ${webhookId}${itemFailedCount > 0 ? ` (${itemFailedCount} failed)` : ''}` - ) - } - } catch (error) { - logger.error(`[${requestId}] Error processing RSS webhook ${webhookId}:`, error) - await markWebhookFailed(webhookId) - failureCount++ - } - } - - for (const webhookData of activeWebhooks) { - const promise = enqueue(webhookData) - .then(() => {}) - .catch((err) => { - logger.error('Unexpected error in webhook processing:', err) - failureCount++ - }) - - running.push(promise) - - if (running.length >= CONCURRENCY) { - const completedIdx = await Promise.race(running.map((p, i) => p.then(() => i))) - running.splice(completedIdx, 1) - } - } - - await Promise.allSettled(running) - - const summary = { - total: activeWebhooks.length, - successful: successCount, - failed: failureCount, - details: [], - } - - logger.info('RSS polling completed', { - total: summary.total, - successful: summary.successful, - failed: summary.failed, - }) - - return summary - } catch (error) { - const errorMessage = error instanceof Error ? error.message : 'Unknown error' - logger.error('Error in RSS polling service:', errorMessage) - throw error - } -} - -async function fetchNewRssItems( - config: RssWebhookConfig, - requestId: string -): Promise<{ feed: RssFeed; items: RssItem[] }> { - try { - const urlValidation = await validateUrlWithDNS(config.feedUrl, 'feedUrl') - if (!urlValidation.isValid) { - logger.error(`[${requestId}] Invalid RSS feed URL: ${urlValidation.error}`) - throw new Error(`Invalid RSS feed URL: ${urlValidation.error}`) - } - - const response = await secureFetchWithPinnedIP(config.feedUrl, urlValidation.resolvedIP!, { - headers: { - 'User-Agent': 'Sim/1.0 RSS Poller', - Accept: 'application/rss+xml, application/xml, text/xml, */*', - }, - timeout: 30000, - }) - - if (!response.ok) { - await response.text().catch(() => {}) - throw new Error(`Failed to fetch RSS feed: ${response.status} ${response.statusText}`) - } - - const xmlContent = await response.text() - - const feed = await parser.parseString(xmlContent) - - if (!feed.items || !feed.items.length) { - return { feed: feed as RssFeed, items: [] } - } - - const lastCheckedTime = config.lastCheckedTimestamp - ? new Date(config.lastCheckedTimestamp) - : null - const lastSeenGuids = new Set(config.lastSeenGuids || []) - - const newItems = feed.items.filter((item) => { - const itemGuid = item.guid || item.link || '' - - if (itemGuid && lastSeenGuids.has(itemGuid)) { - return false - } - - if (lastCheckedTime && item.isoDate) { - const itemDate = new Date(item.isoDate) - if (itemDate <= lastCheckedTime) { - return false - } - } - - return true - }) - - newItems.sort((a, b) => { - const dateA = a.isoDate ? new Date(a.isoDate).getTime() : 0 - const dateB = b.isoDate ? new Date(b.isoDate).getTime() : 0 - return dateB - dateA - }) - - const limitedItems = newItems.slice(0, 25) - - logger.info( - `[${requestId}] Found ${newItems.length} new items (processing ${limitedItems.length})` - ) - - return { feed: feed as RssFeed, items: limitedItems as RssItem[] } - } catch (error) { - const errorMessage = error instanceof Error ? error.message : 'Unknown error' - logger.error(`[${requestId}] Error fetching RSS feed:`, errorMessage) - throw error - } -} - -async function processRssItems( - items: RssItem[], - feed: RssFeed, - webhookData: any, - requestId: string -): Promise<{ processedCount: number; failedCount: number }> { - let processedCount = 0 - let failedCount = 0 - - for (const item of items) { - try { - const itemGuid = item.guid || item.link || `${item.title}-${item.pubDate}` - - await pollingIdempotency.executeWithIdempotency( - 'rss', - `${webhookData.id}:${itemGuid}`, - async () => { - const payload: RssWebhookPayload = { - title: item.title, - link: item.link, - pubDate: item.pubDate, - item: { - title: item.title, - link: item.link, - pubDate: item.pubDate, - guid: item.guid, - description: item.description, - content: item.content, - contentSnippet: item.contentSnippet, - author: item.author || item.creator, - categories: item.categories, - enclosure: item.enclosure, - isoDate: item.isoDate, - }, - feed: { - title: feed.title, - link: feed.link, - description: feed.description, - }, - timestamp: new Date().toISOString(), - } - - const webhookUrl = `${getInternalApiBaseUrl()}/api/webhooks/trigger/${webhookData.path}` - - const response = await fetch(webhookUrl, { - method: 'POST', - headers: { - 'Content-Type': 'application/json', - 'User-Agent': 'Sim/1.0', - }, - body: JSON.stringify(payload), - }) - - if (!response.ok) { - const errorText = await response.text() - logger.error( - `[${requestId}] Failed to trigger webhook for item ${itemGuid}:`, - response.status, - errorText - ) - throw new Error(`Webhook request failed: ${response.status} - ${errorText}`) - } - - return { - itemGuid, - webhookStatus: response.status, - processed: true, - } - } - ) - - logger.info( - `[${requestId}] Successfully processed item ${item.title || itemGuid} for webhook ${webhookData.id}` - ) - processedCount++ - } catch (error) { - const errorMessage = error instanceof Error ? error.message : 'Unknown error' - logger.error(`[${requestId}] Error processing item:`, errorMessage) - failedCount++ - } - } - - return { processedCount, failedCount } -} - -async function updateWebhookConfig(webhookId: string, timestamp: string, newGuids: string[]) { - try { - const result = await db.select().from(webhook).where(eq(webhook.id, webhookId)) - const existingConfig = (result[0]?.providerConfig as Record) || {} - - const existingGuids = existingConfig.lastSeenGuids || [] - const allGuids = [...newGuids, ...existingGuids].slice(0, MAX_GUIDS_TO_TRACK) - - await db - .update(webhook) - .set({ - providerConfig: { - ...existingConfig, - lastCheckedTimestamp: timestamp, - lastSeenGuids: allGuids, - } as any, - updatedAt: new Date(), - }) - .where(eq(webhook.id, webhookId)) - } catch (err) { - logger.error(`Failed to update webhook ${webhookId} config:`, err) - } -} From 21f49cce9499f703e59939514d407a879ef51359 Mon Sep 17 00:00:00 2001 From: Waleed Latif Date: Tue, 7 Apr 2026 20:40:23 -0700 Subject: [PATCH 2/4] polish(polling): extract lock TTL constant and remove unnecessary type casts - Widen processPolledWebhookEvent body param to accept object, eliminating `as unknown as Record` double casts in all 4 handlers - Extract LOCK_TTL_SECONDS constant in route, tying maxDuration and lock TTL to a single value Co-Authored-By: Claude Opus 4.6 --- apps/sim/app/api/webhooks/poll/[provider]/route.ts | 7 +++++-- apps/sim/lib/webhooks/polling/gmail.ts | 2 +- apps/sim/lib/webhooks/polling/imap.ts | 2 +- apps/sim/lib/webhooks/polling/outlook.ts | 2 +- apps/sim/lib/webhooks/polling/rss.ts | 2 +- apps/sim/lib/webhooks/processor.ts | 2 +- 6 files changed, 10 insertions(+), 7 deletions(-) diff --git a/apps/sim/app/api/webhooks/poll/[provider]/route.ts b/apps/sim/app/api/webhooks/poll/[provider]/route.ts index 425a375c62d..dd0a230e95f 100644 --- a/apps/sim/app/api/webhooks/poll/[provider]/route.ts +++ b/apps/sim/app/api/webhooks/poll/[provider]/route.ts @@ -7,8 +7,11 @@ import { pollProvider, VALID_POLLING_PROVIDERS } from '@/lib/webhooks/polling' const logger = createLogger('PollingAPI') +/** Lock TTL in seconds — matches maxDuration so the lock auto-expires if the function times out. */ +const LOCK_TTL_SECONDS = 180 + export const dynamic = 'force-dynamic' -export const maxDuration = 180 +export const maxDuration = LOCK_TTL_SECONDS export async function GET( request: NextRequest, @@ -29,7 +32,7 @@ export async function GET( if (authError) return authError lockValue = requestId - const locked = await acquireLock(LOCK_KEY, lockValue, 180) + const locked = await acquireLock(LOCK_KEY, lockValue, LOCK_TTL_SECONDS) if (!locked) { return NextResponse.json( { diff --git a/apps/sim/lib/webhooks/polling/gmail.ts b/apps/sim/lib/webhooks/polling/gmail.ts index 32d6d25a343..7db8587d2c2 100644 --- a/apps/sim/lib/webhooks/polling/gmail.ts +++ b/apps/sim/lib/webhooks/polling/gmail.ts @@ -488,7 +488,7 @@ async function processEmails( const result = await processPolledWebhookEvent( webhookData, workflowData, - webhookPayload as unknown as Record, + webhookPayload, requestId ) diff --git a/apps/sim/lib/webhooks/polling/imap.ts b/apps/sim/lib/webhooks/polling/imap.ts index e0763b7b0ab..e5822aa8882 100644 --- a/apps/sim/lib/webhooks/polling/imap.ts +++ b/apps/sim/lib/webhooks/polling/imap.ts @@ -518,7 +518,7 @@ async function processEmails( const result = await processPolledWebhookEvent( webhookData, workflowData, - payload as unknown as Record, + payload, requestId ) diff --git a/apps/sim/lib/webhooks/polling/outlook.ts b/apps/sim/lib/webhooks/polling/outlook.ts index 0eb4e248cb4..f5600148a2c 100644 --- a/apps/sim/lib/webhooks/polling/outlook.ts +++ b/apps/sim/lib/webhooks/polling/outlook.ts @@ -412,7 +412,7 @@ async function processOutlookEmails( const result = await processPolledWebhookEvent( webhookData, workflowData, - payload as unknown as Record, + payload, requestId ) diff --git a/apps/sim/lib/webhooks/polling/rss.ts b/apps/sim/lib/webhooks/polling/rss.ts index f98b9d16af9..31044fc8924 100644 --- a/apps/sim/lib/webhooks/polling/rss.ts +++ b/apps/sim/lib/webhooks/polling/rss.ts @@ -275,7 +275,7 @@ async function processRssItems( const result = await processPolledWebhookEvent( webhookData, workflowData, - payload as unknown as Record, + payload, requestId ) diff --git a/apps/sim/lib/webhooks/processor.ts b/apps/sim/lib/webhooks/processor.ts index 64877fb6664..d75f539ee38 100644 --- a/apps/sim/lib/webhooks/processor.ts +++ b/apps/sim/lib/webhooks/processor.ts @@ -708,7 +708,7 @@ interface PolledWorkflowRecord { export async function processPolledWebhookEvent( foundWebhook: PolledWebhookRecord, foundWorkflow: PolledWorkflowRecord, - body: Record, + body: Record | object, requestId: string ): Promise { if (!foundWebhook.provider) { From 4a1000c0d87ccd218f65342601b821d83fece2a3 Mon Sep 17 00:00:00 2001 From: Waleed Latif Date: Tue, 7 Apr 2026 20:42:31 -0700 Subject: [PATCH 3/4] fix(polling): address PR review feedback - Add archivedAt filters to fetchActiveWebhooks query, matching findWebhookAndWorkflow in processor.ts to prevent polling archived webhooks/workflows - Move provider validation after auth check to prevent provider enumeration by unauthenticated callers - Fix inconsistent pollingIdempotency import path in outlook.ts to match other handlers Co-Authored-By: Claude Opus 4.6 --- apps/sim/app/api/webhooks/poll/[provider]/route.ts | 8 ++++---- apps/sim/lib/webhooks/polling/outlook.ts | 2 +- apps/sim/lib/webhooks/polling/utils.ts | 2 ++ 3 files changed, 7 insertions(+), 5 deletions(-) diff --git a/apps/sim/app/api/webhooks/poll/[provider]/route.ts b/apps/sim/app/api/webhooks/poll/[provider]/route.ts index dd0a230e95f..078e907dfe2 100644 --- a/apps/sim/app/api/webhooks/poll/[provider]/route.ts +++ b/apps/sim/app/api/webhooks/poll/[provider]/route.ts @@ -20,10 +20,6 @@ export async function GET( const { provider } = await params const requestId = generateShortId() - if (!VALID_POLLING_PROVIDERS.has(provider)) { - return NextResponse.json({ error: `Unknown polling provider: ${provider}` }, { status: 404 }) - } - const LOCK_KEY = `${provider}-polling-lock` let lockValue: string | undefined @@ -31,6 +27,10 @@ export async function GET( const authError = verifyCronAuth(request, `${provider} webhook polling`) if (authError) return authError + if (!VALID_POLLING_PROVIDERS.has(provider)) { + return NextResponse.json({ error: `Unknown polling provider: ${provider}` }, { status: 404 }) + } + lockValue = requestId const locked = await acquireLock(LOCK_KEY, lockValue, LOCK_TTL_SECONDS) if (!locked) { diff --git a/apps/sim/lib/webhooks/polling/outlook.ts b/apps/sim/lib/webhooks/polling/outlook.ts index f5600148a2c..e6874940c61 100644 --- a/apps/sim/lib/webhooks/polling/outlook.ts +++ b/apps/sim/lib/webhooks/polling/outlook.ts @@ -1,5 +1,5 @@ import { htmlToText } from 'html-to-text' -import { pollingIdempotency } from '@/lib/core/idempotency' +import { pollingIdempotency } from '@/lib/core/idempotency/service' import type { PollingProviderHandler, PollWebhookContext } from '@/lib/webhooks/polling/types' import { markWebhookFailed, diff --git a/apps/sim/lib/webhooks/polling/utils.ts b/apps/sim/lib/webhooks/polling/utils.ts index 3578c97c28b..cca46585258 100644 --- a/apps/sim/lib/webhooks/polling/utils.ts +++ b/apps/sim/lib/webhooks/polling/utils.ts @@ -86,7 +86,9 @@ export async function fetchActiveWebhooks( and( eq(webhook.provider, provider), eq(webhook.isActive, true), + isNull(webhook.archivedAt), eq(workflow.isDeployed, true), + isNull(workflow.archivedAt), or( eq(webhook.deploymentVersionId, workflowDeploymentVersion.id), and(isNull(workflowDeploymentVersion.id), isNull(webhook.deploymentVersionId)) From 7828b1e6849484d54fdbcb54b2f5d842f25d0155 Mon Sep 17 00:00:00 2001 From: Waleed Latif Date: Tue, 7 Apr 2026 20:48:52 -0700 Subject: [PATCH 4/4] fix(polling): use literal for maxDuration segment config Next.js requires segment config exports to be statically analyzable literals. Using a variable reference caused build failure. Co-Authored-By: Claude Opus 4.6 --- apps/sim/app/api/webhooks/poll/[provider]/route.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/apps/sim/app/api/webhooks/poll/[provider]/route.ts b/apps/sim/app/api/webhooks/poll/[provider]/route.ts index 078e907dfe2..d314e8563bb 100644 --- a/apps/sim/app/api/webhooks/poll/[provider]/route.ts +++ b/apps/sim/app/api/webhooks/poll/[provider]/route.ts @@ -7,11 +7,11 @@ import { pollProvider, VALID_POLLING_PROVIDERS } from '@/lib/webhooks/polling' const logger = createLogger('PollingAPI') -/** Lock TTL in seconds — matches maxDuration so the lock auto-expires if the function times out. */ +/** Lock TTL in seconds — must match maxDuration so the lock auto-expires if the function times out. */ const LOCK_TTL_SECONDS = 180 export const dynamic = 'force-dynamic' -export const maxDuration = LOCK_TTL_SECONDS +export const maxDuration = 180 export async function GET( request: NextRequest,