feat(api): add X-NexusGate-ReqId request deduplication#59
Conversation
|
Note Other AI code review bot(s) detectedCodeRabbit has detected other AI code review bot(s) in this pull request and will avoid duplicating their findings in the review comments. This may lead to a less comprehensive review. 📝 WalkthroughWalkthrough引入基于请求 ID(ReqId)的去重与缓存命中:数据库模式扩展并新增枚举;新增 Redis in‑flight 跟踪、ReqId 生命周期处理器与 DB 辅助函数;在 completions/messages/responses 的流式与非流式路径中集成去重、缓存返回与最终化逻辑;前端展示 Cache Hit 状态。 Changes
Sequence Diagram(s)sequenceDiagram
participant Client as Client
participant API as API Endpoint
participant ReqHandler as ReqId Handler
participant DB as Database
participant Redis as Redis
participant Upstream as Upstream Provider
Client->>API: POST /completions (含 x-request-id)
API->>ReqHandler: extractReqId(headers) / checkReqId(reqId, pendingData)
alt cache_hit
ReqHandler->>DB: findCompletionByReqId(apiKeyId, reqId)
DB-->>ReqHandler: cached completion
ReqHandler->>DB: recordCacheHit(...)
ReqHandler-->>API: { type: "cache_hit", completion }
API-->>Client: 返回缓存响应(200)
else in_flight
ReqHandler->>Redis: getInFlight(apiKeyId, reqId)
Redis-->>ReqHandler: InFlightRequest
ReqHandler->>ReqHandler: calculateRetryAfter(...)
ReqHandler-->>API: { type: "in_flight", retryAfter }
API-->>Client: 409 + Retry-After
else new_request
ReqHandler->>DB: createPendingCompletion(...)
DB-->>ReqHandler: completionId
ReqHandler->>Redis: markInFlight(apiKeyId, reqId, completionId, ...)
ReqHandler-->>API: { type: "new_request", completionId }
API->>Upstream: 转发请求
Upstream-->>API: 响应(流或非流)
API->>ReqHandler: finalizeReqId(apiKeyId, reqId, completionId, updates 包含 cached_response)
ReqHandler->>DB: update completion (cached_response ...)
ReqHandler->>Redis: clearInFlight(apiKeyId, reqId)
API-->>Client: 返回新响应(200)
else no_reqid
ReqHandler-->>API: { type: "no_reqid" }
API->>Upstream: 转发请求
Upstream-->>API: 响应
API-->>Client: 返回响应(200)
end
Estimated code review effort🎯 4 (复杂) | ⏱️ ~60 分钟 Possibly related PRs
诗歌
🚥 Pre-merge checks | ✅ 3✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
📜 Recent review detailsConfiguration used: Organization UI Review profile: CHILL Plan: Pro 📒 Files selected for processing (1)
🚧 Files skipped from review as they are similar to previous changes (1)
✏️ Tip: You can disable this entire section by setting Comment |
Summary of ChangesHello @pescn, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request significantly enhances the API's efficiency and reliability by introducing a robust request deduplication mechanism. By leveraging a client-provided Highlights
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Code Review
This pull request introduces a robust request deduplication feature using the X-NexusGate-ReqId header. The implementation is comprehensive, covering database schema changes, Redis-based in-flight request tracking, and integration across multiple API endpoints (completions, messages, responses) for both streaming and non-streaming requests. The use of a partial unique index in PostgreSQL and atomic SETNX in Redis are excellent choices for ensuring data integrity and handling concurrency. The logic for cache hits and in-flight requests, including dynamic Retry-After headers, is well-designed. The frontend changes to reflect the new cache_hit status are also a nice touch. Overall, this is a solid and well-executed feature. I have one suggestion to improve code clarity in the reqIdHandler.
Add support for request deduplication using X-NexusGate-ReqId header: - Cache completed responses and return them for duplicate requests - Return 409 Conflict with Retry-After for in-flight requests - Support both streaming and non-streaming requests - Scope ReqId by API key to prevent cross-user collisions - Use Redis SETNX for atomic in-flight marking (600s TTL) - Store cachedResponse in database for future cache hits - Add cache_hit status to completions table and frontend UI Database changes: - Add req_id, source_completion_id, api_format, cached_response columns - Add cache_hit status to completions_status enum - Create partial unique index on (api_key_id, req_id) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
cd89f99 to
4336fff
Compare
There was a problem hiding this comment.
Actionable comments posted: 5
🤖 Fix all issues with AI agents
In `@backend/src/api/v1/responses.ts`:
- Around line 566-577: The use of the non-null assertion apiKeyRecord!.id is
unsafe—replace it with an explicit null/undefined check on apiKeyRecord before
calling checkReqId (or use optional chaining with an early return/error) so we
never pass undefined to checkReqId; locate the call site in the responses
handler where checkReqId(...) is invoked and validate apiKeyRecord (and
similarly update the other occurrences that use apiKeyRecord!.id, e.g., the
subsequent checkReqId calls around the responses flow and any usages in
functions handling /v1/responses) to throw/return an appropriate error if
apiKeyRecord is missing, then pass apiKeyRecord.id without the non-null
assertion.
- Around line 723-743: The catch block inside the async generator wrapping
processStreamingResponse should not attempt to change the HTTP status after the
stream has started; remove or move the set.status = 500 assignment and instead
only emit the SSE-formatted error event (or ensure status is set before
initiating the stream). Specifically, update the wrapper around
processStreamingResponse (the anonymous async function that yields from
processStreamingResponse) to: 1) keep the existing aborted check on
streamSignal, 2) avoid modifying set.status inside the catch, and 3) exclusively
yield the SSE error event JSON when an unrecoverable error occurs (or surface
synchronous errors before starting the stream so set.status can be set
reliably).
In `@backend/src/utils/reqIdHandler.ts`:
- Around line 313-324: In extractReqId, trim the header value before any
validation: retrieve headers.get(REQID_HEADER), call .trim() once into a local
(e.g., reqIdTrimmed), then check emptiness, then enforce the 127-char limit on
the trimmed value (logging using its length via logger.warn) and return either
the trimmed value or the trimmed-and-truncated substring; this ensures
consistent behavior for inputs with leading/trailing spaces.
- Around line 156-174: The current flow in reqIdHandler (when isRedisAvailable()
and after markInFlight(...) returns false) merely logs a warning (logger.warn)
and continues, which can allow concurrent requests to bypass deduplication;
change the behavior in the markInFlight failure branch inside the request
handling logic to take an active corrective action: either retry markInFlight a
small number of times with short backoff, or fall back to a deterministic
rejection/early error response (e.g., return an error type instead of
proceeding), or consult Redis again and if still unavailable fail fast—ensure
the code path around markInFlight, logger.warn, and the subsequent return {
type: "new_request", ... } is updated so a failed Redis mark does not silently
continue but triggers retry/fallback/reject behavior and keeps
findCompletionByReqId semantics consistent.
In `@backend/src/utils/streaming-context.ts`:
- Around line 132-146: When reqIdContext is present the current branch always
calls finalizeReqId and never triggers the else-if (error) path so errors are
never written to the srv_logs DB via addCompletions; update the reqIdContext
branch around finalizeReqId (symbols: reqIdContext, finalizeReqId,
buildCachedResponse, completion, error, addCompletions, insertLog) to ensure DB
logging on failure—either (A) detect error before/after calling finalizeReqId
and call insertLog/addCompletions with the error details into srv_logs, or (B)
extend finalizeReqId to accept an optional error payload and inside
finalizeReqId reuse the addCompletions/insertLog logic to persist the error to
srv_logs; implement one of these so failures in the reqIdContext path are
persisted to the DB.
♻️ Duplicate comments (2)
backend/src/api/v1/completions.ts (1)
172-180:ReqIdContext接口重复定义。如前所述,此接口在三个 API 文件中重复定义。参见
responses.ts的评论建议提取为共享类型。backend/src/api/v1/messages.ts (1)
164-172:ReqIdContext接口重复定义。与 completions.ts 和 responses.ts 中的定义相同。建议统一提取。
🧹 Nitpick comments (3)
backend/src/utils/reqIdCache.ts (1)
100-110: JSON 解析缺少 schema 验证
JSON.parse(data) as InFlightRequest使用了类型断言,但没有验证解析后的数据是否符合InFlightRequest接口。如果 Redis 中存储了损坏或格式不正确的数据,类型断言不会捕获这个问题,可能导致后续使用时出现运行时错误。♻️ 建议添加基本的数据验证
try { const data = await redisClient.get(key); if (!data) { return null; } - return JSON.parse(data) as InFlightRequest; + const parsed = JSON.parse(data); + // Basic validation + if ( + typeof parsed.completionId !== 'number' || + typeof parsed.startTime !== 'number' || + typeof parsed.isStream !== 'boolean' || + typeof parsed.endpoint !== 'string' + ) { + logger.warn("Invalid in-flight data format", { apiKeyId, reqId }); + return null; + } + return parsed as InFlightRequest; } catch (error) {backend/src/db/index.ts (1)
1300-1316: 约束判断方式建议优化,但需要验证错误对象属性。使用
message.includes("unique")过于宽泛,会误吞其他唯一约束错误。建议基于 PostgreSQL 错误码23505判断,并优先检查约束名称completions_api_key_req_id_unique。注意:建议方案中的
pgError.constraint属性需要确认是否由 drizzle-orm 提供;如果不存在,应仅依赖错误码判断,并保留 message 中包含约束名称的兜底检查。可参考backend/src/api/admin/apiKey.ts中 error.code 的现有用法。backend/src/api/v1/responses.ts (1)
175-183: 建议将ReqIdContext接口提取为共享类型。
ReqIdContext接口在completions.ts、messages.ts和responses.ts中重复定义,具有相同的结构。建议将其提取到@/utils/reqIdHandler.ts中统一导出,以避免代码重复并确保一致性。♻️ 建议的重构
在
backend/src/utils/reqIdHandler.ts中添加:export interface ReqIdContext { reqId: string; apiKeyId: number; preCreatedCompletionId: number; apiFormat: ApiFormat; }然后在各 API 文件中导入使用。
📜 Review details
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (16)
backend/drizzle/0011_outgoing_johnny_blaze.sqlbackend/drizzle/meta/0011_snapshot.jsonbackend/drizzle/meta/_journal.jsonbackend/src/api/v1/completions.tsbackend/src/api/v1/messages.tsbackend/src/api/v1/responses.tsbackend/src/db/index.tsbackend/src/db/schema.tsbackend/src/utils/redisClient.tsbackend/src/utils/reqIdCache.tsbackend/src/utils/reqIdHandler.tsbackend/src/utils/streaming-context.tsfrontend/src/i18n/locales/en-US.jsonfrontend/src/i18n/locales/zh-CN.jsonfrontend/src/pages/requests/columns.tsxfrontend/src/pages/requests/detail-panel/header.tsx
🧰 Additional context used
🧬 Code graph analysis (7)
frontend/src/pages/requests/columns.tsx (1)
frontend/src/components/ui/indicator-badge.tsx (1)
MiniIndicatorBadge(18-32)
backend/src/utils/streaming-context.ts (3)
backend/src/db/index.ts (1)
Completion(33-33)backend/src/db/schema.ts (1)
CachedResponseType(162-165)backend/src/utils/reqIdHandler.ts (1)
finalizeReqId(187-211)
backend/src/utils/reqIdCache.ts (1)
backend/src/utils/redisClient.ts (1)
redisClient(162-162)
backend/src/db/index.ts (1)
backend/src/utils/completions.ts (1)
Completion(10-21)
backend/src/api/v1/messages.ts (4)
backend/src/utils/reqIdHandler.ts (4)
ApiFormat(39-39)finalizeReqId(187-211)extractReqId(313-324)checkReqId(76-175)backend/src/adapters/types.ts (1)
ApiFormat(349-349)backend/src/db/schema.ts (1)
CachedResponseType(162-165)backend/src/utils/streaming-context.ts (1)
StreamingContext(32-183)
backend/src/utils/reqIdHandler.ts (3)
backend/src/db/index.ts (6)
Completion(33-33)CompletionInsert(34-34)findCompletionByReqId(1272-1292)createPendingCompletion(1300-1319)updateCompletion(1328-1340)insertCompletion(304-315)backend/src/utils/reqIdCache.ts (6)
InFlightRequest(16-21)isRedisAvailable(157-159)getInFlight(94-111)calculateRetryAfter(142-150)markInFlight(51-85)clearInFlight(121-134)backend/src/db/schema.ts (1)
CachedResponseType(162-165)
backend/src/api/v1/completions.ts (7)
backend/src/utils/reqIdHandler.ts (6)
ApiFormat(39-39)finalizeReqId(187-211)extractReqId(313-324)checkReqId(76-175)recordCacheHit(251-262)buildInFlightErrorResponse(273-305)backend/src/adapters/types.ts (1)
ApiFormat(349-349)backend/src/db/schema.ts (4)
CachedResponseType(162-165)CompletionsMessageType(118-127)ToolDefinitionType(90-97)ToolChoiceType(102-106)backend/src/utils/completions.ts (2)
addCompletions(29-64)Completion(10-21)backend/src/adapters/index.ts (2)
getUpstreamAdapter(85-92)getResponseAdapter(60-66)backend/src/db/index.ts (1)
Completion(33-33)backend/src/utils/streaming-context.ts (1)
StreamingContext(32-183)
🔇 Additional comments (32)
frontend/src/i18n/locales/zh-CN.json (1)
98-98: LGTM!新增的本地化键命名规范且翻译准确,与现有的状态翻译(待处理、已完成、失败、已中止)保持一致。
Also applies to: 125-125
frontend/src/i18n/locales/en-US.json (1)
97-97: LGTM!新增的本地化键遵循现有命名规范,文本清晰一致。
Also applies to: 124-124
backend/src/utils/redisClient.ts (1)
101-127: LGTM!实现正确地使用了 Redis 的
SET key value EX ttl NX命令来实现原子性的 set-if-not-exists 操作。错误处理与类中其他方法保持一致,返回false作为安全默认值对于去重场景是合理的选择。backend/src/utils/reqIdCache.ts (2)
41-85: LGTM!整体实现逻辑清晰:
- 使用 Redis SETNX 实现原子性的 in-flight 标记
- TTL 设置合理,防止孤儿键
calculateRetryAfter提供了基于请求类型的动态 Retry-After 计算- 完善的日志记录便于调试
Also applies to: 113-134, 136-150, 152-159
80-84: 代码已正确处理 Redis 不可用的情况
markInFlight在线路 157 的调用已被isRedisAvailable()检查所保护。当 Redis 不可用时,markInFlight不会被调用。即使返回false,也只会记录警告日志(线路 169),请求仍会作为新请求正常处理(线路 174),不会导致请求被拒绝。Likely an incorrect or invalid review comment.
backend/drizzle/meta/_journal.json (1)
81-88: LGTM!新的迁移日志条目遵循现有模式,索引序号正确递增。
frontend/src/pages/requests/detail-panel/header.tsx (1)
70-75: LGTM!新增的
cache_hit状态处理遵循现有模式,蓝色配色方案合理地将缓存命中与其他状态(成功/错误/警告)区分开来。使用ts-pattern的.exhaustive()确保了类型安全。frontend/src/pages/requests/columns.tsx (1)
47-49: 新增 CacheHit 状态徽标清晰。与现有状态链一致,列表里能直观看出缓存命中。
backend/src/utils/streaming-context.ts (2)
6-26: ReqId 上下文类型定义清楚。类型与缓存响应结构清晰,利于流式去重流程维护。
39-66: 构造函数注入 reqIdContext 的方式合理。保持可选参数形态,兼容现有调用路径。
backend/drizzle/meta/0011_snapshot.json (1)
1-1040: 快照与迁移变更对齐,LGTM。backend/drizzle/0011_outgoing_johnny_blaze.sql (1)
1-7: 迁移脚本覆盖 ReqId 去重所需字段与索引。backend/src/db/schema.ts (2)
149-165: 新增 cache_hit 状态与缓存响应类型定义清楚。
189-195: completions 表新增 reqId / cachedResponse 字段契合去重需求。backend/src/db/index.ts (2)
1265-1292: ReqId 查询过滤逻辑明确。
1328-1340: 更新接口补齐 updatedAt,逻辑一致。backend/src/api/v1/responses.ts (3)
33-41: 导入和类型定义看起来正确。ReqId 相关的工具函数和类型从
@/utils/reqIdHandler正确导入,CachedResponseType从 schema 导入。
579-605: 缓存命中处理逻辑正确。正确处理了缓存命中情况:记录缓存命中、返回缓存响应,并提供了从 completion 数据重建响应的回退机制。
608-618: in-flight 处理正确设置了 409 状态和 Retry-After 头。符合 PR 目标中描述的并发请求返回 409 Conflict 并带有动态 Retry-After 头的行为。
backend/src/utils/reqIdHandler.ts (4)
1-29: 文件结构和导入清晰。模块注释清楚描述了 ReqId 处理的完整生命周期,导入语句正确引用了必要的数据库和缓存依赖。
76-154:checkReqId函数的竞态条件处理逻辑合理。函数正确处理了以下场景:
- 无 ReqId 时直接返回
- 数据库中存在已完成请求时返回缓存命中
- Redis 中存在 in-flight 请求时返回冲突
- 创建新 pending completion 失败时重新检查状态(处理唯一约束冲突)
第 142-153 行的兜底处理(当状态异常时返回 in-flight)是合理的防御性编程。
187-211:finalizeReqId的错误处理策略合理。即使更新 completion 失败,也会尝试清除 in-flight 标记,这是正确的做法,避免了请求被永久阻塞。
251-262:recordCacheHit正确处理了 ReqId 唯一约束问题。通过将
reqId: null插入 cache_hit 记录,避免了与源 completion 的 ReqId 冲突。这是正确的设计决策。backend/src/api/v1/completions.ts (4)
39-47: 导入正确,与 responses.ts 保持一致。
553-568:checkReqId调用正确传递了所有必要参数。包括
apiKeyId、model、modelId、prompt(含 messages、tools、tool_choice)、apiFormat、endpoint和isStream。
570-604: 缓存命中的响应重建逻辑正确。重建的响应结构符合 OpenAI Chat Completion API 格式,包括
id、object、created、model、choices和usage字段。
725-745: 流式响应包装器与 responses.ts 一致。错误处理逻辑相同,但这里的错误格式使用了简单的 JSON 对象
{ error: "Stream processing error" },而 responses.ts 使用了 SSE 格式。这是正确的,因为 OpenAI Chat API 流式响应使用不同的格式。backend/src/api/v1/messages.ts (5)
33-41: 导入与其他 API 端点一致。
279-308: 流式缓存响应构建正确使用 Anthropic 格式。
buildCachedResponse回调正确构建了 Anthropic 风格的消息响应,包括type: "message"、role: "assistant"、content数组和usage对象。
542-568: 缓存命中的 Anthropic 响应重建正确。回退响应结构符合 Anthropic API 格式:
type: "message"role: "assistant"content为[{ type: "text", text: ... }]数组stop_reason: "end_turn"usage包含input_tokens和output_tokens
685-705: 流式错误事件格式符合 Anthropic SSE 规范。使用
event: error\ndata: ...格式,错误对象包含type: "error"和嵌套的error对象,符合 Anthropic 的 SSE 错误格式。与 responses.ts 相同,
set.status = 500在流开始后可能无效,但错误事件会正确发送给客户端。
524-540: Anthropic prompt 格式化与 OpenAI 端点略有不同。这里将 messages 内容序列化为 JSON 字符串存储,这是正确的做法,因为 Anthropic 的 content 可能是复杂的数组结构。
✏️ Tip: You can disable this entire section by setting review_details to false in your review settings.
- Simplify buildCacheHitRecord by removing reqId field (was immediately overwritten) - Fix extractReqId to trim before length validation - Remove ineffective set.status=500 in streaming error handlers (status cannot change after streaming starts) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
There was a problem hiding this comment.
Actionable comments posted: 0
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
backend/src/api/v1/messages.ts (1)
613-644: ReqId 失败路径未 finalize,可能导致重复请求被卡住
与 completions 一样,failover 失败或非流式解析抛错时不会调用finalizeReqId,可能造成 in-flight 与 DB pending 残留。请确保失败路径统一 finalize。🐛 建议修复(在非流式 catch 中优先 finalize)
- try { - await addCompletions(completion, bearer, { - level: "info", - message: "Client disconnected during non-streaming response", - details: { - type: "completionError", - data: { type: "aborted", msg: errorMsg }, - }, - }); - } catch (logError: unknown) { + try { + if (nonStreamReqIdContext) { + await finalizeReqId( + nonStreamReqIdContext.apiKeyId, + nonStreamReqIdContext.reqId, + nonStreamReqIdContext.preCreatedCompletionId, + { ...completion }, + ); + } else { + await addCompletions(completion, bearer, { + level: "info", + message: "Client disconnected during non-streaming response", + details: { + type: "completionError", + data: { type: "aborted", msg: errorMsg }, + }, + }); + } + } catch (logError: unknown) { logger.error("Failed to log aborted completion after processing error", logError); } @@ - try { - await addCompletions(completion, bearer, { - level: "error", - message: `Response processing error: ${errorMsg}`, - details: { - type: "completionError", - data: { type: "processingError", msg: errorMsg }, - }, - }); - } catch (logError: unknown) { + try { + if (nonStreamReqIdContext) { + await finalizeReqId( + nonStreamReqIdContext.apiKeyId, + nonStreamReqIdContext.reqId, + nonStreamReqIdContext.preCreatedCompletionId, + { ...completion }, + ); + } else { + await addCompletions(completion, bearer, { + level: "error", + message: `Response processing error: ${errorMsg}`, + details: { + type: "completionError", + data: { type: "processingError", msg: errorMsg }, + }, + }); + } + } catch (logError: unknown) { logger.error("Failed to log completion after processing error", logError); }Also applies to: 706-737, 775-825
backend/src/api/v1/completions.ts (1)
649-688: 在 failover 和 processNonStreamingResponse 错误路径中缺少 ReqId finalize 调用checkReqId 预创建的 completion 在以下失败场景未被 finalize,导致 Redis in-flight 与 DB pending 记录残留:
- streaming failover 失败(行 663-680):
processFailoverError调用时streamReqIdContext尚未创建,无法 finalize- non-streaming failover 失败(行 754-772):
processFailoverError调用时nonStreamReqIdContext尚未创建,无法 finalize- non-streaming 响应处理异常(行 821-865):catch 块中虽有
nonStreamReqIdContext,但调用addCompletions而非finalizeReqId需在这三处失败路径确保调用 finalizeReqId。对于 failover 错误,建议在 processFailoverError 调用前传入 ReqId 上下文;对于响应处理异常,建议检查是否需要 finalize,类似 processNonStreamingResponse 中的做法。
♻️ Duplicate comments (6)
backend/src/utils/streaming-context.ts (1)
132-157: ReqId 分支在错误场景缺少 srv_logs 数据库日志写入。当
reqIdContext存在时,即使发生错误(如status === "failed"),代码也会走finalizeReqId路径,而finalizeReqId仅提供控制台日志(logger),不包含数据库日志写入。原有的addCompletions路径会将错误写入srv_logs表。如果监控/告警系统依赖
srv_logs表的错误记录,此场景下会丢失失败原因。💡 建议的改进
// Save to database - use finalizeReqId if ReqId context is present if (this.reqIdContext) { // Build cached response if callback is provided const cachedResponse = this.reqIdContext.buildCachedResponse?.(this.completion); await finalizeReqId( this.reqIdContext.apiKeyId, this.reqIdContext.reqId, this.reqIdContext.preCreatedCompletionId, { ...this.completion, cachedResponse, }, ); + + // Also log errors to srv_logs for monitoring + if (error && status !== "aborted") { + await insertLog({ + relatedCompletionId: this.reqIdContext.preCreatedCompletionId, + message: `Stream ${status}: ${error}`, + level: "error", + details: { + type: "completionError", + data: { type: status, msg: error }, + }, + }); + } } else if (error) {backend/src/utils/reqIdHandler.ts (3)
156-176: 当 Redis 不可用时,并发去重机制失效。当
markInFlight失败时(Lines 167-170),系统记录警告但继续处理。这意味着:
- 数据库唯一约束可以防止重复插入
- 但当一个请求正在处理(pending 状态)时,后续相同 ReqId 请求可能绕过 Redis 检查,导致返回 409 的时机不一致
考虑到 Redis 标记失败是罕见情况,当前的降级策略可以接受,但建议在文档中说明这一行为。
223-263: buildCacheHitRecord 设置 reqId 后在 recordCacheHit 中被覆盖为 null。
buildCacheHitRecord在 Line 239 设置了reqId: sourceCompletion.reqId,但recordCacheHit在 Line 261 又将其覆盖为null。这种设计意图不够清晰。建议直接在
buildCacheHitRecord中省略reqId字段,使代码意图更明确。♻️ 建议的重构
export function buildCacheHitRecord( sourceCompletion: Completion, apiKeyId: number, ): CompletionInsert { return { apiKeyId, model: sourceCompletion.model, modelId: sourceCompletion.modelId, upstreamId: sourceCompletion.upstreamId, prompt: sourceCompletion.prompt, promptTokens: 0, // Cache hit doesn't consume tokens completion: sourceCompletion.completion, completionTokens: 0, // Cache hit doesn't consume tokens status: "cache_hit", ttft: 0, duration: 0, - reqId: sourceCompletion.reqId, + // reqId is intentionally omitted to avoid unique constraint violations + // since cache_hit records should not occupy the ReqId namespace sourceCompletionId: sourceCompletion.id, apiFormat: sourceCompletion.apiFormat, }; } export async function recordCacheHit( sourceCompletion: Completion, apiKeyId: number, ): Promise<Completion | null> { const record = buildCacheHitRecord(sourceCompletion, apiKeyId); - // Use insertCompletion instead of createPendingCompletion to avoid ReqId conflicts - // since cache_hit records share the same ReqId as the source - return await insertCompletion({ - ...record, - reqId: null, // Don't set ReqId on cache_hit records to avoid unique constraint - }); + return await insertCompletion(record); }
314-325: extractReqId 存在 trim 前后不一致的问题。函数先检查原始字符串长度,截断后才调用
trim()。如果 ReqId 首尾有大量空格,可能导致:
- 原始长度 > 127,被截断
- 截断后 trim 得到更短的有效值
建议先 trim 再检查长度。
🔧 建议的修复
export function extractReqId(headers: Headers): string | null { const reqId = headers.get(REQID_HEADER); if (!reqId || reqId.trim() === "") { return null; } + const trimmedReqId = reqId.trim(); // Validate ReqId length (max 127 chars as per schema) - if (reqId.length > 127) { + if (trimmedReqId.length > 127) { logger.warn("ReqId too long, truncating", { length: reqId.length }); - return reqId.substring(0, 127); + return trimmedReqId.substring(0, 127); } - return reqId.trim(); + return trimmedReqId; }backend/src/api/v1/responses.ts (2)
566-577: apiKeyRecord.id 访问假设 checkApiKey 插件保证其存在。虽然
checkApiKey: true配置应确保apiKeyRecord存在,但直接访问apiKeyRecord.id而非使用防御性检查或非空断言!可能在未来重构时引入风险。当前实现是可接受的,因为类型系统已通过插件保证了
apiKeyRecord的存在。
722-742: 流式响应中 set.status = 500 在流开始后无效。在 SSE 流式响应中,HTTP 状态码在响应开始后就已确定。Line 738 的
set.status = 500不会产生预期效果,因为流式传输已经开始。建议移除此行或添加注释说明其无效性,仅依赖 SSE 错误事件传递错误信息。
🔧 建议的修改
} catch (error) { // Don't log error if it's due to client abort if (!streamSignal.aborted) { logger.error("Stream processing error", error); - set.status = 500; + // Note: HTTP status cannot be changed after streaming has started + // Error is communicated via SSE event below yield `event: error\ndata: ${JSON.stringify({ type: "error", error: { code: "internal_error", message: "Stream processing error", param: null, help_url: null } })}\n\n`; } }
🧹 Nitpick comments (4)
backend/src/db/index.ts (1)
1300-1319: 唯一约束违规检测方式较脆弱。通过
error.message.includes("unique")检测唯一约束违规依赖于错误消息的具体文本,不同数据库驱动或 PostgreSQL 版本可能返回不同的错误格式。♻️ 建议使用更可靠的错误检测
} catch (error) { // Handle unique constraint violation (duplicate ReqId) - if (error instanceof Error && error.message.includes("unique")) { + // PostgreSQL unique violation error code is 23505 + const pgError = error as { code?: string; message?: string }; + if (pgError.code === "23505" || (error instanceof Error && error.message.includes("unique"))) { logger.warn("Duplicate ReqId detected", c.reqId); return null; } throw error; }backend/src/utils/reqIdCache.ts (1)
94-111: getInFlight 使用 JSON.parse 存在潜在风险。如果 Redis 中存储了非法 JSON 数据,
JSON.parse会抛出异常。虽然外层 catch 会捕获,但建议添加更明确的类型验证。♻️ 可选改进
try { const data = await redisClient.get(key); if (!data) { return null; } - return JSON.parse(data) as InFlightRequest; + const parsed = JSON.parse(data); + // Basic validation + if (typeof parsed.completionId !== "number" || typeof parsed.startTime !== "number") { + logger.warn("Invalid in-flight data format", { apiKeyId, reqId }); + return null; + } + return parsed as InFlightRequest; } catch (error) {backend/src/api/v1/responses.ts (2)
175-183: ReqIdContext 接口在多个文件中重复定义。此接口在
completions.ts、messages.ts和responses.ts中都有定义。建议将其提取到共享位置(如reqIdHandler.ts或单独的类型文件)以避免重复和不一致风险。
541-577: 输入转换逻辑与 buildCompletionRecord 重复。Lines 545-564 的输入转换逻辑与
buildCompletionRecord函数(Lines 132-155)中的逻辑高度相似。建议提取为共享的辅助函数。♻️ 建议提取共享函数
function convertInputToMessages( input: unknown ): Array<{ role: string; content: string }> { const messages: Array<{ role: string; content: string }> = []; if (typeof input === "string") { messages.push({ role: "user", content: input }); } else if (Array.isArray(input)) { for (const item of input) { if (typeof item === "object" && item !== null) { const obj = item as Record<string, unknown>; if (obj.type === "message") { messages.push({ role: (obj.role as string) || "user", content: typeof obj.content === "string" ? obj.content : JSON.stringify(obj.content), }); } else if (obj.type === "function_call_output") { messages.push({ role: "tool", content: (obj.output as string) || "", }); } } } } return messages; }
📜 Review details
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (16)
backend/drizzle/0011_outgoing_johnny_blaze.sqlbackend/drizzle/meta/0011_snapshot.jsonbackend/drizzle/meta/_journal.jsonbackend/src/api/v1/completions.tsbackend/src/api/v1/messages.tsbackend/src/api/v1/responses.tsbackend/src/db/index.tsbackend/src/db/schema.tsbackend/src/utils/redisClient.tsbackend/src/utils/reqIdCache.tsbackend/src/utils/reqIdHandler.tsbackend/src/utils/streaming-context.tsfrontend/src/i18n/locales/en-US.jsonfrontend/src/i18n/locales/zh-CN.jsonfrontend/src/pages/requests/columns.tsxfrontend/src/pages/requests/detail-panel/header.tsx
🚧 Files skipped from review as they are similar to previous changes (6)
- frontend/src/pages/requests/columns.tsx
- backend/src/utils/redisClient.ts
- backend/drizzle/0011_outgoing_johnny_blaze.sql
- frontend/src/i18n/locales/zh-CN.json
- backend/drizzle/meta/0011_snapshot.json
- frontend/src/i18n/locales/en-US.json
🧰 Additional context used
🧬 Code graph analysis (6)
frontend/src/pages/requests/detail-panel/header.tsx (1)
frontend/src/components/ui/indicator-badge.tsx (1)
IndicatorBadge(6-16)
backend/src/api/v1/completions.ts (3)
backend/src/utils/reqIdHandler.ts (6)
ApiFormat(40-40)finalizeReqId(188-212)extractReqId(314-325)checkReqId(77-176)recordCacheHit(252-263)buildInFlightErrorResponse(274-306)backend/src/adapters/types.ts (1)
ApiFormat(349-349)backend/src/db/schema.ts (1)
CachedResponseType(162-165)
backend/src/utils/streaming-context.ts (3)
backend/src/db/index.ts (1)
Completion(33-33)backend/src/db/schema.ts (1)
CachedResponseType(162-165)backend/src/utils/reqIdHandler.ts (1)
finalizeReqId(188-212)
backend/src/utils/reqIdCache.ts (1)
backend/src/utils/redisClient.ts (1)
redisClient(162-162)
backend/src/api/v1/messages.ts (3)
backend/src/utils/reqIdHandler.ts (6)
ApiFormat(40-40)finalizeReqId(188-212)extractReqId(314-325)checkReqId(77-176)recordCacheHit(252-263)buildInFlightErrorResponse(274-306)backend/src/adapters/types.ts (1)
ApiFormat(349-349)backend/src/db/schema.ts (1)
CachedResponseType(162-165)
backend/src/utils/reqIdHandler.ts (3)
backend/src/db/index.ts (6)
Completion(33-33)CompletionInsert(34-34)findCompletionByReqId(1272-1292)createPendingCompletion(1300-1319)updateCompletion(1328-1340)insertCompletion(304-315)backend/src/utils/reqIdCache.ts (6)
InFlightRequest(16-21)isRedisAvailable(157-159)getInFlight(94-111)calculateRetryAfter(142-150)markInFlight(51-85)clearInFlight(121-134)backend/src/db/schema.ts (1)
CachedResponseType(162-165)
🔇 Additional comments (33)
backend/drizzle/meta/_journal.json (1)
81-88: LGTM!新增的迁移日志条目格式正确,
idx递增顺序正确(10 → 11),version、breakpoints等字段与现有条目保持一致。标签0011_outgoing_johnny_blaze对应 ReqId 去重功能的数据库迁移。frontend/src/pages/requests/detail-panel/header.tsx (1)
70-75: 实现正确,代码风格一致。新增的
cache_hit状态分支遵循了现有的模式:
- 蓝色配色方案与其他状态区分明确(绿色=completed,红色=failed,琥珀色=aborted)
- 翻译键命名规范一致(已验证 en-US "Cache Hit",zh-CN "缓存命中")
.exhaustive()确保了类型安全的穷尽匹配,同时在 columns.tsx 中也使用了相同的状态定义backend/src/db/schema.ts (3)
149-156: 枚举扩展正确实现。
cache_hit状态已正确添加到枚举中,CompletionsStatusEnumType类型也同步更新。这为请求去重功能提供了必要的状态标识。
158-165: CachedResponseType 类型定义清晰。类型结构合理:
body使用unknown类型允许存储不同格式的响应体,format字段明确支持的 API 格式。
189-196: ReqId 去重字段设计合理。
reqId长度限制为 127 与extractReqId中的验证一致(第 320-322 行)sourceCompletionId正确使用自引用外键,指向同表completions的id列cachedResponse使用 JSONB 配合$type<CachedResponseType>()提供完整的类型安全数据库迁移文件已包含
(api_key_id, req_id)的部分唯一索引(WHERE req_id IS NOT NULL),去重约束生效。backend/src/utils/streaming-context.ts (2)
16-26: StreamingReqIdContext 接口设计合理。接口结构清晰,
buildCachedResponse回调模式允许调用方自定义缓存响应的构建逻辑,增加了灵活性。
53-66: 构造函数正确扩展以支持 ReqId 上下文。
reqIdContext作为可选参数添加,保持了向后兼容性。backend/src/db/index.ts (2)
1272-1292: findCompletionByReqId 查询逻辑正确。正确过滤掉
pending状态和已删除的记录,使用limit(1)优化查询性能。
1328-1340: updateCompletion 实现正确。使用展开运算符合并更新,自动设置
updatedAt时间戳。backend/src/utils/reqIdCache.ts (4)
1-39: 模块结构清晰,常量定义合理。
- 10 分钟的 TTL 足以处理大多数请求并清理孤立键
- 流式/非流式的预估时长为 Retry-After 计算提供了合理基准
51-85: markInFlight 错误处理采用安全默认值。当 Redis 操作失败时返回
false,将请求视为"已在处理中"。这是一个保守的策略,避免在 Redis 不可用时产生重复请求。但需注意:如果 Redis 持续不可用,所有新请求都会被当作重复请求处理。可考虑在日志中增加更明显的告警级别。
121-134: clearInFlight 错误处理恰当。非关键操作,失败时仅记录日志,依赖 TTL 自动过期。这是正确的设计选择。
142-150: calculateRetryAfter 计算逻辑正确。确保返回值最小为 1 秒,避免立即重试。
backend/src/utils/reqIdHandler.ts (4)
35-62: 类型定义完善,覆盖了所有 ReqId 检查结果场景。
ReqIdCheckResult联合类型清晰定义了四种可能的结果:缓存命中、请求进行中、新请求、无 ReqId。PendingCompletionData接口包含创建待处理记录所需的所有字段。
77-106: checkReqId 前两步逻辑清晰正确。
- 无 ReqId 时直接返回,保持现有流程
- 数据库查询优先于 Redis,确保已完成的请求能正确返回缓存
107-155: 并发竞争处理逻辑完善。当
createPendingCompletion返回null(唯一约束违规)时,代码会重新检查数据库和 Redis 状态。Lines 143-154 的兜底逻辑(返回合成的 in_flight)是合理的防御性编码。但
completionId: 0作为占位符可能在下游代码中引起混淆,建议在注释中说明这是合成数据。
274-306: buildInFlightErrorResponse 正确区分 API 格式。Anthropic 和 OpenAI 格式的错误响应结构不同,代码正确处理了这一差异。响应中包含有用的调试信息(
req_id、retry_after、started_at)。backend/src/api/v1/responses.ts (4)
33-41: ReqId 相关导入正确添加。从
reqIdHandler导入所需的函数和类型,从schema导入CachedResponseType。
219-257: 非流式响应的 ReqId 处理逻辑正确。正确构建
cachedResponse并在客户端断开或正常完成时调用finalizeReqId。保留了原有的addCompletions路径作为无 ReqId 时的后备。
290-323: 流式响应的缓存构建逻辑正确。
buildCachedResponse回调正确构建了完整的非流式 Response API 响应格式,使得缓存命中时可以返回完整响应而非流式数据。
579-617: 缓存命中和请求进行中的处理正确。
- 缓存命中时优先返回
cachedResponse.body,并有重建响应的降级方案- 请求进行中时正确返回 409 状态和
Retry-After头backend/src/api/v1/completions.ts (6)
22-47: ReqId 相关导入与上下文定义清晰
工具函数引入 + ReqIdContext 结构让去重流程传递一致,易读。Also applies to: 172-180
186-266: 非流式缓存与 finalize 路径实现合理
cachedResponse 构建与 finalizeReqId / addCompletions 的分支处理清楚。
292-337: 流式缓存构建 + StreamingContext 接入 OK
streamingReqIdContext 的 buildCachedResponse 便于后续 cache hit 回放。
490-620: ReqId 去重分支处理完整
cache_hit / in_flight / new_request 的返回逻辑与 Retry-After 处理清晰。
711-745: 流式处理的异步生成器封装清晰
异步生成器的 try/catch 组织方式更直观,便于控制错误输出。
797-806: 非流式 ReqIdContext 传递一致
preCreatedCompletionId 的封装逻辑直观,便于后续 finalize。backend/src/api/v1/messages.ts (6)
33-42: ReqId 相关导入与上下文定义清晰
上下文模型与去重工具链路引入完整。Also applies to: 164-172
178-246: 非流式缓存与 finalize 路径实现合理
cachedResponse 构建与 finalizeReqId 分支清晰。
263-312: 流式缓存构建 + StreamingContext 接入 OK
streamingReqIdContext 的缓存构建策略清楚且可复用。
470-585: ReqId 去重分支处理完整
cache_hit / in_flight 的返回与重建逻辑清晰。
671-705: 流式异步生成器封装清晰
流式处理的封装与错误输出更统一。
756-776: 非流式 ReqIdContext 传递一致
preCreatedCompletionId 的封装逻辑直观。
✏️ Tip: You can disable this entire section by setting review_details to false in your review settings.
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request introduces a robust request deduplication feature using the X-NexusGate-ReqId header. The implementation is comprehensive, covering database schema changes, new Redis-based caching for in-flight requests, and integration into all three completion-style API endpoints (completions, messages, responses). The support for both streaming and non-streaming requests is well-handled, with a clever approach to caching streaming responses as complete non-streaming objects. The code is well-structured, with new logic encapsulated in reqIdHandler.ts and reqIdCache.ts, and it demonstrates robust error handling and consideration for race conditions. Overall, this is an excellent and well-executed feature addition.
There was a problem hiding this comment.
Actionable comments posted: 3
🤖 Fix all issues with AI agents
In `@backend/src/api/v1/completions.ts`:
- Around line 618-620: The early/failure paths don't call finalizeReqId for
pre-created pending completions, leaving records stuck in in_flight; update all
branches that can return early or throw (where preCreatedCompletionId may be
set) to detect preCreatedCompletionId and call
finalizeReqId(preCreatedCompletionId, { status: "failed" } or "aborted") before
returning/throwing; additionally persist/cache the error response or failure
payload keyed by reqIdResult.completionId to keep responses idempotent for
subsequent same-reqid requests. Make these fixes around the logic that sets
preCreatedCompletionId (referencing reqIdResult, preCreatedCompletionId, and
finalizeReqId) and apply the same pattern to the other early/exception ranges
noted (lines ~711-742, 797-805, 808-816).
In `@backend/src/api/v1/messages.ts`:
- Around line 581-585: The pre-created pending completion
(preCreatedCompletionId obtained from reqIdResult in the new_request branch) is
not being finalized on failure/early-return paths, leaving records stuck in
pending/in_flight; update all error/early-exit branches that can return
empty/failed/non-stream responses (including the branches near where
preCreatedCompletionId is set and the other similar regions noted) to call
finalizeReqId(preCreatedCompletionId, <failed|aborted>) before returning, and
optionally persist/cache the error response tied to the ReqId so subsequent
requests are idempotent; ensure you reference and use preCreatedCompletionId and
reqIdResult and handle null checks so finalizeReqId is only called for actual
pre-created IDs.
In `@backend/src/api/v1/responses.ts`:
- Around line 619-622: The pre-created pending/in-flight record for new_request
(reqIdResult.type === "new_request" => preCreatedCompletionId) is not finalized
on failure/early-return paths, leaving ReqId stuck in in_flight; update all
error/early-return branches that may return without streaming (the branches
around where preCreatedCompletionId is set and the ranges you noted) to call
finalizeReqId(preCreatedCompletionId, "failed" or "aborted") before returning,
and optionally persist/cache the error response for idempotency; ensure you
check preCreatedCompletionId !== null before finalizing and reuse the same
completionId when caching the error response so subsequent calls observe a
terminal state.
📜 Review details
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (4)
backend/src/api/v1/completions.tsbackend/src/api/v1/messages.tsbackend/src/api/v1/responses.tsbackend/src/utils/reqIdHandler.ts
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2026-01-24T18:23:42.635Z
Learnt from: pescn
Repo: EM-GeekLab/NexusGate PR: 59
File: backend/src/api/v1/responses.ts:566-577
Timestamp: 2026-01-24T18:23:42.635Z
Learning: When using Elysia with the apiKeyPlugin, if a route option has checkApiKey: true, the apiKeyRecord parameter is guaranteed to be non-null. Do not add explicit null checks or non-null assertions for apiKeyRecord in such routes; rely on this contract and avoid unnecessary guards to improve readability. If there is any doubt about the guarantee in a specific place, add a runtime assertion at the boundary where the plugin is configured, but otherwise omit null checks for apiKeyRecord in backend code under backend/.
Applied to files:
backend/src/api/v1/completions.tsbackend/src/utils/reqIdHandler.tsbackend/src/api/v1/responses.tsbackend/src/api/v1/messages.ts
🧬 Code graph analysis (2)
backend/src/api/v1/completions.ts (6)
backend/src/utils/reqIdHandler.ts (6)
ApiFormat(40-40)finalizeReqId(188-212)extractReqId(310-326)checkReqId(77-176)recordCacheHit(253-259)buildInFlightErrorResponse(270-302)backend/src/adapters/types.ts (1)
ApiFormat(349-349)backend/src/db/schema.ts (1)
CachedResponseType(162-165)backend/src/utils/completions.ts (2)
addCompletions(29-64)Completion(10-21)backend/src/adapters/index.ts (2)
getUpstreamAdapter(85-92)getResponseAdapter(60-66)backend/src/utils/streaming-context.ts (1)
StreamingContext(32-183)
backend/src/utils/reqIdHandler.ts (3)
backend/src/db/index.ts (6)
Completion(33-33)CompletionInsert(34-34)findCompletionByReqId(1272-1292)createPendingCompletion(1300-1319)updateCompletion(1328-1340)insertCompletion(304-315)backend/src/utils/reqIdCache.ts (6)
InFlightRequest(16-21)isRedisAvailable(157-159)getInFlight(94-111)calculateRetryAfter(142-150)markInFlight(51-85)clearInFlight(121-134)backend/src/db/schema.ts (1)
CachedResponseType(162-165)
🔇 Additional comments (21)
backend/src/utils/reqIdHandler.ts (6)
11-62: ReqId 常量/类型定义清晰。结构和注释一致,便于复用与维护。
77-176: 去重流程分支覆盖完整。cache_hit / in_flight / new_request / no_reqid 路径清晰。
188-211: Finalize 与清理时序合理。更新完成后再清理 in-flight,异常分支也有兜底。
223-259: cache_hit 记录构建与入库清晰。省略 reqId 的约束说明明确。
270-302: 409 冲突响应格式统一。不同格式分支清楚,字段含义明确。
310-325: ReqId 归一化处理到位。先 trim 再做长度校验,行为一致。
backend/src/api/v1/completions.ts (5)
22-47: ReqId 工具与类型引入清楚。依赖组织合理,便于后续扩展。
172-180: ReqIdContext 结构简洁明确。字段最小化且语义清晰。
186-266: 非流式路径的缓存与 finalize 集成合理。缓存落库与状态更新顺序清楚。
284-338: 流式 ReqId 上下文构建清晰。缓存构建与 StreamingContext 传递逻辑一致。
499-615: 去重入口处理完整。cache_hit / in_flight / new_request 分支逻辑清楚。
backend/src/api/v1/responses.ts (5)
33-41: ReqId 相关引入清晰。依赖集中,阅读顺畅。
175-183: ReqIdContext 定义简洁。字段覆盖所需信息。
189-257: 非流式缓存与 finalize 整合到位。缓存写入与状态更新一致。
274-324: 流式上下文与缓存构建逻辑清晰。StreamingContext 传递一致性好。
487-617: 去重入口与 cache_hit/in_flight 处理完整。返回格式与重试头设置明确。
backend/src/api/v1/messages.ts (5)
33-41: ReqId 相关引入清晰。依赖分组合理。
164-172: ReqIdContext 定义清楚。字段语义一致。
178-247: 非流式缓存与 finalize 整合合理。状态更新与缓存一致。
263-312: 流式上下文构建清晰。缓存构建与 StreamingContext 传递一致。
470-579: 去重入口与 cache_hit/in_flight 分支完整。错误格式与重试头处理清晰。
✏️ Tip: You can disable this entire section by setting review_details to false in your review settings.
There was a problem hiding this comment.
Pull request overview
This PR implements request deduplication using the X-NexusGate-ReqId header, allowing clients to safely retry requests without duplicate processing. The feature caches completed responses for instant replay and returns 409 Conflict for concurrent duplicate requests.
Changes:
- Added database schema changes to support ReqId tracking, cache storage, and a new
cache_hitstatus - Implemented Redis-based in-flight request tracking with atomic SETNX operations (600s TTL)
- Integrated ReqId handling into all three API endpoints (completions, messages, responses) with support for both streaming and non-streaming requests
- Added frontend UI support for displaying cache_hit status with proper i18n translations
Reviewed changes
Copilot reviewed 16 out of 16 changed files in this pull request and generated 11 comments.
Show a summary per file
| File | Description |
|---|---|
| backend/drizzle/0011_outgoing_johnny_blaze.sql | Migration adding req_id, source_completion_id, api_format, cached_response columns and cache_hit enum value with partial unique index |
| backend/drizzle/meta/0011_snapshot.json | Drizzle schema snapshot for migration 0011 |
| backend/drizzle/meta/_journal.json | Migration journal entry |
| backend/src/db/schema.ts | Schema updates for cache_hit status, CachedResponseType, and new completion columns |
| backend/src/db/index.ts | New functions: findCompletionByReqId, createPendingCompletion, updateCompletion |
| backend/src/utils/reqIdCache.ts | Redis in-flight tracking with SETNX, TTL management, and retry-after calculation |
| backend/src/utils/reqIdHandler.ts | Core ReqId logic: checkReqId, finalizeReqId, recordCacheHit, buildInFlightErrorResponse |
| backend/src/utils/redisClient.ts | Added setnx method for atomic set-if-not-exists operations |
| backend/src/utils/streaming-context.ts | Added ReqId context support for streaming requests with cached response building |
| backend/src/api/v1/completions.ts | Integrated ReqId handling: cache hit detection, in-flight 409 responses, finalization |
| backend/src/api/v1/messages.ts | Same ReqId integration for Anthropic Messages API |
| backend/src/api/v1/responses.ts | Same ReqId integration for OpenAI Responses API |
| frontend/src/pages/requests/detail-panel/header.tsx | Added cache_hit status indicator with blue badge |
| frontend/src/pages/requests/columns.tsx | Added cache_hit status display in requests table |
| frontend/src/i18n/locales/en-US.json | English translations for "Cache Hit" |
| frontend/src/i18n/locales/zh-CN.json | Chinese translations for "缓存命中" |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
When ReqId is used and a pending completion is pre-created, failure paths (failover error, internal error, no response body) must finalize the completion to prevent ReqId from being stuck in pending/in-flight state. This ensures subsequent requests with the same ReqId can succeed. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
There was a problem hiding this comment.
Actionable comments posted: 4
🤖 Fix all issues with AI agents
In `@backend/src/api/v1/completions.ts`:
- Around line 771-776: The SSE error path currently yields JSON.stringify(...)
which breaks SSE clients; in the error handling block for streamSignal (check
streamSignal.aborted) replace the raw JSON yield with a properly formatted SSE
packet using the same error payload but prefixed with "data: " and terminated
with "\n\n" (and include any stream end marker if your stream protocol requires
it); keep the logger.error("Stream processing error", error) call and ensure the
yielded string uses the JSON error object from the existing code but wrapped as
SSE data so clients can parse the event.
In `@backend/src/api/v1/messages.ts`:
- Around line 286-305: The buildCachedResponse function is converting streamed
completion blocks to cache but only preserves text, losing non-text blocks like
tool_use; update buildCachedResponse (used in the streaming path that references
reqIdContext.preCreatedCompletionId and the comp: Completion object) to map each
comp.completion element by preserving its original block type and relevant
fields (e.g., keep type, text, tool_use payload, metadata) instead of forcing
every block to {type: "text", text: ...}, or alternatively call the same
Anthropic-response adapter used at stream end to construct a full, canonical
response before storing; ensure the CachedResponseType.body.content entries
mirror the original block shapes so cache hits restore tool_use and non-text
semantics.
In `@backend/src/api/v1/responses.ts`:
- Around line 297-316: buildCachedResponse currently serializes only output_text
into the cached response (in buildCachedResponse for Completion), which drops
tool_call and other non-text blocks and breaks idempotence for streaming
results; update buildCachedResponse (or generate the cache at StreamingContext
end via an adapter) to preserve the original Completion.completion items' full
block types by mapping each comp.completion item to Response API output entries
(e.g., include tool_call objects and any non-text block shapes instead of always
converting to {type:"output_text",text:...}), ensure usage still reflects
tokens, and keep the cache id using reqIdContext.preCreatedCompletionId so
cached responses match the original response shape and semantics.
- Around line 566-577: The query used by findCompletionByReqId currently filters
with not(eq(status, "pending")) and can return failed/aborted rows; update
findCompletionByReqId to restrict results to only status IN ('completed',
'cache_hit') so checkReqId (and the reqIdResult usage in responses.ts) only
treats true completed or cache hits as cached responses; locate the
findCompletionByReqId implementation and replace the not(eq(status, "pending"))
predicate with an explicit inclusion filter for 'completed' and 'cache_hit'.
♻️ Duplicate comments (1)
backend/src/api/v1/responses.ts (1)
580-583: 缓存命中记录建议降级为 best‑effort
recordCacheHit失败会直接导致缓存命中请求失败,影响可用性。建议 try/catch 记录日志后继续返回缓存结果。♻️ 建议改动
- await recordCacheHit(sourceCompletion, apiKeyRecord.id); + try { + await recordCacheHit(sourceCompletion, apiKeyRecord.id); + } catch (error) { + logger.error("Failed to record cache hit", error); + }
📜 Review details
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
backend/src/api/v1/completions.tsbackend/src/api/v1/messages.tsbackend/src/api/v1/responses.ts
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2026-01-24T18:23:42.635Z
Learnt from: pescn
Repo: EM-GeekLab/NexusGate PR: 59
File: backend/src/api/v1/responses.ts:566-577
Timestamp: 2026-01-24T18:23:42.635Z
Learning: When using Elysia with the apiKeyPlugin, if a route option has checkApiKey: true, the apiKeyRecord parameter is guaranteed to be non-null. Do not add explicit null checks or non-null assertions for apiKeyRecord in such routes; rely on this contract and avoid unnecessary guards to improve readability. If there is any doubt about the guarantee in a specific place, add a runtime assertion at the boundary where the plugin is configured, but otherwise omit null checks for apiKeyRecord in backend code under backend/.
Applied to files:
backend/src/api/v1/messages.tsbackend/src/api/v1/responses.tsbackend/src/api/v1/completions.ts
🧬 Code graph analysis (2)
backend/src/api/v1/messages.ts (4)
backend/src/utils/reqIdHandler.ts (6)
ApiFormat(40-40)finalizeReqId(188-212)extractReqId(310-326)checkReqId(77-176)recordCacheHit(253-259)buildInFlightErrorResponse(270-302)backend/src/adapters/types.ts (1)
ApiFormat(349-349)backend/src/db/schema.ts (1)
CachedResponseType(162-165)backend/src/utils/streaming-context.ts (1)
StreamingContext(32-183)
backend/src/api/v1/completions.ts (4)
backend/src/utils/reqIdHandler.ts (5)
ApiFormat(40-40)finalizeReqId(188-212)checkReqId(77-176)recordCacheHit(253-259)buildInFlightErrorResponse(270-302)backend/src/adapters/types.ts (1)
ApiFormat(349-349)backend/src/db/schema.ts (1)
CachedResponseType(162-165)backend/src/utils/streaming-context.ts (1)
StreamingContext(32-183)
✏️ Tip: You can disable this entire section by setting review_details to false in your review settings.
- Wrap recordCacheHit in try-catch to prevent cache recording failures from blocking cached response delivery - Replace reqId! non-null assertions with explicit null checks and invariant error throws for better type safety - Add documentation about race conditions when Redis is unavailable Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Instead of silently truncating ReqId values longer than 127 characters, now returns a 400 Bad Request error with a clear message indicating the length constraint. This provides explicit feedback to API consumers. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
…onses
- Fix completions.ts SSE error format to use `data: {...}\n\n` prefix
- Fix messages.ts buildCachedResponse to include tool_use blocks from completion.tool_calls
- Fix responses.ts buildCachedResponse to include function_call output items from completion.tool_calls
- Update fallback response reconstruction to also handle tool_calls
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
…idation - Extract shared utilities to reqIdHandler.ts: - finalizeReqIdOnError() helper for error path finalization - Response builders for each API format (OpenAI Chat, Anthropic, OpenAI Responses) - buildReqIdValidationErrorResponse() for format-appropriate errors - Export ReqIdContext type for endpoint files - Enhance ReqId validation: - Add regex pattern to reject control characters and special characters - Only allow alphanumeric, hyphens, underscores, dots, colons, forward slashes - Add "invalid_characters" error type - Fix inconsistent logging: - Use warn level for non-critical recordCacheHit failures - Reduce code duplication: - Replace 6+ error finalization blocks per endpoint with single function calls - Replace ~40 lines of response reconstruction per endpoint with shared builders Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
|
/gemini review |
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In `@backend/src/api/v1/messages.ts`:
- Around line 292-299: The code currently does JSON.parse(tc.function.arguments
|| "{}") inside the loop over c.tool_calls which can throw on invalid JSON and
break cache/stream handling; wrap parsing in a safe try/catch (or create a small
helper like safeParseJson) and set the input to the parsed object on success, or
to tc.function.arguments (raw string) or an empty object {} on failure, ensuring
the contentBlocks.push for type "tool_use" never throws—update the block around
c.tool_calls / tc.function.arguments where contentBlocks is populated.
📜 Review details
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (4)
backend/src/api/v1/completions.tsbackend/src/api/v1/messages.tsbackend/src/api/v1/responses.tsbackend/src/utils/reqIdHandler.ts
🚧 Files skipped from review as they are similar to previous changes (1)
- backend/src/utils/reqIdHandler.ts
🧰 Additional context used
🧠 Learnings (3)
📓 Common learnings
Learnt from: pescn
Repo: EM-GeekLab/NexusGate PR: 59
File: backend/src/api/v1/responses.ts:608-619
Timestamp: 2026-01-24T19:02:05.570Z
Learning: In NexusGate's ReqId deduplication feature (backend/src/db/index.ts, backend/src/utils/reqIdHandler.ts), the `findCompletionByReqId` query intentionally returns completions with status 'completed', 'failed', 'aborted', or 'cache_hit' (filtering only 'pending'). This design caches and returns errors for failed/aborted requests to ensure idempotency—the same ReqId always returns the same result. Users must use a different ReqId to retry failed requests.
📚 Learning: 2026-01-24T19:02:05.570Z
Learnt from: pescn
Repo: EM-GeekLab/NexusGate PR: 59
File: backend/src/api/v1/responses.ts:608-619
Timestamp: 2026-01-24T19:02:05.570Z
Learning: In NexusGate's ReqId deduplication feature (backend/src/db/index.ts, backend/src/utils/reqIdHandler.ts), the `findCompletionByReqId` query intentionally returns completions with status 'completed', 'failed', 'aborted', or 'cache_hit' (filtering only 'pending'). This design caches and returns errors for failed/aborted requests to ensure idempotency—the same ReqId always returns the same result. Users must use a different ReqId to retry failed requests.
Applied to files:
backend/src/api/v1/responses.tsbackend/src/api/v1/completions.tsbackend/src/api/v1/messages.ts
📚 Learning: 2026-01-24T18:23:42.635Z
Learnt from: pescn
Repo: EM-GeekLab/NexusGate PR: 59
File: backend/src/api/v1/responses.ts:566-577
Timestamp: 2026-01-24T18:23:42.635Z
Learning: When using Elysia with the apiKeyPlugin, if a route option has checkApiKey: true, the apiKeyRecord parameter is guaranteed to be non-null. Do not add explicit null checks or non-null assertions for apiKeyRecord in such routes; rely on this contract and avoid unnecessary guards to improve readability. If there is any doubt about the guarantee in a specific place, add a runtime assertion at the boundary where the plugin is configured, but otherwise omit null checks for apiKeyRecord in backend code under backend/.
Applied to files:
backend/src/api/v1/responses.tsbackend/src/api/v1/completions.tsbackend/src/api/v1/messages.ts
🧬 Code graph analysis (1)
backend/src/api/v1/completions.ts (3)
backend/src/utils/reqIdHandler.ts (9)
ReqIdContext(505-510)finalizeReqId(195-219)ApiFormat(40-40)extractReqId(338-359)buildReqIdValidationErrorResponse(545-570)checkReqId(77-183)recordCacheHit(260-266)buildCachedResponseByFormat(484-496)finalizeReqIdOnError(520-536)backend/src/db/schema.ts (1)
CachedResponseType(162-165)backend/src/adapters/types.ts (1)
ApiFormat(349-349)
🔇 Additional comments (2)
backend/src/api/v1/completions.ts (1)
495-613: ReqId 校验与去重分支实现清晰且稳健。校验、cache hit、in-flight 以及 new_request 的处理路径完整,且 cache hit 记录为 best-effort,整体可用性不错。
backend/src/api/v1/responses.ts (1)
512-652: ReqId 分支处理与缓存命中路径实现合理。校验、缓存命中与 409 in-flight 的返回逻辑清晰、与 completions 保持一致。
✏️ Tip: You can disable this entire section by setting review_details to false in your review settings.
There was a problem hiding this comment.
Code Review
This pull request introduces a robust request deduplication and caching mechanism using the X-NexusGate-ReqId header. The implementation is well-structured, with core logic cleanly separated into reqIdHandler.ts and reqIdCache.ts. The combination of Redis for in-flight tracking and a database unique constraint for persistence provides strong guarantees against race conditions. I'm impressed by the comprehensive support for both streaming and non-streaming requests across all three API formats. The error handling for failover and client disconnects has also been thoughtfully updated. Overall, this is a solid feature addition. I have a couple of suggestions to further improve robustness in edge-case scenarios.
- Add extractAndValidateReqId() to combine extraction, validation, and error response building - Add handleReqIdResult() to consolidate cache_hit, in_flight, and context building logic - Update completions, messages, and responses endpoints to use new helpers - Reduces ~24 lines of duplicated code per endpoint (~72 lines total) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
|
/gemini review |
There was a problem hiding this comment.
Code Review
这是一个非常出色的功能实现,为 API 增加了基于 X-NexusGate-ReqId 的请求去重和缓存能力。整体设计考虑周全,通过结合数据库唯一约束和 Redis SETNX,稳健地处理了并发请求和竞态条件。代码结构清晰,将核心逻辑封装在 reqIdHandler 和 reqIdCache 中,使得各 API 端点的集成非常干净。对流式请求的支持(通过在流结束后缓存完整的非流式响应)是一个亮点,设计得非常巧妙。
我在代码中发现了一些可以改进的地方,主要集中在数据保真度和错误处理的健壮性上:
- 在
messages和responsesAPI 中,为请求去重而存储的prompt数据对复杂内容(如图片)进行了字符串化,这会导致信息丢失。 - 在从缓存构建 Anthropic 格式的响应时,对
tool_calls的arguments字段进行 JSON 解析的地方缺少错误处理,可能因上游数据格式错误而导致异常。
除了这些小问题,整个 PR 的质量非常高,代码可读性和可维护性都很好。这些改动将极大地提升 API 的幂等性和性能。
Replace brittle string matching (error.message.includes("unique")) with
PostgreSQL error code check (code === "23505") for detecting unique
constraint violations in createPendingCompletion.
This matches the pattern already used in apiKey.ts and is more reliable
across different PostgreSQL versions and locales.
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Summary
X-NexusGate-ReqIdheader support for request deduplicationRetry-Afterheader for in-flight requestsImplementation Details
Database Changes:
req_id,source_completion_id,api_format,cached_responsecolumns tocompletionstablecache_hitstatus tocompletions_statusenum(api_key_id, req_id)wherereq_id IS NOT NULLNew Files:
backend/src/utils/reqIdCache.ts- Redis in-flight tracking with SETNX for atomic marking (600s TTL)backend/src/utils/reqIdHandler.ts- ReqId processing logicModified Files:
streaming-context.ts- Added ReqId support for streaming requestscache_hitstatus displayTest plan
🤖 Generated with Claude Code
Summary by CodeRabbit
新功能
改进
✏️ Tip: You can customize this high-level summary in your review settings.