Skip to content

Phase/worker retry dlq#4

Merged
Ryanakml merged 4 commits intomainfrom
phase/worker-retry-dlq
Mar 7, 2026
Merged

Phase/worker retry dlq#4
Ryanakml merged 4 commits intomainfrom
phase/worker-retry-dlq

Conversation

@Ryanakml
Copy link
Copy Markdown
Owner

@Ryanakml Ryanakml commented Mar 7, 2026

Summary by CodeRabbit

Release Notes

  • New Features

    • Configurable worker retry policies with exponential backoff and jitter
    • Dead-letter queue replay functionality to re-process failed jobs
    • Job timeout enforcement to prevent indefinite processing
    • Structured payload schema versioning for job tracking
  • Configuration

    • New environment variables for worker concurrency, job timeout, and retry behavior (transient/permanent max attempts, backoff delay, backoff jitter)

@coderabbitai
Copy link
Copy Markdown

coderabbitai bot commented Mar 7, 2026

📝 Walkthrough

Walkthrough

This PR introduces a comprehensive worker retry and dead-letter queue system. It adds configurable retry policies via environment variables, shared payload schemas with versioning, a new worker service for processing ingress jobs with transient/permanent error classification, and DLQ replay functionality for recovering failed jobs.

Changes

Cohort / File(s) Summary
Configuration Defaults
apps/api/.env.example, apps/worker/.env.example, packages/config/src/index.*
Adds four worker retry environment variables (WORKER_RETRY_TRANSIENT_MAX_ATTEMPTS, WORKER_RETRY_PERMANENT_MAX_ATTEMPTS, WORKER_RETRY_BACKOFF_DELAY_MS, WORKER_RETRY_BACKOFF_JITTER) plus defaults for concurrency and job timeout to both .env examples and config package exports with new resolveWorkerRetryPolicy function.
Shared Payload & Validation
packages/shared/src/index.*
Introduces comprehensive JSON/payload type definitions (JsonValue, IngressJobPayload, IngressDlqJobPayload) with schema versioning, queue/job name constants, and full runtime validators (isIngressJobPayload, assertIngressJobPayload, createIngressJobPayload, and DLQ equivalents).
API Ingress Queue Enhancement
apps/api/src/index.*, apps/api/tsconfig.json
Modifies createBullMqIngressQueue to accept optional retryPolicy parameter, adds createIngressQueueRetryJobOptions helper, imports IngressJobPayload from shared, updates enqueue payloads to use createIngressJobPayload helper, and adds @wa-chat/shared path alias.
Worker Service & DLQ Processing
apps/worker/src/index.ts, apps/worker/src/queue/consumer.ts
Introduces new worker service initialization (startWorker), DLQ queue creation and routing logic (createBullMqIngressDlqQueue, routeFailedIngressJobToDlq, buildIngressDlqPayloadFromFailure), job processor with timeout and error classification (runIngressJob, classifyWorkerError), and policy resolution (resolveWorkerPolicies, resolveWillRetry).
DLQ Replay Utility
apps/worker/src/dlq/replay.ts
Adds comprehensive DLQ replay module with CLI argument parsing (parseDlqReplayCliArgs), candidate evaluation (evaluateDlqReplayCandidate), job option resolution (resolveReplayJobOptions), main replay orchestration (runDlqReplay), and audit logging (emitDlqReplayAuditLog) with configurable limits and age-based filtering.
Test Scripts & Configuration
apps/api/package.json, apps/worker/package.json, packages/config/package.json, packages/shared/tsconfig.json
Updates test scripts to use node --import tsx instead of tsx directly; adds worker build scripts to include shared package build; adds test and dlq:replay scripts to worker; adds @wa-chat/shared and bullmq dependencies to worker; enables concrete TypeScript output directories in shared tsconfig.
Test Coverage
apps/api/src/index.test.*, apps/worker/src/index.test.ts, apps/worker/src/dlq/replay.test.ts
Adds test cases for retry configuration validation, enqueued job schema version assertion, worker policy resolution defaults/parsing, job processing with timeouts, error classification and retry logic, DLQ payload construction, and end-to-end DLQ replay scenarios with CLI argument validation.

Sequence Diagram(s)

