fix(api): record completions even when client disconnects#58
Conversation
This fix ensures that LLM completions are properly recorded to the database even when the client disconnects mid-stream or times out. Backend changes: - Add 'aborted' status to CompletionsStatusEnum - Create StreamingContext class to manage streaming state with abort detection - Register abort event listener on request.signal to save completion on disconnect - Update processStreamingResponse to use StreamingContext - Update processNonStreamingResponse to await database writes and detect aborts - Apply changes to all three endpoints: completions, messages, responses Frontend changes: - Add 'aborted' status handling in ts-pattern exhaustive matching - Add amber indicator badges for aborted requests in requests and embeddings tables - Add i18n translations for 'Aborted' status (en-US: "Aborted", zh-CN: "已中止") Database migration: - Add 'aborted' value to completions_status enum Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
|
Note Other AI code review bot(s) detectedCodeRabbit has detected other AI code review bot(s) in this pull request and will avoid duplicating their findings in the review comments. This may lead to a less comprehensive review. 📝 WalkthroughWalkthrough新增 StreamingContext 并将 AbortSignal 贯穿流式与非流式响应处理;在客户端断开时把 completion 标记为 Changes
Sequence Diagram(s)sequenceDiagram
participant Client
participant Server as API_Server
participant Context as StreamingContext
participant DB as Database
Client->>Server: 发起请求(流式/非流式,含 AbortSignal)
Server->>Server: 创建 Completion (status="pending")
Server->>Context: new StreamingContext(completion, bearer, apiKey, begin, signal)
loop 接收上游数据块
Server->>Server: 读取上游分片
Server->>Context: recordTTFT()/累积分片/计数tokens
alt 客户端未中止
Server->>Client: 推送分片
else 客户端已中止
Server-->>Client: 停止推送(不再 yield)
end
end
alt 正常完成
Server->>Context: saveCompletion("completed")
Context->>DB: 插入/更新 completion(status=completed, parts, tokens, ttft)
DB-->>Context: OK
Context->>Server: consumeTokens()
else 客户端中止
Client-->>Server: 触发 AbortSignal
Context->>Context: isAborted() -> true
Context->>DB: 保存 completion(status=aborted, 已收分片)
DB-->>Context: OK
else 上游/处理错误
Server->>Context: saveCompletion("failed", error)
Context->>DB: 保存失败状态
DB-->>Context: OK
end
Context->>Context: cleanup()
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
📜 Recent review detailsConfiguration used: Organization UI Review profile: CHILL Plan: Pro 📒 Files selected for processing (3)
🧰 Additional context used🧬 Code graph analysis (3)backend/src/api/v1/messages.ts (3)
backend/src/api/v1/completions.ts (4)
backend/src/api/v1/responses.ts (3)
🔇 Additional comments (10)
✏️ Tip: You can disable this entire section by setting Comment |
Summary of ChangesHello @pescn, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request addresses a critical issue where API completion records were not reliably saved if a client disconnected prematurely or a request timed out. By introducing an Highlights
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Code Review
This pull request introduces a robust mechanism to handle client disconnects during API requests, ensuring that completions are logged even if the client aborts. This is achieved by adding an aborted status and implementing a StreamingContext class that leverages request.signal to detect disconnections. The refactoring of the completions, messages, and responses API endpoints to use this new context class is a significant improvement, centralizing state management and cleanup logic for streaming responses. The changes are well-structured and also improve correctness by awaiting database operations. The frontend has also been updated to reflect the new aborted status. I have one suggestion to improve logging for aborted streaming requests.
There was a problem hiding this comment.
Actionable comments posted: 3
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
backend/src/api/v1/responses.ts (1)
245-310: 修复 abort 时的异常日志噪声。当前 catch 块无条件调用
logger.error,当客户端 abort 触发上游异常时会产生不必要的错误日志噪声。建议添加条件检查,仅在非 abort 情况下记录错误。已确认
StreamingContext在 abort 时正确保存完成记录并标记isSaved,防止重复保存。建议修复
- logger.error("Stream processing error", error); + if (!ctx.isAborted()) { + logger.error("Stream processing error", error); + }backend/src/api/v1/messages.ts (1)
234-295: abort 场景的流式异常日志建议做保护。catch 里无条件记录错误,若因 abort 触发异常会导致噪声。已确认
StreamingContext在 abort 时会立即调用saveCompletion("aborted")并设置isSaved = true,后续 catch 块中的isSaved()检查能有效防止被completed覆盖。建议如下改进来减少 abort 误日志:推荐改进
- logger.error("Stream processing error", error); + if (!ctx.isAborted()) { + logger.error("Stream processing error", error); + }
🤖 Fix all issues with AI agents
In `@backend/src/api/v1/completions.ts`:
- Around line 620-634: If request.signal.aborted is true the current branch
skips updating the completion, leaving it stuck as "pending"; modify the abort
branch in the error handling around processNonStreamingResponse to persist an
aborted status and a minimal audit entry: set completion.status = "aborted" (or
similar), call await addCompletions(completion, bearer, { level: "info" |
"warn", message: "Request aborted before processing", details: { type:
"abortedDuringProcessing" } }) and ensure set.status = 499 (or appropriate) and
yield a short aborted response; this change should be applied next to the
existing request.signal.aborted check so completion and logs are always written
even if processing threw after the client aborted.
In `@backend/src/utils/streaming-context.ts`:
- Around line 138-142: The current TPM consumption only runs when both
this.inputTokens and this.outputTokens are > 0 which can skip charging when
output is 0 (e.g., aborted streams); clamp negative/undefined token counts to
zero (e.g., safeInput = Math.max(0, this.inputTokens), safeOutput = Math.max(0,
this.outputTokens)), compute totalTokens = safeInput + safeOutput and call await
consumeTokens(this.apiKeyRecord.id, this.apiKeyRecord.tpmLimit, totalTokens)
whenever totalTokens > 0; keep references to this.apiKeyRecord,
this.inputTokens, this.outputTokens and consumeTokens to locate the change.
- Around line 55-65: The abort handler registration should handle the case where
signal is already aborted before listener registration: after creating and
adding this.abortHandler via signal.addEventListener in the constructor (or
initializer) of the StreamingContext, immediately check if (signal.aborted) and
if so invoke this.abortHandler() to ensure saveCompletion("aborted") runs;
reference the existing symbols this.abortHandler, signal, saveCompletion, and
logger and ensure the handler is invoked safely (guard against undefined) so the
aborted path is processed even when the AbortSignal was triggered prior to
listener registration.
📜 Review details
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (13)
backend/drizzle/0010_noisy_deathbird.sqlbackend/drizzle/meta/0010_snapshot.jsonbackend/drizzle/meta/_journal.jsonbackend/src/api/v1/completions.tsbackend/src/api/v1/messages.tsbackend/src/api/v1/responses.tsbackend/src/db/schema.tsbackend/src/utils/streaming-context.tsfrontend/src/i18n/locales/en-US.jsonfrontend/src/i18n/locales/zh-CN.jsonfrontend/src/pages/embeddings/columns.tsxfrontend/src/pages/requests/columns.tsxfrontend/src/pages/requests/detail-panel/header.tsx
🧰 Additional context used
🧬 Code graph analysis (4)
frontend/src/pages/requests/detail-panel/header.tsx (1)
frontend/src/components/ui/indicator-badge.tsx (1)
IndicatorBadge(6-16)
frontend/src/pages/requests/columns.tsx (1)
frontend/src/components/ui/indicator-badge.tsx (1)
MiniIndicatorBadge(18-32)
backend/src/utils/streaming-context.ts (2)
backend/src/db/schema.ts (2)
ToolCallType(78-85)CompletionsStatusEnumType(155-155)backend/src/utils/completions.ts (1)
addCompletions(29-64)
frontend/src/pages/embeddings/columns.tsx (1)
frontend/src/components/ui/indicator-badge.tsx (1)
MiniIndicatorBadge(18-32)
🔇 Additional comments (20)
backend/src/db/schema.ts (1)
149-155: 枚举与类型同步新增aborted
pgEnum 与 TypeScript 联合类型保持一致,端到端状态对齐良好。backend/src/utils/streaming-context.ts (2)
71-83: TTFT 记录与中止判定清晰
逻辑简洁直接,易于维护。
145-160: cleanup 释放监听器处理到位
对 abort 监听器的解除与状态清理很清楚。backend/drizzle/meta/_journal.json (1)
74-80: 迁移日志条目新增一致
条目格式与既有记录保持一致。frontend/src/i18n/locales/zh-CN.json (1)
93-98: “已中止”本地化覆盖完整
请求列表、详情头部与向量化列表均已补齐。Also applies to: 118-124, 192-197
frontend/src/pages/requests/detail-panel/header.tsx (1)
65-69: 明细头部新增 aborted 指示器一致
配色与其他状态保持一致。frontend/src/i18n/locales/en-US.json (1)
92-97: Aborted 英文文案补齐
与中文键值同步,覆盖请求与向量化视图。Also applies to: 117-123, 191-196
frontend/src/pages/requests/columns.tsx (1)
33-46: 新增 aborted 状态指示与现有模式一致。保持了与 pending/completed/failed 相同的渲染与 i18n 使用方式。
frontend/src/pages/embeddings/columns.tsx (1)
21-42: aborted 状态展示保持一致性。与其他状态分支的结构和样式一致,便于统一视觉反馈。
backend/drizzle/0010_noisy_deathbird.sql (1)
1-1: 枚举新增 aborted 与业务语义对齐。迁移与新增状态保持一致。
backend/drizzle/meta/0010_snapshot.json (1)
1-1002: 自动生成快照文件,无需人工评审项。backend/src/api/v1/responses.ts (3)
26-28: StreamingContext 引入合理。为流式处理提供统一的 abort 管理与保存入口。
166-213: 非流式 abort 处理清晰。在 signal.aborted 时记录 aborted 并写入数据库,符合需求目标。
486-587: 路由层面传递 signal 并在 abort 时抑制错误响应很到位。避免了客户端断开后的多余错误输出与重复日志。
backend/src/api/v1/messages.ts (3)
26-28: StreamingContext 引入有助于统一流式状态管理。
156-203: 非流式 abort 保存逻辑符合预期。在客户端中断时写入 aborted,减少遗漏。
470-571: signal 传递与 abort 时的错误抑制处理到位。减少了客户端断开后的无意义错误响应与日志。
backend/src/api/v1/completions.ts (3)
33-33: 引入 StreamingContext 后流式聚合与落库更清晰。上下文集中管理内容聚合、TTFT 与保存逻辑,流程更一致、更易追踪。
Also applies to: 227-356
535-552: 流式路由层的异常分支处理更干净。减少噪音并避免重复响应,整体可读性更好。
163-215: 非流式路径在返回前持久化并区分终态,👍先落库再返回,有助于保证记录完整性。
Also applies to: 608-617
✏️ Tip: You can disable this entire section by setting review_details to false in your review settings.
Addresses review comments from CodeRabbit and Gemini:
1. StreamingContext improvements:
- Pass "Client disconnected" reason to saveCompletion for better logging
- Handle already-aborted signals (AbortSignal spec: addEventListener
won't trigger for already-aborted signals)
- Fix TPM consumption to use Math.max(0, ...) for token values,
ensuring partial usage is charged even when output is 0
2. Non-streaming abort handling:
- Save completion as "aborted" when processing throws after client
disconnect (previously left as "pending")
3. Streaming error logging:
- Add ctx.isAborted() check before logger.error to reduce noise
when client disconnects
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Fix no-base-to-string lint error by using instanceof Error check instead of String(error ?? "...") pattern. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request introduces robust handling for client disconnects during API requests, ensuring that completions are logged with a new aborted status. The implementation is centered around a new StreamingContext class, which is a clean and effective way to manage state and handle abort signals. The changes are consistently applied across the completions, messages, and responses endpoints.
My review focuses on improving the robustness of the error handling logic in the non-streaming paths. I've identified a critical issue where a failure to log a completion could lead to an unhandled exception, preventing an error response from being sent to the client. I've provided a detailed comment and code suggestion for each of the affected API endpoint files to address this.
Address Gemini critical review: 1. Remove flawed `completion.status === "pending"` check that could prevent logging when status was already updated 2. Use .catch() instead of await for addCompletions to handle database failures gracefully without blocking error response Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request introduces a robust mechanism for handling client disconnects by adding an aborted status and a StreamingContext class. This is a significant improvement for ensuring completion records are saved reliably, especially for streaming responses. The logic is well-encapsulated and consistently applied across the completions, messages, and responses API endpoints. I've noted a few areas in the non-streaming error handlers where database calls should be awaited to guarantee data persistence. Overall, this is a very solid and valuable change.
| if (request.signal.aborted) { | ||
| // Client disconnected - save as aborted | ||
| completion.status = "aborted"; | ||
| addCompletions(completion, bearer, { | ||
| level: "info", | ||
| message: "Client disconnected during non-streaming response", | ||
| details: { | ||
| type: "completionError", | ||
| data: { type: "aborted", msg: errorMsg }, | ||
| }, | ||
| }).catch((logError: unknown) => { | ||
| logger.error("Failed to log aborted completion after processing error", logError); | ||
| }); | ||
| } else { | ||
| logger.error("Failed to process response", error); | ||
| completion.status = "failed"; | ||
| addCompletions(completion, bearer, { | ||
| level: "error", | ||
| message: `Response processing error: ${errorMsg}`, | ||
| details: { | ||
| type: "completionError", | ||
| data: { type: "processingError", msg: errorMsg }, | ||
| }, | ||
| }).catch((logError: unknown) => { | ||
| logger.error("Failed to log completion after processing error", logError); | ||
| }); | ||
| set.status = 500; | ||
| yield JSON.stringify({ error: "Failed to process response" }); | ||
| } |
There was a problem hiding this comment.
In the error handling logic for non-streaming requests, the addCompletions calls are not awaited. This creates a "fire-and-forget" situation where the application might terminate or the response might be sent before the database operation completes, potentially leading to lost completion records in error scenarios. To ensure data is persisted reliably, you should await these calls.
if (request.signal.aborted) {
// Client disconnected - save as aborted
completion.status = "aborted";
try {
await addCompletions(completion, bearer, {
level: "info",
message: "Client disconnected during non-streaming response",
details: {
type: "completionError",
data: { type: "aborted", msg: errorMsg },
},
});
} catch (logError) {
logger.error("Failed to log aborted completion after processing error", logError);
}
} else {
logger.error("Failed to process response", error);
completion.status = "failed";
try {
await addCompletions(completion, bearer, {
level: "error",
message: `Response processing error: ${errorMsg}`,
details: {
type: "completionError",
data: { type: "processingError", msg: errorMsg },
},
});
} catch (logError) {
logger.error("Failed to log completion after processing error", logError);
}
set.status = 500;
yield JSON.stringify({ error: "Failed to process response" });
}| if (request.signal.aborted) { | ||
| // Client disconnected - save as aborted | ||
| completion.status = "aborted"; | ||
| addCompletions(completion, bearer, { | ||
| level: "info", | ||
| message: "Client disconnected during non-streaming response", | ||
| details: { | ||
| type: "completionError", | ||
| data: { type: "aborted", msg: errorMsg }, | ||
| }, | ||
| }).catch((logError: unknown) => { | ||
| logger.error("Failed to log aborted completion after processing error", logError); | ||
| }); | ||
| } else { | ||
| logger.error("Failed to process response", error); | ||
| completion.status = "failed"; | ||
| addCompletions(completion, bearer, { | ||
| level: "error", | ||
| message: `Response processing error: ${errorMsg}`, | ||
| details: { | ||
| type: "completionError", | ||
| data: { type: "processingError", msg: errorMsg }, | ||
| }, | ||
| }).catch((logError: unknown) => { | ||
| logger.error("Failed to log completion after processing error", logError); | ||
| }); | ||
| set.status = 500; | ||
| yield JSON.stringify({ | ||
| type: "error", | ||
| error: { type: "api_error", message: "Failed to process response" }, | ||
| }); | ||
| } | ||
| } |
There was a problem hiding this comment.
The addCompletions calls within this catch block are not awaited. This could lead to a race condition where the function returns or the process exits before the completion record is saved to the database, especially during error handling. It's important to await these calls to ensure the completion status is reliably recorded.
if (request.signal.aborted) {
// Client disconnected - save as aborted
completion.status = "aborted";
try {
await addCompletions(completion, bearer, {
level: "info",
message: "Client disconnected during non-streaming response",
details: {
type: "completionError",
data: { type: "aborted", msg: errorMsg },
},
});
} catch (logError) {
logger.error("Failed to log aborted completion after processing error", logError);
}
} else {
logger.error("Failed to process response", error);
completion.status = "failed";
try {
await addCompletions(completion, bearer, {
level: "error",
message: `Response processing error: ${errorMsg}`,
details: {
type: "completionError",
data: { type: "processingError", msg: errorMsg },
},
});
} catch (logError) {
logger.error("Failed to log completion after processing error", logError);
}
set.status = 500;
yield JSON.stringify({
type: "error",
error: { type: "api_error", message: "Failed to process response" },
});
}…eaming responses Addresses reviewer feedback: - Set Content-Type: text/event-stream for SSE streaming responses - Set Content-Type: application/json for non-streaming JSON responses When using async generators in Elysia, the content-type cannot be auto-detected, so we need to explicitly set the appropriate headers based on the request type. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
…tors Per Elysia documentation, async generators should: - Use `return` for non-streaming responses (Elysia converts to normal JSON response) - Use `yield` only for streaming responses This ensures: - Non-streaming responses get proper `application/json` content-type - Streaming responses get proper streaming behavior - Early error responses are returned as normal JSON, not streamed References: - https://elysiajs.com/essential/handler - elysiajs/elysia#742 Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
关于 Content-Type 问题的修复经过查阅 Elysia Handler 文档 和 Issue #742,确认了 Reviewer 的观点是正确的: Elysia 的 async generator 行为
之前的问题之前的代码中,所有响应都使用 修复方案 (commit 65bbdb8)
这样 Elysia 可以正确区分响应类型并设置适当的 Content-Type。 |
|
/gemini review |
|
@coderabbitai review |
✅ Actions performedReview triggered.
|
There was a problem hiding this comment.
Code Review
This pull request introduces robust handling for client disconnects during API requests, a critical feature for improving reliability. It correctly adds an 'aborted' status to the database and frontend, and refactors the streaming logic into a new StreamingContext class. This new class centralizes the complex state management and ensures that completions are saved even when the client disconnects, which is a significant improvement. The changes are well-structured across all three API endpoints (completions, messages, responses). My review includes a few suggestions to enhance the consistency of asynchronous database operations within error handling blocks to further improve robustness.
Changed from fire-and-forget .catch() pattern to await with try-catch for addCompletions calls in non-streaming error handlers. With the refactoring to use `return` instead of `yield`, the fire-and-forget pattern is problematic because: 1. The function returns immediately after return statement 2. The pending promise may not complete before the function exits Using await ensures the completion record is reliably saved before returning the response or exiting the handler. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
关于 Gemini 新反馈的修复 (commit 3399b60)Gemini 指出在重构后(使用 修复内容将 // 之前 (fire-and-forget)
addCompletions(completion, bearer, {...}).catch((logError) => {
logger.error("...", logError);
});
return;
// 之后 (await with try-catch)
try {
await addCompletions(completion, bearer, {...});
} catch (logError: unknown) {
logger.error("...", logError);
}
return;这确保了:
修复覆盖了三个文件:
|
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request introduces a robust mechanism for handling client disconnects during API requests, ensuring that completions are logged even if the client aborts. The core of this change is the new StreamingContext class, which is a great addition for managing the lifecycle of streaming responses and handling abort signals. The changes are well-implemented across the completions, messages, and responses APIs.
My review identifies one critical issue in the new StreamingContext that would cause incorrect data to be saved for two of the three updated API endpoints. Overall, this is a valuable improvement to the system's reliability.
Previously, when a client disconnected mid-stream: 1. The abort handler immediately saved partial data 2. The for-await loop broke, discarding remaining chunks Now the behavior is: 1. No immediate save on abort (removed abort handler) 2. Continue reading all chunks from upstream 3. Only skip yielding to disconnected client 4. Save full response with "aborted" status when stream ends This ensures complete data is captured for observability even when clients disconnect before the stream finishes. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
修复:客户端断开后继续收集 chunks (commit 64b28ed)问题之前的行为:
修复后的行为
代码变化// 之前
for await (const chunk of chunks) {
if (ctx.isAborted()) {
break; // 丢弃后续 chunks
}
// ... collect and yield
}
// 之后
for await (const chunk of chunks) {
const clientAborted = ctx.isAborted();
// Always collect data
if (chunk.type === "content_block_delta") {
ctx.textParts.push(chunk.delta.text);
}
// Only yield if client is still connected
if (!clientAborted) {
yield serialized;
}
}
// Save with appropriate status at the end
if (ctx.isAborted()) {
await ctx.saveCompletion("aborted", "Client disconnected");
} else {
await ctx.saveCompletion("completed");
}这确保了即使客户端提前断开,也能记录完整的 LLM 响应内容,便于后续分析和计费。 |
Log an info-level message when we first detect that a client has disconnected during streaming. This helps track client disconnect events while still continuing to collect all upstream data for accurate token metering. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
There was a problem hiding this comment.
Actionable comments posted: 4
🤖 Fix all issues with AI agents
In `@backend/src/api/v1/completions.ts`:
- Around line 631-669: The outer catch is calling addCompletions again even if
processNonStreamingResponse already saved the completion; to prevent duplicates,
introduce a saved flag or state on the Completion lifecycle (e.g., a boolean
like completionSaved) that processNonStreamingResponse sets when it successfully
persists, or have processNonStreamingResponse return a result indicating
“saved”; then in the outer catch (around processNonStreamingResponse /
consumeTokens) check that flag/result before calling addCompletions or changing
completion.status (referencing processNonStreamingResponse, consumeTokens,
addCompletions, and the completion object) so addCompletions is only invoked
once for the same completion.
In `@backend/src/api/v1/messages.ts`:
- Around line 567-608: Before calling addCompletions in the catch block (both
the aborted branch and the failed branch) guard the save with a check that
completion.status === "pending" to avoid duplicate saves; i.e., inspect the
completion.status field and skip calling addCompletions (and skip related
logging) if it has already been changed from "pending" (this affects the abort
branch that sets completion.status = "aborted" and the error branch that sets
completion.status = "failed", so perform the check before invoking
addCompletions for those cases).
- Around line 234-295: The streaming loop in messages.ts only handles
content_block_delta and thus misses content_block_start/stop tool-call events;
update the for-await loop (where upstreamAdapter.parseStreamResponse(resp) is
iterated and ctx.textParts / thinkingParts are collected) to also handle
chunk.type === "content_block_start" and "content_block_stop" like
completions.ts: on content_block_start record the tool metadata into
ctx.indexToIdMap using ctx.nextToolCallIndex and increment
ctx.nextToolCallIndex, and on content_block_stop finalize any necessary
tool-call entry so the resulting completion saved via
StreamingContext.saveCompletion includes all tool-call entries; ensure
responseAdapter.serializeStreamChunk/yield logic remains unchanged for non-tool
chunks.
In `@backend/src/api/v1/responses.ts`:
- Around line 583-624: The catch block may double-save the completion via
addCompletions if the record was already persisted earlier (same issue as in
completions.ts); introduce and check an alreadySaved flag (either on the
completion object or a local boolean captured in the surrounding scope) before
calling addCompletions in both the aborted and failed branches, skip
logging/saving when alreadySaved is true, and ensure the code that first
persists the completion (the spot in processNonStreamingResponse/consumeTokens
that currently calls addCompletions) sets alreadySaved = true so subsequent
catch handlers do not duplicate the record.
📜 Review details
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
backend/src/api/v1/completions.tsbackend/src/api/v1/messages.tsbackend/src/api/v1/responses.ts
🧰 Additional context used
🧬 Code graph analysis (3)
backend/src/api/v1/messages.ts (3)
backend/src/utils/completions.ts (1)
addCompletions(29-64)backend/src/adapters/index.ts (2)
getUpstreamAdapter(85-92)getResponseAdapter(60-66)backend/src/utils/streaming-context.ts (1)
StreamingContext(18-153)
backend/src/api/v1/responses.ts (3)
backend/src/utils/completions.ts (1)
addCompletions(29-64)backend/src/adapters/index.ts (2)
getUpstreamAdapter(85-92)getResponseAdapter(60-66)backend/src/utils/streaming-context.ts (1)
StreamingContext(18-153)
backend/src/api/v1/completions.ts (3)
backend/src/utils/completions.ts (1)
addCompletions(29-64)backend/src/adapters/index.ts (2)
getUpstreamAdapter(85-92)getResponseAdapter(60-66)backend/src/utils/streaming-context.ts (1)
StreamingContext(18-153)
🔇 Additional comments (10)
backend/src/api/v1/completions.ts (5)
199-215: 非流式响应的中止处理逻辑正确。在完成响应解析后检查
signal?.aborted,并根据状态保存为"aborted"或"completed"。使用await确保数据库写入完成后再返回,符合 PR 目标。
247-264: 流式响应中止检测与日志记录实现正确。使用
loggedAbort标志避免重复日志,客户端断开后继续收集上游数据以确保完整的计量数据。这符合 PR 目标中"保留完整 LLM 输出用于计量和分析"的要求。
329-341: 流式数据仅在客户端未断开时发送。通过
!clientAborted条件控制yield,确保在客户端断开后不再尝试发送数据,同时继续收集上游响应。实现符合预期。
357-376: 流式错误处理区分中止与非中止场景。当客户端中止且发生错误时,保存为
"aborted"状态并附带错误信息;仅在客户端仍连接时重新抛出错误。这避免了不必要的错误传播,同时确保完成记录被正确保存。
549-566: 流式处理的外层错误处理正确传递中止信号。
request.signal被正确传递给processStreamingResponse,外层 catch 块在客户端中止时跳过错误日志和响应发送。backend/src/api/v1/responses.ts (3)
198-213: 非流式响应的中止处理与 completions.ts 保持一致。实现模式与
completions.ts一致,确保 API 行为统一。
245-311: 流式响应使用 StreamingContext 正确处理中止。实现与
completions.ts一致,Response API 的流式处理仅收集文本和 thinking 内容,符合该 API 的语义。
512-518: 流式错误响应使用正确的 SSE 格式。错误事件格式
event: error\ndata: ...符合 OpenAI Response API 的 SSE 规范。backend/src/api/v1/messages.ts (2)
187-202: 非流式响应的中止处理与其他端点保持一致。Anthropic Messages API 的实现与
completions.ts和responses.ts保持一致的模式。
496-502: 流式错误响应使用 Anthropic API 错误格式。错误事件格式符合 Anthropic Messages API 的约定。
✏️ Tip: You can disable this entire section by setting review_details to false in your review settings.
Address CodeRabbit review feedback: - Add alreadySaved check in catch blocks to prevent duplicate database writes if processNonStreamingResponse already saved the completion before consumeTokens throws an error - Add tool call handling (content_block_start/stop) to messages.ts and responses.ts streaming to match completions.ts behavior Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Summary
Fixes #21: Completions are now properly recorded to the database even when the client disconnects mid-stream or times out.
Key changes:
abortedstatus to track client-disconnected requestsStreamingContextclass with abort event detection viarequest.signalTest plan
python_test_code/test_abort_handling.py- all 3 tests pass:completed✅aborted✅aborted✅🤖 Generated with Claude Code
Summary by CodeRabbit
新功能
改进
本地化
✏️ Tip: You can customize this high-level summary in your review settings.