-
-
Notifications
You must be signed in to change notification settings - Fork 358
feat: add email send idempotency caching #281
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,49 @@ | ||
| --- | ||
| title: Email idempotency | ||
| description: Deduplicate email send requests by reusing the same queued job. | ||
| --- | ||
|
|
||
| ## Overview | ||
|
|
||
| When you provide an `idempotencyKey` with the email send endpoint, useSend will | ||
| short-circuit duplicate requests and return the first queued email. The key is | ||
| scoped to the team and automatically expires after three days so you do not need | ||
| any manual cleanup. | ||
|
|
||
| ## JavaScript SDK example | ||
|
|
||
| ```ts | ||
| import { UseSend } from "usesend-js"; | ||
|
|
||
| const usesend = new UseSend(process.env.USESEND_API_KEY!); | ||
|
|
||
| await usesend.emails.send({ | ||
| to: "recipient@example.com", | ||
| from: "hello@example.com", | ||
| subject: "Receipt", | ||
| html: "<p>Thanks for your order!</p>", | ||
| idempotencyKey: "order-123", | ||
| }); | ||
| ``` | ||
|
|
||
| ## Python SDK example | ||
|
|
||
| ```py | ||
| from usesend import UseSend | ||
|
|
||
| client = UseSend("us_your_api_key") | ||
|
|
||
| client.emails.send({ | ||
| "to": "recipient@example.com", | ||
| "from": "hello@example.com", | ||
| "subject": "Receipt", | ||
| "html": "<p>Thanks for your order!</p>", | ||
| "idempotencyKey": "order-123", | ||
| }) | ||
| ``` | ||
|
|
||
| :::note | ||
| Repeated calls with the same key will reuse the cached response until the | ||
| three-day expiration window passes. If a send fails, a new request can reuse the | ||
| same key after the failure record is cleared automatically. | ||
| ::: | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -48,3 +48,52 @@ export async function withCache<T>( | |
|
|
||
| return value; | ||
| } | ||
|
|
||
| export type RedisSetJsonOptions = { | ||
| ttlSeconds?: number; | ||
| mode?: "NX" | "XX"; | ||
| }; | ||
|
|
||
| export async function setJsonValue( | ||
| key: string, | ||
| value: unknown, | ||
| options?: RedisSetJsonOptions | ||
| ): Promise<"OK" | null> { | ||
| const redis = getRedis(); | ||
| const payload = JSON.stringify(value); | ||
| const mode = options?.mode; | ||
| const ttlSeconds = options?.ttlSeconds; | ||
|
|
||
| if (mode && ttlSeconds) { | ||
| return redis.set(key, payload, mode, "EX", ttlSeconds); | ||
| } | ||
|
|
||
| if (mode) { | ||
| return redis.set(key, payload, mode); | ||
| } | ||
|
|
||
| if (ttlSeconds) { | ||
| return redis.set(key, payload, "EX", ttlSeconds); | ||
| } | ||
|
|
||
| return redis.set(key, payload); | ||
| } | ||
|
Comment on lines
+57
to
+80
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🧩 Analysis chainVerify ioredis SET command argument order. The Alternatively, test this syntax in the integration tests or verify against ioredis documentation: 🏁 Script executed: #!/bin/bash
# Verify ioredis SET command syntax supports NX/XX with EX in this argument order
# Search for ioredis SET usage patterns in the codebase
rg -n -C3 'redis\.set\(' --type=ts
# Check ioredis type definitions
fd -e d.ts -x cat {} \; | rg -A5 'interface Redis' | head -50Length of output: 960 🌐 Web query: 💡 Result: Use positional arguments (they're passed straight to Redis). Example:
Notes:
Sources: Fix argument order for redis.set() with NX/XX and EX options on line 68. The call on line 68 passes arguments in incorrect order: 🤖 Prompt for AI Agents |
||
|
|
||
| export async function getJsonValue<T>(key: string): Promise<T | null> { | ||
| const redis = getRedis(); | ||
| const cached = await redis.get(key); | ||
| if (!cached) { | ||
| return null; | ||
| } | ||
|
|
||
| try { | ||
| return JSON.parse(cached) as T; | ||
| } catch { | ||
| return null; | ||
| } | ||
| } | ||
|
|
||
| export async function deleteKey(key: string): Promise<number> { | ||
| const redis = getRedis(); | ||
| return redis.del(key); | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -11,6 +11,65 @@ import { logger } from "../logger/log"; | |
| import { SuppressionService } from "./suppression-service"; | ||
| import { sanitizeCustomHeaders } from "~/server/utils/email-headers"; | ||
| import { Prisma } from "@prisma/client"; | ||
| import { deleteKey, getJsonValue, setJsonValue } from "../redis"; | ||
|
|
||
| const IDEMPOTENCY_TTL_SECONDS = 60 * 60 * 24 * 3; // 3 days | ||
| const IDEMPOTENCY_WAIT_ATTEMPTS = 5; | ||
| const IDEMPOTENCY_WAIT_DELAY_MS = 100; | ||
|
|
||
| type EmailIdempotencyStatus = "PENDING" | "CREATED" | "QUEUED" | "FAILED"; | ||
|
|
||
| type EmailIdempotencyRecord = { | ||
| status: EmailIdempotencyStatus; | ||
| emailId?: string; | ||
| createdAt: string; | ||
| queuedAt?: string; | ||
| lastUpdatedAt: string; | ||
| error?: string; | ||
| }; | ||
|
|
||
| async function waitForIdempotentRecord( | ||
| redisKey: string | ||
| ): Promise<EmailIdempotencyRecord | null> { | ||
| for (let attempt = 0; attempt < IDEMPOTENCY_WAIT_ATTEMPTS; attempt += 1) { | ||
| await new Promise((resolve) => setTimeout(resolve, IDEMPOTENCY_WAIT_DELAY_MS)); | ||
| const record = await getJsonValue<EmailIdempotencyRecord>(redisKey); | ||
| if (!record || record.status === "FAILED" || record.emailId) { | ||
| return record ?? null; | ||
| } | ||
| } | ||
|
|
||
| return getJsonValue<EmailIdempotencyRecord>(redisKey); | ||
| } | ||
|
|
||
| async function resolveEmailFromRecord( | ||
| record: EmailIdempotencyRecord | null, | ||
| redisKey: string, | ||
| teamId: number | ||
| ) { | ||
| if (!record || !record.emailId) { | ||
| return null; | ||
| } | ||
|
|
||
| const email = await db.email.findUnique({ where: { id: record.emailId } }); | ||
|
|
||
| if (!email) { | ||
| await deleteKey(redisKey); | ||
| return null; | ||
| } | ||
|
|
||
| logger.info( | ||
| { | ||
| emailId: email.id, | ||
| teamId, | ||
| redisKey, | ||
| status: record.status, | ||
| }, | ||
| "Returning cached idempotent email send result (entries expire after 3 days)." | ||
| ); | ||
|
|
||
| return email; | ||
| } | ||
|
|
||
| async function checkIfValidEmail(emailId: string) { | ||
| const email = await db.email.findUnique({ | ||
|
|
@@ -72,10 +131,98 @@ export async function sendEmail( | |
| apiKeyId, | ||
| inReplyToId, | ||
| headers, | ||
| idempotencyKey, | ||
| } = emailContent; | ||
| let subject = subjectFromApiCall; | ||
| let html = htmlFromApiCall; | ||
|
|
||
| const normalizedIdempotencyKey = idempotencyKey?.trim(); | ||
| const idempotencyRedisKey = normalizedIdempotencyKey | ||
| ? `email-idempotency:${teamId}:${normalizedIdempotencyKey}` | ||
| : null; | ||
| let idempotencyRecordSnapshot: EmailIdempotencyRecord | null = null; | ||
| let idempotencyCreatedAt = new Date().toISOString(); | ||
|
|
||
| if (idempotencyRedisKey) { | ||
| let existingRecord = await getJsonValue<EmailIdempotencyRecord>( | ||
| idempotencyRedisKey | ||
| ); | ||
|
|
||
| if (existingRecord?.status === "FAILED") { | ||
| await deleteKey(idempotencyRedisKey); | ||
| existingRecord = null; | ||
| } else { | ||
| const cachedEmail = await resolveEmailFromRecord( | ||
| existingRecord, | ||
| idempotencyRedisKey, | ||
| teamId | ||
| ); | ||
|
|
||
| if (cachedEmail) { | ||
| return cachedEmail; | ||
| } | ||
|
|
||
| if (existingRecord?.status === "PENDING") { | ||
| const awaitedRecord = await waitForIdempotentRecord( | ||
| idempotencyRedisKey | ||
| ); | ||
| const awaitedEmail = await resolveEmailFromRecord( | ||
| awaitedRecord, | ||
| idempotencyRedisKey, | ||
| teamId | ||
| ); | ||
|
|
||
| if (awaitedEmail) { | ||
| return awaitedEmail; | ||
| } | ||
|
|
||
| if (awaitedRecord?.status === "FAILED") { | ||
| await deleteKey(idempotencyRedisKey); | ||
| existingRecord = null; | ||
| } else { | ||
| existingRecord = awaitedRecord; | ||
| } | ||
| } | ||
| } | ||
|
|
||
| const nowIso = new Date().toISOString(); | ||
| const pendingRecord: EmailIdempotencyRecord = { | ||
| status: "PENDING", | ||
| createdAt: existingRecord?.createdAt ?? nowIso, | ||
| lastUpdatedAt: nowIso, | ||
| }; | ||
|
Comment on lines
+189
to
+193
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
The code writes a Useful? React with 👍 / 👎. |
||
|
|
||
| const setResult = await setJsonValue(idempotencyRedisKey, pendingRecord, { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Errors thrown after creating the PENDING idempotency record can leave the key stuck for up to 3 days, causing subsequent requests with the same idempotencyKey to fail with NOT_UNIQUE. Wrap the processing that follows creation of the PENDING record in a try-catch and, on any error, update the Redis record to status "FAILED" (preserving createdAt and setting lastUpdatedAt) before rethrowing to avoid blocking retries. Prompt for AI agents |
||
| ttlSeconds: IDEMPOTENCY_TTL_SECONDS, | ||
| mode: "NX", | ||
| }); | ||
|
|
||
| if (setResult !== "OK") { | ||
| const latestRecord = await waitForIdempotentRecord(idempotencyRedisKey); | ||
| const latestEmail = await resolveEmailFromRecord( | ||
| latestRecord, | ||
| idempotencyRedisKey, | ||
| teamId | ||
| ); | ||
|
|
||
| if (latestEmail) { | ||
| return latestEmail; | ||
| } | ||
|
|
||
| if (latestRecord?.status === "FAILED") { | ||
| await deleteKey(idempotencyRedisKey); | ||
| } | ||
|
|
||
| throw new UnsendApiError({ | ||
| code: "NOT_UNIQUE", | ||
| message: "A request with this idempotency key is already in progress.", | ||
| }); | ||
| } | ||
|
|
||
| idempotencyCreatedAt = pendingRecord.createdAt; | ||
| idempotencyRecordSnapshot = pendingRecord; | ||
| } | ||
|
Comment on lines
+139
to
+224
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Critical: Wrap idempotency flow in try-catch to clean up PENDING records. If an error occurs after creating the PENDING record (line 195) but before updating to CREATED (line 430)—such as validation failures on lines 236-246 or lines 377-389—the PENDING record remains stuck in Redis for 3 days. Subsequent requests with the same Wrap the entire flow from line 224 onward in a try-catch block to update the record to FAILED on any error: idempotencyCreatedAt = pendingRecord.createdAt;
idempotencyRecordSnapshot = pendingRecord;
}
+
+ try {
let domain: Awaited<ReturnType<typeof validateDomainFromEmail>>;
// ... (rest of the email creation flow)
return email;
+
+ } catch (error: any) {
+ if (idempotencyRedisKey) {
+ const failedAt = new Date().toISOString();
+ const failureRecord: EmailIdempotencyRecord = {
+ status: "FAILED",
+ createdAt: idempotencyCreatedAt,
+ lastUpdatedAt: failedAt,
+ error: error instanceof Error ? error.message : String(error),
+ };
+ await setJsonValue(idempotencyRedisKey, failureRecord, {
+ ttlSeconds: IDEMPOTENCY_TTL_SECONDS,
+ });
+ }
+ throw error;
+ }
}
🤖 Prompt for AI Agents |
||
|
|
||
| let domain: Awaited<ReturnType<typeof validateDomainFromEmail>>; | ||
|
|
||
| // If this is an API call with an API key, validate domain access | ||
|
|
@@ -271,6 +418,22 @@ export async function sendEmail( | |
| }, | ||
| }); | ||
|
|
||
| if (idempotencyRedisKey) { | ||
| const createdAtIso = new Date().toISOString(); | ||
| const createdRecord: EmailIdempotencyRecord = { | ||
| status: "CREATED", | ||
| emailId: email.id, | ||
| createdAt: idempotencyCreatedAt, | ||
| lastUpdatedAt: createdAtIso, | ||
| }; | ||
|
|
||
| await setJsonValue(idempotencyRedisKey, createdRecord, { | ||
| ttlSeconds: IDEMPOTENCY_TTL_SECONDS, | ||
| }); | ||
|
|
||
| idempotencyRecordSnapshot = createdRecord; | ||
| } | ||
|
|
||
| try { | ||
| await EmailQueueService.queueEmail( | ||
| email.id, | ||
|
|
@@ -280,6 +443,23 @@ export async function sendEmail( | |
| undefined, | ||
| delay | ||
| ); | ||
|
|
||
| if (idempotencyRedisKey) { | ||
| const queuedAt = new Date().toISOString(); | ||
| const queuedRecord: EmailIdempotencyRecord = { | ||
| status: "QUEUED", | ||
| emailId: email.id, | ||
| createdAt: idempotencyRecordSnapshot?.createdAt ?? idempotencyCreatedAt, | ||
| queuedAt, | ||
| lastUpdatedAt: queuedAt, | ||
| }; | ||
|
|
||
| await setJsonValue(idempotencyRedisKey, queuedRecord, { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If the Redis cache write fails after queueEmail succeeds, this line throws and we immediately mark the send as FAILED even though the job is already queued, so callers see an error and may retry, generating duplicate sends. Guard the idempotency cache update so Redis failures don't flip the request into the failure path. Prompt for AI agents |
||
| ttlSeconds: IDEMPOTENCY_TTL_SECONDS, | ||
| }); | ||
|
|
||
| idempotencyRecordSnapshot = queuedRecord; | ||
| } | ||
| } catch (error: any) { | ||
| await db.emailEvent.create({ | ||
| data: { | ||
|
|
@@ -295,6 +475,22 @@ export async function sendEmail( | |
| where: { id: email.id }, | ||
| data: { latestStatus: "FAILED" }, | ||
| }); | ||
|
|
||
| if (idempotencyRedisKey) { | ||
| const failedAt = new Date().toISOString(); | ||
| const failureRecord: EmailIdempotencyRecord = { | ||
| status: "FAILED", | ||
| emailId: email.id, | ||
| createdAt: | ||
| idempotencyRecordSnapshot?.createdAt ?? idempotencyCreatedAt, | ||
| lastUpdatedAt: failedAt, | ||
| error: error instanceof Error ? error.message : String(error), | ||
| }; | ||
|
|
||
| await setJsonValue(idempotencyRedisKey, failureRecord, { | ||
| ttlSeconds: IDEMPOTENCY_TTL_SECONDS, | ||
| }); | ||
| } | ||
| throw error; | ||
| } | ||
|
|
||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This line claims all duplicate requests return the first queued email, but the API sends an "already in progress" error when the original request hasn't finished. Please clarify the pending-request behavior.
Prompt for AI agents