sequenceDiagram
    participant Client
    participant API
    participant IngressQueue as Ingress Queue<br/>(Redis/BullMQ)
    participant Worker
    participant Processor
    participant DLQQueue as DLQ Queue<br/>(Redis/BullMQ)

    Client->>API: POST webhook
    activate API
    API->>API: validateEnv & resolveWorkerRetryPolicy
    API->>IngressQueue: createBullMqIngressQueue(redisUrl, retryPolicy)
    API->>IngressQueue: enqueue(createIngressJobPayload({...}))
    deactivate API
    
    IngressQueue->>Worker: Job available
    activate Worker
    Worker->>Worker: runIngressJob with timeout
    Worker->>Processor: Execute processor(payload)
    activate Processor
    alt Processor Success
        Processor-->>Worker: resolved
        Worker->>IngressQueue: Job complete
    else Transient Error (retryable)
        Processor-->>Worker: Error thrown
        Worker->>Worker: classifyWorkerError → 'transient'
        Worker->>Worker: Check attemptsMade < maxAttempts
        alt Within retry limit
            Worker->>IngressQueue: Retry (exponential backoff)
        else Retries exhausted
            Worker->>Worker: buildIngressDlqPayloadFromFailure
            Worker->>DLQQueue: routeFailedIngressJobToDlq
            DLQQueue->>DLQQueue: enqueue(dlqPayload)
        end
    else Permanent Error
        Processor-->>Worker: Error thrown
        Worker->>Worker: classifyWorkerError → 'permanent'
        alt permanentMaxAttempts exceeded
            Worker->>Worker: buildIngressDlqPayloadFromFailure
            Worker->>DLQQueue: routeFailedIngressJobToDlq
            DLQQueue->>DLQQueue: enqueue(dlqPayload)
        else Still has attempts
            Worker->>IngressQueue: Retry
        end
    end
    deactivate Processor
    deactivate Worker
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~50 minutes

Possibly related PRs

  • PR #3 — Introduced initial ingress queue and enqueue logic; this PR extends it with retry policies, payload schemas, and DLQ integration, building directly on that foundation.

Poem

🐰 Workers hop with retry might,
Dead letters sorted, wrongs made right,
Exponential backoff brings delight,
Queues and schemas, schemas tight!
Transient errors? Hop again—
Permanent fails find the pen. 🐇

🚥 Pre-merge checks | ✅ 1 | ❌ 2

