From 1fbef4fd835437bd89e32ba5b2211fa867c4f257 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=9F=A9=E7=BF=94=E5=AE=87?= Date: Sat, 24 Jan 2026 21:19:14 +0800 Subject: [PATCH 01/10] fix(api): record completions even when client disconnects (#21) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This fix ensures that LLM completions are properly recorded to the database even when the client disconnects mid-stream or times out. Backend changes: - Add 'aborted' status to CompletionsStatusEnum - Create StreamingContext class to manage streaming state with abort detection - Register abort event listener on request.signal to save completion on disconnect - Update processStreamingResponse to use StreamingContext - Update processNonStreamingResponse to await database writes and detect aborts - Apply changes to all three endpoints: completions, messages, responses Frontend changes: - Add 'aborted' status handling in ts-pattern exhaustive matching - Add amber indicator badges for aborted requests in requests and embeddings tables - Add i18n translations for 'Aborted' status (en-US: "Aborted", zh-CN: "已中止") Database migration: - Add 'aborted' value to completions_status enum Co-Authored-By: Claude Opus 4.5 --- backend/drizzle/0010_noisy_deathbird.sql | 1 + backend/drizzle/meta/0010_snapshot.json | 1002 +++++++++++++++++ backend/drizzle/meta/_journal.json | 7 + backend/src/api/v1/completions.ts | 179 ++- backend/src/api/v1/messages.ts | 148 +-- backend/src/api/v1/responses.ts | 148 +-- backend/src/db/schema.ts | 3 +- backend/src/utils/streaming-context.ts | 161 +++ frontend/src/i18n/locales/en-US.json | 3 + frontend/src/i18n/locales/zh-CN.json | 3 + frontend/src/pages/embeddings/columns.tsx | 5 + frontend/src/pages/requests/columns.tsx | 3 + .../pages/requests/detail-panel/header.tsx | 5 + 13 files changed, 1421 insertions(+), 247 deletions(-) create mode 100644 backend/drizzle/0010_noisy_deathbird.sql create mode 100644 backend/drizzle/meta/0010_snapshot.json create mode 100644 backend/src/utils/streaming-context.ts diff --git a/backend/drizzle/0010_noisy_deathbird.sql b/backend/drizzle/0010_noisy_deathbird.sql new file mode 100644 index 0000000..71bbe04 --- /dev/null +++ b/backend/drizzle/0010_noisy_deathbird.sql @@ -0,0 +1 @@ +ALTER TYPE "public"."completions_status" ADD VALUE 'aborted'; \ No newline at end of file diff --git a/backend/drizzle/meta/0010_snapshot.json b/backend/drizzle/meta/0010_snapshot.json new file mode 100644 index 0000000..a39d245 --- /dev/null +++ b/backend/drizzle/meta/0010_snapshot.json @@ -0,0 +1,1002 @@ +{ + "id": "6be63fd9-50d1-4d49-acea-e53401c54b2f", + "prevId": "1fcc8e66-5eaf-433f-9537-89ea8601aa65", + "version": "7", + "dialect": "postgresql", + "tables": { + "public.api_keys": { + "name": "api_keys", + "schema": "", + "columns": { + "id": { + "name": "id", + "type": "integer", + "primaryKey": true, + "notNull": true, + "identity": { + "type": "always", + "name": "api_keys_id_seq", + "schema": "public", + "increment": "1", + "startWith": "1", + "minValue": "1", + "maxValue": "2147483647", + "cache": "1", + "cycle": false + } + }, + "key": { + "name": "key", + "type": "varchar(63)", + "primaryKey": false, + "notNull": true + }, + "comment": { + "name": "comment", + "type": "varchar", + "primaryKey": false, + "notNull": false + }, + "created_at": { + "name": "created_at", + "type": "timestamp", + "primaryKey": false, + "notNull": true, + "default": "now()" + }, + "updated_at": { + "name": "updated_at", + "type": "timestamp", + "primaryKey": false, + "notNull": true, + "default": "now()" + }, + "expires_at": { + "name": "expires_at", + "type": "timestamp", + "primaryKey": false, + "notNull": false + }, + "last_seen": { + "name": "last_seen", + "type": "timestamp", + "primaryKey": false, + "notNull": false + }, + "revoked": { + "name": "revoked", + "type": "boolean", + "primaryKey": false, + "notNull": true, + "default": false + }, + "rpm_limit": { + "name": "rpm_limit", + "type": "integer", + "primaryKey": false, + "notNull": true, + "default": 50 + }, + "tpm_limit": { + "name": "tpm_limit", + "type": "integer", + "primaryKey": false, + "notNull": true, + "default": 50000 + }, + "external_id": { + "name": "external_id", + "type": "varchar(127)", + "primaryKey": false, + "notNull": false + }, + "source": { + "name": "source", + "type": "api_key_source", + "typeSchema": "public", + "primaryKey": false, + "notNull": false, + "default": "'manual'" + } + }, + "indexes": {}, + "foreignKeys": {}, + "compositePrimaryKeys": {}, + "uniqueConstraints": { + "api_keys_key_unique": { + "name": "api_keys_key_unique", + "nullsNotDistinct": false, + "columns": [ + "key" + ] + }, + "api_keys_external_id_unique": { + "name": "api_keys_external_id_unique", + "nullsNotDistinct": false, + "columns": [ + "external_id" + ] + } + }, + "policies": {}, + "checkConstraints": {}, + "isRLSEnabled": false + }, + "public.completions": { + "name": "completions", + "schema": "", + "columns": { + "id": { + "name": "id", + "type": "integer", + "primaryKey": true, + "notNull": true, + "identity": { + "type": "always", + "name": "completions_id_seq", + "schema": "public", + "increment": "1", + "startWith": "1", + "minValue": "1", + "maxValue": "2147483647", + "cache": "1", + "cycle": false + } + }, + "api_key_id": { + "name": "api_key_id", + "type": "integer", + "primaryKey": false, + "notNull": true + }, + "upstream_id": { + "name": "upstream_id", + "type": "integer", + "primaryKey": false, + "notNull": false + }, + "model_id": { + "name": "model_id", + "type": "integer", + "primaryKey": false, + "notNull": false + }, + "model": { + "name": "model", + "type": "varchar", + "primaryKey": false, + "notNull": true + }, + "prompt": { + "name": "prompt", + "type": "jsonb", + "primaryKey": false, + "notNull": true + }, + "prompt_tokens": { + "name": "prompt_tokens", + "type": "integer", + "primaryKey": false, + "notNull": true + }, + "completion": { + "name": "completion", + "type": "jsonb", + "primaryKey": false, + "notNull": true + }, + "completion_tokens": { + "name": "completion_tokens", + "type": "integer", + "primaryKey": false, + "notNull": true + }, + "status": { + "name": "status", + "type": "completions_status", + "typeSchema": "public", + "primaryKey": false, + "notNull": true, + "default": "'pending'" + }, + "ttft": { + "name": "ttft", + "type": "integer", + "primaryKey": false, + "notNull": true + }, + "duration": { + "name": "duration", + "type": "integer", + "primaryKey": false, + "notNull": true + }, + "created_at": { + "name": "created_at", + "type": "timestamp", + "primaryKey": false, + "notNull": true, + "default": "now()" + }, + "updated_at": { + "name": "updated_at", + "type": "timestamp", + "primaryKey": false, + "notNull": true, + "default": "now()" + }, + "deleted": { + "name": "deleted", + "type": "boolean", + "primaryKey": false, + "notNull": true, + "default": false + }, + "rating": { + "name": "rating", + "type": "real", + "primaryKey": false, + "notNull": false + } + }, + "indexes": {}, + "foreignKeys": { + "completions_api_key_id_api_keys_id_fk": { + "name": "completions_api_key_id_api_keys_id_fk", + "tableFrom": "completions", + "tableTo": "api_keys", + "columnsFrom": [ + "api_key_id" + ], + "columnsTo": [ + "id" + ], + "onDelete": "no action", + "onUpdate": "no action" + }, + "completions_upstream_id_upstreams_id_fk": { + "name": "completions_upstream_id_upstreams_id_fk", + "tableFrom": "completions", + "tableTo": "upstreams", + "columnsFrom": [ + "upstream_id" + ], + "columnsTo": [ + "id" + ], + "onDelete": "no action", + "onUpdate": "no action" + } + }, + "compositePrimaryKeys": {}, + "uniqueConstraints": { + "completions_id_unique": { + "name": "completions_id_unique", + "nullsNotDistinct": false, + "columns": [ + "id" + ] + } + }, + "policies": {}, + "checkConstraints": {}, + "isRLSEnabled": false + }, + "public.embeddings": { + "name": "embeddings", + "schema": "", + "columns": { + "id": { + "name": "id", + "type": "integer", + "primaryKey": true, + "notNull": true, + "identity": { + "type": "always", + "name": "embeddings_id_seq", + "schema": "public", + "increment": "1", + "startWith": "1", + "minValue": "1", + "maxValue": "2147483647", + "cache": "1", + "cycle": false + } + }, + "api_key_id": { + "name": "api_key_id", + "type": "integer", + "primaryKey": false, + "notNull": true + }, + "model_id": { + "name": "model_id", + "type": "integer", + "primaryKey": false, + "notNull": false + }, + "model": { + "name": "model", + "type": "varchar", + "primaryKey": false, + "notNull": true + }, + "input": { + "name": "input", + "type": "jsonb", + "primaryKey": false, + "notNull": true + }, + "input_tokens": { + "name": "input_tokens", + "type": "integer", + "primaryKey": false, + "notNull": true + }, + "embedding": { + "name": "embedding", + "type": "jsonb", + "primaryKey": false, + "notNull": true + }, + "dimensions": { + "name": "dimensions", + "type": "integer", + "primaryKey": false, + "notNull": true + }, + "status": { + "name": "status", + "type": "completions_status", + "typeSchema": "public", + "primaryKey": false, + "notNull": true, + "default": "'pending'" + }, + "duration": { + "name": "duration", + "type": "integer", + "primaryKey": false, + "notNull": true + }, + "created_at": { + "name": "created_at", + "type": "timestamp", + "primaryKey": false, + "notNull": true, + "default": "now()" + }, + "updated_at": { + "name": "updated_at", + "type": "timestamp", + "primaryKey": false, + "notNull": true, + "default": "now()" + }, + "deleted": { + "name": "deleted", + "type": "boolean", + "primaryKey": false, + "notNull": true, + "default": false + } + }, + "indexes": {}, + "foreignKeys": { + "embeddings_api_key_id_api_keys_id_fk": { + "name": "embeddings_api_key_id_api_keys_id_fk", + "tableFrom": "embeddings", + "tableTo": "api_keys", + "columnsFrom": [ + "api_key_id" + ], + "columnsTo": [ + "id" + ], + "onDelete": "no action", + "onUpdate": "no action" + }, + "embeddings_model_id_models_id_fk": { + "name": "embeddings_model_id_models_id_fk", + "tableFrom": "embeddings", + "tableTo": "models", + "columnsFrom": [ + "model_id" + ], + "columnsTo": [ + "id" + ], + "onDelete": "no action", + "onUpdate": "no action" + } + }, + "compositePrimaryKeys": {}, + "uniqueConstraints": { + "embeddings_id_unique": { + "name": "embeddings_id_unique", + "nullsNotDistinct": false, + "columns": [ + "id" + ] + } + }, + "policies": {}, + "checkConstraints": {}, + "isRLSEnabled": false + }, + "public.models": { + "name": "models", + "schema": "", + "columns": { + "id": { + "name": "id", + "type": "integer", + "primaryKey": true, + "notNull": true, + "identity": { + "type": "always", + "name": "models_id_seq", + "schema": "public", + "increment": "1", + "startWith": "1", + "minValue": "1", + "maxValue": "2147483647", + "cache": "1", + "cycle": false + } + }, + "provider_id": { + "name": "provider_id", + "type": "integer", + "primaryKey": false, + "notNull": true + }, + "system_name": { + "name": "system_name", + "type": "varchar(63)", + "primaryKey": false, + "notNull": true + }, + "remote_id": { + "name": "remote_id", + "type": "varchar(63)", + "primaryKey": false, + "notNull": false + }, + "model_type": { + "name": "model_type", + "type": "model_type", + "typeSchema": "public", + "primaryKey": false, + "notNull": true, + "default": "'chat'" + }, + "context_length": { + "name": "context_length", + "type": "integer", + "primaryKey": false, + "notNull": false + }, + "input_price": { + "name": "input_price", + "type": "real", + "primaryKey": false, + "notNull": false + }, + "output_price": { + "name": "output_price", + "type": "real", + "primaryKey": false, + "notNull": false + }, + "weight": { + "name": "weight", + "type": "real", + "primaryKey": false, + "notNull": true, + "default": 1 + }, + "comment": { + "name": "comment", + "type": "varchar", + "primaryKey": false, + "notNull": false + }, + "created_at": { + "name": "created_at", + "type": "timestamp", + "primaryKey": false, + "notNull": true, + "default": "now()" + }, + "updated_at": { + "name": "updated_at", + "type": "timestamp", + "primaryKey": false, + "notNull": true, + "default": "now()" + }, + "deleted": { + "name": "deleted", + "type": "boolean", + "primaryKey": false, + "notNull": true, + "default": false + } + }, + "indexes": {}, + "foreignKeys": { + "models_provider_id_providers_id_fk": { + "name": "models_provider_id_providers_id_fk", + "tableFrom": "models", + "tableTo": "providers", + "columnsFrom": [ + "provider_id" + ], + "columnsTo": [ + "id" + ], + "onDelete": "no action", + "onUpdate": "no action" + } + }, + "compositePrimaryKeys": {}, + "uniqueConstraints": { + "models_provider_system_name_unique": { + "name": "models_provider_system_name_unique", + "nullsNotDistinct": false, + "columns": [ + "provider_id", + "system_name" + ] + } + }, + "policies": {}, + "checkConstraints": {}, + "isRLSEnabled": false + }, + "public.providers": { + "name": "providers", + "schema": "", + "columns": { + "id": { + "name": "id", + "type": "integer", + "primaryKey": true, + "notNull": true, + "identity": { + "type": "always", + "name": "providers_id_seq", + "schema": "public", + "increment": "1", + "startWith": "1", + "minValue": "1", + "maxValue": "2147483647", + "cache": "1", + "cycle": false + } + }, + "name": { + "name": "name", + "type": "varchar(63)", + "primaryKey": false, + "notNull": true + }, + "type": { + "name": "type", + "type": "provider_type", + "typeSchema": "public", + "primaryKey": false, + "notNull": true, + "default": "'openai'" + }, + "base_url": { + "name": "base_url", + "type": "varchar(255)", + "primaryKey": false, + "notNull": true + }, + "api_key": { + "name": "api_key", + "type": "varchar(255)", + "primaryKey": false, + "notNull": false + }, + "api_version": { + "name": "api_version", + "type": "varchar(31)", + "primaryKey": false, + "notNull": false + }, + "comment": { + "name": "comment", + "type": "varchar", + "primaryKey": false, + "notNull": false + }, + "created_at": { + "name": "created_at", + "type": "timestamp", + "primaryKey": false, + "notNull": true, + "default": "now()" + }, + "updated_at": { + "name": "updated_at", + "type": "timestamp", + "primaryKey": false, + "notNull": true, + "default": "now()" + }, + "deleted": { + "name": "deleted", + "type": "boolean", + "primaryKey": false, + "notNull": true, + "default": false + } + }, + "indexes": {}, + "foreignKeys": {}, + "compositePrimaryKeys": {}, + "uniqueConstraints": { + "providers_name_unique": { + "name": "providers_name_unique", + "nullsNotDistinct": false, + "columns": [ + "name" + ] + } + }, + "policies": {}, + "checkConstraints": {}, + "isRLSEnabled": false + }, + "public.settings": { + "name": "settings", + "schema": "", + "columns": { + "id": { + "name": "id", + "type": "integer", + "primaryKey": true, + "notNull": true, + "identity": { + "type": "always", + "name": "settings_id_seq", + "schema": "public", + "increment": "1", + "startWith": "1", + "minValue": "1", + "maxValue": "2147483647", + "cache": "1", + "cycle": false + } + }, + "key": { + "name": "key", + "type": "varchar(63)", + "primaryKey": false, + "notNull": true + }, + "value": { + "name": "value", + "type": "jsonb", + "primaryKey": false, + "notNull": true + }, + "updated_at": { + "name": "updated_at", + "type": "timestamp", + "primaryKey": false, + "notNull": true, + "default": "now()" + } + }, + "indexes": {}, + "foreignKeys": {}, + "compositePrimaryKeys": {}, + "uniqueConstraints": { + "settings_key_unique": { + "name": "settings_key_unique", + "nullsNotDistinct": false, + "columns": [ + "key" + ] + } + }, + "policies": {}, + "checkConstraints": {}, + "isRLSEnabled": false + }, + "public.srv_logs": { + "name": "srv_logs", + "schema": "", + "columns": { + "id": { + "name": "id", + "type": "integer", + "primaryKey": true, + "notNull": true, + "identity": { + "type": "always", + "name": "srv_logs_id_seq", + "schema": "public", + "increment": "1", + "startWith": "1", + "minValue": "1", + "maxValue": "2147483647", + "cache": "1", + "cycle": false + } + }, + "related_api_key_id": { + "name": "related_api_key_id", + "type": "integer", + "primaryKey": false, + "notNull": false + }, + "related_upstream_id": { + "name": "related_upstream_id", + "type": "integer", + "primaryKey": false, + "notNull": false + }, + "related_completion_id": { + "name": "related_completion_id", + "type": "integer", + "primaryKey": false, + "notNull": false + }, + "message": { + "name": "message", + "type": "varchar", + "primaryKey": false, + "notNull": true + }, + "level": { + "name": "level", + "type": "srv_logs_level", + "typeSchema": "public", + "primaryKey": false, + "notNull": true + }, + "details": { + "name": "details", + "type": "jsonb", + "primaryKey": false, + "notNull": false + }, + "created_at": { + "name": "created_at", + "type": "timestamp", + "primaryKey": false, + "notNull": true, + "default": "now()" + }, + "acknowledged": { + "name": "acknowledged", + "type": "boolean", + "primaryKey": false, + "notNull": true, + "default": false + }, + "ack_at": { + "name": "ack_at", + "type": "timestamp", + "primaryKey": false, + "notNull": false + } + }, + "indexes": {}, + "foreignKeys": { + "srv_logs_related_api_key_id_api_keys_id_fk": { + "name": "srv_logs_related_api_key_id_api_keys_id_fk", + "tableFrom": "srv_logs", + "tableTo": "api_keys", + "columnsFrom": [ + "related_api_key_id" + ], + "columnsTo": [ + "id" + ], + "onDelete": "no action", + "onUpdate": "no action" + }, + "srv_logs_related_upstream_id_upstreams_id_fk": { + "name": "srv_logs_related_upstream_id_upstreams_id_fk", + "tableFrom": "srv_logs", + "tableTo": "upstreams", + "columnsFrom": [ + "related_upstream_id" + ], + "columnsTo": [ + "id" + ], + "onDelete": "no action", + "onUpdate": "no action" + }, + "srv_logs_related_completion_id_completions_id_fk": { + "name": "srv_logs_related_completion_id_completions_id_fk", + "tableFrom": "srv_logs", + "tableTo": "completions", + "columnsFrom": [ + "related_completion_id" + ], + "columnsTo": [ + "id" + ], + "onDelete": "no action", + "onUpdate": "no action" + } + }, + "compositePrimaryKeys": {}, + "uniqueConstraints": { + "srv_logs_id_unique": { + "name": "srv_logs_id_unique", + "nullsNotDistinct": false, + "columns": [ + "id" + ] + } + }, + "policies": {}, + "checkConstraints": {}, + "isRLSEnabled": false + }, + "public.upstreams": { + "name": "upstreams", + "schema": "", + "columns": { + "id": { + "name": "id", + "type": "integer", + "primaryKey": true, + "notNull": true, + "identity": { + "type": "always", + "name": "upstreams_id_seq", + "schema": "public", + "increment": "1", + "startWith": "1", + "minValue": "1", + "maxValue": "2147483647", + "cache": "1", + "cycle": false + } + }, + "name": { + "name": "name", + "type": "varchar(63)", + "primaryKey": false, + "notNull": true + }, + "url": { + "name": "url", + "type": "varchar(255)", + "primaryKey": false, + "notNull": true + }, + "model": { + "name": "model", + "type": "varchar(63)", + "primaryKey": false, + "notNull": true + }, + "upstream_model": { + "name": "upstream_model", + "type": "varchar(63)", + "primaryKey": false, + "notNull": false + }, + "api_key": { + "name": "api_key", + "type": "varchar(255)", + "primaryKey": false, + "notNull": false + }, + "weight": { + "name": "weight", + "type": "real", + "primaryKey": false, + "notNull": true, + "default": 1 + }, + "comment": { + "name": "comment", + "type": "varchar", + "primaryKey": false, + "notNull": false + }, + "created_at": { + "name": "created_at", + "type": "timestamp", + "primaryKey": false, + "notNull": true, + "default": "now()" + }, + "updated_at": { + "name": "updated_at", + "type": "timestamp", + "primaryKey": false, + "notNull": true, + "default": "now()" + }, + "deleted": { + "name": "deleted", + "type": "boolean", + "primaryKey": false, + "notNull": true, + "default": false + } + }, + "indexes": {}, + "foreignKeys": {}, + "compositePrimaryKeys": {}, + "uniqueConstraints": {}, + "policies": {}, + "checkConstraints": {}, + "isRLSEnabled": false + } + }, + "enums": { + "public.api_key_source": { + "name": "api_key_source", + "schema": "public", + "values": [ + "manual", + "operator", + "init" + ] + }, + "public.completions_status": { + "name": "completions_status", + "schema": "public", + "values": [ + "pending", + "completed", + "failed", + "aborted" + ] + }, + "public.model_type": { + "name": "model_type", + "schema": "public", + "values": [ + "chat", + "embedding" + ] + }, + "public.provider_type": { + "name": "provider_type", + "schema": "public", + "values": [ + "openai", + "openai-responses", + "anthropic", + "azure", + "ollama" + ] + }, + "public.srv_logs_level": { + "name": "srv_logs_level", + "schema": "public", + "values": [ + "unspecific", + "info", + "warn", + "error" + ] + } + }, + "schemas": {}, + "sequences": {}, + "roles": {}, + "policies": {}, + "views": {}, + "_meta": { + "columns": {}, + "schemas": {}, + "tables": {} + } +} \ No newline at end of file diff --git a/backend/drizzle/meta/_journal.json b/backend/drizzle/meta/_journal.json index e31209d..9f2d7e8 100644 --- a/backend/drizzle/meta/_journal.json +++ b/backend/drizzle/meta/_journal.json @@ -71,6 +71,13 @@ "when": 1768653472259, "tag": "0009_api_key_external_id", "breakpoints": true + }, + { + "idx": 10, + "version": "7", + "when": 1769256296714, + "tag": "0010_noisy_deathbird", + "breakpoints": true } ] } \ No newline at end of file diff --git a/backend/src/api/v1/completions.ts b/backend/src/api/v1/completions.ts index 068829e..2ea8d2e 100644 --- a/backend/src/api/v1/completions.ts +++ b/backend/src/api/v1/completions.ts @@ -18,7 +18,6 @@ import { rateLimitPlugin } from "@/plugins/rateLimitPlugin"; import type { CompletionsMessageType, ToolDefinitionType, - ToolCallType, ToolChoiceType, } from "@/db/schema"; import { @@ -31,6 +30,7 @@ import { PROVIDER_HEADER, } from "@/utils/api-helpers"; import { addCompletions, type Completion } from "@/utils/completions"; +import { StreamingContext } from "@/utils/streaming-context"; import { executeWithFailover, selectMultipleCandidates, @@ -162,6 +162,7 @@ function buildCompletionRecord( /** * Process a successful non-streaming response + * Ensures completion is saved to database before returning */ async function processNonStreamingResponse( resp: Response, @@ -170,6 +171,7 @@ async function processNonStreamingResponse( providerType: string, apiKeyRecord: ApiKey | null, begin: number, + signal?: AbortSignal, ): Promise { // Parse response using upstream adapter const upstreamAdapter = getUpstreamAdapter(providerType); @@ -182,7 +184,6 @@ async function processNonStreamingResponse( // Update completion record completion.promptTokens = internalResponse.usage.inputTokens; completion.completionTokens = internalResponse.usage.outputTokens; - completion.status = "completed"; completion.ttft = Date.now() - begin; completion.duration = Date.now() - begin; @@ -195,9 +196,23 @@ async function processNonStreamingResponse( tool_calls: toolCalls, }, ]; - addCompletions(completion, bearer).catch(() => { - logger.error("Failed to log completion after non-streaming"); - }); + + // Check if client disconnected during processing + if (signal?.aborted) { + completion.status = "aborted"; + await addCompletions(completion, bearer, { + level: "info", + message: "Client disconnected during non-streaming response", + details: { + type: "completionError", + data: { type: "aborted", msg: "Client disconnected" }, + }, + }); + } else { + completion.status = "completed"; + // Use await to ensure database write completes before returning + await addCompletions(completion, bearer); + } // Consume tokens for TPM rate limiting (post-flight) // Only consume if token counts are valid (not -1 which indicates parsing failure) @@ -211,6 +226,7 @@ async function processNonStreamingResponse( /** * Process a successful streaming response + * Uses StreamingContext to ensure completion is saved even on client disconnect * @yields string chunks in OpenAI format */ async function* processStreamingResponse( @@ -220,6 +236,7 @@ async function* processStreamingResponse( providerType: string, apiKeyRecord: ApiKey | null, begin: number, + signal?: AbortSignal, ): AsyncGenerator { // Get adapters const upstreamAdapter = getUpstreamAdapter(providerType); @@ -227,38 +244,29 @@ async function* processStreamingResponse( logger.debug("parse stream completions response"); - let ttft = -1; - let isFirstChunk = true; - const textParts: string[] = []; - const thinkingParts: string[] = []; - let inputTokens = -1; - let outputTokens = -1; - - // Track tool calls during streaming - // Use Map with tool call ID as key to avoid index collision issues - const streamToolCalls: Map = new Map(); - const toolCallArguments: Map = new Map(); - // Track index-to-id mapping for chunks that only provide index - const indexToIdMap: Map = new Map(); - let nextToolCallIndex = 0; + // Create streaming context with abort handling + const ctx = new StreamingContext(completion, bearer, apiKeyRecord, begin, signal); try { const chunks = upstreamAdapter.parseStreamResponse(resp); for await (const chunk of chunks) { - if (isFirstChunk) { - isFirstChunk = false; - ttft = Date.now() - begin; + // Check if client has disconnected + if (ctx.isAborted()) { + logger.debug("Client aborted, stopping stream processing"); + break; } + ctx.recordTTFT(); + // Collect content for completion record if (chunk.type === "content_block_start") { // Track new tool call block if (chunk.contentBlock?.type === "tool_use") { const toolId = chunk.contentBlock.id; - const index = chunk.index ?? nextToolCallIndex++; - indexToIdMap.set(index, toolId); - streamToolCalls.set(toolId, { + const index = chunk.index ?? ctx.nextToolCallIndex++; + ctx.indexToIdMap.set(index, toolId); + ctx.streamToolCalls.set(toolId, { id: toolId, type: "function", function: { @@ -266,23 +274,23 @@ async function* processStreamingResponse( arguments: "", }, }); - toolCallArguments.set(toolId, []); + ctx.toolCallArguments.set(toolId, []); } } else if (chunk.type === "content_block_delta") { if (chunk.delta?.type === "text_delta" && chunk.delta.text) { - textParts.push(chunk.delta.text); + ctx.textParts.push(chunk.delta.text); } else if ( chunk.delta?.type === "thinking_delta" && chunk.delta.thinking ) { - thinkingParts.push(chunk.delta.thinking); + ctx.thinkingParts.push(chunk.delta.thinking); } else if (chunk.delta?.type === "input_json_delta" && chunk.delta.partialJson) { // Collect tool call arguments - lookup by index to get tool ID // Skip if index is missing to avoid data corruption if (chunk.index !== undefined) { - const toolId = indexToIdMap.get(chunk.index); + const toolId = ctx.indexToIdMap.get(chunk.index); if (toolId) { - const args = toolCallArguments.get(toolId); + const args = ctx.toolCallArguments.get(toolId); if (args) { args.push(chunk.delta.partialJson); } @@ -295,10 +303,10 @@ async function* processStreamingResponse( // Finalize tool call arguments - lookup by index to get tool ID // Skip if index is missing to avoid data corruption if (chunk.index !== undefined) { - const toolId = indexToIdMap.get(chunk.index); + const toolId = ctx.indexToIdMap.get(chunk.index); if (toolId) { - const toolCall = streamToolCalls.get(toolId); - const args = toolCallArguments.get(toolId); + const toolCall = ctx.streamToolCalls.get(toolId); + const args = ctx.toolCallArguments.get(toolId); if (toolCall && args) { toolCall.function.arguments = args.join(""); } @@ -308,8 +316,8 @@ async function* processStreamingResponse( // Collect usage info if (chunk.usage) { - inputTokens = chunk.usage.inputTokens; - outputTokens = chunk.usage.outputTokens; + ctx.inputTokens = chunk.usage.inputTokens; + ctx.outputTokens = chunk.usage.outputTokens; } // Convert to OpenAI format and yield @@ -324,58 +332,27 @@ async function* processStreamingResponse( } } - // Collect final tool calls - const finalToolCalls: ToolCallType[] | undefined = - streamToolCalls.size > 0 - ? Array.from(streamToolCalls.values()) - : undefined; - - // Finalize completion record - const contentText = - (thinkingParts.length > 0 - ? `${thinkingParts.join("")}\n` - : "") + textParts.join(""); - - completion.completion = [ - { - role: "assistant", - content: contentText || null, - tool_calls: finalToolCalls, - }, - ]; - completion.promptTokens = inputTokens; - completion.completionTokens = outputTokens; - completion.status = "completed"; - completion.ttft = ttft; - completion.duration = Date.now() - begin; - addCompletions(completion, bearer).catch((error: unknown) => { - logger.error("Failed to log completion after streaming", error); - }); - - // Consume tokens for TPM rate limiting (post-flight) - if (apiKeyRecord && inputTokens > 0 && outputTokens > 0) { - const totalTokens = inputTokens + outputTokens; - await consumeTokens(apiKeyRecord.id, apiKeyRecord.tpmLimit, totalTokens); - } - // Handle case where no chunks were received - if (isFirstChunk) { + if (ctx.isFirstChunk && !ctx.isAborted()) { throw new Error("No chunk received from upstream"); } + + // Save completed completion (if not already saved by abort handler) + if (!ctx.isSaved()) { + await ctx.saveCompletion("completed"); + } } catch (error) { logger.error("Stream processing error", error); - completion.status = "failed"; - addCompletions(completion, bearer, { - level: "error", - message: `Stream processing error: ${String(error)}`, - details: { - type: "completionError", - data: { type: "streamError", msg: String(error) }, - }, - }).catch(() => { - logger.error("Failed to log completion after stream processing error"); - }); + + // Save failed completion (if not already saved by abort handler) + if (!ctx.isSaved()) { + await ctx.saveCompletion("failed", String(error)); + } + throw error; + } finally { + // Clean up abort handler + ctx.cleanup(); } } @@ -563,11 +540,15 @@ export const completionsApi = new Elysia({ providerType, apiKeyRecord ?? null, begin, + request.signal, ); } catch (error) { - logger.error("Stream processing error", error); - set.status = 500; - yield JSON.stringify({ error: "Stream processing error" }); + // Don't log error if it's due to client abort + if (!request.signal.aborted) { + logger.error("Stream processing error", error); + set.status = 500; + yield JSON.stringify({ error: "Stream processing error" }); + } } } else { // Non-streaming request with failover @@ -632,23 +613,25 @@ export const completionsApi = new Elysia({ providerType, apiKeyRecord ?? null, begin, + request.signal, ); yield response; } catch (error) { - logger.error("Failed to process response", error); - completion.status = "failed"; - addCompletions(completion, bearer, { - level: "error", - message: `Response processing error: ${String(error)}`, - details: { - type: "completionError", - data: { type: "processingError", msg: String(error) }, - }, - }).catch(() => { - logger.error("Failed to log completion after processing error"); - }); - set.status = 500; - yield JSON.stringify({ error: "Failed to process response" }); + // Don't log error if it's due to client abort + if (!request.signal.aborted) { + logger.error("Failed to process response", error); + completion.status = "failed"; + await addCompletions(completion, bearer, { + level: "error", + message: `Response processing error: ${String(error)}`, + details: { + type: "completionError", + data: { type: "processingError", msg: String(error) }, + }, + }); + set.status = 500; + yield JSON.stringify({ error: "Failed to process response" }); + } } } }, diff --git a/backend/src/api/v1/messages.ts b/backend/src/api/v1/messages.ts index 9b93e66..33c0444 100644 --- a/backend/src/api/v1/messages.ts +++ b/backend/src/api/v1/messages.ts @@ -24,6 +24,7 @@ import { PROVIDER_HEADER, } from "@/utils/api-helpers"; import { addCompletions, type Completion } from "@/utils/completions"; +import { StreamingContext } from "@/utils/streaming-context"; import { executeWithFailover, selectMultipleCandidates, @@ -153,6 +154,7 @@ function buildCompletionRecord( /** * Process a successful non-streaming message response + * Ensures completion is saved to database before returning */ async function processNonStreamingResponse( resp: Response, @@ -161,6 +163,7 @@ async function processNonStreamingResponse( providerType: string, apiKeyRecord: ApiKey | null, begin: number, + signal?: AbortSignal, ): Promise { // Parse response using upstream adapter const upstreamAdapter = getUpstreamAdapter(providerType); @@ -173,7 +176,6 @@ async function processNonStreamingResponse( // Update completion record completion.promptTokens = internalResponse.usage.inputTokens; completion.completionTokens = internalResponse.usage.outputTokens; - completion.status = "completed"; completion.ttft = Date.now() - begin; completion.duration = Date.now() - begin; completion.completion = [ @@ -182,9 +184,22 @@ async function processNonStreamingResponse( content: extractContentText(internalResponse), }, ]; - addCompletions(completion, bearer).catch(() => { - logger.error("Failed to log completion after non-streaming"); - }); + + // Check if client disconnected during processing + if (signal?.aborted) { + completion.status = "aborted"; + await addCompletions(completion, bearer, { + level: "info", + message: "Client disconnected during non-streaming response", + details: { + type: "completionError", + data: { type: "aborted", msg: "Client disconnected" }, + }, + }); + } else { + completion.status = "completed"; + await addCompletions(completion, bearer); + } // Consume tokens for TPM rate limiting (post-flight) // Only consume if token counts are valid (not -1 which indicates parsing failure) @@ -198,6 +213,7 @@ async function processNonStreamingResponse( /** * Process a successful streaming message response + * Uses StreamingContext to ensure completion is saved even on client disconnect * @yields string - SSE formatted string chunks */ async function* processStreamingResponse( @@ -207,6 +223,7 @@ async function* processStreamingResponse( providerType: string, apiKeyRecord: ApiKey | null, begin: number, + signal?: AbortSignal, ): AsyncGenerator { // Get adapters const upstreamAdapter = getUpstreamAdapter(providerType); @@ -214,38 +231,37 @@ async function* processStreamingResponse( logger.debug("parse stream messages response"); - let ttft = -1; - let isFirstChunk = true; - const textParts: string[] = []; - const thinkingParts: string[] = []; - let inputTokens = -1; - let outputTokens = -1; + // Create streaming context with abort handling + const ctx = new StreamingContext(completion, bearer, apiKeyRecord, begin, signal); try { const chunks = upstreamAdapter.parseStreamResponse(resp); for await (const chunk of chunks) { - if (isFirstChunk) { - isFirstChunk = false; - ttft = Date.now() - begin; + // Check if client has disconnected + if (ctx.isAborted()) { + logger.debug("Client aborted, stopping stream processing"); + break; } + ctx.recordTTFT(); + // Collect content for completion record if (chunk.type === "content_block_delta") { if (chunk.delta?.type === "text_delta" && chunk.delta.text) { - textParts.push(chunk.delta.text); + ctx.textParts.push(chunk.delta.text); } else if ( chunk.delta?.type === "thinking_delta" && chunk.delta.thinking ) { - thinkingParts.push(chunk.delta.thinking); + ctx.thinkingParts.push(chunk.delta.thinking); } } // Collect usage info if (chunk.usage) { - inputTokens = chunk.usage.inputTokens; - outputTokens = chunk.usage.outputTokens; + ctx.inputTokens = chunk.usage.inputTokens; + ctx.outputTokens = chunk.usage.outputTokens; } // Convert to Anthropic format and yield @@ -255,49 +271,27 @@ async function* processStreamingResponse( } } - // Finalize completion record - completion.completion = [ - { - role: undefined, - content: - (thinkingParts.length > 0 - ? `${thinkingParts.join("")}\n` - : "") + textParts.join(""), - }, - ]; - completion.promptTokens = inputTokens; - completion.completionTokens = outputTokens; - completion.status = "completed"; - completion.ttft = ttft; - completion.duration = Date.now() - begin; - addCompletions(completion, bearer).catch(() => { - logger.error("Failed to log completion after streaming"); - }); - - // Consume tokens for TPM rate limiting (post-flight) - if (apiKeyRecord && inputTokens > 0 && outputTokens > 0) { - const totalTokens = inputTokens + outputTokens; - await consumeTokens(apiKeyRecord.id, apiKeyRecord.tpmLimit, totalTokens); - } - // Handle case where no chunks were received - if (isFirstChunk) { + if (ctx.isFirstChunk && !ctx.isAborted()) { throw new Error("No chunk received from upstream"); } + + // Save completed completion (if not already saved by abort handler) + if (!ctx.isSaved()) { + await ctx.saveCompletion("completed"); + } } catch (error) { logger.error("Stream processing error", error); - completion.status = "failed"; - addCompletions(completion, bearer, { - level: "error", - message: `Stream processing error: ${String(error)}`, - details: { - type: "completionError", - data: { type: "streamError", msg: String(error) }, - }, - }).catch(() => { - logger.error("Failed to log completion after stream error"); - }); + + // Save failed completion (if not already saved by abort handler) + if (!ctx.isSaved()) { + await ctx.saveCompletion("failed", String(error)); + } + throw error; + } finally { + // Clean up abort handler + ctx.cleanup(); } } @@ -481,11 +475,15 @@ export const messagesApi = new Elysia({ providerType, apiKeyRecord ?? null, begin, + request.signal, ); } catch (error) { - logger.error("Stream processing error", error); - set.status = 500; - yield `event: error\ndata: ${JSON.stringify({ type: "error", error: { type: "server_error", message: "Stream processing error" } })}\n\n`; + // Don't log error if it's due to client abort + if (!request.signal.aborted) { + logger.error("Stream processing error", error); + set.status = 500; + yield `event: error\ndata: ${JSON.stringify({ type: "error", error: { type: "server_error", message: "Stream processing error" } })}\n\n`; + } } } else { // Non-streaming request with failover @@ -549,26 +547,28 @@ export const messagesApi = new Elysia({ providerType, apiKeyRecord ?? null, begin, + request.signal, ); yield response; } catch (error) { - logger.error("Failed to process response", error); - completion.status = "failed"; - addCompletions(completion, bearer, { - level: "error", - message: `Response processing error: ${String(error)}`, - details: { - type: "completionError", - data: { type: "processingError", msg: String(error) }, - }, - }).catch(() => { - logger.error("Failed to log completion after processing error"); - }); - set.status = 500; - yield JSON.stringify({ - type: "error", - error: { type: "api_error", message: "Failed to process response" }, - }); + // Don't log error if it's due to client abort + if (!request.signal.aborted) { + logger.error("Failed to process response", error); + completion.status = "failed"; + await addCompletions(completion, bearer, { + level: "error", + message: `Response processing error: ${String(error)}`, + details: { + type: "completionError", + data: { type: "processingError", msg: String(error) }, + }, + }); + set.status = 500; + yield JSON.stringify({ + type: "error", + error: { type: "api_error", message: "Failed to process response" }, + }); + } } } }, diff --git a/backend/src/api/v1/responses.ts b/backend/src/api/v1/responses.ts index 224e060..ef9b485 100644 --- a/backend/src/api/v1/responses.ts +++ b/backend/src/api/v1/responses.ts @@ -24,6 +24,7 @@ import { PROVIDER_HEADER, } from "@/utils/api-helpers"; import { addCompletions, type Completion } from "@/utils/completions"; +import { StreamingContext } from "@/utils/streaming-context"; import { executeWithFailover, selectMultipleCandidates, @@ -164,6 +165,7 @@ function buildCompletionRecord( /** * Process a successful non-streaming response + * Ensures completion is saved to database before returning */ async function processNonStreamingResponse( resp: Response, @@ -172,6 +174,7 @@ async function processNonStreamingResponse( providerType: string, apiKeyRecord: ApiKey | null, begin: number, + signal?: AbortSignal, ): Promise { // Parse response using upstream adapter const upstreamAdapter = getUpstreamAdapter(providerType); @@ -184,7 +187,6 @@ async function processNonStreamingResponse( // Update completion record completion.promptTokens = internalResponse.usage.inputTokens; completion.completionTokens = internalResponse.usage.outputTokens; - completion.status = "completed"; completion.ttft = Date.now() - begin; completion.duration = Date.now() - begin; completion.completion = [ @@ -193,9 +195,22 @@ async function processNonStreamingResponse( content: extractContentText(internalResponse), }, ]; - addCompletions(completion, bearer).catch(() => { - logger.error("Failed to log completion after non-streaming"); - }); + + // Check if client disconnected during processing + if (signal?.aborted) { + completion.status = "aborted"; + await addCompletions(completion, bearer, { + level: "info", + message: "Client disconnected during non-streaming response", + details: { + type: "completionError", + data: { type: "aborted", msg: "Client disconnected" }, + }, + }); + } else { + completion.status = "completed"; + await addCompletions(completion, bearer); + } // Consume tokens for TPM rate limiting (post-flight) // Only consume if token counts are valid (not -1 which indicates parsing failure) @@ -209,6 +224,7 @@ async function processNonStreamingResponse( /** * Process a successful streaming response + * Uses StreamingContext to ensure completion is saved even on client disconnect * @yields SSE formatted strings */ async function* processStreamingResponse( @@ -218,6 +234,7 @@ async function* processStreamingResponse( providerType: string, apiKeyRecord: ApiKey | null, begin: number, + signal?: AbortSignal, ): AsyncGenerator { // Get adapters const upstreamAdapter = getUpstreamAdapter(providerType); @@ -225,38 +242,37 @@ async function* processStreamingResponse( logger.debug("parse stream responses"); - let ttft = -1; - let isFirstChunk = true; - const textParts: string[] = []; - const thinkingParts: string[] = []; - let inputTokens = -1; - let outputTokens = -1; + // Create streaming context with abort handling + const ctx = new StreamingContext(completion, bearer, apiKeyRecord, begin, signal); try { const chunks = upstreamAdapter.parseStreamResponse(resp); for await (const chunk of chunks) { - if (isFirstChunk) { - isFirstChunk = false; - ttft = Date.now() - begin; + // Check if client has disconnected + if (ctx.isAborted()) { + logger.debug("Client aborted, stopping stream processing"); + break; } + ctx.recordTTFT(); + // Collect content for completion record if (chunk.type === "content_block_delta") { if (chunk.delta?.type === "text_delta" && chunk.delta.text) { - textParts.push(chunk.delta.text); + ctx.textParts.push(chunk.delta.text); } else if ( chunk.delta?.type === "thinking_delta" && chunk.delta.thinking ) { - thinkingParts.push(chunk.delta.thinking); + ctx.thinkingParts.push(chunk.delta.thinking); } } // Collect usage info if (chunk.usage) { - inputTokens = chunk.usage.inputTokens; - outputTokens = chunk.usage.outputTokens; + ctx.inputTokens = chunk.usage.inputTokens; + ctx.outputTokens = chunk.usage.outputTokens; } // Convert to Response API format and yield @@ -271,49 +287,27 @@ async function* processStreamingResponse( } } - // Finalize completion record - completion.completion = [ - { - role: undefined, - content: - (thinkingParts.length > 0 - ? `${thinkingParts.join("")}\n` - : "") + textParts.join(""), - }, - ]; - completion.promptTokens = inputTokens; - completion.completionTokens = outputTokens; - completion.status = "completed"; - completion.ttft = ttft; - completion.duration = Date.now() - begin; - addCompletions(completion, bearer).catch(() => { - logger.error("Failed to log completion after streaming"); - }); - - // Consume tokens for TPM rate limiting (post-flight) - if (apiKeyRecord && inputTokens > 0 && outputTokens > 0) { - const totalTokens = inputTokens + outputTokens; - await consumeTokens(apiKeyRecord.id, apiKeyRecord.tpmLimit, totalTokens); - } - // Handle case where no chunks were received - if (isFirstChunk) { + if (ctx.isFirstChunk && !ctx.isAborted()) { throw new Error("No chunk received from upstream"); } + + // Save completed completion (if not already saved by abort handler) + if (!ctx.isSaved()) { + await ctx.saveCompletion("completed"); + } } catch (error) { logger.error("Stream processing error", error); - completion.status = "failed"; - addCompletions(completion, bearer, { - level: "error", - message: `Stream processing error: ${String(error)}`, - details: { - type: "completionError", - data: { type: "streamError", msg: String(error) }, - }, - }).catch(() => { - logger.error("Failed to log completion error after stream failure"); - }); + + // Save failed completion (if not already saved by abort handler) + if (!ctx.isSaved()) { + await ctx.saveCompletion("failed", String(error)); + } + throw error; + } finally { + // Clean up abort handler + ctx.cleanup(); } } @@ -497,11 +491,15 @@ export const responsesApi = new Elysia({ providerType, apiKeyRecord ?? null, begin, + request.signal, ); } catch (error) { - logger.error("Stream processing error", error); - set.status = 500; - yield `event: error\ndata: ${JSON.stringify({ type: "error", error: { code: "internal_error", message: "Stream processing error", param: null, help_url: null } })}\n\n`; + // Don't log error if it's due to client abort + if (!request.signal.aborted) { + logger.error("Stream processing error", error); + set.status = 500; + yield `event: error\ndata: ${JSON.stringify({ type: "error", error: { code: "internal_error", message: "Stream processing error", param: null, help_url: null } })}\n\n`; + } } } else { // Non-streaming request with failover @@ -565,26 +563,28 @@ export const responsesApi = new Elysia({ providerType, apiKeyRecord ?? null, begin, + request.signal, ); yield response; } catch (error) { - logger.error("Failed to process response", error); - completion.status = "failed"; - addCompletions(completion, bearer, { - level: "error", - message: `Response processing error: ${String(error)}`, - details: { - type: "completionError", - data: { type: "processingError", msg: String(error) }, - }, - }).catch(() => { - logger.error("Failed to log completion after processing error"); - }); - set.status = 500; - yield JSON.stringify({ - object: "error", - error: { type: "server_error", message: "Failed to process response" }, - }); + // Don't log error if it's due to client abort + if (!request.signal.aborted) { + logger.error("Failed to process response", error); + completion.status = "failed"; + await addCompletions(completion, bearer, { + level: "error", + message: `Response processing error: ${String(error)}`, + details: { + type: "completionError", + data: { type: "processingError", msg: String(error) }, + }, + }); + set.status = 500; + yield JSON.stringify({ + object: "error", + error: { type: "server_error", message: "Failed to process response" }, + }); + } } } }, diff --git a/backend/src/db/schema.ts b/backend/src/db/schema.ts index b0a8cf5..5ea0561 100644 --- a/backend/src/db/schema.ts +++ b/backend/src/db/schema.ts @@ -150,8 +150,9 @@ export const CompletionsStatusEnum = pgEnum("completions_status", [ "pending", "completed", "failed", + "aborted", ]); -export type CompletionsStatusEnumType = "pending" | "completed" | "failed"; +export type CompletionsStatusEnumType = "pending" | "completed" | "failed" | "aborted"; export const CompletionsTable = pgTable("completions", { id: integer("id").primaryKey().generatedAlwaysAsIdentity().unique(), diff --git a/backend/src/utils/streaming-context.ts b/backend/src/utils/streaming-context.ts new file mode 100644 index 0000000..f7862aa --- /dev/null +++ b/backend/src/utils/streaming-context.ts @@ -0,0 +1,161 @@ +/** + * Streaming Context for Abort Handling + * Ensures completion records are saved even when client disconnects + */ + +import { consola } from "consola"; +import type { + CompletionsStatusEnumType, + ToolCallType, +} from "@/db/schema"; +import { addCompletions, type Completion } from "@/utils/completions"; +import { consumeTokens } from "@/plugins/apiKeyRateLimitPlugin"; +import type { ApiKey } from "@/plugins/apiKeyPlugin"; + +const logger = consola.withTag("streamingContext"); + +/** + * StreamingContext manages the state of a streaming response. + * It ensures completion records are saved even when client disconnects. + */ +export class StreamingContext { + private completion: Completion; + private bearer: string; + private apiKeyRecord: ApiKey | null; + private begin: number; + private saved = false; + private abortHandler: (() => void) | null = null; + private signal?: AbortSignal; + + // Accumulated data during streaming + textParts: string[] = []; + thinkingParts: string[] = []; + inputTokens = -1; + outputTokens = -1; + ttft = -1; + streamToolCalls: Map = new Map(); + toolCallArguments: Map = new Map(); + indexToIdMap: Map = new Map(); + nextToolCallIndex = 0; + isFirstChunk = true; + + constructor( + completion: Completion, + bearer: string, + apiKeyRecord: ApiKey | null, + begin: number, + signal?: AbortSignal, + ) { + this.completion = completion; + this.bearer = bearer; + this.apiKeyRecord = apiKeyRecord; + this.begin = begin; + this.signal = signal; + + // Register abort handler to save completion when client disconnects + if (signal) { + this.abortHandler = () => { + logger.info("Client disconnected, saving partial completion"); + this.saveCompletion("aborted").catch((saveError: unknown) => { + const errorMessage = saveError instanceof Error ? saveError.message : String(saveError); + logger.error("Failed to save aborted completion", errorMessage); + }); + }; + signal.addEventListener("abort", this.abortHandler); + } + } + + /** + * Record time to first token + */ + recordTTFT(): void { + if (this.isFirstChunk) { + this.isFirstChunk = false; + this.ttft = Date.now() - this.begin; + } + } + + /** + * Check if client has aborted + */ + isAborted(): boolean { + return this.signal?.aborted ?? false; + } + + /** + * Save the completion record to database + */ + async saveCompletion( + status: CompletionsStatusEnumType, + error?: string, + ): Promise { + // Prevent double-save + if (this.saved) { + return; + } + this.saved = true; + + // Collect final tool calls + const finalToolCalls: ToolCallType[] | undefined = + this.streamToolCalls.size > 0 + ? Array.from(this.streamToolCalls.values()) + : undefined; + + // Build content text + const contentText = + (this.thinkingParts.length > 0 + ? `${this.thinkingParts.join("")}\n` + : "") + this.textParts.join(""); + + // Update completion record + this.completion.completion = [ + { + role: "assistant", + content: contentText || null, + tool_calls: finalToolCalls, + }, + ]; + this.completion.promptTokens = this.inputTokens; + this.completion.completionTokens = this.outputTokens; + this.completion.status = status; + this.completion.ttft = this.ttft; + this.completion.duration = Date.now() - this.begin; + + // Save to database + if (error) { + await addCompletions(this.completion, this.bearer, { + level: status === "aborted" ? "info" : "error", + message: `Stream ${status}: ${error}`, + details: { + type: "completionError", + data: { type: status, msg: error }, + }, + }); + } else { + await addCompletions(this.completion, this.bearer); + } + + // Consume tokens for TPM rate limiting (only if we have valid counts) + if (this.apiKeyRecord && this.inputTokens > 0 && this.outputTokens > 0) { + const totalTokens = this.inputTokens + this.outputTokens; + await consumeTokens(this.apiKeyRecord.id, this.apiKeyRecord.tpmLimit, totalTokens); + } + } + + /** + * Check if completion has already been saved + */ + isSaved(): boolean { + return this.saved; + } + + /** + * Clean up resources + */ + cleanup(): void { + if (this.signal && this.abortHandler) { + this.signal.removeEventListener("abort", this.abortHandler); + this.abortHandler = null; + } + } +} diff --git a/frontend/src/i18n/locales/en-US.json b/frontend/src/i18n/locales/en-US.json index d05ec11..8611ccc 100644 --- a/frontend/src/i18n/locales/en-US.json +++ b/frontend/src/i18n/locales/en-US.json @@ -93,6 +93,7 @@ "pages.requests.columns.Pending": "Pending", "pages.requests.columns.Completed": "Completed", "pages.requests.columns.Failed": "Failed", + "pages.requests.columns.Aborted": "Aborted", "pages.requests.columns.Model": "Model", "pages.requests.columns.TTFT": "TTFT", "pages.requests.columns.TimeToFirstToken": "Time To First Token", @@ -118,6 +119,7 @@ "pages.requests.detail-panel.header.Pending": "Pending", "pages.requests.detail-panel.header.Completed": "Completed", "pages.requests.detail-panel.header.Failed": "Failed", + "pages.requests.detail-panel.header.Aborted": "Aborted", "pages.requests.detail-panel.header.ClosePanel": "Close panel", "pages.requests.detail-panel.index.Close": "Close", "pages.requests.detail-panel.index.Retry": "Retry", @@ -190,6 +192,7 @@ "pages.embeddings.columns.Pending": "Pending", "pages.embeddings.columns.Completed": "Completed", "pages.embeddings.columns.Failed": "Failed", + "pages.embeddings.columns.Aborted": "Aborted", "pages.embeddings.columns.Model": "Model", "pages.embeddings.columns.Input": "Input", "pages.embeddings.columns.Tokens": "Tokens", diff --git a/frontend/src/i18n/locales/zh-CN.json b/frontend/src/i18n/locales/zh-CN.json index 6e224ec..61780ac 100644 --- a/frontend/src/i18n/locales/zh-CN.json +++ b/frontend/src/i18n/locales/zh-CN.json @@ -94,6 +94,7 @@ "pages.requests.columns.Pending": "待处理", "pages.requests.columns.Completed": "已完成", "pages.requests.columns.Failed": "失败", + "pages.requests.columns.Aborted": "已中止", "pages.requests.columns.Model": "模型", "pages.requests.columns.TTFT": "TTFT", "pages.requests.columns.TimeToFirstToken": "首 Token 返回时间", @@ -119,6 +120,7 @@ "pages.requests.detail-panel.header.Pending": "待处理", "pages.requests.detail-panel.header.Completed": "已完成", "pages.requests.detail-panel.header.Failed": "失败", + "pages.requests.detail-panel.header.Aborted": "已中止", "pages.requests.detail-panel.header.ClosePanel": "关闭面板", "pages.requests.detail-panel.index.Close": "关闭", "pages.requests.detail-panel.index.Retry": "重试", @@ -191,6 +193,7 @@ "pages.embeddings.columns.Pending": "待处理", "pages.embeddings.columns.Completed": "已完成", "pages.embeddings.columns.Failed": "失败", + "pages.embeddings.columns.Aborted": "已中止", "pages.embeddings.columns.Model": "模型", "pages.embeddings.columns.Input": "输入", "pages.embeddings.columns.Tokens": "Tokens", diff --git a/frontend/src/pages/embeddings/columns.tsx b/frontend/src/pages/embeddings/columns.tsx index f098f0d..1153d4b 100644 --- a/frontend/src/pages/embeddings/columns.tsx +++ b/frontend/src/pages/embeddings/columns.tsx @@ -35,6 +35,11 @@ export const columns: ColumnDef[] = [ {i18n.t('pages.embeddings.columns.Failed')} )) + .with('aborted', () => ( + + {i18n.t('pages.embeddings.columns.Aborted')} + + )) .exhaustive() return (
diff --git a/frontend/src/pages/requests/columns.tsx b/frontend/src/pages/requests/columns.tsx index 02072b1..d6eb2f5 100644 --- a/frontend/src/pages/requests/columns.tsx +++ b/frontend/src/pages/requests/columns.tsx @@ -41,6 +41,9 @@ export const columns: ColumnDef[] = [ .with('failed', () => ( {i18n.t('pages.requests.columns.Failed')} )) + .with('aborted', () => ( + {i18n.t('pages.requests.columns.Aborted')} + )) .exhaustive() return (
diff --git a/frontend/src/pages/requests/detail-panel/header.tsx b/frontend/src/pages/requests/detail-panel/header.tsx index 05536d6..587b65d 100644 --- a/frontend/src/pages/requests/detail-panel/header.tsx +++ b/frontend/src/pages/requests/detail-panel/header.tsx @@ -62,6 +62,11 @@ function StatusIndicator({ status }: { status: ChatRequest['status'] }) { {t('pages.requests.detail-panel.header.Failed')} )) + .with('aborted', () => ( + + {t('pages.requests.detail-panel.header.Aborted')} + + )) .exhaustive() } From 7e5cad5d40ad4f2e0d6fa83e269bc0e341f74ec2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=9F=A9=E7=BF=94=E5=AE=87?= Date: Sat, 24 Jan 2026 21:33:12 +0800 Subject: [PATCH 02/10] fix(api): address PR review feedback for abort handling Addresses review comments from CodeRabbit and Gemini: 1. StreamingContext improvements: - Pass "Client disconnected" reason to saveCompletion for better logging - Handle already-aborted signals (AbortSignal spec: addEventListener won't trigger for already-aborted signals) - Fix TPM consumption to use Math.max(0, ...) for token values, ensuring partial usage is charged even when output is 0 2. Non-streaming abort handling: - Save completion as "aborted" when processing throws after client disconnect (previously left as "pending") 3. Streaming error logging: - Add ctx.isAborted() check before logger.error to reduce noise when client disconnects Co-Authored-By: Claude Opus 4.5 --- backend/src/api/v1/completions.ts | 22 +++++++++++++++++++--- backend/src/api/v1/messages.ts | 22 +++++++++++++++++++--- backend/src/api/v1/responses.ts | 22 +++++++++++++++++++--- backend/src/utils/streaming-context.ts | 16 ++++++++++++---- 4 files changed, 69 insertions(+), 13 deletions(-) diff --git a/backend/src/api/v1/completions.ts b/backend/src/api/v1/completions.ts index 2ea8d2e..9986905 100644 --- a/backend/src/api/v1/completions.ts +++ b/backend/src/api/v1/completions.ts @@ -342,7 +342,10 @@ async function* processStreamingResponse( await ctx.saveCompletion("completed"); } } catch (error) { - logger.error("Stream processing error", error); + // Only log error if not due to client abort + if (!ctx.isAborted()) { + logger.error("Stream processing error", error); + } // Save failed completion (if not already saved by abort handler) if (!ctx.isSaved()) { @@ -617,8 +620,21 @@ export const completionsApi = new Elysia({ ); yield response; } catch (error) { - // Don't log error if it's due to client abort - if (!request.signal.aborted) { + // Handle error based on whether client aborted + if (request.signal.aborted) { + // Client disconnected - save as aborted if not already saved + if (completion.status === "pending") { + completion.status = "aborted"; + await addCompletions(completion, bearer, { + level: "info", + message: "Client disconnected during non-streaming response", + details: { + type: "completionError", + data: { type: "aborted", msg: String(error ?? "Client disconnected") }, + }, + }); + } + } else { logger.error("Failed to process response", error); completion.status = "failed"; await addCompletions(completion, bearer, { diff --git a/backend/src/api/v1/messages.ts b/backend/src/api/v1/messages.ts index 33c0444..e4d7617 100644 --- a/backend/src/api/v1/messages.ts +++ b/backend/src/api/v1/messages.ts @@ -281,7 +281,10 @@ async function* processStreamingResponse( await ctx.saveCompletion("completed"); } } catch (error) { - logger.error("Stream processing error", error); + // Only log error if not due to client abort + if (!ctx.isAborted()) { + logger.error("Stream processing error", error); + } // Save failed completion (if not already saved by abort handler) if (!ctx.isSaved()) { @@ -551,8 +554,21 @@ export const messagesApi = new Elysia({ ); yield response; } catch (error) { - // Don't log error if it's due to client abort - if (!request.signal.aborted) { + // Handle error based on whether client aborted + if (request.signal.aborted) { + // Client disconnected - save as aborted if not already saved + if (completion.status === "pending") { + completion.status = "aborted"; + await addCompletions(completion, bearer, { + level: "info", + message: "Client disconnected during non-streaming response", + details: { + type: "completionError", + data: { type: "aborted", msg: String(error ?? "Client disconnected") }, + }, + }); + } + } else { logger.error("Failed to process response", error); completion.status = "failed"; await addCompletions(completion, bearer, { diff --git a/backend/src/api/v1/responses.ts b/backend/src/api/v1/responses.ts index ef9b485..9febc1e 100644 --- a/backend/src/api/v1/responses.ts +++ b/backend/src/api/v1/responses.ts @@ -297,7 +297,10 @@ async function* processStreamingResponse( await ctx.saveCompletion("completed"); } } catch (error) { - logger.error("Stream processing error", error); + // Only log error if not due to client abort + if (!ctx.isAborted()) { + logger.error("Stream processing error", error); + } // Save failed completion (if not already saved by abort handler) if (!ctx.isSaved()) { @@ -567,8 +570,21 @@ export const responsesApi = new Elysia({ ); yield response; } catch (error) { - // Don't log error if it's due to client abort - if (!request.signal.aborted) { + // Handle error based on whether client aborted + if (request.signal.aborted) { + // Client disconnected - save as aborted if not already saved + if (completion.status === "pending") { + completion.status = "aborted"; + await addCompletions(completion, bearer, { + level: "info", + message: "Client disconnected during non-streaming response", + details: { + type: "completionError", + data: { type: "aborted", msg: String(error ?? "Client disconnected") }, + }, + }); + } + } else { logger.error("Failed to process response", error); completion.status = "failed"; await addCompletions(completion, bearer, { diff --git a/backend/src/utils/streaming-context.ts b/backend/src/utils/streaming-context.ts index f7862aa..d7dedb0 100644 --- a/backend/src/utils/streaming-context.ts +++ b/backend/src/utils/streaming-context.ts @@ -56,12 +56,17 @@ export class StreamingContext { if (signal) { this.abortHandler = () => { logger.info("Client disconnected, saving partial completion"); - this.saveCompletion("aborted").catch((saveError: unknown) => { + this.saveCompletion("aborted", "Client disconnected").catch((saveError: unknown) => { const errorMessage = saveError instanceof Error ? saveError.message : String(saveError); logger.error("Failed to save aborted completion", errorMessage); }); }; signal.addEventListener("abort", this.abortHandler); + // Handle case where signal is already aborted before listener registration + // (AbortSignal spec: addEventListener won't trigger for already-aborted signals) + if (signal.aborted) { + this.abortHandler(); + } } } @@ -135,9 +140,12 @@ export class StreamingContext { await addCompletions(this.completion, this.bearer); } - // Consume tokens for TPM rate limiting (only if we have valid counts) - if (this.apiKeyRecord && this.inputTokens > 0 && this.outputTokens > 0) { - const totalTokens = this.inputTokens + this.outputTokens; + // Consume tokens for TPM rate limiting + // Use Math.max(0, ...) to handle -1 (unknown) values and ensure partial usage is charged + const inputTokens = Math.max(0, this.inputTokens); + const outputTokens = Math.max(0, this.outputTokens); + const totalTokens = inputTokens + outputTokens; + if (this.apiKeyRecord && totalTokens > 0) { await consumeTokens(this.apiKeyRecord.id, this.apiKeyRecord.tpmLimit, totalTokens); } } From 5566bc070202cbbd0f346a8021ad4c9deadc3735 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=9F=A9=E7=BF=94=E5=AE=87?= Date: Sat, 24 Jan 2026 21:37:44 +0800 Subject: [PATCH 03/10] fix(api): use type guard for error message in abort handling Fix no-base-to-string lint error by using instanceof Error check instead of String(error ?? "...") pattern. Co-Authored-By: Claude Opus 4.5 --- backend/src/api/v1/completions.ts | 3 ++- backend/src/api/v1/messages.ts | 3 ++- backend/src/api/v1/responses.ts | 3 ++- 3 files changed, 6 insertions(+), 3 deletions(-) diff --git a/backend/src/api/v1/completions.ts b/backend/src/api/v1/completions.ts index 9986905..80be95f 100644 --- a/backend/src/api/v1/completions.ts +++ b/backend/src/api/v1/completions.ts @@ -625,12 +625,13 @@ export const completionsApi = new Elysia({ // Client disconnected - save as aborted if not already saved if (completion.status === "pending") { completion.status = "aborted"; + const errorMsg = error instanceof Error ? error.message : "Client disconnected"; await addCompletions(completion, bearer, { level: "info", message: "Client disconnected during non-streaming response", details: { type: "completionError", - data: { type: "aborted", msg: String(error ?? "Client disconnected") }, + data: { type: "aborted", msg: errorMsg }, }, }); } diff --git a/backend/src/api/v1/messages.ts b/backend/src/api/v1/messages.ts index e4d7617..c886605 100644 --- a/backend/src/api/v1/messages.ts +++ b/backend/src/api/v1/messages.ts @@ -559,12 +559,13 @@ export const messagesApi = new Elysia({ // Client disconnected - save as aborted if not already saved if (completion.status === "pending") { completion.status = "aborted"; + const errorMsg = error instanceof Error ? error.message : "Client disconnected"; await addCompletions(completion, bearer, { level: "info", message: "Client disconnected during non-streaming response", details: { type: "completionError", - data: { type: "aborted", msg: String(error ?? "Client disconnected") }, + data: { type: "aborted", msg: errorMsg }, }, }); } diff --git a/backend/src/api/v1/responses.ts b/backend/src/api/v1/responses.ts index 9febc1e..c458d71 100644 --- a/backend/src/api/v1/responses.ts +++ b/backend/src/api/v1/responses.ts @@ -575,12 +575,13 @@ export const responsesApi = new Elysia({ // Client disconnected - save as aborted if not already saved if (completion.status === "pending") { completion.status = "aborted"; + const errorMsg = error instanceof Error ? error.message : "Client disconnected"; await addCompletions(completion, bearer, { level: "info", message: "Client disconnected during non-streaming response", details: { type: "completionError", - data: { type: "aborted", msg: String(error ?? "Client disconnected") }, + data: { type: "aborted", msg: errorMsg }, }, }); } From 72397496dfd17148e3b59c9b622b7e70f9e59db8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=9F=A9=E7=BF=94=E5=AE=87?= Date: Sat, 24 Jan 2026 21:48:03 +0800 Subject: [PATCH 04/10] fix(api): improve error handling robustness in abort catch blocks Address Gemini critical review: 1. Remove flawed `completion.status === "pending"` check that could prevent logging when status was already updated 2. Use .catch() instead of await for addCompletions to handle database failures gracefully without blocking error response Co-Authored-By: Claude Opus 4.5 --- backend/src/api/v1/completions.ts | 34 ++++++++++++++++--------------- backend/src/api/v1/messages.ts | 34 ++++++++++++++++--------------- backend/src/api/v1/responses.ts | 34 ++++++++++++++++--------------- 3 files changed, 54 insertions(+), 48 deletions(-) diff --git a/backend/src/api/v1/completions.ts b/backend/src/api/v1/completions.ts index 80be95f..12ff976 100644 --- a/backend/src/api/v1/completions.ts +++ b/backend/src/api/v1/completions.ts @@ -621,30 +621,32 @@ export const completionsApi = new Elysia({ yield response; } catch (error) { // Handle error based on whether client aborted + const errorMsg = error instanceof Error ? error.message : String(error); if (request.signal.aborted) { - // Client disconnected - save as aborted if not already saved - if (completion.status === "pending") { - completion.status = "aborted"; - const errorMsg = error instanceof Error ? error.message : "Client disconnected"; - await addCompletions(completion, bearer, { - level: "info", - message: "Client disconnected during non-streaming response", - details: { - type: "completionError", - data: { type: "aborted", msg: errorMsg }, - }, - }); - } + // Client disconnected - save as aborted + completion.status = "aborted"; + addCompletions(completion, bearer, { + level: "info", + message: "Client disconnected during non-streaming response", + details: { + type: "completionError", + data: { type: "aborted", msg: errorMsg }, + }, + }).catch((logError: unknown) => { + logger.error("Failed to log aborted completion after processing error", logError); + }); } else { logger.error("Failed to process response", error); completion.status = "failed"; - await addCompletions(completion, bearer, { + addCompletions(completion, bearer, { level: "error", - message: `Response processing error: ${String(error)}`, + message: `Response processing error: ${errorMsg}`, details: { type: "completionError", - data: { type: "processingError", msg: String(error) }, + data: { type: "processingError", msg: errorMsg }, }, + }).catch((logError: unknown) => { + logger.error("Failed to log completion after processing error", logError); }); set.status = 500; yield JSON.stringify({ error: "Failed to process response" }); diff --git a/backend/src/api/v1/messages.ts b/backend/src/api/v1/messages.ts index c886605..28e2330 100644 --- a/backend/src/api/v1/messages.ts +++ b/backend/src/api/v1/messages.ts @@ -555,30 +555,32 @@ export const messagesApi = new Elysia({ yield response; } catch (error) { // Handle error based on whether client aborted + const errorMsg = error instanceof Error ? error.message : String(error); if (request.signal.aborted) { - // Client disconnected - save as aborted if not already saved - if (completion.status === "pending") { - completion.status = "aborted"; - const errorMsg = error instanceof Error ? error.message : "Client disconnected"; - await addCompletions(completion, bearer, { - level: "info", - message: "Client disconnected during non-streaming response", - details: { - type: "completionError", - data: { type: "aborted", msg: errorMsg }, - }, - }); - } + // Client disconnected - save as aborted + completion.status = "aborted"; + addCompletions(completion, bearer, { + level: "info", + message: "Client disconnected during non-streaming response", + details: { + type: "completionError", + data: { type: "aborted", msg: errorMsg }, + }, + }).catch((logError: unknown) => { + logger.error("Failed to log aborted completion after processing error", logError); + }); } else { logger.error("Failed to process response", error); completion.status = "failed"; - await addCompletions(completion, bearer, { + addCompletions(completion, bearer, { level: "error", - message: `Response processing error: ${String(error)}`, + message: `Response processing error: ${errorMsg}`, details: { type: "completionError", - data: { type: "processingError", msg: String(error) }, + data: { type: "processingError", msg: errorMsg }, }, + }).catch((logError: unknown) => { + logger.error("Failed to log completion after processing error", logError); }); set.status = 500; yield JSON.stringify({ diff --git a/backend/src/api/v1/responses.ts b/backend/src/api/v1/responses.ts index c458d71..83b19b3 100644 --- a/backend/src/api/v1/responses.ts +++ b/backend/src/api/v1/responses.ts @@ -571,30 +571,32 @@ export const responsesApi = new Elysia({ yield response; } catch (error) { // Handle error based on whether client aborted + const errorMsg = error instanceof Error ? error.message : String(error); if (request.signal.aborted) { - // Client disconnected - save as aborted if not already saved - if (completion.status === "pending") { - completion.status = "aborted"; - const errorMsg = error instanceof Error ? error.message : "Client disconnected"; - await addCompletions(completion, bearer, { - level: "info", - message: "Client disconnected during non-streaming response", - details: { - type: "completionError", - data: { type: "aborted", msg: errorMsg }, - }, - }); - } + // Client disconnected - save as aborted + completion.status = "aborted"; + addCompletions(completion, bearer, { + level: "info", + message: "Client disconnected during non-streaming response", + details: { + type: "completionError", + data: { type: "aborted", msg: errorMsg }, + }, + }).catch((logError: unknown) => { + logger.error("Failed to log aborted completion after processing error", logError); + }); } else { logger.error("Failed to process response", error); completion.status = "failed"; - await addCompletions(completion, bearer, { + addCompletions(completion, bearer, { level: "error", - message: `Response processing error: ${String(error)}`, + message: `Response processing error: ${errorMsg}`, details: { type: "completionError", - data: { type: "processingError", msg: String(error) }, + data: { type: "processingError", msg: errorMsg }, }, + }).catch((logError: unknown) => { + logger.error("Failed to log completion after processing error", logError); }); set.status = 500; yield JSON.stringify({ From 61ea83a021023f93726611e2f16ee95dc77e29a6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=9F=A9=E7=BF=94=E5=AE=87?= Date: Sat, 24 Jan 2026 21:54:43 +0800 Subject: [PATCH 05/10] fix(api): add explicit Content-Type headers for streaming and non-streaming responses Addresses reviewer feedback: - Set Content-Type: text/event-stream for SSE streaming responses - Set Content-Type: application/json for non-streaming JSON responses When using async generators in Elysia, the content-type cannot be auto-detected, so we need to explicitly set the appropriate headers based on the request type. Co-Authored-By: Claude Opus 4.5 --- backend/src/api/v1/completions.ts | 6 ++++++ backend/src/api/v1/messages.ts | 6 ++++++ backend/src/api/v1/responses.ts | 6 ++++++ 3 files changed, 18 insertions(+) diff --git a/backend/src/api/v1/completions.ts b/backend/src/api/v1/completions.ts index 12ff976..2a2e2cf 100644 --- a/backend/src/api/v1/completions.ts +++ b/backend/src/api/v1/completions.ts @@ -467,6 +467,9 @@ export const completionsApi = new Elysia({ // Handle streaming vs non-streaming if (internalRequest.stream) { + // Set content-type for SSE streaming + set.headers["Content-Type"] = "text/event-stream"; + if (body.n && body.n > 1) { set.status = 400; yield JSON.stringify({ @@ -554,6 +557,9 @@ export const completionsApi = new Elysia({ } } } else { + // Set content-type for JSON response + set.headers["Content-Type"] = "application/json"; + // Non-streaming request with failover const result = await executeWithFailover( candidates, diff --git a/backend/src/api/v1/messages.ts b/backend/src/api/v1/messages.ts index 28e2330..6cb5019 100644 --- a/backend/src/api/v1/messages.ts +++ b/backend/src/api/v1/messages.ts @@ -408,6 +408,9 @@ export const messagesApi = new Elysia({ // Handle streaming vs non-streaming if (internalRequest.stream) { + // Set content-type for SSE streaming + set.headers["Content-Type"] = "text/event-stream"; + // For streaming, use failover only for connection establishment const result = await executeWithFailover( candidates, @@ -489,6 +492,9 @@ export const messagesApi = new Elysia({ } } } else { + // Set content-type for JSON response + set.headers["Content-Type"] = "application/json"; + // Non-streaming request with failover const result = await executeWithFailover( candidates, diff --git a/backend/src/api/v1/responses.ts b/backend/src/api/v1/responses.ts index 83b19b3..a9720d9 100644 --- a/backend/src/api/v1/responses.ts +++ b/backend/src/api/v1/responses.ts @@ -424,6 +424,9 @@ export const responsesApi = new Elysia({ // Handle streaming vs non-streaming if (internalRequest.stream) { + // Set content-type for SSE streaming + set.headers["Content-Type"] = "text/event-stream"; + // For streaming, use failover only for connection establishment const result = await executeWithFailover( candidates, @@ -505,6 +508,9 @@ export const responsesApi = new Elysia({ } } } else { + // Set content-type for JSON response + set.headers["Content-Type"] = "application/json"; + // Non-streaming request with failover const result = await executeWithFailover( candidates, From 65bbdb82f27a98f4c5e7a5a1ecbac2e03c8df860 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=9F=A9=E7=BF=94=E5=AE=87?= Date: Sat, 24 Jan 2026 22:01:29 +0800 Subject: [PATCH 06/10] refactor(api): use return for non-streaming responses in async generators Per Elysia documentation, async generators should: - Use `return` for non-streaming responses (Elysia converts to normal JSON response) - Use `yield` only for streaming responses This ensures: - Non-streaming responses get proper `application/json` content-type - Streaming responses get proper streaming behavior - Early error responses are returned as normal JSON, not streamed References: - https://elysiajs.com/essential/handler - https://github.com/elysiajs/elysia/issues/742 Co-Authored-By: Claude Opus 4.5 --- backend/src/api/v1/completions.ts | 59 +++++++++++----------------- backend/src/api/v1/messages.ts | 65 +++++++++++++------------------ backend/src/api/v1/responses.ts | 65 +++++++++++++------------------ 3 files changed, 74 insertions(+), 115 deletions(-) diff --git a/backend/src/api/v1/completions.ts b/backend/src/api/v1/completions.ts index 2a2e2cf..9b57005 100644 --- a/backend/src/api/v1/completions.ts +++ b/backend/src/api/v1/completions.ts @@ -381,8 +381,7 @@ export const completionsApi = new Elysia({ async function* ({ body, set, bearer, request, apiKeyRecord }) { if (bearer === undefined) { set.status = 500; - yield JSON.stringify({ error: "Internal server error" }); - return; + return { error: "Internal server error" }; } const reqHeaders = request.headers; @@ -403,14 +402,13 @@ export const completionsApi = new Elysia({ // Check if model exists if (modelsWithProviders.length === 0) { set.status = 404; - yield JSON.stringify({ + return { error: { message: `Model '${systemName}' not found`, type: "invalid_request_error", code: "model_not_found", }, - }); - return; + }; } // Filter candidates by target provider (if specified) @@ -421,14 +419,13 @@ export const completionsApi = new Elysia({ if (filteredCandidates.length === 0) { set.status = 404; - yield JSON.stringify({ + return { error: { message: `No available provider for model '${systemName}'`, type: "invalid_request_error", code: "no_provider", }, - }); - return; + }; } // Select candidates for failover (weighted random order) @@ -467,15 +464,10 @@ export const completionsApi = new Elysia({ // Handle streaming vs non-streaming if (internalRequest.stream) { - // Set content-type for SSE streaming - set.headers["Content-Type"] = "text/event-stream"; - + // Streaming request - use yield for streaming responses if (body.n && body.n > 1) { set.status = 400; - yield JSON.stringify({ - error: "Stream completions with n > 1 is not supported", - }); - return; + return { error: "Stream completions with n > 1 is not supported" }; } // For streaming, use failover only for connection establishment @@ -500,31 +492,27 @@ export const completionsApi = new Elysia({ if (errorResult.type === "upstream_error") { set.status = errorResult.status; - yield errorResult.body; - return; + return JSON.parse(errorResult.body) as Record; } set.status = 502; - yield JSON.stringify({ + return { error: { message: "All upstream providers failed", type: "upstream_error", code: "all_providers_failed", }, - }); - return; + }; } if (!result.response || !result.provider) { set.status = 500; - yield JSON.stringify({ error: "Internal server error" }); - return; + return { error: "Internal server error" }; } if (!result.response.body) { set.status = 500; - yield JSON.stringify({ error: "No body in response" }); - return; + return { error: "No body in response" }; } const providerType = result.provider.provider.type || "openai"; @@ -557,10 +545,7 @@ export const completionsApi = new Elysia({ } } } else { - // Set content-type for JSON response - set.headers["Content-Type"] = "application/json"; - - // Non-streaming request with failover + // Non-streaming request - use return for normal JSON response const result = await executeWithFailover( candidates, buildRequestForProvider, @@ -582,25 +567,22 @@ export const completionsApi = new Elysia({ if (errorResult.type === "upstream_error") { set.status = errorResult.status; - yield errorResult.body; - return; + return JSON.parse(errorResult.body) as Record; } set.status = 502; - yield JSON.stringify({ + return { error: { message: "All upstream providers failed", type: "upstream_error", code: "all_providers_failed", }, - }); - return; + }; } if (!result.response || !result.provider) { set.status = 500; - yield JSON.stringify({ error: "Internal server error" }); - return; + return { error: "Internal server error" }; } const providerType = result.provider.provider.type || "openai"; @@ -624,7 +606,8 @@ export const completionsApi = new Elysia({ begin, request.signal, ); - yield response; + // Return parsed JSON object for proper content-type + return JSON.parse(response) as Record; } catch (error) { // Handle error based on whether client aborted const errorMsg = error instanceof Error ? error.message : String(error); @@ -641,6 +624,8 @@ export const completionsApi = new Elysia({ }).catch((logError: unknown) => { logger.error("Failed to log aborted completion after processing error", logError); }); + // Return nothing for aborted requests + return; } else { logger.error("Failed to process response", error); completion.status = "failed"; @@ -655,7 +640,7 @@ export const completionsApi = new Elysia({ logger.error("Failed to log completion after processing error", logError); }); set.status = 500; - yield JSON.stringify({ error: "Failed to process response" }); + return { error: "Failed to process response" }; } } } diff --git a/backend/src/api/v1/messages.ts b/backend/src/api/v1/messages.ts index 6cb5019..edebab4 100644 --- a/backend/src/api/v1/messages.ts +++ b/backend/src/api/v1/messages.ts @@ -319,11 +319,10 @@ export const messagesApi = new Elysia({ async function* ({ body, set, bearer, request, apiKeyRecord }) { if (bearer === undefined) { set.status = 500; - yield JSON.stringify({ + return { type: "error", error: { type: "api_error", message: "Internal server error" }, - }); - return; + }; } const reqHeaders = request.headers; @@ -344,14 +343,13 @@ export const messagesApi = new Elysia({ // Check if model exists if (modelsWithProviders.length === 0) { set.status = 404; - yield JSON.stringify({ + return { type: "error", error: { type: "not_found_error", message: `Model '${systemName}' not found`, }, - }); - return; + }; } // Filter candidates by target provider (if specified) @@ -362,14 +360,13 @@ export const messagesApi = new Elysia({ if (filteredCandidates.length === 0) { set.status = 404; - yield JSON.stringify({ + return { type: "error", error: { type: "not_found_error", message: `No available provider for model '${systemName}'`, }, - }); - return; + }; } // Select candidates for failover (weighted random order) @@ -408,10 +405,7 @@ export const messagesApi = new Elysia({ // Handle streaming vs non-streaming if (internalRequest.stream) { - // Set content-type for SSE streaming - set.headers["Content-Type"] = "text/event-stream"; - - // For streaming, use failover only for connection establishment + // Streaming request - use yield for streaming responses const result = await executeWithFailover( candidates, buildRequestForProvider, @@ -431,37 +425,33 @@ export const messagesApi = new Elysia({ if (errorResult.type === "upstream_error") { set.status = errorResult.status; - yield errorResult.body; - return; + return JSON.parse(errorResult.body) as Record; } set.status = 502; - yield JSON.stringify({ + return { type: "error", error: { type: "api_error", message: "All upstream providers failed", }, - }); - return; + }; } if (!result.response || !result.provider) { set.status = 500; - yield JSON.stringify({ + return { type: "error", error: { type: "api_error", message: "Internal server error" }, - }); - return; + }; } if (!result.response.body) { set.status = 500; - yield JSON.stringify({ + return { type: "error", error: { type: "api_error", message: "No body in response" }, - }); - return; + }; } const providerType = result.provider.provider.type || "openai"; @@ -492,10 +482,7 @@ export const messagesApi = new Elysia({ } } } else { - // Set content-type for JSON response - set.headers["Content-Type"] = "application/json"; - - // Non-streaming request with failover + // Non-streaming request - use return for normal JSON response const result = await executeWithFailover( candidates, buildRequestForProvider, @@ -515,28 +502,25 @@ export const messagesApi = new Elysia({ if (errorResult.type === "upstream_error") { set.status = errorResult.status; - yield errorResult.body; - return; + return JSON.parse(errorResult.body) as Record; } set.status = 502; - yield JSON.stringify({ + return { type: "error", error: { type: "api_error", message: "All upstream providers failed", }, - }); - return; + }; } if (!result.response || !result.provider) { set.status = 500; - yield JSON.stringify({ + return { type: "error", error: { type: "api_error", message: "Internal server error" }, - }); - return; + }; } const providerType = result.provider.provider.type || "openai"; @@ -558,7 +542,8 @@ export const messagesApi = new Elysia({ begin, request.signal, ); - yield response; + // Return parsed JSON object for proper content-type + return JSON.parse(response) as Record; } catch (error) { // Handle error based on whether client aborted const errorMsg = error instanceof Error ? error.message : String(error); @@ -575,6 +560,8 @@ export const messagesApi = new Elysia({ }).catch((logError: unknown) => { logger.error("Failed to log aborted completion after processing error", logError); }); + // Return nothing for aborted requests + return; } else { logger.error("Failed to process response", error); completion.status = "failed"; @@ -589,10 +576,10 @@ export const messagesApi = new Elysia({ logger.error("Failed to log completion after processing error", logError); }); set.status = 500; - yield JSON.stringify({ + return { type: "error", error: { type: "api_error", message: "Failed to process response" }, - }); + }; } } } diff --git a/backend/src/api/v1/responses.ts b/backend/src/api/v1/responses.ts index a9720d9..49511a5 100644 --- a/backend/src/api/v1/responses.ts +++ b/backend/src/api/v1/responses.ts @@ -335,11 +335,10 @@ export const responsesApi = new Elysia({ async function* ({ body, set, bearer, request, apiKeyRecord }) { if (bearer === undefined) { set.status = 500; - yield JSON.stringify({ + return { object: "error", error: { type: "server_error", message: "Internal server error" }, - }); - return; + }; } const reqHeaders = request.headers; @@ -360,14 +359,13 @@ export const responsesApi = new Elysia({ // Check if model exists if (modelsWithProviders.length === 0) { set.status = 404; - yield JSON.stringify({ + return { object: "error", error: { type: "invalid_request_error", message: `Model '${systemName}' not found`, }, - }); - return; + }; } // Filter candidates by target provider (if specified) @@ -378,14 +376,13 @@ export const responsesApi = new Elysia({ if (filteredCandidates.length === 0) { set.status = 404; - yield JSON.stringify({ + return { object: "error", error: { type: "invalid_request_error", message: `No available provider for model '${systemName}'`, }, - }); - return; + }; } // Select candidates for failover (weighted random order) @@ -424,10 +421,7 @@ export const responsesApi = new Elysia({ // Handle streaming vs non-streaming if (internalRequest.stream) { - // Set content-type for SSE streaming - set.headers["Content-Type"] = "text/event-stream"; - - // For streaming, use failover only for connection establishment + // Streaming request - use yield for streaming responses const result = await executeWithFailover( candidates, buildRequestForProvider, @@ -447,37 +441,33 @@ export const responsesApi = new Elysia({ if (errorResult.type === "upstream_error") { set.status = errorResult.status; - yield errorResult.body; - return; + return JSON.parse(errorResult.body) as Record; } set.status = 502; - yield JSON.stringify({ + return { object: "error", error: { type: "server_error", message: "All upstream providers failed", }, - }); - return; + }; } if (!result.response || !result.provider) { set.status = 500; - yield JSON.stringify({ + return { object: "error", error: { type: "server_error", message: "Internal server error" }, - }); - return; + }; } if (!result.response.body) { set.status = 500; - yield JSON.stringify({ + return { object: "error", error: { type: "server_error", message: "No body in response" }, - }); - return; + }; } const providerType = result.provider.provider.type || "openai"; @@ -508,10 +498,7 @@ export const responsesApi = new Elysia({ } } } else { - // Set content-type for JSON response - set.headers["Content-Type"] = "application/json"; - - // Non-streaming request with failover + // Non-streaming request - use return for normal JSON response const result = await executeWithFailover( candidates, buildRequestForProvider, @@ -531,28 +518,25 @@ export const responsesApi = new Elysia({ if (errorResult.type === "upstream_error") { set.status = errorResult.status; - yield errorResult.body; - return; + return JSON.parse(errorResult.body) as Record; } set.status = 502; - yield JSON.stringify({ + return { object: "error", error: { type: "server_error", message: "All upstream providers failed", }, - }); - return; + }; } if (!result.response || !result.provider) { set.status = 500; - yield JSON.stringify({ + return { object: "error", error: { type: "server_error", message: "Internal server error" }, - }); - return; + }; } const providerType = result.provider.provider.type || "openai"; @@ -574,7 +558,8 @@ export const responsesApi = new Elysia({ begin, request.signal, ); - yield response; + // Return parsed JSON object for proper content-type + return JSON.parse(response) as Record; } catch (error) { // Handle error based on whether client aborted const errorMsg = error instanceof Error ? error.message : String(error); @@ -591,6 +576,8 @@ export const responsesApi = new Elysia({ }).catch((logError: unknown) => { logger.error("Failed to log aborted completion after processing error", logError); }); + // Return nothing for aborted requests + return; } else { logger.error("Failed to process response", error); completion.status = "failed"; @@ -605,10 +592,10 @@ export const responsesApi = new Elysia({ logger.error("Failed to log completion after processing error", logError); }); set.status = 500; - yield JSON.stringify({ + return { object: "error", error: { type: "server_error", message: "Failed to process response" }, - }); + }; } } } From 3399b603ddff12872c0a9050f88108649008d457 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=9F=A9=E7=BF=94=E5=AE=87?= Date: Sat, 24 Jan 2026 22:11:42 +0800 Subject: [PATCH 07/10] fix(api): use await with try-catch for addCompletions in error handlers Changed from fire-and-forget .catch() pattern to await with try-catch for addCompletions calls in non-streaming error handlers. With the refactoring to use `return` instead of `yield`, the fire-and-forget pattern is problematic because: 1. The function returns immediately after return statement 2. The pending promise may not complete before the function exits Using await ensures the completion record is reliably saved before returning the response or exiting the handler. Co-Authored-By: Claude Opus 4.5 --- backend/src/api/v1/completions.ts | 40 +++++++++++++++++-------------- backend/src/api/v1/messages.ts | 40 +++++++++++++++++-------------- backend/src/api/v1/responses.ts | 40 +++++++++++++++++-------------- 3 files changed, 66 insertions(+), 54 deletions(-) diff --git a/backend/src/api/v1/completions.ts b/backend/src/api/v1/completions.ts index 9b57005..e38272d 100644 --- a/backend/src/api/v1/completions.ts +++ b/backend/src/api/v1/completions.ts @@ -614,31 +614,35 @@ export const completionsApi = new Elysia({ if (request.signal.aborted) { // Client disconnected - save as aborted completion.status = "aborted"; - addCompletions(completion, bearer, { - level: "info", - message: "Client disconnected during non-streaming response", - details: { - type: "completionError", - data: { type: "aborted", msg: errorMsg }, - }, - }).catch((logError: unknown) => { + try { + await addCompletions(completion, bearer, { + level: "info", + message: "Client disconnected during non-streaming response", + details: { + type: "completionError", + data: { type: "aborted", msg: errorMsg }, + }, + }); + } catch (logError: unknown) { logger.error("Failed to log aborted completion after processing error", logError); - }); + } // Return nothing for aborted requests return; } else { logger.error("Failed to process response", error); completion.status = "failed"; - addCompletions(completion, bearer, { - level: "error", - message: `Response processing error: ${errorMsg}`, - details: { - type: "completionError", - data: { type: "processingError", msg: errorMsg }, - }, - }).catch((logError: unknown) => { + try { + await addCompletions(completion, bearer, { + level: "error", + message: `Response processing error: ${errorMsg}`, + details: { + type: "completionError", + data: { type: "processingError", msg: errorMsg }, + }, + }); + } catch (logError: unknown) { logger.error("Failed to log completion after processing error", logError); - }); + } set.status = 500; return { error: "Failed to process response" }; } diff --git a/backend/src/api/v1/messages.ts b/backend/src/api/v1/messages.ts index edebab4..56cd063 100644 --- a/backend/src/api/v1/messages.ts +++ b/backend/src/api/v1/messages.ts @@ -550,31 +550,35 @@ export const messagesApi = new Elysia({ if (request.signal.aborted) { // Client disconnected - save as aborted completion.status = "aborted"; - addCompletions(completion, bearer, { - level: "info", - message: "Client disconnected during non-streaming response", - details: { - type: "completionError", - data: { type: "aborted", msg: errorMsg }, - }, - }).catch((logError: unknown) => { + try { + await addCompletions(completion, bearer, { + level: "info", + message: "Client disconnected during non-streaming response", + details: { + type: "completionError", + data: { type: "aborted", msg: errorMsg }, + }, + }); + } catch (logError: unknown) { logger.error("Failed to log aborted completion after processing error", logError); - }); + } // Return nothing for aborted requests return; } else { logger.error("Failed to process response", error); completion.status = "failed"; - addCompletions(completion, bearer, { - level: "error", - message: `Response processing error: ${errorMsg}`, - details: { - type: "completionError", - data: { type: "processingError", msg: errorMsg }, - }, - }).catch((logError: unknown) => { + try { + await addCompletions(completion, bearer, { + level: "error", + message: `Response processing error: ${errorMsg}`, + details: { + type: "completionError", + data: { type: "processingError", msg: errorMsg }, + }, + }); + } catch (logError: unknown) { logger.error("Failed to log completion after processing error", logError); - }); + } set.status = 500; return { type: "error", diff --git a/backend/src/api/v1/responses.ts b/backend/src/api/v1/responses.ts index 49511a5..aa6c462 100644 --- a/backend/src/api/v1/responses.ts +++ b/backend/src/api/v1/responses.ts @@ -566,31 +566,35 @@ export const responsesApi = new Elysia({ if (request.signal.aborted) { // Client disconnected - save as aborted completion.status = "aborted"; - addCompletions(completion, bearer, { - level: "info", - message: "Client disconnected during non-streaming response", - details: { - type: "completionError", - data: { type: "aborted", msg: errorMsg }, - }, - }).catch((logError: unknown) => { + try { + await addCompletions(completion, bearer, { + level: "info", + message: "Client disconnected during non-streaming response", + details: { + type: "completionError", + data: { type: "aborted", msg: errorMsg }, + }, + }); + } catch (logError: unknown) { logger.error("Failed to log aborted completion after processing error", logError); - }); + } // Return nothing for aborted requests return; } else { logger.error("Failed to process response", error); completion.status = "failed"; - addCompletions(completion, bearer, { - level: "error", - message: `Response processing error: ${errorMsg}`, - details: { - type: "completionError", - data: { type: "processingError", msg: errorMsg }, - }, - }).catch((logError: unknown) => { + try { + await addCompletions(completion, bearer, { + level: "error", + message: `Response processing error: ${errorMsg}`, + details: { + type: "completionError", + data: { type: "processingError", msg: errorMsg }, + }, + }); + } catch (logError: unknown) { logger.error("Failed to log completion after processing error", logError); - }); + } set.status = 500; return { object: "error", From 64b28eda794af2e00a6d64eaed870727c4028ee1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=9F=A9=E7=BF=94=E5=AE=87?= Date: Sat, 24 Jan 2026 23:02:26 +0800 Subject: [PATCH 08/10] fix(api): continue collecting chunks after client disconnect Previously, when a client disconnected mid-stream: 1. The abort handler immediately saved partial data 2. The for-await loop broke, discarding remaining chunks Now the behavior is: 1. No immediate save on abort (removed abort handler) 2. Continue reading all chunks from upstream 3. Only skip yielding to disconnected client 4. Save full response with "aborted" status when stream ends This ensures complete data is captured for observability even when clients disconnect before the stream finishes. Co-Authored-By: Claude Opus 4.5 --- backend/src/api/v1/completions.ts | 53 ++++++++++++++++---------- backend/src/api/v1/messages.ts | 45 +++++++++++++--------- backend/src/api/v1/responses.ts | 53 ++++++++++++++++---------- backend/src/utils/streaming-context.ts | 32 ++++------------ 4 files changed, 100 insertions(+), 83 deletions(-) diff --git a/backend/src/api/v1/completions.ts b/backend/src/api/v1/completions.ts index e38272d..e6376d9 100644 --- a/backend/src/api/v1/completions.ts +++ b/backend/src/api/v1/completions.ts @@ -251,15 +251,12 @@ async function* processStreamingResponse( const chunks = upstreamAdapter.parseStreamResponse(resp); for await (const chunk of chunks) { - // Check if client has disconnected - if (ctx.isAborted()) { - logger.debug("Client aborted, stopping stream processing"); - break; - } + // Check if client has disconnected (but continue processing to collect all data) + const clientAborted = ctx.isAborted(); ctx.recordTTFT(); - // Collect content for completion record + // Collect content for completion record (always, even if client aborted) if (chunk.type === "content_block_start") { // Track new tool call block if (chunk.contentBlock?.type === "tool_use") { @@ -320,26 +317,33 @@ async function* processStreamingResponse( ctx.outputTokens = chunk.usage.outputTokens; } - // Convert to OpenAI format and yield - const serialized = responseAdapter.serializeStreamChunk(chunk); - if (serialized) { - yield serialized; - } + // Only yield to client if not aborted (client is still listening) + if (!clientAborted) { + // Convert to OpenAI format and yield + const serialized = responseAdapter.serializeStreamChunk(chunk); + if (serialized) { + yield serialized; + } - // Handle message_stop - if (chunk.type === "message_stop") { - yield responseAdapter.getDoneMarker(); + // Handle message_stop + if (chunk.type === "message_stop") { + yield responseAdapter.getDoneMarker(); + } } } // Handle case where no chunks were received - if (ctx.isFirstChunk && !ctx.isAborted()) { + if (ctx.isFirstChunk) { throw new Error("No chunk received from upstream"); } - // Save completed completion (if not already saved by abort handler) + // Save completion with appropriate status if (!ctx.isSaved()) { - await ctx.saveCompletion("completed"); + if (ctx.isAborted()) { + await ctx.saveCompletion("aborted", "Client disconnected"); + } else { + await ctx.saveCompletion("completed"); + } } } catch (error) { // Only log error if not due to client abort @@ -347,14 +351,21 @@ async function* processStreamingResponse( logger.error("Stream processing error", error); } - // Save failed completion (if not already saved by abort handler) + // Save failed completion if (!ctx.isSaved()) { - await ctx.saveCompletion("failed", String(error)); + if (ctx.isAborted()) { + // If client aborted and we got an error, still save as aborted with the error info + await ctx.saveCompletion("aborted", `Client disconnected, stream error: ${String(error)}`); + } else { + await ctx.saveCompletion("failed", String(error)); + } } - throw error; + // Only re-throw if client is still connected + if (!ctx.isAborted()) { + throw error; + } } finally { - // Clean up abort handler ctx.cleanup(); } } diff --git a/backend/src/api/v1/messages.ts b/backend/src/api/v1/messages.ts index 56cd063..c03f40e 100644 --- a/backend/src/api/v1/messages.ts +++ b/backend/src/api/v1/messages.ts @@ -238,15 +238,12 @@ async function* processStreamingResponse( const chunks = upstreamAdapter.parseStreamResponse(resp); for await (const chunk of chunks) { - // Check if client has disconnected - if (ctx.isAborted()) { - logger.debug("Client aborted, stopping stream processing"); - break; - } + // Check if client has disconnected (but continue processing to collect all data) + const clientAborted = ctx.isAborted(); ctx.recordTTFT(); - // Collect content for completion record + // Collect content for completion record (always, even if client aborted) if (chunk.type === "content_block_delta") { if (chunk.delta?.type === "text_delta" && chunk.delta.text) { ctx.textParts.push(chunk.delta.text); @@ -264,21 +261,28 @@ async function* processStreamingResponse( ctx.outputTokens = chunk.usage.outputTokens; } - // Convert to Anthropic format and yield - const serialized = responseAdapter.serializeStreamChunk(chunk); - if (serialized) { - yield serialized; + // Only yield to client if not aborted (client is still listening) + if (!clientAborted) { + // Convert to Anthropic format and yield + const serialized = responseAdapter.serializeStreamChunk(chunk); + if (serialized) { + yield serialized; + } } } // Handle case where no chunks were received - if (ctx.isFirstChunk && !ctx.isAborted()) { + if (ctx.isFirstChunk) { throw new Error("No chunk received from upstream"); } - // Save completed completion (if not already saved by abort handler) + // Save completion with appropriate status if (!ctx.isSaved()) { - await ctx.saveCompletion("completed"); + if (ctx.isAborted()) { + await ctx.saveCompletion("aborted", "Client disconnected"); + } else { + await ctx.saveCompletion("completed"); + } } } catch (error) { // Only log error if not due to client abort @@ -286,14 +290,21 @@ async function* processStreamingResponse( logger.error("Stream processing error", error); } - // Save failed completion (if not already saved by abort handler) + // Save failed completion if (!ctx.isSaved()) { - await ctx.saveCompletion("failed", String(error)); + if (ctx.isAborted()) { + // If client aborted and we got an error, still save as aborted with the error info + await ctx.saveCompletion("aborted", `Client disconnected, stream error: ${String(error)}`); + } else { + await ctx.saveCompletion("failed", String(error)); + } } - throw error; + // Only re-throw if client is still connected + if (!ctx.isAborted()) { + throw error; + } } finally { - // Clean up abort handler ctx.cleanup(); } } diff --git a/backend/src/api/v1/responses.ts b/backend/src/api/v1/responses.ts index aa6c462..8c99247 100644 --- a/backend/src/api/v1/responses.ts +++ b/backend/src/api/v1/responses.ts @@ -249,15 +249,12 @@ async function* processStreamingResponse( const chunks = upstreamAdapter.parseStreamResponse(resp); for await (const chunk of chunks) { - // Check if client has disconnected - if (ctx.isAborted()) { - logger.debug("Client aborted, stopping stream processing"); - break; - } + // Check if client has disconnected (but continue processing to collect all data) + const clientAborted = ctx.isAborted(); ctx.recordTTFT(); - // Collect content for completion record + // Collect content for completion record (always, even if client aborted) if (chunk.type === "content_block_delta") { if (chunk.delta?.type === "text_delta" && chunk.delta.text) { ctx.textParts.push(chunk.delta.text); @@ -275,26 +272,33 @@ async function* processStreamingResponse( ctx.outputTokens = chunk.usage.outputTokens; } - // Convert to Response API format and yield - const serialized = responseAdapter.serializeStreamChunk(chunk); - if (serialized) { - yield serialized; - } + // Only yield to client if not aborted (client is still listening) + if (!clientAborted) { + // Convert to Response API format and yield + const serialized = responseAdapter.serializeStreamChunk(chunk); + if (serialized) { + yield serialized; + } - // Handle message_stop - if (chunk.type === "message_stop") { - yield responseAdapter.getDoneMarker(); + // Handle message_stop + if (chunk.type === "message_stop") { + yield responseAdapter.getDoneMarker(); + } } } // Handle case where no chunks were received - if (ctx.isFirstChunk && !ctx.isAborted()) { + if (ctx.isFirstChunk) { throw new Error("No chunk received from upstream"); } - // Save completed completion (if not already saved by abort handler) + // Save completion with appropriate status if (!ctx.isSaved()) { - await ctx.saveCompletion("completed"); + if (ctx.isAborted()) { + await ctx.saveCompletion("aborted", "Client disconnected"); + } else { + await ctx.saveCompletion("completed"); + } } } catch (error) { // Only log error if not due to client abort @@ -302,14 +306,21 @@ async function* processStreamingResponse( logger.error("Stream processing error", error); } - // Save failed completion (if not already saved by abort handler) + // Save failed completion if (!ctx.isSaved()) { - await ctx.saveCompletion("failed", String(error)); + if (ctx.isAborted()) { + // If client aborted and we got an error, still save as aborted with the error info + await ctx.saveCompletion("aborted", `Client disconnected, stream error: ${String(error)}`); + } else { + await ctx.saveCompletion("failed", String(error)); + } } - throw error; + // Only re-throw if client is still connected + if (!ctx.isAborted()) { + throw error; + } } finally { - // Clean up abort handler ctx.cleanup(); } } diff --git a/backend/src/utils/streaming-context.ts b/backend/src/utils/streaming-context.ts index d7dedb0..4554f46 100644 --- a/backend/src/utils/streaming-context.ts +++ b/backend/src/utils/streaming-context.ts @@ -3,7 +3,6 @@ * Ensures completion records are saved even when client disconnects */ -import { consola } from "consola"; import type { CompletionsStatusEnumType, ToolCallType, @@ -12,8 +11,6 @@ import { addCompletions, type Completion } from "@/utils/completions"; import { consumeTokens } from "@/plugins/apiKeyRateLimitPlugin"; import type { ApiKey } from "@/plugins/apiKeyPlugin"; -const logger = consola.withTag("streamingContext"); - /** * StreamingContext manages the state of a streaming response. * It ensures completion records are saved even when client disconnects. @@ -24,7 +21,6 @@ export class StreamingContext { private apiKeyRecord: ApiKey | null; private begin: number; private saved = false; - private abortHandler: (() => void) | null = null; private signal?: AbortSignal; // Accumulated data during streaming @@ -52,22 +48,12 @@ export class StreamingContext { this.begin = begin; this.signal = signal; - // Register abort handler to save completion when client disconnects - if (signal) { - this.abortHandler = () => { - logger.info("Client disconnected, saving partial completion"); - this.saveCompletion("aborted", "Client disconnected").catch((saveError: unknown) => { - const errorMessage = saveError instanceof Error ? saveError.message : String(saveError); - logger.error("Failed to save aborted completion", errorMessage); - }); - }; - signal.addEventListener("abort", this.abortHandler); - // Handle case where signal is already aborted before listener registration - // (AbortSignal spec: addEventListener won't trigger for already-aborted signals) - if (signal.aborted) { - this.abortHandler(); - } - } + // Note: We don't save immediately on abort anymore. + // Instead, we continue processing chunks from upstream and save the full + // response when the stream ends. This ensures we capture all data even + // when the client disconnects mid-stream. + // The abort status is checked via isAborted() and the final save uses + // "aborted" status if the client disconnected. } /** @@ -161,9 +147,7 @@ export class StreamingContext { * Clean up resources */ cleanup(): void { - if (this.signal && this.abortHandler) { - this.signal.removeEventListener("abort", this.abortHandler); - this.abortHandler = null; - } + // No-op now since we don't register abort handlers anymore + // Kept for API compatibility } } From 80684f94767e1d4a9a6954b79a536583f26ffabf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=9F=A9=E7=BF=94=E5=AE=87?= Date: Sat, 24 Jan 2026 23:06:11 +0800 Subject: [PATCH 09/10] feat: add logging when client disconnects during streaming Log an info-level message when we first detect that a client has disconnected during streaming. This helps track client disconnect events while still continuing to collect all upstream data for accurate token metering. Co-Authored-By: Claude Opus 4.5 --- backend/src/api/v1/completions.ts | 9 +++++++++ backend/src/api/v1/messages.ts | 9 +++++++++ backend/src/api/v1/responses.ts | 9 +++++++++ 3 files changed, 27 insertions(+) diff --git a/backend/src/api/v1/completions.ts b/backend/src/api/v1/completions.ts index e6376d9..33047a1 100644 --- a/backend/src/api/v1/completions.ts +++ b/backend/src/api/v1/completions.ts @@ -247,6 +247,9 @@ async function* processStreamingResponse( // Create streaming context with abort handling const ctx = new StreamingContext(completion, bearer, apiKeyRecord, begin, signal); + // Track whether we've logged the client abort (to avoid duplicate logs) + let loggedAbort = false; + try { const chunks = upstreamAdapter.parseStreamResponse(resp); @@ -254,6 +257,12 @@ async function* processStreamingResponse( // Check if client has disconnected (but continue processing to collect all data) const clientAborted = ctx.isAborted(); + // Log client disconnect once when first detected + if (clientAborted && !loggedAbort) { + loggedAbort = true; + logger.info("Client disconnected during streaming, continuing to collect upstream data"); + } + ctx.recordTTFT(); // Collect content for completion record (always, even if client aborted) diff --git a/backend/src/api/v1/messages.ts b/backend/src/api/v1/messages.ts index c03f40e..e17c957 100644 --- a/backend/src/api/v1/messages.ts +++ b/backend/src/api/v1/messages.ts @@ -234,6 +234,9 @@ async function* processStreamingResponse( // Create streaming context with abort handling const ctx = new StreamingContext(completion, bearer, apiKeyRecord, begin, signal); + // Track whether we've logged the client abort (to avoid duplicate logs) + let loggedAbort = false; + try { const chunks = upstreamAdapter.parseStreamResponse(resp); @@ -241,6 +244,12 @@ async function* processStreamingResponse( // Check if client has disconnected (but continue processing to collect all data) const clientAborted = ctx.isAborted(); + // Log client disconnect once when first detected + if (clientAborted && !loggedAbort) { + loggedAbort = true; + logger.info("Client disconnected during streaming, continuing to collect upstream data"); + } + ctx.recordTTFT(); // Collect content for completion record (always, even if client aborted) diff --git a/backend/src/api/v1/responses.ts b/backend/src/api/v1/responses.ts index 8c99247..19ed929 100644 --- a/backend/src/api/v1/responses.ts +++ b/backend/src/api/v1/responses.ts @@ -245,6 +245,9 @@ async function* processStreamingResponse( // Create streaming context with abort handling const ctx = new StreamingContext(completion, bearer, apiKeyRecord, begin, signal); + // Track whether we've logged the client abort (to avoid duplicate logs) + let loggedAbort = false; + try { const chunks = upstreamAdapter.parseStreamResponse(resp); @@ -252,6 +255,12 @@ async function* processStreamingResponse( // Check if client has disconnected (but continue processing to collect all data) const clientAborted = ctx.isAborted(); + // Log client disconnect once when first detected + if (clientAborted && !loggedAbort) { + loggedAbort = true; + logger.info("Client disconnected during streaming, continuing to collect upstream data"); + } + ctx.recordTTFT(); // Collect content for completion record (always, even if client aborted) From babc04a2bf7c89edb4cacffb264385c778f18393 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=9F=A9=E7=BF=94=E5=AE=87?= Date: Sat, 24 Jan 2026 23:17:38 +0800 Subject: [PATCH 10/10] fix: prevent duplicate completion saves and add tool call handling Address CodeRabbit review feedback: - Add alreadySaved check in catch blocks to prevent duplicate database writes if processNonStreamingResponse already saved the completion before consumeTokens throws an error - Add tool call handling (content_block_start/stop) to messages.ts and responses.ts streaming to match completions.ts behavior Co-Authored-By: Claude Opus 4.5 --- backend/src/api/v1/completions.ts | 56 +++++++++-------- backend/src/api/v1/messages.ts | 101 ++++++++++++++++++++++-------- backend/src/api/v1/responses.ts | 101 ++++++++++++++++++++++-------- 3 files changed, 181 insertions(+), 77 deletions(-) diff --git a/backend/src/api/v1/completions.ts b/backend/src/api/v1/completions.ts index 33047a1..97df736 100644 --- a/backend/src/api/v1/completions.ts +++ b/backend/src/api/v1/completions.ts @@ -631,37 +631,43 @@ export const completionsApi = new Elysia({ } catch (error) { // Handle error based on whether client aborted const errorMsg = error instanceof Error ? error.message : String(error); + // Only save if completion wasn't already saved in processNonStreamingResponse + const alreadySaved = completion.status !== "pending"; if (request.signal.aborted) { - // Client disconnected - save as aborted - completion.status = "aborted"; - try { - await addCompletions(completion, bearer, { - level: "info", - message: "Client disconnected during non-streaming response", - details: { - type: "completionError", - data: { type: "aborted", msg: errorMsg }, - }, - }); - } catch (logError: unknown) { - logger.error("Failed to log aborted completion after processing error", logError); + // Client disconnected - save as aborted (if not already saved) + if (!alreadySaved) { + completion.status = "aborted"; + try { + await addCompletions(completion, bearer, { + level: "info", + message: "Client disconnected during non-streaming response", + details: { + type: "completionError", + data: { type: "aborted", msg: errorMsg }, + }, + }); + } catch (logError: unknown) { + logger.error("Failed to log aborted completion after processing error", logError); + } } // Return nothing for aborted requests return; } else { logger.error("Failed to process response", error); - completion.status = "failed"; - try { - await addCompletions(completion, bearer, { - level: "error", - message: `Response processing error: ${errorMsg}`, - details: { - type: "completionError", - data: { type: "processingError", msg: errorMsg }, - }, - }); - } catch (logError: unknown) { - logger.error("Failed to log completion after processing error", logError); + if (!alreadySaved) { + completion.status = "failed"; + try { + await addCompletions(completion, bearer, { + level: "error", + message: `Response processing error: ${errorMsg}`, + details: { + type: "completionError", + data: { type: "processingError", msg: errorMsg }, + }, + }); + } catch (logError: unknown) { + logger.error("Failed to log completion after processing error", logError); + } } set.status = 500; return { error: "Failed to process response" }; diff --git a/backend/src/api/v1/messages.ts b/backend/src/api/v1/messages.ts index e17c957..9eb6a36 100644 --- a/backend/src/api/v1/messages.ts +++ b/backend/src/api/v1/messages.ts @@ -253,7 +253,23 @@ async function* processStreamingResponse( ctx.recordTTFT(); // Collect content for completion record (always, even if client aborted) - if (chunk.type === "content_block_delta") { + if (chunk.type === "content_block_start") { + // Track new tool call block + if (chunk.contentBlock?.type === "tool_use") { + const toolId = chunk.contentBlock.id; + const index = chunk.index ?? ctx.nextToolCallIndex++; + ctx.indexToIdMap.set(index, toolId); + ctx.streamToolCalls.set(toolId, { + id: toolId, + type: "function", + function: { + name: chunk.contentBlock.name, + arguments: "", + }, + }); + ctx.toolCallArguments.set(toolId, []); + } + } else if (chunk.type === "content_block_delta") { if (chunk.delta?.type === "text_delta" && chunk.delta.text) { ctx.textParts.push(chunk.delta.text); } else if ( @@ -261,6 +277,33 @@ async function* processStreamingResponse( chunk.delta.thinking ) { ctx.thinkingParts.push(chunk.delta.thinking); + } else if (chunk.delta?.type === "input_json_delta" && chunk.delta.partialJson) { + // Collect tool call arguments - lookup by index to get tool ID + // Skip if index is missing to avoid data corruption + if (chunk.index !== undefined) { + const toolId = ctx.indexToIdMap.get(chunk.index); + if (toolId) { + const args = ctx.toolCallArguments.get(toolId); + if (args) { + args.push(chunk.delta.partialJson); + } + } + } else { + logger.warn("Received input_json_delta without index, skipping"); + } + } + } else if (chunk.type === "content_block_stop") { + // Finalize tool call arguments - lookup by index to get tool ID + // Skip if index is missing to avoid data corruption + if (chunk.index !== undefined) { + const toolId = ctx.indexToIdMap.get(chunk.index); + if (toolId) { + const toolCall = ctx.streamToolCalls.get(toolId); + const args = ctx.toolCallArguments.get(toolId); + if (toolCall && args) { + toolCall.function.arguments = args.join(""); + } + } } } @@ -567,37 +610,43 @@ export const messagesApi = new Elysia({ } catch (error) { // Handle error based on whether client aborted const errorMsg = error instanceof Error ? error.message : String(error); + // Only save if completion wasn't already saved in processNonStreamingResponse + const alreadySaved = completion.status !== "pending"; if (request.signal.aborted) { - // Client disconnected - save as aborted - completion.status = "aborted"; - try { - await addCompletions(completion, bearer, { - level: "info", - message: "Client disconnected during non-streaming response", - details: { - type: "completionError", - data: { type: "aborted", msg: errorMsg }, - }, - }); - } catch (logError: unknown) { - logger.error("Failed to log aborted completion after processing error", logError); + // Client disconnected - save as aborted (if not already saved) + if (!alreadySaved) { + completion.status = "aborted"; + try { + await addCompletions(completion, bearer, { + level: "info", + message: "Client disconnected during non-streaming response", + details: { + type: "completionError", + data: { type: "aborted", msg: errorMsg }, + }, + }); + } catch (logError: unknown) { + logger.error("Failed to log aborted completion after processing error", logError); + } } // Return nothing for aborted requests return; } else { logger.error("Failed to process response", error); - completion.status = "failed"; - try { - await addCompletions(completion, bearer, { - level: "error", - message: `Response processing error: ${errorMsg}`, - details: { - type: "completionError", - data: { type: "processingError", msg: errorMsg }, - }, - }); - } catch (logError: unknown) { - logger.error("Failed to log completion after processing error", logError); + if (!alreadySaved) { + completion.status = "failed"; + try { + await addCompletions(completion, bearer, { + level: "error", + message: `Response processing error: ${errorMsg}`, + details: { + type: "completionError", + data: { type: "processingError", msg: errorMsg }, + }, + }); + } catch (logError: unknown) { + logger.error("Failed to log completion after processing error", logError); + } } set.status = 500; return { diff --git a/backend/src/api/v1/responses.ts b/backend/src/api/v1/responses.ts index 19ed929..d86b44f 100644 --- a/backend/src/api/v1/responses.ts +++ b/backend/src/api/v1/responses.ts @@ -264,7 +264,23 @@ async function* processStreamingResponse( ctx.recordTTFT(); // Collect content for completion record (always, even if client aborted) - if (chunk.type === "content_block_delta") { + if (chunk.type === "content_block_start") { + // Track new tool call block + if (chunk.contentBlock?.type === "tool_use") { + const toolId = chunk.contentBlock.id; + const index = chunk.index ?? ctx.nextToolCallIndex++; + ctx.indexToIdMap.set(index, toolId); + ctx.streamToolCalls.set(toolId, { + id: toolId, + type: "function", + function: { + name: chunk.contentBlock.name, + arguments: "", + }, + }); + ctx.toolCallArguments.set(toolId, []); + } + } else if (chunk.type === "content_block_delta") { if (chunk.delta?.type === "text_delta" && chunk.delta.text) { ctx.textParts.push(chunk.delta.text); } else if ( @@ -272,6 +288,33 @@ async function* processStreamingResponse( chunk.delta.thinking ) { ctx.thinkingParts.push(chunk.delta.thinking); + } else if (chunk.delta?.type === "input_json_delta" && chunk.delta.partialJson) { + // Collect tool call arguments - lookup by index to get tool ID + // Skip if index is missing to avoid data corruption + if (chunk.index !== undefined) { + const toolId = ctx.indexToIdMap.get(chunk.index); + if (toolId) { + const args = ctx.toolCallArguments.get(toolId); + if (args) { + args.push(chunk.delta.partialJson); + } + } + } else { + logger.warn("Received input_json_delta without index, skipping"); + } + } + } else if (chunk.type === "content_block_stop") { + // Finalize tool call arguments - lookup by index to get tool ID + // Skip if index is missing to avoid data corruption + if (chunk.index !== undefined) { + const toolId = ctx.indexToIdMap.get(chunk.index); + if (toolId) { + const toolCall = ctx.streamToolCalls.get(toolId); + const args = ctx.toolCallArguments.get(toolId); + if (toolCall && args) { + toolCall.function.arguments = args.join(""); + } + } } } @@ -583,37 +626,43 @@ export const responsesApi = new Elysia({ } catch (error) { // Handle error based on whether client aborted const errorMsg = error instanceof Error ? error.message : String(error); + // Only save if completion wasn't already saved in processNonStreamingResponse + const alreadySaved = completion.status !== "pending"; if (request.signal.aborted) { - // Client disconnected - save as aborted - completion.status = "aborted"; - try { - await addCompletions(completion, bearer, { - level: "info", - message: "Client disconnected during non-streaming response", - details: { - type: "completionError", - data: { type: "aborted", msg: errorMsg }, - }, - }); - } catch (logError: unknown) { - logger.error("Failed to log aborted completion after processing error", logError); + // Client disconnected - save as aborted (if not already saved) + if (!alreadySaved) { + completion.status = "aborted"; + try { + await addCompletions(completion, bearer, { + level: "info", + message: "Client disconnected during non-streaming response", + details: { + type: "completionError", + data: { type: "aborted", msg: errorMsg }, + }, + }); + } catch (logError: unknown) { + logger.error("Failed to log aborted completion after processing error", logError); + } } // Return nothing for aborted requests return; } else { logger.error("Failed to process response", error); - completion.status = "failed"; - try { - await addCompletions(completion, bearer, { - level: "error", - message: `Response processing error: ${errorMsg}`, - details: { - type: "completionError", - data: { type: "processingError", msg: errorMsg }, - }, - }); - } catch (logError: unknown) { - logger.error("Failed to log completion after processing error", logError); + if (!alreadySaved) { + completion.status = "failed"; + try { + await addCompletions(completion, bearer, { + level: "error", + message: `Response processing error: ${errorMsg}`, + details: { + type: "completionError", + data: { type: "processingError", msg: errorMsg }, + }, + }); + } catch (logError: unknown) { + logger.error("Failed to log completion after processing error", logError); + } } set.status = 500; return {