From 65a09046990645b10da788df163b783ec49c2ac4 Mon Sep 17 00:00:00 2001 From: KM Koushik Date: Sat, 25 Oct 2025 06:22:04 +1100 Subject: [PATCH] feat: add email send idempotency caching --- apps/docs/docs.json | 5 +- apps/docs/guides/email-idempotency.mdx | 49 +++++ .../public-api/api/emails/send-email.ts | 8 +- .../server/public-api/schemas/email-schema.ts | 10 + apps/web/src/server/redis.ts | 49 +++++ apps/web/src/server/service/email-service.ts | 196 ++++++++++++++++++ apps/web/src/types/index.ts | 1 + packages/python-sdk/usesend/emails.py | 7 + packages/python-sdk/usesend/types.py | 1 + packages/sdk/src/email.ts | 10 + packages/sdk/types/schema.d.ts | 2 + 11 files changed, 334 insertions(+), 4 deletions(-) create mode 100644 apps/docs/guides/email-idempotency.mdx diff --git a/apps/docs/docs.json b/apps/docs/docs.json index eac7cf29..ebb4be2c 100644 --- a/apps/docs/docs.json +++ b/apps/docs/docs.json @@ -38,7 +38,10 @@ }, { "group": "Guides", - "pages": ["guides/use-with-react-email"] + "pages": [ + "guides/use-with-react-email", + "guides/email-idempotency" + ] }, { "group": "Community SDKs", diff --git a/apps/docs/guides/email-idempotency.mdx b/apps/docs/guides/email-idempotency.mdx new file mode 100644 index 00000000..d14c96a1 --- /dev/null +++ b/apps/docs/guides/email-idempotency.mdx @@ -0,0 +1,49 @@ +--- +title: Email idempotency +description: Deduplicate email send requests by reusing the same queued job. +--- + +## Overview + +When you provide an `idempotencyKey` with the email send endpoint, useSend will +short-circuit duplicate requests and return the first queued email. The key is +scoped to the team and automatically expires after three days so you do not need +any manual cleanup. + +## JavaScript SDK example + +```ts +import { UseSend } from "usesend-js"; + +const usesend = new UseSend(process.env.USESEND_API_KEY!); + +await usesend.emails.send({ + to: "recipient@example.com", + from: "hello@example.com", + subject: "Receipt", + html: "

Thanks for your order!

", + idempotencyKey: "order-123", +}); +``` + +## Python SDK example + +```py +from usesend import UseSend + +client = UseSend("us_your_api_key") + +client.emails.send({ + "to": "recipient@example.com", + "from": "hello@example.com", + "subject": "Receipt", + "html": "

Thanks for your order!

", + "idempotencyKey": "order-123", +}) +``` + +:::note +Repeated calls with the same key will reuse the cached response until the +three-day expiration window passes. If a send fails, a new request can reuse the +same key after the failure record is cleared automatically. +::: diff --git a/apps/web/src/server/public-api/api/emails/send-email.ts b/apps/web/src/server/public-api/api/emails/send-email.ts index 5a6164ca..8b213538 100644 --- a/apps/web/src/server/public-api/api/emails/send-email.ts +++ b/apps/web/src/server/public-api/api/emails/send-email.ts @@ -32,19 +32,21 @@ function send(app: PublicAPIApp) { app.openapi(route, async (c) => { const team = c.var.team; + const body = c.req.valid("json"); + let html = undefined; - const _html = c.req.valid("json")?.html?.toString(); + const _html = body?.html?.toString(); if (_html && _html !== "true" && _html !== "false") { html = _html; } const email = await sendEmail({ - ...c.req.valid("json"), + ...body, teamId: team.id, apiKeyId: team.apiKeyId, - text: c.req.valid("json").text ?? undefined, + text: body.text ?? undefined, html: html, }); diff --git a/apps/web/src/server/public-api/schemas/email-schema.ts b/apps/web/src/server/public-api/schemas/email-schema.ts index 1a49a08c..cdc6b51b 100644 --- a/apps/web/src/server/public-api/schemas/email-schema.ts +++ b/apps/web/src/server/public-api/schemas/email-schema.ts @@ -7,6 +7,16 @@ export const emailSchema = z .object({ to: z.string().or(z.array(z.string())), from: z.string(), + idempotencyKey: z + .string() + .trim() + .min(1) + .max(255) + .optional() + .openapi({ + description: + "Optional key to deduplicate send requests. Duplicate keys reuse results.", + }), subject: z.string().min(1).optional().openapi({ description: "Optional when templateId is provided", }), diff --git a/apps/web/src/server/redis.ts b/apps/web/src/server/redis.ts index 9813a891..cf9ad8e1 100644 --- a/apps/web/src/server/redis.ts +++ b/apps/web/src/server/redis.ts @@ -48,3 +48,52 @@ export async function withCache( return value; } + +export type RedisSetJsonOptions = { + ttlSeconds?: number; + mode?: "NX" | "XX"; +}; + +export async function setJsonValue( + key: string, + value: unknown, + options?: RedisSetJsonOptions +): Promise<"OK" | null> { + const redis = getRedis(); + const payload = JSON.stringify(value); + const mode = options?.mode; + const ttlSeconds = options?.ttlSeconds; + + if (mode && ttlSeconds) { + return redis.set(key, payload, mode, "EX", ttlSeconds); + } + + if (mode) { + return redis.set(key, payload, mode); + } + + if (ttlSeconds) { + return redis.set(key, payload, "EX", ttlSeconds); + } + + return redis.set(key, payload); +} + +export async function getJsonValue(key: string): Promise { + const redis = getRedis(); + const cached = await redis.get(key); + if (!cached) { + return null; + } + + try { + return JSON.parse(cached) as T; + } catch { + return null; + } +} + +export async function deleteKey(key: string): Promise { + const redis = getRedis(); + return redis.del(key); +} diff --git a/apps/web/src/server/service/email-service.ts b/apps/web/src/server/service/email-service.ts index 58a36e19..5e7fbde2 100644 --- a/apps/web/src/server/service/email-service.ts +++ b/apps/web/src/server/service/email-service.ts @@ -11,6 +11,65 @@ import { logger } from "../logger/log"; import { SuppressionService } from "./suppression-service"; import { sanitizeCustomHeaders } from "~/server/utils/email-headers"; import { Prisma } from "@prisma/client"; +import { deleteKey, getJsonValue, setJsonValue } from "../redis"; + +const IDEMPOTENCY_TTL_SECONDS = 60 * 60 * 24 * 3; // 3 days +const IDEMPOTENCY_WAIT_ATTEMPTS = 5; +const IDEMPOTENCY_WAIT_DELAY_MS = 100; + +type EmailIdempotencyStatus = "PENDING" | "CREATED" | "QUEUED" | "FAILED"; + +type EmailIdempotencyRecord = { + status: EmailIdempotencyStatus; + emailId?: string; + createdAt: string; + queuedAt?: string; + lastUpdatedAt: string; + error?: string; +}; + +async function waitForIdempotentRecord( + redisKey: string +): Promise { + for (let attempt = 0; attempt < IDEMPOTENCY_WAIT_ATTEMPTS; attempt += 1) { + await new Promise((resolve) => setTimeout(resolve, IDEMPOTENCY_WAIT_DELAY_MS)); + const record = await getJsonValue(redisKey); + if (!record || record.status === "FAILED" || record.emailId) { + return record ?? null; + } + } + + return getJsonValue(redisKey); +} + +async function resolveEmailFromRecord( + record: EmailIdempotencyRecord | null, + redisKey: string, + teamId: number +) { + if (!record || !record.emailId) { + return null; + } + + const email = await db.email.findUnique({ where: { id: record.emailId } }); + + if (!email) { + await deleteKey(redisKey); + return null; + } + + logger.info( + { + emailId: email.id, + teamId, + redisKey, + status: record.status, + }, + "Returning cached idempotent email send result (entries expire after 3 days)." + ); + + return email; +} async function checkIfValidEmail(emailId: string) { const email = await db.email.findUnique({ @@ -72,10 +131,98 @@ export async function sendEmail( apiKeyId, inReplyToId, headers, + idempotencyKey, } = emailContent; let subject = subjectFromApiCall; let html = htmlFromApiCall; + const normalizedIdempotencyKey = idempotencyKey?.trim(); + const idempotencyRedisKey = normalizedIdempotencyKey + ? `email-idempotency:${teamId}:${normalizedIdempotencyKey}` + : null; + let idempotencyRecordSnapshot: EmailIdempotencyRecord | null = null; + let idempotencyCreatedAt = new Date().toISOString(); + + if (idempotencyRedisKey) { + let existingRecord = await getJsonValue( + idempotencyRedisKey + ); + + if (existingRecord?.status === "FAILED") { + await deleteKey(idempotencyRedisKey); + existingRecord = null; + } else { + const cachedEmail = await resolveEmailFromRecord( + existingRecord, + idempotencyRedisKey, + teamId + ); + + if (cachedEmail) { + return cachedEmail; + } + + if (existingRecord?.status === "PENDING") { + const awaitedRecord = await waitForIdempotentRecord( + idempotencyRedisKey + ); + const awaitedEmail = await resolveEmailFromRecord( + awaitedRecord, + idempotencyRedisKey, + teamId + ); + + if (awaitedEmail) { + return awaitedEmail; + } + + if (awaitedRecord?.status === "FAILED") { + await deleteKey(idempotencyRedisKey); + existingRecord = null; + } else { + existingRecord = awaitedRecord; + } + } + } + + const nowIso = new Date().toISOString(); + const pendingRecord: EmailIdempotencyRecord = { + status: "PENDING", + createdAt: existingRecord?.createdAt ?? nowIso, + lastUpdatedAt: nowIso, + }; + + const setResult = await setJsonValue(idempotencyRedisKey, pendingRecord, { + ttlSeconds: IDEMPOTENCY_TTL_SECONDS, + mode: "NX", + }); + + if (setResult !== "OK") { + const latestRecord = await waitForIdempotentRecord(idempotencyRedisKey); + const latestEmail = await resolveEmailFromRecord( + latestRecord, + idempotencyRedisKey, + teamId + ); + + if (latestEmail) { + return latestEmail; + } + + if (latestRecord?.status === "FAILED") { + await deleteKey(idempotencyRedisKey); + } + + throw new UnsendApiError({ + code: "NOT_UNIQUE", + message: "A request with this idempotency key is already in progress.", + }); + } + + idempotencyCreatedAt = pendingRecord.createdAt; + idempotencyRecordSnapshot = pendingRecord; + } + let domain: Awaited>; // If this is an API call with an API key, validate domain access @@ -271,6 +418,22 @@ export async function sendEmail( }, }); + if (idempotencyRedisKey) { + const createdAtIso = new Date().toISOString(); + const createdRecord: EmailIdempotencyRecord = { + status: "CREATED", + emailId: email.id, + createdAt: idempotencyCreatedAt, + lastUpdatedAt: createdAtIso, + }; + + await setJsonValue(idempotencyRedisKey, createdRecord, { + ttlSeconds: IDEMPOTENCY_TTL_SECONDS, + }); + + idempotencyRecordSnapshot = createdRecord; + } + try { await EmailQueueService.queueEmail( email.id, @@ -280,6 +443,23 @@ export async function sendEmail( undefined, delay ); + + if (idempotencyRedisKey) { + const queuedAt = new Date().toISOString(); + const queuedRecord: EmailIdempotencyRecord = { + status: "QUEUED", + emailId: email.id, + createdAt: idempotencyRecordSnapshot?.createdAt ?? idempotencyCreatedAt, + queuedAt, + lastUpdatedAt: queuedAt, + }; + + await setJsonValue(idempotencyRedisKey, queuedRecord, { + ttlSeconds: IDEMPOTENCY_TTL_SECONDS, + }); + + idempotencyRecordSnapshot = queuedRecord; + } } catch (error: any) { await db.emailEvent.create({ data: { @@ -295,6 +475,22 @@ export async function sendEmail( where: { id: email.id }, data: { latestStatus: "FAILED" }, }); + + if (idempotencyRedisKey) { + const failedAt = new Date().toISOString(); + const failureRecord: EmailIdempotencyRecord = { + status: "FAILED", + emailId: email.id, + createdAt: + idempotencyRecordSnapshot?.createdAt ?? idempotencyCreatedAt, + lastUpdatedAt: failedAt, + error: error instanceof Error ? error.message : String(error), + }; + + await setJsonValue(idempotencyRedisKey, failureRecord, { + ttlSeconds: IDEMPOTENCY_TTL_SECONDS, + }); + } throw error; } diff --git a/apps/web/src/types/index.ts b/apps/web/src/types/index.ts index f71955c4..e4678498 100644 --- a/apps/web/src/types/index.ts +++ b/apps/web/src/types/index.ts @@ -1,6 +1,7 @@ export type EmailContent = { to: string | string[]; from: string; + idempotencyKey?: string; subject?: string; templateId?: string; variables?: Record; diff --git a/packages/python-sdk/usesend/emails.py b/packages/python-sdk/usesend/emails.py index 3756fcbe..cb990735 100644 --- a/packages/python-sdk/usesend/emails.py +++ b/packages/python-sdk/usesend/emails.py @@ -42,6 +42,13 @@ def create(self, payload: Union[EmailCreate, Dict[str, Any]]) -> Tuple[Optional[ if isinstance(body.get("scheduledAt"), datetime): body["scheduledAt"] = body["scheduledAt"].isoformat() + if body.get("idempotencyKey") is not None: + key = str(body["idempotencyKey"]).strip() + if key: + body["idempotencyKey"] = key + else: + body.pop("idempotencyKey", None) + data, err = self.usesend.post("/emails", body) return (data, err) # type: ignore[return-value] diff --git a/packages/python-sdk/usesend/types.py b/packages/python-sdk/usesend/types.py index 6ec188de..1a3d8e88 100644 --- a/packages/python-sdk/usesend/types.py +++ b/packages/python-sdk/usesend/types.py @@ -210,6 +210,7 @@ class Attachment(TypedDict): { "to": Required[Union[str, List[str]]], "from": Required[str], + "idempotencyKey": NotRequired[str], "subject": NotRequired[str], "templateId": NotRequired[str], "variables": NotRequired[Dict[str, str]], diff --git a/packages/sdk/src/email.ts b/packages/sdk/src/email.ts index 14ede274..736ff3b8 100644 --- a/packages/sdk/src/email.ts +++ b/packages/sdk/src/email.ts @@ -82,6 +82,16 @@ export class Emails { delete payload.react; } + if (payload.idempotencyKey !== undefined) { + const normalizedKey = String(payload.idempotencyKey).trim(); + + if (normalizedKey.length > 0) { + payload.idempotencyKey = normalizedKey; + } else { + delete payload.idempotencyKey; + } + } + const data = await this.usesend.post( "/emails", payload diff --git a/packages/sdk/types/schema.d.ts b/packages/sdk/types/schema.d.ts index 6ef77a9c..0590ed38 100644 --- a/packages/sdk/types/schema.d.ts +++ b/packages/sdk/types/schema.d.ts @@ -570,6 +570,8 @@ export interface paths { "application/json": { to: string | string[]; from: string; + /** @description Optional key to deduplicate send requests. Duplicate keys reuse results. */ + idempotencyKey?: string; /** @description Optional when templateId is provided */ subject?: string; /** @description ID of a template from the dashboard */