❌ Failed checks (1 warning, 1 inconclusive)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 0.00% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
Title check ❓ Inconclusive The title 'Phase/worker retry dlq' is vague and uses non-descriptive formatting that doesn't clearly convey the main changes in the changeset. Revise the title to clearly describe the primary change, such as 'Add worker retry and dead-letter queue (DLQ) handling' or 'Implement configurable retry policies and DLQ routing for ingress jobs'.
✅ Passed checks (1 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
  • 📝 Generate docstrings (stacked PR)
  • 📝 Generate docstrings (commit on current branch)
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch phase/worker-retry-dlq

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 5

🧹 Nitpick comments (4)
packages/shared/src/index.js (1)

140-146: Validate failure.reason and failure.errorClass as a pair.

The current guard accepts contradictory combinations like transient_retries_exhausted with errorClass: "permanent". Tightening that invariant here keeps DLQ records self-consistent for replay and reporting.

💡 Suggested fix
     if (failure.reason !== 'transient_retries_exhausted' &&
         failure.reason !== 'permanent_retries_exhausted') {
         return false;
     }
     if (failure.errorClass !== 'transient' && failure.errorClass !== 'permanent') {
         return false;
     }
+    if ((failure.reason === 'transient_retries_exhausted' && failure.errorClass !== 'transient') ||
+        (failure.reason === 'permanent_retries_exhausted' && failure.errorClass !== 'permanent')) {
+        return false;
+    }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@packages/shared/src/index.js` around lines 140 - 146, The current guards
validate failure.reason and failure.errorClass independently, allowing
inconsistent pairs (e.g., reason 'transient_retries_exhausted' with errorClass
'permanent'); update the logic around the failure.reason/failure.errorClass
checks to enforce a consistent pair: if failure.reason startsWith('transient')
or equals 'transient_retries_exhausted' then require failure.errorClass ===
'transient', and if failure.reason startsWith('permanent') or equals
'permanent_retries_exhausted' then require failure.errorClass === 'permanent'
(return false on mismatch); locate and modify the block using failure.reason and
failure.errorClass to implement this paired validation.
apps/api/src/index.js (1)

3-4: Use the shared JSON coercer on this enqueue path.

This code now relies on @wa-chat/shared for the payload contract, but it still bypasses the shared coerceJsonValue implementation in packages/shared/src/index.ts Lines 107-131 by keeping a second copy locally. That makes payload semantics drift-prone: the next fix in the shared package will have to be duplicated here by hand.

Also applies to: 457-460

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@apps/api/src/index.js` around lines 3 - 4, The enqueue path duplicates JSON
coercion instead of using the shared implementation; import and use the shared
coerceJsonValue from `@wa-chat/shared` where the code builds the ingress payload
(alongside INGRESS_JOB_NAME, INGRESS_QUEUE_NAME, createIngressJobPayload) and
remove the local duplicate coercer; also search for the same local copy used in
the other enqueue location (the later enqueue block) and replace it with the
shared coerceJsonValue so payload semantics follow the single source of truth.
apps/worker/src/dlq/replay.ts (1)

417-432: Minor inefficiency in job fetching logic.

Line 429 uses Math.max(args.limit * 5, args.limit) - 1, but since limit is always positive (validated earlier), limit * 5 is always greater than or equal to limit, making Math.max redundant.

Additionally, fetching 5x the limit to account for filtered jobs may be inefficient for large queues. Consider documenting this trade-off or using pagination.

🔧 Simplify the expression
     return dlqQueue.getJobs(
       ['wait', 'paused', 'prioritized', 'delayed'],
       0,
-      Math.max(args.limit * 5, args.limit) - 1,
+      args.limit * 5 - 1,
       false,
     );
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@apps/worker/src/dlq/replay.ts` around lines 417 - 432, The inspectJobs
function currently calls dlqQueue.getJobs with an upper index computed as
Math.max(args.limit * 5, args.limit) - 1 which is redundant because args.limit
is validated positive; replace that expression with (args.limit * 5) - 1 (or
better, implement simple pagination) and document the trade-off that we fetch 5×
the requested limit to compensate for filtered/skip-prone jobs; update the call
site in inspectJobs and any related comments to reflect the simplified
expression and optional pagination note.
apps/worker/src/queue/consumer.ts (1)

104-142: Consider documenting the timeout error classification behavior.

WorkerJobTimeoutError is classified as transient (not in the permanent error list), meaning timeout failures will be retried. This seems intentional (transient network/service delays), but it's worth confirming this matches the desired behavior for job processing timeouts.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@apps/worker/src/queue/consumer.ts` around lines 104 - 142, The runIngressJob
function currently lets WorkerJobTimeoutError be classified as transient by
classifyWorkerError (so timeouts are retried); add an explicit code comment in
runIngressJob (near the try/catch and error-class handling) stating that
WorkerJobTimeoutError is treated as transient and will be retried per
policies.retry.permanentMaxAttempts, and include guidance that if timeouts
should be treated as permanent instead the change must be made in
classifyWorkerError or by adding an explicit branch in runIngressJob to handle
WorkerJobTimeoutError as permanent; reference runIngressJob,
classifyWorkerError, WorkerJobTimeoutError, and
policies.retry.permanentMaxAttempts in the comment for clarity.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@apps/worker/package.json`:
- Line 10: Update the "dlq:replay" npm script to prebuild the workspace packages
before running the replay script: ensure it runs the same prebuild command used
by other scripts to build `@wa-chat/config` and `@wa-chat/shared` first, then
execute the node --import tsx src/dlq/replay.ts command so imports resolve from
the generated dist/ entrypoints for those packages.

In `@packages/shared/src/index.js`:
- Around line 67-81: createIngressJobPayload currently returns input.payload
untouched which can later fail assertIngressJobPayload or serialize
inconsistently; mirror the behavior of createIngressDlqJobPayload by
constructing the job object (using INGRESS_JOB_SCHEMA_VERSION, eventKey,
payload, receivedAt) and then validate it immediately with
assertIngressJobPayload(job) (or throw the assertion error) and optionally
deep-clone/normalize payload if createIngressDlqJobPayload does so; update
createIngressJobPayload to call assertIngressJobPayload before returning to
ensure the helper validates its own output.
- Around line 7-45: isJsonValue and coerceJsonValue currently accept non-plain
objects (Date, Map) and non-finite numbers (NaN, Infinity); update validations
so they only allow plain objects and finite numbers. Change isRecord to detect
plain objects (e.g., Object.getPrototypeOf(value) === Object.prototype or
value.constructor === Object) and use Number.isFinite to ensure numbers are
finite in isJsonValue and coerceJsonValue; for arrays still recurse, but when
encountering a non-plain object or a non-finite number, return false from
isJsonValue and return null from coerceJsonValue (instead of silently coercing),
referencing the functions isRecord, isJsonValue, and coerceJsonValue to locate
edits.

In `@packages/shared/src/index.ts`:
- Around line 146-147: The current checks use Date.parse which accepts non-ISO
formats; update isIngressJobPayload, createIngressJobPayload, and
isIngressDlqJobPayload to validate receivedAt with a strict ISO 8601 timestamp
pattern (e.g. regex that enforces YYYY-MM-DDThh:mm:ss(.sss)?(Z|±hh:mm)) instead
of relying solely on Date.parse, and only accept the value if it matches the
regex and parses to a valid Date; replace the `typeof value.receivedAt !==
'string' || Number.isNaN(Date.parse(value.receivedAt))` checks with a combined
regex.test(value.receivedAt) && !Number.isNaN(Date.parse(value.receivedAt))
guard so the code enforces strict ISO format while still ensuring it parses.
- Around line 66-67: Update isJsonValue, coerceJsonValue, and
createIngressJobPayload to reject non-JSON-serializable values: in isJsonValue
use Number.isFinite for numeric checks (so NaN and Infinity return false), treat
only plain objects (use the existing isRecord predicate) as valid object values
(so Date and other class instances are rejected), and ensure arrays and plain
objects are validated recursively; in coerceJsonValue mirror those rules by
rejecting non-plain objects instead of letting them pass through and only coerce
arrays/objects that consist entirely of valid JSON values; finally, validate the
final payload in createIngressJobPayload with isJsonValue before returning and
throw or return an error on invalid payloads.

---

Nitpick comments:
In `@apps/api/src/index.js`:
- Around line 3-4: The enqueue path duplicates JSON coercion instead of using
the shared implementation; import and use the shared coerceJsonValue from
`@wa-chat/shared` where the code builds the ingress payload (alongside
INGRESS_JOB_NAME, INGRESS_QUEUE_NAME, createIngressJobPayload) and remove the
local duplicate coercer; also search for the same local copy used in the other
enqueue location (the later enqueue block) and replace it with the shared
coerceJsonValue so payload semantics follow the single source of truth.

In `@apps/worker/src/dlq/replay.ts`:
- Around line 417-432: The inspectJobs function currently calls dlqQueue.getJobs
with an upper index computed as Math.max(args.limit * 5, args.limit) - 1 which
is redundant because args.limit is validated positive; replace that expression
with (args.limit * 5) - 1 (or better, implement simple pagination) and document
the trade-off that we fetch 5× the requested limit to compensate for
filtered/skip-prone jobs; update the call site in inspectJobs and any related
comments to reflect the simplified expression and optional pagination note.

In `@apps/worker/src/queue/consumer.ts`:
- Around line 104-142: The runIngressJob function currently lets
WorkerJobTimeoutError be classified as transient by classifyWorkerError (so
timeouts are retried); add an explicit code comment in runIngressJob (near the
try/catch and error-class handling) stating that WorkerJobTimeoutError is
treated as transient and will be retried per
policies.retry.permanentMaxAttempts, and include guidance that if timeouts
should be treated as permanent instead the change must be made in
classifyWorkerError or by adding an explicit branch in runIngressJob to handle
WorkerJobTimeoutError as permanent; reference runIngressJob,
classifyWorkerError, WorkerJobTimeoutError, and
policies.retry.permanentMaxAttempts in the comment for clarity.

In `@packages/shared/src/index.js`:
- Around line 140-146: The current guards validate failure.reason and
failure.errorClass independently, allowing inconsistent pairs (e.g., reason
'transient_retries_exhausted' with errorClass 'permanent'); update the logic
around the failure.reason/failure.errorClass checks to enforce a consistent
pair: if failure.reason startsWith('transient') or equals
'transient_retries_exhausted' then require failure.errorClass === 'transient',
and if failure.reason startsWith('permanent') or equals
'permanent_retries_exhausted' then require failure.errorClass === 'permanent'
(return false on mismatch); locate and modify the block using failure.reason and
failure.errorClass to implement this paired validation.

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 7ced5f86-5f40-4d7b-9ec2-c52419e4d72e

📥 Commits

Reviewing files that changed from the base of the PR and between 51f1f9c and b047b1a.

⛔ Files ignored due to path filters (8)
  • apps/api/src/index.d.ts.map is excluded by !**/*.map
  • apps/api/src/index.js.map is excluded by !**/*.map
  • apps/api/src/index.test.js.map is excluded by !**/*.map
  • package-lock.json is excluded by !**/package-lock.json
  • packages/config/src/index.d.ts.map is excluded by !**/*.map
  • packages/config/src/index.js.map is excluded by !**/*.map
  • packages/shared/src/index.d.ts.map is excluded by !**/*.map
  • packages/shared/src/index.js.map is excluded by !**/*.map
📒 Files selected for processing (24)
  • apps/api/.env.example
  • apps/api/package.json
  • apps/api/src/index.d.ts
  • apps/api/src/index.js
  • apps/api/src/index.test.js
  • apps/api/src/index.test.ts
  • apps/api/src/index.ts
  • apps/api/tsconfig.json
  • apps/worker/.env.example
  • apps/worker/package.json
  • apps/worker/src/dlq/replay.test.ts
  • apps/worker/src/dlq/replay.ts
  • apps/worker/src/index.test.ts
  • apps/worker/src/index.ts
  • apps/worker/src/queue/consumer.ts
  • packages/config/package.json
  • packages/config/src/env.test.ts
  • packages/config/src/index.d.ts
  • packages/config/src/index.js
  • packages/config/src/index.ts
  • packages/shared/src/index.d.ts
  • packages/shared/src/index.js
  • packages/shared/src/index.ts
  • packages/shared/tsconfig.json

"build": "npm --workspace @wa-chat/config run build && tsc",
"build": "npm --workspace @wa-chat/config run build && npm --workspace @wa-chat/shared run build && tsc",
"dev": "tsx watch src/index.ts",
"dlq:replay": "node --import tsx src/dlq/replay.ts",
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
set -euo pipefail

echo "== replay imports =="
sed -n '1,25p' apps/worker/src/dlq/replay.ts

echo
echo "== workspace package entrypoints =="
python <<'PY'
import json
from pathlib import Path

for manifest_path in [
    Path("packages/config/package.json"),
    Path("packages/shared/package.json"),
]:
    data = json.loads(manifest_path.read_text())
    print(f"--- {manifest_path} ---")
    subset = {key: data.get(key) for key in ("name", "main", "module", "types", "exports")}
    print(json.dumps(subset, indent=2))
    print("dist exists:", (manifest_path.parent / "dist").exists())
    print()
PY

Repository: Ryanakml/ChatFuse

Length of output: 1418


Add prebuild step to dlq:replay to match other scripts.

The replay script imports @wa-chat/config and @wa-chat/shared, both of which have entrypoints configured to resolve through dist/ directories. Those directories don't currently exist on disk. Without prebuilding these workspace packages first, npm run dlq:replay will fail on a clean checkout. Add the same prebuild prefix you used for build, test, and typecheck:

"dlq:replay": "npm run build -w `@wa-chat/config` -w `@wa-chat/shared` && node --import tsx src/dlq/replay.ts",
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@apps/worker/package.json` at line 10, Update the "dlq:replay" npm script to
prebuild the workspace packages before running the replay script: ensure it runs
the same prebuild command used by other scripts to build `@wa-chat/config` and
`@wa-chat/shared` first, then execute the node --import tsx src/dlq/replay.ts
command so imports resolve from the generated dist/ entrypoints for those
packages.

Comment on lines +7 to +45
const isRecord = (value) => typeof value === 'object' && value !== null && !Array.isArray(value);
const isNonEmptyString = (value) => typeof value === 'string' && value.trim() !== '';
const isNullableInteger = (value, options) => value === null || (typeof value === 'number' && Number.isInteger(value) && value >= options.minimum);
const isNullableFiniteNumber = (value) => value === null || (typeof value === 'number' && Number.isFinite(value));
const isNullableJitter = (value) => value === null || (typeof value === 'number' && Number.isFinite(value) && value >= 0 && value <= 1);
export const isJsonValue = (value) => {
if (value === null ||
typeof value === 'string' ||
typeof value === 'number' ||
typeof value === 'boolean') {
return true;
}
if (Array.isArray(value)) {
return value.every((entry) => isJsonValue(entry));
}
if (!isRecord(value)) {
return false;
}
return Object.values(value).every((entry) => isJsonValue(entry));
};
export const coerceJsonValue = (value) => {
if (value === null ||
typeof value === 'string' ||
typeof value === 'number' ||
typeof value === 'boolean') {
return value;
}
if (Array.isArray(value)) {
return value.map((entry) => coerceJsonValue(entry));
}
if (!isRecord(value)) {
return null;
}
const jsonObject = {};
for (const [key, entry] of Object.entries(value)) {
jsonObject[key] = coerceJsonValue(entry);
}
return jsonObject;
};
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🌐 Web query:

In JavaScript, are NaNandInfinityvalid JSON numbers, and what doesJSON.stringifyemit for them? Also, doDateandMap instances count as plain JSON objects, or do they serialize differently?

💡 Result:

  • NaN / Infinity as JSON numbers: No—JSON numbers must match the JSON number grammar, and non-finite numeric values like NaN and Infinity are not permitted in JSON. [1]
    JSON.stringify output: JavaScript’s JSON.stringify serializes NaN, Infinity, and -Infinity as the JSON literal null (e.g., JSON.stringify(NaN) === "null"). [2]

  • Date instances: A Date is not a “plain JSON object” in the sense of preserving its object structure/type. During JSON.stringify, Date.prototype.toJSON() is called, and it produces an ISO string (same format as toISOString()), so dates serialize as JSON strings. [3]

  • Map instances: A Map is also not a plain JSON object for JSON.stringify. JSON.stringify only serializes an object’s own enumerable string-keyed properties; Map entries aren’t stored that way, so a Map typically stringifies as {} unless you transform it (e.g., Object.fromEntries(map) or Array.from(map)) or provide a replacer. [4][5]

Sources
[1] RFC 8259 (JSON) — non-finite numbers not permitted: “Infinity and NaN are not permitted.” (datatracker.ietf.org)
[2] ECMAScript spec — JSON.stringify renders NaN/Infinity as "null". (tc39.es)
[3] MDN — Date.prototype.toJSON() returns an ISO string and is called by JSON.stringify. (developer.mozilla.org)
[4] ECMAScript spec — serialization uses enumerable own properties. (tc39.es)
[5] Map has no enumerable own properties for entries → typically {}. (stackoverflow.com)

Citations:


Restrict the JSON helpers to plain objects and finite numbers.

isJsonValue currently accepts NaN and Infinity, which are invalid in JSON and serialize to null during JSON.stringify. Additionally, isRecord accepts non-plain objects like Date and Map instances, which serialize to JSON strings and empty objects respectively—causing payloads to round-trip unpredictably. coerceJsonValue silently rewrites these values instead of rejecting them.

Suggested fix
-const isRecord = (value) => typeof value === 'object' && value !== null && !Array.isArray(value);
+const isRecord = (value) => {
+    if (typeof value !== 'object' || value === null || Array.isArray(value)) {
+        return false;
+    }
+    const prototype = Object.getPrototypeOf(value);
+    return prototype === Object.prototype || prototype === null;
+};
 ...
-        typeof value === 'number' ||
+        (typeof value === 'number' && Number.isFinite(value)) ||
 ...
-        typeof value === 'number' ||
+        (typeof value === 'number' && Number.isFinite(value)) ||
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
const isRecord = (value) => typeof value === 'object' && value !== null && !Array.isArray(value);
const isNonEmptyString = (value) => typeof value === 'string' && value.trim() !== '';
const isNullableInteger = (value, options) => value === null || (typeof value === 'number' && Number.isInteger(value) && value >= options.minimum);
const isNullableFiniteNumber = (value) => value === null || (typeof value === 'number' && Number.isFinite(value));
const isNullableJitter = (value) => value === null || (typeof value === 'number' && Number.isFinite(value) && value >= 0 && value <= 1);
export const isJsonValue = (value) => {
if (value === null ||
typeof value === 'string' ||
typeof value === 'number' ||
typeof value === 'boolean') {
return true;
}
if (Array.isArray(value)) {
return value.every((entry) => isJsonValue(entry));
}
if (!isRecord(value)) {
return false;
}
return Object.values(value).every((entry) => isJsonValue(entry));
};
export const coerceJsonValue = (value) => {
if (value === null ||
typeof value === 'string' ||
typeof value === 'number' ||
typeof value === 'boolean') {
return value;
}
if (Array.isArray(value)) {
return value.map((entry) => coerceJsonValue(entry));
}
if (!isRecord(value)) {
return null;
}
const jsonObject = {};
for (const [key, entry] of Object.entries(value)) {
jsonObject[key] = coerceJsonValue(entry);
}
return jsonObject;
};
const isRecord = (value) => {
if (typeof value !== 'object' || value === null || Array.isArray(value)) {
return false;
}
const prototype = Object.getPrototypeOf(value);
return prototype === Object.prototype || prototype === null;
};
const isNonEmptyString = (value) => typeof value === 'string' && value.trim() !== '';
const isNullableInteger = (value, options) => value === null || (typeof value === 'number' && Number.isInteger(value) && value >= options.minimum);
const isNullableFiniteNumber = (value) => value === null || (typeof value === 'number' && Number.isFinite(value));
const isNullableJitter = (value) => value === null || (typeof value === 'number' && Number.isFinite(value) && value >= 0 && value <= 1);
export const isJsonValue = (value) => {
if (value === null ||
typeof value === 'string' ||
(typeof value === 'number' && Number.isFinite(value)) ||
typeof value === 'boolean') {
return true;
}
if (Array.isArray(value)) {
return value.every((entry) => isJsonValue(entry));
}
if (!isRecord(value)) {
return false;
}
return Object.values(value).every((entry) => isJsonValue(entry));
};
export const coerceJsonValue = (value) => {
if (value === null ||
typeof value === 'string' ||
(typeof value === 'number' && Number.isFinite(value)) ||
typeof value === 'boolean') {
return value;
}
if (Array.isArray(value)) {
return value.map((entry) => coerceJsonValue(entry));
}
if (!isRecord(value)) {
return null;
}
const jsonObject = {};
for (const [key, entry] of Object.entries(value)) {
jsonObject[key] = coerceJsonValue(entry);
}
return jsonObject;
};
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@packages/shared/src/index.js` around lines 7 - 45, isJsonValue and
coerceJsonValue currently accept non-plain objects (Date, Map) and non-finite
numbers (NaN, Infinity); update validations so they only allow plain objects and
finite numbers. Change isRecord to detect plain objects (e.g.,
Object.getPrototypeOf(value) === Object.prototype or value.constructor ===
Object) and use Number.isFinite to ensure numbers are finite in isJsonValue and
coerceJsonValue; for arrays still recurse, but when encountering a non-plain
object or a non-finite number, return false from isJsonValue and return null
from coerceJsonValue (instead of silently coercing), referencing the functions
isRecord, isJsonValue, and coerceJsonValue to locate edits.

Comment on lines +67 to +81
export const createIngressJobPayload = (input) => {
const eventKey = input.eventKey.trim();
if (!eventKey) {
throw new Error('Ingress job eventKey must be a non-empty string');
}
const receivedAt = input.receivedAt ?? new Date().toISOString();
if (Number.isNaN(Date.parse(receivedAt))) {
throw new Error('Ingress job receivedAt must be a valid ISO timestamp');
}
return {
schemaVersion: INGRESS_JOB_SCHEMA_VERSION,
eventKey,
payload: input.payload,
receivedAt,
};
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Have the ingress builder validate its own output.

Unlike createIngressDlqJobPayload, this helper returns input.payload untouched. A caller can build an object that later fails assertIngressJobPayload or serializes differently once enqueued.

💡 Suggested fix
 export const createIngressJobPayload = (input) => {
     const eventKey = input.eventKey.trim();
     if (!eventKey) {
         throw new Error('Ingress job eventKey must be a non-empty string');
     }
     const receivedAt = input.receivedAt ?? new Date().toISOString();
     if (Number.isNaN(Date.parse(receivedAt))) {
         throw new Error('Ingress job receivedAt must be a valid ISO timestamp');
     }
-    return {
+    return assertIngressJobPayload({
         schemaVersion: INGRESS_JOB_SCHEMA_VERSION,
         eventKey,
         payload: input.payload,
         receivedAt,
-    };
+    });
 };
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
export const createIngressJobPayload = (input) => {
const eventKey = input.eventKey.trim();
if (!eventKey) {
throw new Error('Ingress job eventKey must be a non-empty string');
}
const receivedAt = input.receivedAt ?? new Date().toISOString();
if (Number.isNaN(Date.parse(receivedAt))) {
throw new Error('Ingress job receivedAt must be a valid ISO timestamp');
}
return {
schemaVersion: INGRESS_JOB_SCHEMA_VERSION,
eventKey,
payload: input.payload,
receivedAt,
};
export const createIngressJobPayload = (input) => {
const eventKey = input.eventKey.trim();
if (!eventKey) {
throw new Error('Ingress job eventKey must be a non-empty string');
}
const receivedAt = input.receivedAt ?? new Date().toISOString();
if (Number.isNaN(Date.parse(receivedAt))) {
throw new Error('Ingress job receivedAt must be a valid ISO timestamp');
}
return assertIngressJobPayload({
schemaVersion: INGRESS_JOB_SCHEMA_VERSION,
eventKey,
payload: input.payload,
receivedAt,
});
};
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@packages/shared/src/index.js` around lines 67 - 81, createIngressJobPayload
currently returns input.payload untouched which can later fail
assertIngressJobPayload or serialize inconsistently; mirror the behavior of
createIngressDlqJobPayload by constructing the job object (using
INGRESS_JOB_SCHEMA_VERSION, eventKey, payload, receivedAt) and then validate it
immediately with assertIngressJobPayload(job) (or throw the assertion error) and
optionally deep-clone/normalize payload if createIngressDlqJobPayload does so;
update createIngressJobPayload to call assertIngressJobPayload before returning
to ensure the helper validates its own output.

Comment on lines +66 to +67
const isRecord = (value: unknown): value is Record<string, unknown> =>
typeof value === 'object' && value !== null && !Array.isArray(value);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

wc -l packages/shared/src/index.ts

Repository: Ryanakml/ChatFuse

Length of output: 93


🏁 Script executed:

cat -n packages/shared/src/index.ts | head -200

Repository: Ryanakml/ChatFuse

Length of output: 6805


🏁 Script executed:

cat -n packages/shared/src/index.ts | tail -130

Repository: Ryanakml/ChatFuse

Length of output: 3911


🏁 Script executed:

node <<'EOF'
// Test what Object.values returns for a Date object
const dateObj = new Date();
console.log("Object.values(new Date()):", Object.values(dateObj));
console.log("Length:", Object.values(dateObj).length);

// Test isRecord on Date
const isRecord = (value) => typeof value === 'object' && value !== null && !Array.isArray(value);
console.log("isRecord(new Date()):", isRecord(dateObj));

// Test NaN and Infinity
console.log("typeof NaN:", typeof NaN);
console.log("typeof Infinity:", typeof Infinity);
console.log("NaN === NaN:", NaN === NaN);
console.log("JSON.stringify(NaN):", JSON.stringify(NaN));
console.log("JSON.stringify(Infinity):", JSON.stringify(Infinity));
console.log("JSON.stringify({a: NaN}):", JSON.stringify({a: NaN}));
EOF

Repository: Ryanakml/ChatFuse

Length of output: 283


🏁 Script executed:

rg "createIngressJobPayload|isJsonValue|coerceJsonValue" -A 3 --type ts --type tsx -g "!index.ts"

Repository: Ryanakml/ChatFuse

Length of output: 88


🏁 Script executed:

node <<'EOF'
// Simulating the actual behavior
const isRecord = (value) => typeof value === 'object' && value !== null && !Array.isArray(value);

const isJsonValue = (value) => {
  if (
    value === null ||
    typeof value === 'string' ||
    typeof value === 'number' ||
    typeof value === 'boolean'
  ) {
    return true;
  }

  if (Array.isArray(value)) {
    return value.every((entry) => isJsonValue(entry));
  }

  if (!isRecord(value)) {
    return false;
  }

  return Object.values(value).every((entry) => isJsonValue(entry));
};

// Test cases from the review
console.log("=== Date Objects ===");
const dateObj = new Date();
console.log("isJsonValue(new Date()):", isJsonValue(dateObj));

console.log("\n=== NaN/Infinity ===");
console.log("isJsonValue(NaN):", isJsonValue(NaN));
console.log("isJsonValue(Infinity):", isJsonValue(Infinity));

console.log("\n=== Objects with NaN/Infinity ===");
console.log("isJsonValue({a: NaN, b: Infinity}):", isJsonValue({a: NaN, b: Infinity}));

console.log("\n=== What actually happens when serialized ===");
const payload = {a: NaN, b: Infinity, c: new Date()};
console.log("Original:", payload);
console.log("After JSON round-trip:", JSON.parse(JSON.stringify(payload)));
EOF

Repository: Ryanakml/ChatFuse

Length of output: 445


🏁 Script executed:

rg "createIngressJobPayload|isJsonValue|coerceJsonValue" --type ts -g "!index.ts"

Repository: Ryanakml/ChatFuse

Length of output: 1577


🏁 Script executed:

# Also check if there's any input validation before createIngressJobPayload is called
rg "createIngressJobPayload" -B 5 -A 2 --type ts

Repository: Ryanakml/ChatFuse

Length of output: 8600


🏁 Script executed:

rg "createIngressJobPayload" -B 10 -A 5 apps/api/src/index.ts

Repository: Ryanakml/ChatFuse

Length of output: 909


Fix isJsonValue and coerceJsonValue to reject non-JSON-serializable values.

Both functions currently accept values that cannot be safely serialized to JSON:

  • isJsonValue accepts NaN and Infinity (no finite-number check)
  • isJsonValue accepts Date objects and other non-plain objects (only checks !isRecord)
  • coerceJsonValue has the same issues, allowing these invalid values to pass through

When payloads containing NaN, Infinity, or Date objects reach downstream serialization (e.g., JSON.stringify), they are silently converted to null, causing data loss. Replace the number check with Number.isFinite(), reject non-plain objects in coerceJsonValue, and validate the payload before returning from createIngressJobPayload.

Also applies to: 86–105 (isJsonValue), 107–131 (coerceJsonValue)

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@packages/shared/src/index.ts` around lines 66 - 67, Update isJsonValue,
coerceJsonValue, and createIngressJobPayload to reject non-JSON-serializable
values: in isJsonValue use Number.isFinite for numeric checks (so NaN and
Infinity return false), treat only plain objects (use the existing isRecord
predicate) as valid object values (so Date and other class instances are
rejected), and ensure arrays and plain objects are validated recursively; in
coerceJsonValue mirror those rules by rejecting non-plain objects instead of
letting them pass through and only coerce arrays/objects that consist entirely
of valid JSON values; finally, validate the final payload in
createIngressJobPayload with isJsonValue before returning and throw or return an
error on invalid payloads.

Comment on lines +146 to +147
if (typeof value.receivedAt !== 'string' || Number.isNaN(Date.parse(value.receivedAt))) {
return false;
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

🏁 Script executed:

wc -l packages/shared/src/index.ts

Repository: Ryanakml/ChatFuse

Length of output: 93


🏁 Script executed:

head -200 packages/shared/src/index.ts | tail -60

Repository: Ryanakml/ChatFuse

Length of output: 1524


🏁 Script executed:

sed -n '140,180p' packages/shared/src/index.ts

Repository: Ryanakml/ChatFuse

Length of output: 1104


🏁 Script executed:

sed -n '270,290p' packages/shared/src/index.ts

Repository: Ryanakml/ChatFuse

Length of output: 598


🏁 Script executed:

rg -i "iso.*timestamp|timestamp.*iso" packages/shared/src/index.ts -B 3 -A 3

Repository: Ryanakml/ChatFuse

Length of output: 267


🏁 Script executed:

fd -t f "\.ts$" packages/shared/src | head -20

Repository: Ryanakml/ChatFuse

Length of output: 120


🏁 Script executed:

rg "IngressJobPayload|IngressDlqJobPayload" packages/shared/src -B 2 -A 5

Repository: Ryanakml/ChatFuse

Length of output: 10756


🏁 Script executed:

rg "receivedAt|failedAt" packages/shared/src -B 1 -A 1 | head -40

Repository: Ryanakml/ChatFuse

Length of output: 1794


🏁 Script executed:

node <<'EOF'
// Test Date.parse behavior
const testCases = [
  '2024-01-15T10:30:45.123Z',  // ISO 8601 with milliseconds
  '2024-01-15T10:30:45Z',      // ISO 8601 without milliseconds
  '2024-01-15',                 // Date only (non-ISO)
  '01/15/2024',                 // US format
  '15 January 2024',            // Locale format
  'Jan 15, 2024',               // Another locale format
];

testCases.forEach(test => {
  const parsed = Date.parse(test);
  const isValid = !Number.isNaN(parsed);
  console.log(`"${test}": ${isValid ? 'ACCEPTED' : 'REJECTED'}`);
});
EOF

Repository: Ryanakml/ChatFuse

Length of output: 229


Use a strict ISO 8601 timestamp check instead of Date.parse.

Date.parse() accepts many non-ISO formats including date-only strings ("2024-01-15"), US formats ("01/15/2024"), and locale-dependent strings ("15 January 2024"), even though the error message claims "valid ISO timestamp". This looser validation breaks the contract, can disrupt lexical ordering of timestamp strings, and may cause parsing issues across different languages expecting strict ISO 8601 format.

Locations: 146-147 (isIngressJobPayload), 171-174 (createIngressJobPayload), 279-280 (isIngressDlqJobPayload)

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@packages/shared/src/index.ts` around lines 146 - 147, The current checks use
Date.parse which accepts non-ISO formats; update isIngressJobPayload,
createIngressJobPayload, and isIngressDlqJobPayload to validate receivedAt with
a strict ISO 8601 timestamp pattern (e.g. regex that enforces
YYYY-MM-DDThh:mm:ss(.sss)?(Z|±hh:mm)) instead of relying solely on Date.parse,
and only accept the value if it matches the regex and parses to a valid Date;
replace the `typeof value.receivedAt !== 'string' ||
Number.isNaN(Date.parse(value.receivedAt))` checks with a combined
regex.test(value.receivedAt) && !Number.isNaN(Date.parse(value.receivedAt))
guard so the code enforces strict ISO format while still ensuring it parses.

@Ryanakml Ryanakml merged commit abe71d7 into main Mar 7, 2026
3 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant