diff --git a/backend/drizzle/0010_noisy_deathbird.sql b/backend/drizzle/0010_noisy_deathbird.sql new file mode 100644 index 0000000..71bbe04 --- /dev/null +++ b/backend/drizzle/0010_noisy_deathbird.sql @@ -0,0 +1 @@ +ALTER TYPE "public"."completions_status" ADD VALUE 'aborted'; \ No newline at end of file diff --git a/backend/drizzle/meta/0010_snapshot.json b/backend/drizzle/meta/0010_snapshot.json new file mode 100644 index 0000000..a39d245 --- /dev/null +++ b/backend/drizzle/meta/0010_snapshot.json @@ -0,0 +1,1002 @@ +{ + "id": "6be63fd9-50d1-4d49-acea-e53401c54b2f", + "prevId": "1fcc8e66-5eaf-433f-9537-89ea8601aa65", + "version": "7", + "dialect": "postgresql", + "tables": { + "public.api_keys": { + "name": "api_keys", + "schema": "", + "columns": { + "id": { + "name": "id", + "type": "integer", + "primaryKey": true, + "notNull": true, + "identity": { + "type": "always", + "name": "api_keys_id_seq", + "schema": "public", + "increment": "1", + "startWith": "1", + "minValue": "1", + "maxValue": "2147483647", + "cache": "1", + "cycle": false + } + }, + "key": { + "name": "key", + "type": "varchar(63)", + "primaryKey": false, + "notNull": true + }, + "comment": { + "name": "comment", + "type": "varchar", + "primaryKey": false, + "notNull": false + }, + "created_at": { + "name": "created_at", + "type": "timestamp", + "primaryKey": false, + "notNull": true, + "default": "now()" + }, + "updated_at": { + "name": "updated_at", + "type": "timestamp", + "primaryKey": false, + "notNull": true, + "default": "now()" + }, + "expires_at": { + "name": "expires_at", + "type": "timestamp", + "primaryKey": false, + "notNull": false + }, + "last_seen": { + "name": "last_seen", + "type": "timestamp", + "primaryKey": false, + "notNull": false + }, + "revoked": { + "name": "revoked", + "type": "boolean", + "primaryKey": false, + "notNull": true, + "default": false + }, + "rpm_limit": { + "name": "rpm_limit", + "type": "integer", + "primaryKey": false, + "notNull": true, + "default": 50 + }, + "tpm_limit": { + "name": "tpm_limit", + "type": "integer", + "primaryKey": false, + "notNull": true, + "default": 50000 + }, + "external_id": { + "name": "external_id", + "type": "varchar(127)", + "primaryKey": false, + "notNull": false + }, + "source": { + "name": "source", + "type": "api_key_source", + "typeSchema": "public", + "primaryKey": false, + "notNull": false, + "default": "'manual'" + } + }, + "indexes": {}, + "foreignKeys": {}, + "compositePrimaryKeys": {}, + "uniqueConstraints": { + "api_keys_key_unique": { + "name": "api_keys_key_unique", + "nullsNotDistinct": false, + "columns": [ + "key" + ] + }, + "api_keys_external_id_unique": { + "name": "api_keys_external_id_unique", + "nullsNotDistinct": false, + "columns": [ + "external_id" + ] + } + }, + "policies": {}, + "checkConstraints": {}, + "isRLSEnabled": false + }, + "public.completions": { + "name": "completions", + "schema": "", + "columns": { + "id": { + "name": "id", + "type": "integer", + "primaryKey": true, + "notNull": true, + "identity": { + "type": "always", + "name": "completions_id_seq", + "schema": "public", + "increment": "1", + "startWith": "1", + "minValue": "1", + "maxValue": "2147483647", + "cache": "1", + "cycle": false + } + }, + "api_key_id": { + "name": "api_key_id", + "type": "integer", + "primaryKey": false, + "notNull": true + }, + "upstream_id": { + "name": "upstream_id", + "type": "integer", + "primaryKey": false, + "notNull": false + }, + "model_id": { + "name": "model_id", + "type": "integer", + "primaryKey": false, + "notNull": false + }, + "model": { + "name": "model", + "type": "varchar", + "primaryKey": false, + "notNull": true + }, + "prompt": { + "name": "prompt", + "type": "jsonb", + "primaryKey": false, + "notNull": true + }, + "prompt_tokens": { + "name": "prompt_tokens", + "type": "integer", + "primaryKey": false, + "notNull": true + }, + "completion": { + "name": "completion", + "type": "jsonb", + "primaryKey": false, + "notNull": true + }, + "completion_tokens": { + "name": "completion_tokens", + "type": "integer", + "primaryKey": false, + "notNull": true + }, + "status": { + "name": "status", + "type": "completions_status", + "typeSchema": "public", + "primaryKey": false, + "notNull": true, + "default": "'pending'" + }, + "ttft": { + "name": "ttft", + "type": "integer", + "primaryKey": false, + "notNull": true + }, + "duration": { + "name": "duration", + "type": "integer", + "primaryKey": false, + "notNull": true + }, + "created_at": { + "name": "created_at", + "type": "timestamp", + "primaryKey": false, + "notNull": true, + "default": "now()" + }, + "updated_at": { + "name": "updated_at", + "type": "timestamp", + "primaryKey": false, + "notNull": true, + "default": "now()" + }, + "deleted": { + "name": "deleted", + "type": "boolean", + "primaryKey": false, + "notNull": true, + "default": false + }, + "rating": { + "name": "rating", + "type": "real", + "primaryKey": false, + "notNull": false + } + }, + "indexes": {}, + "foreignKeys": { + "completions_api_key_id_api_keys_id_fk": { + "name": "completions_api_key_id_api_keys_id_fk", + "tableFrom": "completions", + "tableTo": "api_keys", + "columnsFrom": [ + "api_key_id" + ], + "columnsTo": [ + "id" + ], + "onDelete": "no action", + "onUpdate": "no action" + }, + "completions_upstream_id_upstreams_id_fk": { + "name": "completions_upstream_id_upstreams_id_fk", + "tableFrom": "completions", + "tableTo": "upstreams", + "columnsFrom": [ + "upstream_id" + ], + "columnsTo": [ + "id" + ], + "onDelete": "no action", + "onUpdate": "no action" + } + }, + "compositePrimaryKeys": {}, + "uniqueConstraints": { + "completions_id_unique": { + "name": "completions_id_unique", + "nullsNotDistinct": false, + "columns": [ + "id" + ] + } + }, + "policies": {}, + "checkConstraints": {}, + "isRLSEnabled": false + }, + "public.embeddings": { + "name": "embeddings", + "schema": "", + "columns": { + "id": { + "name": "id", + "type": "integer", + "primaryKey": true, + "notNull": true, + "identity": { + "type": "always", + "name": "embeddings_id_seq", + "schema": "public", + "increment": "1", + "startWith": "1", + "minValue": "1", + "maxValue": "2147483647", + "cache": "1", + "cycle": false + } + }, + "api_key_id": { + "name": "api_key_id", + "type": "integer", + "primaryKey": false, + "notNull": true + }, + "model_id": { + "name": "model_id", + "type": "integer", + "primaryKey": false, + "notNull": false + }, + "model": { + "name": "model", + "type": "varchar", + "primaryKey": false, + "notNull": true + }, + "input": { + "name": "input", + "type": "jsonb", + "primaryKey": false, + "notNull": true + }, + "input_tokens": { + "name": "input_tokens", + "type": "integer", + "primaryKey": false, + "notNull": true + }, + "embedding": { + "name": "embedding", + "type": "jsonb", + "primaryKey": false, + "notNull": true + }, + "dimensions": { + "name": "dimensions", + "type": "integer", + "primaryKey": false, + "notNull": true + }, + "status": { + "name": "status", + "type": "completions_status", + "typeSchema": "public", + "primaryKey": false, + "notNull": true, + "default": "'pending'" + }, + "duration": { + "name": "duration", + "type": "integer", + "primaryKey": false, + "notNull": true + }, + "created_at": { + "name": "created_at", + "type": "timestamp", + "primaryKey": false, + "notNull": true, + "default": "now()" + }, + "updated_at": { + "name": "updated_at", + "type": "timestamp", + "primaryKey": false, + "notNull": true, + "default": "now()" + }, + "deleted": { + "name": "deleted", + "type": "boolean", + "primaryKey": false, + "notNull": true, + "default": false + } + }, + "indexes": {}, + "foreignKeys": { + "embeddings_api_key_id_api_keys_id_fk": { + "name": "embeddings_api_key_id_api_keys_id_fk", + "tableFrom": "embeddings", + "tableTo": "api_keys", + "columnsFrom": [ + "api_key_id" + ], + "columnsTo": [ + "id" + ], + "onDelete": "no action", + "onUpdate": "no action" + }, + "embeddings_model_id_models_id_fk": { + "name": "embeddings_model_id_models_id_fk", + "tableFrom": "embeddings", + "tableTo": "models", + "columnsFrom": [ + "model_id" + ], + "columnsTo": [ + "id" + ], + "onDelete": "no action", + "onUpdate": "no action" + } + }, + "compositePrimaryKeys": {}, + "uniqueConstraints": { + "embeddings_id_unique": { + "name": "embeddings_id_unique", + "nullsNotDistinct": false, + "columns": [ + "id" + ] + } + }, + "policies": {}, + "checkConstraints": {}, + "isRLSEnabled": false + }, + "public.models": { + "name": "models", + "schema": "", + "columns": { + "id": { + "name": "id", + "type": "integer", + "primaryKey": true, + "notNull": true, + "identity": { + "type": "always", + "name": "models_id_seq", + "schema": "public", + "increment": "1", + "startWith": "1", + "minValue": "1", + "maxValue": "2147483647", + "cache": "1", + "cycle": false + } + }, + "provider_id": { + "name": "provider_id", + "type": "integer", + "primaryKey": false, + "notNull": true + }, + "system_name": { + "name": "system_name", + "type": "varchar(63)", + "primaryKey": false, + "notNull": true + }, + "remote_id": { + "name": "remote_id", + "type": "varchar(63)", + "primaryKey": false, + "notNull": false + }, + "model_type": { + "name": "model_type", + "type": "model_type", + "typeSchema": "public", + "primaryKey": false, + "notNull": true, + "default": "'chat'" + }, + "context_length": { + "name": "context_length", + "type": "integer", + "primaryKey": false, + "notNull": false + }, + "input_price": { + "name": "input_price", + "type": "real", + "primaryKey": false, + "notNull": false + }, + "output_price": { + "name": "output_price", + "type": "real", + "primaryKey": false, + "notNull": false + }, + "weight": { + "name": "weight", + "type": "real", + "primaryKey": false, + "notNull": true, + "default": 1 + }, + "comment": { + "name": "comment", + "type": "varchar", + "primaryKey": false, + "notNull": false + }, + "created_at": { + "name": "created_at", + "type": "timestamp", + "primaryKey": false, + "notNull": true, + "default": "now()" + }, + "updated_at": { + "name": "updated_at", + "type": "timestamp", + "primaryKey": false, + "notNull": true, + "default": "now()" + }, + "deleted": { + "name": "deleted", + "type": "boolean", + "primaryKey": false, + "notNull": true, + "default": false + } + }, + "indexes": {}, + "foreignKeys": { + "models_provider_id_providers_id_fk": { + "name": "models_provider_id_providers_id_fk", + "tableFrom": "models", + "tableTo": "providers", + "columnsFrom": [ + "provider_id" + ], + "columnsTo": [ + "id" + ], + "onDelete": "no action", + "onUpdate": "no action" + } + }, + "compositePrimaryKeys": {}, + "uniqueConstraints": { + "models_provider_system_name_unique": { + "name": "models_provider_system_name_unique", + "nullsNotDistinct": false, + "columns": [ + "provider_id", + "system_name" + ] + } + }, + "policies": {}, + "checkConstraints": {}, + "isRLSEnabled": false + }, + "public.providers": { + "name": "providers", + "schema": "", + "columns": { + "id": { + "name": "id", + "type": "integer", + "primaryKey": true, + "notNull": true, + "identity": { + "type": "always", + "name": "providers_id_seq", + "schema": "public", + "increment": "1", + "startWith": "1", + "minValue": "1", + "maxValue": "2147483647", + "cache": "1", + "cycle": false + } + }, + "name": { + "name": "name", + "type": "varchar(63)", + "primaryKey": false, + "notNull": true + }, + "type": { + "name": "type", + "type": "provider_type", + "typeSchema": "public", + "primaryKey": false, + "notNull": true, + "default": "'openai'" + }, + "base_url": { + "name": "base_url", + "type": "varchar(255)", + "primaryKey": false, + "notNull": true + }, + "api_key": { + "name": "api_key", + "type": "varchar(255)", + "primaryKey": false, + "notNull": false + }, + "api_version": { + "name": "api_version", + "type": "varchar(31)", + "primaryKey": false, + "notNull": false + }, + "comment": { + "name": "comment", + "type": "varchar", + "primaryKey": false, + "notNull": false + }, + "created_at": { + "name": "created_at", + "type": "timestamp", + "primaryKey": false, + "notNull": true, + "default": "now()" + }, + "updated_at": { + "name": "updated_at", + "type": "timestamp", + "primaryKey": false, + "notNull": true, + "default": "now()" + }, + "deleted": { + "name": "deleted", + "type": "boolean", + "primaryKey": false, + "notNull": true, + "default": false + } + }, + "indexes": {}, + "foreignKeys": {}, + "compositePrimaryKeys": {}, + "uniqueConstraints": { + "providers_name_unique": { + "name": "providers_name_unique", + "nullsNotDistinct": false, + "columns": [ + "name" + ] + } + }, + "policies": {}, + "checkConstraints": {}, + "isRLSEnabled": false + }, + "public.settings": { + "name": "settings", + "schema": "", + "columns": { + "id": { + "name": "id", + "type": "integer", + "primaryKey": true, + "notNull": true, + "identity": { + "type": "always", + "name": "settings_id_seq", + "schema": "public", + "increment": "1", + "startWith": "1", + "minValue": "1", + "maxValue": "2147483647", + "cache": "1", + "cycle": false + } + }, + "key": { + "name": "key", + "type": "varchar(63)", + "primaryKey": false, + "notNull": true + }, + "value": { + "name": "value", + "type": "jsonb", + "primaryKey": false, + "notNull": true + }, + "updated_at": { + "name": "updated_at", + "type": "timestamp", + "primaryKey": false, + "notNull": true, + "default": "now()" + } + }, + "indexes": {}, + "foreignKeys": {}, + "compositePrimaryKeys": {}, + "uniqueConstraints": { + "settings_key_unique": { + "name": "settings_key_unique", + "nullsNotDistinct": false, + "columns": [ + "key" + ] + } + }, + "policies": {}, + "checkConstraints": {}, + "isRLSEnabled": false + }, + "public.srv_logs": { + "name": "srv_logs", + "schema": "", + "columns": { + "id": { + "name": "id", + "type": "integer", + "primaryKey": true, + "notNull": true, + "identity": { + "type": "always", + "name": "srv_logs_id_seq", + "schema": "public", + "increment": "1", + "startWith": "1", + "minValue": "1", + "maxValue": "2147483647", + "cache": "1", + "cycle": false + } + }, + "related_api_key_id": { + "name": "related_api_key_id", + "type": "integer", + "primaryKey": false, + "notNull": false + }, + "related_upstream_id": { + "name": "related_upstream_id", + "type": "integer", + "primaryKey": false, + "notNull": false + }, + "related_completion_id": { + "name": "related_completion_id", + "type": "integer", + "primaryKey": false, + "notNull": false + }, + "message": { + "name": "message", + "type": "varchar", + "primaryKey": false, + "notNull": true + }, + "level": { + "name": "level", + "type": "srv_logs_level", + "typeSchema": "public", + "primaryKey": false, + "notNull": true + }, + "details": { + "name": "details", + "type": "jsonb", + "primaryKey": false, + "notNull": false + }, + "created_at": { + "name": "created_at", + "type": "timestamp", + "primaryKey": false, + "notNull": true, + "default": "now()" + }, + "acknowledged": { + "name": "acknowledged", + "type": "boolean", + "primaryKey": false, + "notNull": true, + "default": false + }, + "ack_at": { + "name": "ack_at", + "type": "timestamp", + "primaryKey": false, + "notNull": false + } + }, + "indexes": {}, + "foreignKeys": { + "srv_logs_related_api_key_id_api_keys_id_fk": { + "name": "srv_logs_related_api_key_id_api_keys_id_fk", + "tableFrom": "srv_logs", + "tableTo": "api_keys", + "columnsFrom": [ + "related_api_key_id" + ], + "columnsTo": [ + "id" + ], + "onDelete": "no action", + "onUpdate": "no action" + }, + "srv_logs_related_upstream_id_upstreams_id_fk": { + "name": "srv_logs_related_upstream_id_upstreams_id_fk", + "tableFrom": "srv_logs", + "tableTo": "upstreams", + "columnsFrom": [ + "related_upstream_id" + ], + "columnsTo": [ + "id" + ], + "onDelete": "no action", + "onUpdate": "no action" + }, + "srv_logs_related_completion_id_completions_id_fk": { + "name": "srv_logs_related_completion_id_completions_id_fk", + "tableFrom": "srv_logs", + "tableTo": "completions", + "columnsFrom": [ + "related_completion_id" + ], + "columnsTo": [ + "id" + ], + "onDelete": "no action", + "onUpdate": "no action" + } + }, + "compositePrimaryKeys": {}, + "uniqueConstraints": { + "srv_logs_id_unique": { + "name": "srv_logs_id_unique", + "nullsNotDistinct": false, + "columns": [ + "id" + ] + } + }, + "policies": {}, + "checkConstraints": {}, + "isRLSEnabled": false + }, + "public.upstreams": { + "name": "upstreams", + "schema": "", + "columns": { + "id": { + "name": "id", + "type": "integer", + "primaryKey": true, + "notNull": true, + "identity": { + "type": "always", + "name": "upstreams_id_seq", + "schema": "public", + "increment": "1", + "startWith": "1", + "minValue": "1", + "maxValue": "2147483647", + "cache": "1", + "cycle": false + } + }, + "name": { + "name": "name", + "type": "varchar(63)", + "primaryKey": false, + "notNull": true + }, + "url": { + "name": "url", + "type": "varchar(255)", + "primaryKey": false, + "notNull": true + }, + "model": { + "name": "model", + "type": "varchar(63)", + "primaryKey": false, + "notNull": true + }, + "upstream_model": { + "name": "upstream_model", + "type": "varchar(63)", + "primaryKey": false, + "notNull": false + }, + "api_key": { + "name": "api_key", + "type": "varchar(255)", + "primaryKey": false, + "notNull": false + }, + "weight": { + "name": "weight", + "type": "real", + "primaryKey": false, + "notNull": true, + "default": 1 + }, + "comment": { + "name": "comment", + "type": "varchar", + "primaryKey": false, + "notNull": false + }, + "created_at": { + "name": "created_at", + "type": "timestamp", + "primaryKey": false, + "notNull": true, + "default": "now()" + }, + "updated_at": { + "name": "updated_at", + "type": "timestamp", + "primaryKey": false, + "notNull": true, + "default": "now()" + }, + "deleted": { + "name": "deleted", + "type": "boolean", + "primaryKey": false, + "notNull": true, + "default": false + } + }, + "indexes": {}, + "foreignKeys": {}, + "compositePrimaryKeys": {}, + "uniqueConstraints": {}, + "policies": {}, + "checkConstraints": {}, + "isRLSEnabled": false + } + }, + "enums": { + "public.api_key_source": { + "name": "api_key_source", + "schema": "public", + "values": [ + "manual", + "operator", + "init" + ] + }, + "public.completions_status": { + "name": "completions_status", + "schema": "public", + "values": [ + "pending", + "completed", + "failed", + "aborted" + ] + }, + "public.model_type": { + "name": "model_type", + "schema": "public", + "values": [ + "chat", + "embedding" + ] + }, + "public.provider_type": { + "name": "provider_type", + "schema": "public", + "values": [ + "openai", + "openai-responses", + "anthropic", + "azure", + "ollama" + ] + }, + "public.srv_logs_level": { + "name": "srv_logs_level", + "schema": "public", + "values": [ + "unspecific", + "info", + "warn", + "error" + ] + } + }, + "schemas": {}, + "sequences": {}, + "roles": {}, + "policies": {}, + "views": {}, + "_meta": { + "columns": {}, + "schemas": {}, + "tables": {} + } +} \ No newline at end of file diff --git a/backend/drizzle/meta/_journal.json b/backend/drizzle/meta/_journal.json index e31209d..9f2d7e8 100644 --- a/backend/drizzle/meta/_journal.json +++ b/backend/drizzle/meta/_journal.json @@ -71,6 +71,13 @@ "when": 1768653472259, "tag": "0009_api_key_external_id", "breakpoints": true + }, + { + "idx": 10, + "version": "7", + "when": 1769256296714, + "tag": "0010_noisy_deathbird", + "breakpoints": true } ] } \ No newline at end of file diff --git a/backend/src/api/v1/completions.ts b/backend/src/api/v1/completions.ts index 068829e..97df736 100644 --- a/backend/src/api/v1/completions.ts +++ b/backend/src/api/v1/completions.ts @@ -18,7 +18,6 @@ import { rateLimitPlugin } from "@/plugins/rateLimitPlugin"; import type { CompletionsMessageType, ToolDefinitionType, - ToolCallType, ToolChoiceType, } from "@/db/schema"; import { @@ -31,6 +30,7 @@ import { PROVIDER_HEADER, } from "@/utils/api-helpers"; import { addCompletions, type Completion } from "@/utils/completions"; +import { StreamingContext } from "@/utils/streaming-context"; import { executeWithFailover, selectMultipleCandidates, @@ -162,6 +162,7 @@ function buildCompletionRecord( /** * Process a successful non-streaming response + * Ensures completion is saved to database before returning */ async function processNonStreamingResponse( resp: Response, @@ -170,6 +171,7 @@ async function processNonStreamingResponse( providerType: string, apiKeyRecord: ApiKey | null, begin: number, + signal?: AbortSignal, ): Promise { // Parse response using upstream adapter const upstreamAdapter = getUpstreamAdapter(providerType); @@ -182,7 +184,6 @@ async function processNonStreamingResponse( // Update completion record completion.promptTokens = internalResponse.usage.inputTokens; completion.completionTokens = internalResponse.usage.outputTokens; - completion.status = "completed"; completion.ttft = Date.now() - begin; completion.duration = Date.now() - begin; @@ -195,9 +196,23 @@ async function processNonStreamingResponse( tool_calls: toolCalls, }, ]; - addCompletions(completion, bearer).catch(() => { - logger.error("Failed to log completion after non-streaming"); - }); + + // Check if client disconnected during processing + if (signal?.aborted) { + completion.status = "aborted"; + await addCompletions(completion, bearer, { + level: "info", + message: "Client disconnected during non-streaming response", + details: { + type: "completionError", + data: { type: "aborted", msg: "Client disconnected" }, + }, + }); + } else { + completion.status = "completed"; + // Use await to ensure database write completes before returning + await addCompletions(completion, bearer); + } // Consume tokens for TPM rate limiting (post-flight) // Only consume if token counts are valid (not -1 which indicates parsing failure) @@ -211,6 +226,7 @@ async function processNonStreamingResponse( /** * Process a successful streaming response + * Uses StreamingContext to ensure completion is saved even on client disconnect * @yields string chunks in OpenAI format */ async function* processStreamingResponse( @@ -220,6 +236,7 @@ async function* processStreamingResponse( providerType: string, apiKeyRecord: ApiKey | null, begin: number, + signal?: AbortSignal, ): AsyncGenerator { // Get adapters const upstreamAdapter = getUpstreamAdapter(providerType); @@ -227,38 +244,35 @@ async function* processStreamingResponse( logger.debug("parse stream completions response"); - let ttft = -1; - let isFirstChunk = true; - const textParts: string[] = []; - const thinkingParts: string[] = []; - let inputTokens = -1; - let outputTokens = -1; - - // Track tool calls during streaming - // Use Map with tool call ID as key to avoid index collision issues - const streamToolCalls: Map = new Map(); - const toolCallArguments: Map = new Map(); - // Track index-to-id mapping for chunks that only provide index - const indexToIdMap: Map = new Map(); - let nextToolCallIndex = 0; + // Create streaming context with abort handling + const ctx = new StreamingContext(completion, bearer, apiKeyRecord, begin, signal); + + // Track whether we've logged the client abort (to avoid duplicate logs) + let loggedAbort = false; try { const chunks = upstreamAdapter.parseStreamResponse(resp); for await (const chunk of chunks) { - if (isFirstChunk) { - isFirstChunk = false; - ttft = Date.now() - begin; + // Check if client has disconnected (but continue processing to collect all data) + const clientAborted = ctx.isAborted(); + + // Log client disconnect once when first detected + if (clientAborted && !loggedAbort) { + loggedAbort = true; + logger.info("Client disconnected during streaming, continuing to collect upstream data"); } - // Collect content for completion record + ctx.recordTTFT(); + + // Collect content for completion record (always, even if client aborted) if (chunk.type === "content_block_start") { // Track new tool call block if (chunk.contentBlock?.type === "tool_use") { const toolId = chunk.contentBlock.id; - const index = chunk.index ?? nextToolCallIndex++; - indexToIdMap.set(index, toolId); - streamToolCalls.set(toolId, { + const index = chunk.index ?? ctx.nextToolCallIndex++; + ctx.indexToIdMap.set(index, toolId); + ctx.streamToolCalls.set(toolId, { id: toolId, type: "function", function: { @@ -266,23 +280,23 @@ async function* processStreamingResponse( arguments: "", }, }); - toolCallArguments.set(toolId, []); + ctx.toolCallArguments.set(toolId, []); } } else if (chunk.type === "content_block_delta") { if (chunk.delta?.type === "text_delta" && chunk.delta.text) { - textParts.push(chunk.delta.text); + ctx.textParts.push(chunk.delta.text); } else if ( chunk.delta?.type === "thinking_delta" && chunk.delta.thinking ) { - thinkingParts.push(chunk.delta.thinking); + ctx.thinkingParts.push(chunk.delta.thinking); } else if (chunk.delta?.type === "input_json_delta" && chunk.delta.partialJson) { // Collect tool call arguments - lookup by index to get tool ID // Skip if index is missing to avoid data corruption if (chunk.index !== undefined) { - const toolId = indexToIdMap.get(chunk.index); + const toolId = ctx.indexToIdMap.get(chunk.index); if (toolId) { - const args = toolCallArguments.get(toolId); + const args = ctx.toolCallArguments.get(toolId); if (args) { args.push(chunk.delta.partialJson); } @@ -295,10 +309,10 @@ async function* processStreamingResponse( // Finalize tool call arguments - lookup by index to get tool ID // Skip if index is missing to avoid data corruption if (chunk.index !== undefined) { - const toolId = indexToIdMap.get(chunk.index); + const toolId = ctx.indexToIdMap.get(chunk.index); if (toolId) { - const toolCall = streamToolCalls.get(toolId); - const args = toolCallArguments.get(toolId); + const toolCall = ctx.streamToolCalls.get(toolId); + const args = ctx.toolCallArguments.get(toolId); if (toolCall && args) { toolCall.function.arguments = args.join(""); } @@ -308,74 +322,60 @@ async function* processStreamingResponse( // Collect usage info if (chunk.usage) { - inputTokens = chunk.usage.inputTokens; - outputTokens = chunk.usage.outputTokens; + ctx.inputTokens = chunk.usage.inputTokens; + ctx.outputTokens = chunk.usage.outputTokens; } - // Convert to OpenAI format and yield - const serialized = responseAdapter.serializeStreamChunk(chunk); - if (serialized) { - yield serialized; - } + // Only yield to client if not aborted (client is still listening) + if (!clientAborted) { + // Convert to OpenAI format and yield + const serialized = responseAdapter.serializeStreamChunk(chunk); + if (serialized) { + yield serialized; + } - // Handle message_stop - if (chunk.type === "message_stop") { - yield responseAdapter.getDoneMarker(); + // Handle message_stop + if (chunk.type === "message_stop") { + yield responseAdapter.getDoneMarker(); + } } } - // Collect final tool calls - const finalToolCalls: ToolCallType[] | undefined = - streamToolCalls.size > 0 - ? Array.from(streamToolCalls.values()) - : undefined; - - // Finalize completion record - const contentText = - (thinkingParts.length > 0 - ? `${thinkingParts.join("")}\n` - : "") + textParts.join(""); - - completion.completion = [ - { - role: "assistant", - content: contentText || null, - tool_calls: finalToolCalls, - }, - ]; - completion.promptTokens = inputTokens; - completion.completionTokens = outputTokens; - completion.status = "completed"; - completion.ttft = ttft; - completion.duration = Date.now() - begin; - addCompletions(completion, bearer).catch((error: unknown) => { - logger.error("Failed to log completion after streaming", error); - }); - - // Consume tokens for TPM rate limiting (post-flight) - if (apiKeyRecord && inputTokens > 0 && outputTokens > 0) { - const totalTokens = inputTokens + outputTokens; - await consumeTokens(apiKeyRecord.id, apiKeyRecord.tpmLimit, totalTokens); - } - // Handle case where no chunks were received - if (isFirstChunk) { + if (ctx.isFirstChunk) { throw new Error("No chunk received from upstream"); } + + // Save completion with appropriate status + if (!ctx.isSaved()) { + if (ctx.isAborted()) { + await ctx.saveCompletion("aborted", "Client disconnected"); + } else { + await ctx.saveCompletion("completed"); + } + } } catch (error) { - logger.error("Stream processing error", error); - completion.status = "failed"; - addCompletions(completion, bearer, { - level: "error", - message: `Stream processing error: ${String(error)}`, - details: { - type: "completionError", - data: { type: "streamError", msg: String(error) }, - }, - }).catch(() => { - logger.error("Failed to log completion after stream processing error"); - }); - throw error; + // Only log error if not due to client abort + if (!ctx.isAborted()) { + logger.error("Stream processing error", error); + } + + // Save failed completion + if (!ctx.isSaved()) { + if (ctx.isAborted()) { + // If client aborted and we got an error, still save as aborted with the error info + await ctx.saveCompletion("aborted", `Client disconnected, stream error: ${String(error)}`); + } else { + await ctx.saveCompletion("failed", String(error)); + } + } + + // Only re-throw if client is still connected + if (!ctx.isAborted()) { + throw error; + } + } finally { + ctx.cleanup(); } } @@ -401,8 +401,7 @@ export const completionsApi = new Elysia({ async function* ({ body, set, bearer, request, apiKeyRecord }) { if (bearer === undefined) { set.status = 500; - yield JSON.stringify({ error: "Internal server error" }); - return; + return { error: "Internal server error" }; } const reqHeaders = request.headers; @@ -423,14 +422,13 @@ export const completionsApi = new Elysia({ // Check if model exists if (modelsWithProviders.length === 0) { set.status = 404; - yield JSON.stringify({ + return { error: { message: `Model '${systemName}' not found`, type: "invalid_request_error", code: "model_not_found", }, - }); - return; + }; } // Filter candidates by target provider (if specified) @@ -441,14 +439,13 @@ export const completionsApi = new Elysia({ if (filteredCandidates.length === 0) { set.status = 404; - yield JSON.stringify({ + return { error: { message: `No available provider for model '${systemName}'`, type: "invalid_request_error", code: "no_provider", }, - }); - return; + }; } // Select candidates for failover (weighted random order) @@ -487,12 +484,10 @@ export const completionsApi = new Elysia({ // Handle streaming vs non-streaming if (internalRequest.stream) { + // Streaming request - use yield for streaming responses if (body.n && body.n > 1) { set.status = 400; - yield JSON.stringify({ - error: "Stream completions with n > 1 is not supported", - }); - return; + return { error: "Stream completions with n > 1 is not supported" }; } // For streaming, use failover only for connection establishment @@ -517,31 +512,27 @@ export const completionsApi = new Elysia({ if (errorResult.type === "upstream_error") { set.status = errorResult.status; - yield errorResult.body; - return; + return JSON.parse(errorResult.body) as Record; } set.status = 502; - yield JSON.stringify({ + return { error: { message: "All upstream providers failed", type: "upstream_error", code: "all_providers_failed", }, - }); - return; + }; } if (!result.response || !result.provider) { set.status = 500; - yield JSON.stringify({ error: "Internal server error" }); - return; + return { error: "Internal server error" }; } if (!result.response.body) { set.status = 500; - yield JSON.stringify({ error: "No body in response" }); - return; + return { error: "No body in response" }; } const providerType = result.provider.provider.type || "openai"; @@ -563,14 +554,18 @@ export const completionsApi = new Elysia({ providerType, apiKeyRecord ?? null, begin, + request.signal, ); } catch (error) { - logger.error("Stream processing error", error); - set.status = 500; - yield JSON.stringify({ error: "Stream processing error" }); + // Don't log error if it's due to client abort + if (!request.signal.aborted) { + logger.error("Stream processing error", error); + set.status = 500; + yield JSON.stringify({ error: "Stream processing error" }); + } } } else { - // Non-streaming request with failover + // Non-streaming request - use return for normal JSON response const result = await executeWithFailover( candidates, buildRequestForProvider, @@ -592,25 +587,22 @@ export const completionsApi = new Elysia({ if (errorResult.type === "upstream_error") { set.status = errorResult.status; - yield errorResult.body; - return; + return JSON.parse(errorResult.body) as Record; } set.status = 502; - yield JSON.stringify({ + return { error: { message: "All upstream providers failed", type: "upstream_error", code: "all_providers_failed", }, - }); - return; + }; } if (!result.response || !result.provider) { set.status = 500; - yield JSON.stringify({ error: "Internal server error" }); - return; + return { error: "Internal server error" }; } const providerType = result.provider.provider.type || "openai"; @@ -632,23 +624,54 @@ export const completionsApi = new Elysia({ providerType, apiKeyRecord ?? null, begin, + request.signal, ); - yield response; + // Return parsed JSON object for proper content-type + return JSON.parse(response) as Record; } catch (error) { - logger.error("Failed to process response", error); - completion.status = "failed"; - addCompletions(completion, bearer, { - level: "error", - message: `Response processing error: ${String(error)}`, - details: { - type: "completionError", - data: { type: "processingError", msg: String(error) }, - }, - }).catch(() => { - logger.error("Failed to log completion after processing error"); - }); - set.status = 500; - yield JSON.stringify({ error: "Failed to process response" }); + // Handle error based on whether client aborted + const errorMsg = error instanceof Error ? error.message : String(error); + // Only save if completion wasn't already saved in processNonStreamingResponse + const alreadySaved = completion.status !== "pending"; + if (request.signal.aborted) { + // Client disconnected - save as aborted (if not already saved) + if (!alreadySaved) { + completion.status = "aborted"; + try { + await addCompletions(completion, bearer, { + level: "info", + message: "Client disconnected during non-streaming response", + details: { + type: "completionError", + data: { type: "aborted", msg: errorMsg }, + }, + }); + } catch (logError: unknown) { + logger.error("Failed to log aborted completion after processing error", logError); + } + } + // Return nothing for aborted requests + return; + } else { + logger.error("Failed to process response", error); + if (!alreadySaved) { + completion.status = "failed"; + try { + await addCompletions(completion, bearer, { + level: "error", + message: `Response processing error: ${errorMsg}`, + details: { + type: "completionError", + data: { type: "processingError", msg: errorMsg }, + }, + }); + } catch (logError: unknown) { + logger.error("Failed to log completion after processing error", logError); + } + } + set.status = 500; + return { error: "Failed to process response" }; + } } } }, diff --git a/backend/src/api/v1/messages.ts b/backend/src/api/v1/messages.ts index 9b93e66..9eb6a36 100644 --- a/backend/src/api/v1/messages.ts +++ b/backend/src/api/v1/messages.ts @@ -24,6 +24,7 @@ import { PROVIDER_HEADER, } from "@/utils/api-helpers"; import { addCompletions, type Completion } from "@/utils/completions"; +import { StreamingContext } from "@/utils/streaming-context"; import { executeWithFailover, selectMultipleCandidates, @@ -153,6 +154,7 @@ function buildCompletionRecord( /** * Process a successful non-streaming message response + * Ensures completion is saved to database before returning */ async function processNonStreamingResponse( resp: Response, @@ -161,6 +163,7 @@ async function processNonStreamingResponse( providerType: string, apiKeyRecord: ApiKey | null, begin: number, + signal?: AbortSignal, ): Promise { // Parse response using upstream adapter const upstreamAdapter = getUpstreamAdapter(providerType); @@ -173,7 +176,6 @@ async function processNonStreamingResponse( // Update completion record completion.promptTokens = internalResponse.usage.inputTokens; completion.completionTokens = internalResponse.usage.outputTokens; - completion.status = "completed"; completion.ttft = Date.now() - begin; completion.duration = Date.now() - begin; completion.completion = [ @@ -182,9 +184,22 @@ async function processNonStreamingResponse( content: extractContentText(internalResponse), }, ]; - addCompletions(completion, bearer).catch(() => { - logger.error("Failed to log completion after non-streaming"); - }); + + // Check if client disconnected during processing + if (signal?.aborted) { + completion.status = "aborted"; + await addCompletions(completion, bearer, { + level: "info", + message: "Client disconnected during non-streaming response", + details: { + type: "completionError", + data: { type: "aborted", msg: "Client disconnected" }, + }, + }); + } else { + completion.status = "completed"; + await addCompletions(completion, bearer); + } // Consume tokens for TPM rate limiting (post-flight) // Only consume if token counts are valid (not -1 which indicates parsing failure) @@ -198,6 +213,7 @@ async function processNonStreamingResponse( /** * Process a successful streaming message response + * Uses StreamingContext to ensure completion is saved even on client disconnect * @yields string - SSE formatted string chunks */ async function* processStreamingResponse( @@ -207,6 +223,7 @@ async function* processStreamingResponse( providerType: string, apiKeyRecord: ApiKey | null, begin: number, + signal?: AbortSignal, ): AsyncGenerator { // Get adapters const upstreamAdapter = getUpstreamAdapter(providerType); @@ -214,90 +231,133 @@ async function* processStreamingResponse( logger.debug("parse stream messages response"); - let ttft = -1; - let isFirstChunk = true; - const textParts: string[] = []; - const thinkingParts: string[] = []; - let inputTokens = -1; - let outputTokens = -1; + // Create streaming context with abort handling + const ctx = new StreamingContext(completion, bearer, apiKeyRecord, begin, signal); + + // Track whether we've logged the client abort (to avoid duplicate logs) + let loggedAbort = false; try { const chunks = upstreamAdapter.parseStreamResponse(resp); for await (const chunk of chunks) { - if (isFirstChunk) { - isFirstChunk = false; - ttft = Date.now() - begin; + // Check if client has disconnected (but continue processing to collect all data) + const clientAborted = ctx.isAborted(); + + // Log client disconnect once when first detected + if (clientAborted && !loggedAbort) { + loggedAbort = true; + logger.info("Client disconnected during streaming, continuing to collect upstream data"); } - // Collect content for completion record - if (chunk.type === "content_block_delta") { + ctx.recordTTFT(); + + // Collect content for completion record (always, even if client aborted) + if (chunk.type === "content_block_start") { + // Track new tool call block + if (chunk.contentBlock?.type === "tool_use") { + const toolId = chunk.contentBlock.id; + const index = chunk.index ?? ctx.nextToolCallIndex++; + ctx.indexToIdMap.set(index, toolId); + ctx.streamToolCalls.set(toolId, { + id: toolId, + type: "function", + function: { + name: chunk.contentBlock.name, + arguments: "", + }, + }); + ctx.toolCallArguments.set(toolId, []); + } + } else if (chunk.type === "content_block_delta") { if (chunk.delta?.type === "text_delta" && chunk.delta.text) { - textParts.push(chunk.delta.text); + ctx.textParts.push(chunk.delta.text); } else if ( chunk.delta?.type === "thinking_delta" && chunk.delta.thinking ) { - thinkingParts.push(chunk.delta.thinking); + ctx.thinkingParts.push(chunk.delta.thinking); + } else if (chunk.delta?.type === "input_json_delta" && chunk.delta.partialJson) { + // Collect tool call arguments - lookup by index to get tool ID + // Skip if index is missing to avoid data corruption + if (chunk.index !== undefined) { + const toolId = ctx.indexToIdMap.get(chunk.index); + if (toolId) { + const args = ctx.toolCallArguments.get(toolId); + if (args) { + args.push(chunk.delta.partialJson); + } + } + } else { + logger.warn("Received input_json_delta without index, skipping"); + } + } + } else if (chunk.type === "content_block_stop") { + // Finalize tool call arguments - lookup by index to get tool ID + // Skip if index is missing to avoid data corruption + if (chunk.index !== undefined) { + const toolId = ctx.indexToIdMap.get(chunk.index); + if (toolId) { + const toolCall = ctx.streamToolCalls.get(toolId); + const args = ctx.toolCallArguments.get(toolId); + if (toolCall && args) { + toolCall.function.arguments = args.join(""); + } + } } } // Collect usage info if (chunk.usage) { - inputTokens = chunk.usage.inputTokens; - outputTokens = chunk.usage.outputTokens; + ctx.inputTokens = chunk.usage.inputTokens; + ctx.outputTokens = chunk.usage.outputTokens; } - // Convert to Anthropic format and yield - const serialized = responseAdapter.serializeStreamChunk(chunk); - if (serialized) { - yield serialized; + // Only yield to client if not aborted (client is still listening) + if (!clientAborted) { + // Convert to Anthropic format and yield + const serialized = responseAdapter.serializeStreamChunk(chunk); + if (serialized) { + yield serialized; + } } } - // Finalize completion record - completion.completion = [ - { - role: undefined, - content: - (thinkingParts.length > 0 - ? `${thinkingParts.join("")}\n` - : "") + textParts.join(""), - }, - ]; - completion.promptTokens = inputTokens; - completion.completionTokens = outputTokens; - completion.status = "completed"; - completion.ttft = ttft; - completion.duration = Date.now() - begin; - addCompletions(completion, bearer).catch(() => { - logger.error("Failed to log completion after streaming"); - }); - - // Consume tokens for TPM rate limiting (post-flight) - if (apiKeyRecord && inputTokens > 0 && outputTokens > 0) { - const totalTokens = inputTokens + outputTokens; - await consumeTokens(apiKeyRecord.id, apiKeyRecord.tpmLimit, totalTokens); - } - // Handle case where no chunks were received - if (isFirstChunk) { + if (ctx.isFirstChunk) { throw new Error("No chunk received from upstream"); } + + // Save completion with appropriate status + if (!ctx.isSaved()) { + if (ctx.isAborted()) { + await ctx.saveCompletion("aborted", "Client disconnected"); + } else { + await ctx.saveCompletion("completed"); + } + } } catch (error) { - logger.error("Stream processing error", error); - completion.status = "failed"; - addCompletions(completion, bearer, { - level: "error", - message: `Stream processing error: ${String(error)}`, - details: { - type: "completionError", - data: { type: "streamError", msg: String(error) }, - }, - }).catch(() => { - logger.error("Failed to log completion after stream error"); - }); - throw error; + // Only log error if not due to client abort + if (!ctx.isAborted()) { + logger.error("Stream processing error", error); + } + + // Save failed completion + if (!ctx.isSaved()) { + if (ctx.isAborted()) { + // If client aborted and we got an error, still save as aborted with the error info + await ctx.saveCompletion("aborted", `Client disconnected, stream error: ${String(error)}`); + } else { + await ctx.saveCompletion("failed", String(error)); + } + } + + // Only re-throw if client is still connected + if (!ctx.isAborted()) { + throw error; + } + } finally { + ctx.cleanup(); } } @@ -322,11 +382,10 @@ export const messagesApi = new Elysia({ async function* ({ body, set, bearer, request, apiKeyRecord }) { if (bearer === undefined) { set.status = 500; - yield JSON.stringify({ + return { type: "error", error: { type: "api_error", message: "Internal server error" }, - }); - return; + }; } const reqHeaders = request.headers; @@ -347,14 +406,13 @@ export const messagesApi = new Elysia({ // Check if model exists if (modelsWithProviders.length === 0) { set.status = 404; - yield JSON.stringify({ + return { type: "error", error: { type: "not_found_error", message: `Model '${systemName}' not found`, }, - }); - return; + }; } // Filter candidates by target provider (if specified) @@ -365,14 +423,13 @@ export const messagesApi = new Elysia({ if (filteredCandidates.length === 0) { set.status = 404; - yield JSON.stringify({ + return { type: "error", error: { type: "not_found_error", message: `No available provider for model '${systemName}'`, }, - }); - return; + }; } // Select candidates for failover (weighted random order) @@ -411,7 +468,7 @@ export const messagesApi = new Elysia({ // Handle streaming vs non-streaming if (internalRequest.stream) { - // For streaming, use failover only for connection establishment + // Streaming request - use yield for streaming responses const result = await executeWithFailover( candidates, buildRequestForProvider, @@ -431,37 +488,33 @@ export const messagesApi = new Elysia({ if (errorResult.type === "upstream_error") { set.status = errorResult.status; - yield errorResult.body; - return; + return JSON.parse(errorResult.body) as Record; } set.status = 502; - yield JSON.stringify({ + return { type: "error", error: { type: "api_error", message: "All upstream providers failed", }, - }); - return; + }; } if (!result.response || !result.provider) { set.status = 500; - yield JSON.stringify({ + return { type: "error", error: { type: "api_error", message: "Internal server error" }, - }); - return; + }; } if (!result.response.body) { set.status = 500; - yield JSON.stringify({ + return { type: "error", error: { type: "api_error", message: "No body in response" }, - }); - return; + }; } const providerType = result.provider.provider.type || "openai"; @@ -481,14 +534,18 @@ export const messagesApi = new Elysia({ providerType, apiKeyRecord ?? null, begin, + request.signal, ); } catch (error) { - logger.error("Stream processing error", error); - set.status = 500; - yield `event: error\ndata: ${JSON.stringify({ type: "error", error: { type: "server_error", message: "Stream processing error" } })}\n\n`; + // Don't log error if it's due to client abort + if (!request.signal.aborted) { + logger.error("Stream processing error", error); + set.status = 500; + yield `event: error\ndata: ${JSON.stringify({ type: "error", error: { type: "server_error", message: "Stream processing error" } })}\n\n`; + } } } else { - // Non-streaming request with failover + // Non-streaming request - use return for normal JSON response const result = await executeWithFailover( candidates, buildRequestForProvider, @@ -508,28 +565,25 @@ export const messagesApi = new Elysia({ if (errorResult.type === "upstream_error") { set.status = errorResult.status; - yield errorResult.body; - return; + return JSON.parse(errorResult.body) as Record; } set.status = 502; - yield JSON.stringify({ + return { type: "error", error: { type: "api_error", message: "All upstream providers failed", }, - }); - return; + }; } if (!result.response || !result.provider) { set.status = 500; - yield JSON.stringify({ + return { type: "error", error: { type: "api_error", message: "Internal server error" }, - }); - return; + }; } const providerType = result.provider.provider.type || "openai"; @@ -549,26 +603,57 @@ export const messagesApi = new Elysia({ providerType, apiKeyRecord ?? null, begin, + request.signal, ); - yield response; + // Return parsed JSON object for proper content-type + return JSON.parse(response) as Record; } catch (error) { - logger.error("Failed to process response", error); - completion.status = "failed"; - addCompletions(completion, bearer, { - level: "error", - message: `Response processing error: ${String(error)}`, - details: { - type: "completionError", - data: { type: "processingError", msg: String(error) }, - }, - }).catch(() => { - logger.error("Failed to log completion after processing error"); - }); - set.status = 500; - yield JSON.stringify({ - type: "error", - error: { type: "api_error", message: "Failed to process response" }, - }); + // Handle error based on whether client aborted + const errorMsg = error instanceof Error ? error.message : String(error); + // Only save if completion wasn't already saved in processNonStreamingResponse + const alreadySaved = completion.status !== "pending"; + if (request.signal.aborted) { + // Client disconnected - save as aborted (if not already saved) + if (!alreadySaved) { + completion.status = "aborted"; + try { + await addCompletions(completion, bearer, { + level: "info", + message: "Client disconnected during non-streaming response", + details: { + type: "completionError", + data: { type: "aborted", msg: errorMsg }, + }, + }); + } catch (logError: unknown) { + logger.error("Failed to log aborted completion after processing error", logError); + } + } + // Return nothing for aborted requests + return; + } else { + logger.error("Failed to process response", error); + if (!alreadySaved) { + completion.status = "failed"; + try { + await addCompletions(completion, bearer, { + level: "error", + message: `Response processing error: ${errorMsg}`, + details: { + type: "completionError", + data: { type: "processingError", msg: errorMsg }, + }, + }); + } catch (logError: unknown) { + logger.error("Failed to log completion after processing error", logError); + } + } + set.status = 500; + return { + type: "error", + error: { type: "api_error", message: "Failed to process response" }, + }; + } } } }, diff --git a/backend/src/api/v1/responses.ts b/backend/src/api/v1/responses.ts index 224e060..d86b44f 100644 --- a/backend/src/api/v1/responses.ts +++ b/backend/src/api/v1/responses.ts @@ -24,6 +24,7 @@ import { PROVIDER_HEADER, } from "@/utils/api-helpers"; import { addCompletions, type Completion } from "@/utils/completions"; +import { StreamingContext } from "@/utils/streaming-context"; import { executeWithFailover, selectMultipleCandidates, @@ -164,6 +165,7 @@ function buildCompletionRecord( /** * Process a successful non-streaming response + * Ensures completion is saved to database before returning */ async function processNonStreamingResponse( resp: Response, @@ -172,6 +174,7 @@ async function processNonStreamingResponse( providerType: string, apiKeyRecord: ApiKey | null, begin: number, + signal?: AbortSignal, ): Promise { // Parse response using upstream adapter const upstreamAdapter = getUpstreamAdapter(providerType); @@ -184,7 +187,6 @@ async function processNonStreamingResponse( // Update completion record completion.promptTokens = internalResponse.usage.inputTokens; completion.completionTokens = internalResponse.usage.outputTokens; - completion.status = "completed"; completion.ttft = Date.now() - begin; completion.duration = Date.now() - begin; completion.completion = [ @@ -193,9 +195,22 @@ async function processNonStreamingResponse( content: extractContentText(internalResponse), }, ]; - addCompletions(completion, bearer).catch(() => { - logger.error("Failed to log completion after non-streaming"); - }); + + // Check if client disconnected during processing + if (signal?.aborted) { + completion.status = "aborted"; + await addCompletions(completion, bearer, { + level: "info", + message: "Client disconnected during non-streaming response", + details: { + type: "completionError", + data: { type: "aborted", msg: "Client disconnected" }, + }, + }); + } else { + completion.status = "completed"; + await addCompletions(completion, bearer); + } // Consume tokens for TPM rate limiting (post-flight) // Only consume if token counts are valid (not -1 which indicates parsing failure) @@ -209,6 +224,7 @@ async function processNonStreamingResponse( /** * Process a successful streaming response + * Uses StreamingContext to ensure completion is saved even on client disconnect * @yields SSE formatted strings */ async function* processStreamingResponse( @@ -218,6 +234,7 @@ async function* processStreamingResponse( providerType: string, apiKeyRecord: ApiKey | null, begin: number, + signal?: AbortSignal, ): AsyncGenerator { // Get adapters const upstreamAdapter = getUpstreamAdapter(providerType); @@ -225,95 +242,138 @@ async function* processStreamingResponse( logger.debug("parse stream responses"); - let ttft = -1; - let isFirstChunk = true; - const textParts: string[] = []; - const thinkingParts: string[] = []; - let inputTokens = -1; - let outputTokens = -1; + // Create streaming context with abort handling + const ctx = new StreamingContext(completion, bearer, apiKeyRecord, begin, signal); + + // Track whether we've logged the client abort (to avoid duplicate logs) + let loggedAbort = false; try { const chunks = upstreamAdapter.parseStreamResponse(resp); for await (const chunk of chunks) { - if (isFirstChunk) { - isFirstChunk = false; - ttft = Date.now() - begin; + // Check if client has disconnected (but continue processing to collect all data) + const clientAborted = ctx.isAborted(); + + // Log client disconnect once when first detected + if (clientAborted && !loggedAbort) { + loggedAbort = true; + logger.info("Client disconnected during streaming, continuing to collect upstream data"); } - // Collect content for completion record - if (chunk.type === "content_block_delta") { + ctx.recordTTFT(); + + // Collect content for completion record (always, even if client aborted) + if (chunk.type === "content_block_start") { + // Track new tool call block + if (chunk.contentBlock?.type === "tool_use") { + const toolId = chunk.contentBlock.id; + const index = chunk.index ?? ctx.nextToolCallIndex++; + ctx.indexToIdMap.set(index, toolId); + ctx.streamToolCalls.set(toolId, { + id: toolId, + type: "function", + function: { + name: chunk.contentBlock.name, + arguments: "", + }, + }); + ctx.toolCallArguments.set(toolId, []); + } + } else if (chunk.type === "content_block_delta") { if (chunk.delta?.type === "text_delta" && chunk.delta.text) { - textParts.push(chunk.delta.text); + ctx.textParts.push(chunk.delta.text); } else if ( chunk.delta?.type === "thinking_delta" && chunk.delta.thinking ) { - thinkingParts.push(chunk.delta.thinking); + ctx.thinkingParts.push(chunk.delta.thinking); + } else if (chunk.delta?.type === "input_json_delta" && chunk.delta.partialJson) { + // Collect tool call arguments - lookup by index to get tool ID + // Skip if index is missing to avoid data corruption + if (chunk.index !== undefined) { + const toolId = ctx.indexToIdMap.get(chunk.index); + if (toolId) { + const args = ctx.toolCallArguments.get(toolId); + if (args) { + args.push(chunk.delta.partialJson); + } + } + } else { + logger.warn("Received input_json_delta without index, skipping"); + } + } + } else if (chunk.type === "content_block_stop") { + // Finalize tool call arguments - lookup by index to get tool ID + // Skip if index is missing to avoid data corruption + if (chunk.index !== undefined) { + const toolId = ctx.indexToIdMap.get(chunk.index); + if (toolId) { + const toolCall = ctx.streamToolCalls.get(toolId); + const args = ctx.toolCallArguments.get(toolId); + if (toolCall && args) { + toolCall.function.arguments = args.join(""); + } + } } } // Collect usage info if (chunk.usage) { - inputTokens = chunk.usage.inputTokens; - outputTokens = chunk.usage.outputTokens; + ctx.inputTokens = chunk.usage.inputTokens; + ctx.outputTokens = chunk.usage.outputTokens; } - // Convert to Response API format and yield - const serialized = responseAdapter.serializeStreamChunk(chunk); - if (serialized) { - yield serialized; - } + // Only yield to client if not aborted (client is still listening) + if (!clientAborted) { + // Convert to Response API format and yield + const serialized = responseAdapter.serializeStreamChunk(chunk); + if (serialized) { + yield serialized; + } - // Handle message_stop - if (chunk.type === "message_stop") { - yield responseAdapter.getDoneMarker(); + // Handle message_stop + if (chunk.type === "message_stop") { + yield responseAdapter.getDoneMarker(); + } } } - // Finalize completion record - completion.completion = [ - { - role: undefined, - content: - (thinkingParts.length > 0 - ? `${thinkingParts.join("")}\n` - : "") + textParts.join(""), - }, - ]; - completion.promptTokens = inputTokens; - completion.completionTokens = outputTokens; - completion.status = "completed"; - completion.ttft = ttft; - completion.duration = Date.now() - begin; - addCompletions(completion, bearer).catch(() => { - logger.error("Failed to log completion after streaming"); - }); - - // Consume tokens for TPM rate limiting (post-flight) - if (apiKeyRecord && inputTokens > 0 && outputTokens > 0) { - const totalTokens = inputTokens + outputTokens; - await consumeTokens(apiKeyRecord.id, apiKeyRecord.tpmLimit, totalTokens); - } - // Handle case where no chunks were received - if (isFirstChunk) { + if (ctx.isFirstChunk) { throw new Error("No chunk received from upstream"); } + + // Save completion with appropriate status + if (!ctx.isSaved()) { + if (ctx.isAborted()) { + await ctx.saveCompletion("aborted", "Client disconnected"); + } else { + await ctx.saveCompletion("completed"); + } + } } catch (error) { - logger.error("Stream processing error", error); - completion.status = "failed"; - addCompletions(completion, bearer, { - level: "error", - message: `Stream processing error: ${String(error)}`, - details: { - type: "completionError", - data: { type: "streamError", msg: String(error) }, - }, - }).catch(() => { - logger.error("Failed to log completion error after stream failure"); - }); - throw error; + // Only log error if not due to client abort + if (!ctx.isAborted()) { + logger.error("Stream processing error", error); + } + + // Save failed completion + if (!ctx.isSaved()) { + if (ctx.isAborted()) { + // If client aborted and we got an error, still save as aborted with the error info + await ctx.saveCompletion("aborted", `Client disconnected, stream error: ${String(error)}`); + } else { + await ctx.saveCompletion("failed", String(error)); + } + } + + // Only re-throw if client is still connected + if (!ctx.isAborted()) { + throw error; + } + } finally { + ctx.cleanup(); } } @@ -338,11 +398,10 @@ export const responsesApi = new Elysia({ async function* ({ body, set, bearer, request, apiKeyRecord }) { if (bearer === undefined) { set.status = 500; - yield JSON.stringify({ + return { object: "error", error: { type: "server_error", message: "Internal server error" }, - }); - return; + }; } const reqHeaders = request.headers; @@ -363,14 +422,13 @@ export const responsesApi = new Elysia({ // Check if model exists if (modelsWithProviders.length === 0) { set.status = 404; - yield JSON.stringify({ + return { object: "error", error: { type: "invalid_request_error", message: `Model '${systemName}' not found`, }, - }); - return; + }; } // Filter candidates by target provider (if specified) @@ -381,14 +439,13 @@ export const responsesApi = new Elysia({ if (filteredCandidates.length === 0) { set.status = 404; - yield JSON.stringify({ + return { object: "error", error: { type: "invalid_request_error", message: `No available provider for model '${systemName}'`, }, - }); - return; + }; } // Select candidates for failover (weighted random order) @@ -427,7 +484,7 @@ export const responsesApi = new Elysia({ // Handle streaming vs non-streaming if (internalRequest.stream) { - // For streaming, use failover only for connection establishment + // Streaming request - use yield for streaming responses const result = await executeWithFailover( candidates, buildRequestForProvider, @@ -447,37 +504,33 @@ export const responsesApi = new Elysia({ if (errorResult.type === "upstream_error") { set.status = errorResult.status; - yield errorResult.body; - return; + return JSON.parse(errorResult.body) as Record; } set.status = 502; - yield JSON.stringify({ + return { object: "error", error: { type: "server_error", message: "All upstream providers failed", }, - }); - return; + }; } if (!result.response || !result.provider) { set.status = 500; - yield JSON.stringify({ + return { object: "error", error: { type: "server_error", message: "Internal server error" }, - }); - return; + }; } if (!result.response.body) { set.status = 500; - yield JSON.stringify({ + return { object: "error", error: { type: "server_error", message: "No body in response" }, - }); - return; + }; } const providerType = result.provider.provider.type || "openai"; @@ -497,14 +550,18 @@ export const responsesApi = new Elysia({ providerType, apiKeyRecord ?? null, begin, + request.signal, ); } catch (error) { - logger.error("Stream processing error", error); - set.status = 500; - yield `event: error\ndata: ${JSON.stringify({ type: "error", error: { code: "internal_error", message: "Stream processing error", param: null, help_url: null } })}\n\n`; + // Don't log error if it's due to client abort + if (!request.signal.aborted) { + logger.error("Stream processing error", error); + set.status = 500; + yield `event: error\ndata: ${JSON.stringify({ type: "error", error: { code: "internal_error", message: "Stream processing error", param: null, help_url: null } })}\n\n`; + } } } else { - // Non-streaming request with failover + // Non-streaming request - use return for normal JSON response const result = await executeWithFailover( candidates, buildRequestForProvider, @@ -524,28 +581,25 @@ export const responsesApi = new Elysia({ if (errorResult.type === "upstream_error") { set.status = errorResult.status; - yield errorResult.body; - return; + return JSON.parse(errorResult.body) as Record; } set.status = 502; - yield JSON.stringify({ + return { object: "error", error: { type: "server_error", message: "All upstream providers failed", }, - }); - return; + }; } if (!result.response || !result.provider) { set.status = 500; - yield JSON.stringify({ + return { object: "error", error: { type: "server_error", message: "Internal server error" }, - }); - return; + }; } const providerType = result.provider.provider.type || "openai"; @@ -565,26 +619,57 @@ export const responsesApi = new Elysia({ providerType, apiKeyRecord ?? null, begin, + request.signal, ); - yield response; + // Return parsed JSON object for proper content-type + return JSON.parse(response) as Record; } catch (error) { - logger.error("Failed to process response", error); - completion.status = "failed"; - addCompletions(completion, bearer, { - level: "error", - message: `Response processing error: ${String(error)}`, - details: { - type: "completionError", - data: { type: "processingError", msg: String(error) }, - }, - }).catch(() => { - logger.error("Failed to log completion after processing error"); - }); - set.status = 500; - yield JSON.stringify({ - object: "error", - error: { type: "server_error", message: "Failed to process response" }, - }); + // Handle error based on whether client aborted + const errorMsg = error instanceof Error ? error.message : String(error); + // Only save if completion wasn't already saved in processNonStreamingResponse + const alreadySaved = completion.status !== "pending"; + if (request.signal.aborted) { + // Client disconnected - save as aborted (if not already saved) + if (!alreadySaved) { + completion.status = "aborted"; + try { + await addCompletions(completion, bearer, { + level: "info", + message: "Client disconnected during non-streaming response", + details: { + type: "completionError", + data: { type: "aborted", msg: errorMsg }, + }, + }); + } catch (logError: unknown) { + logger.error("Failed to log aborted completion after processing error", logError); + } + } + // Return nothing for aborted requests + return; + } else { + logger.error("Failed to process response", error); + if (!alreadySaved) { + completion.status = "failed"; + try { + await addCompletions(completion, bearer, { + level: "error", + message: `Response processing error: ${errorMsg}`, + details: { + type: "completionError", + data: { type: "processingError", msg: errorMsg }, + }, + }); + } catch (logError: unknown) { + logger.error("Failed to log completion after processing error", logError); + } + } + set.status = 500; + return { + object: "error", + error: { type: "server_error", message: "Failed to process response" }, + }; + } } } }, diff --git a/backend/src/db/schema.ts b/backend/src/db/schema.ts index b0a8cf5..5ea0561 100644 --- a/backend/src/db/schema.ts +++ b/backend/src/db/schema.ts @@ -150,8 +150,9 @@ export const CompletionsStatusEnum = pgEnum("completions_status", [ "pending", "completed", "failed", + "aborted", ]); -export type CompletionsStatusEnumType = "pending" | "completed" | "failed"; +export type CompletionsStatusEnumType = "pending" | "completed" | "failed" | "aborted"; export const CompletionsTable = pgTable("completions", { id: integer("id").primaryKey().generatedAlwaysAsIdentity().unique(), diff --git a/backend/src/utils/streaming-context.ts b/backend/src/utils/streaming-context.ts new file mode 100644 index 0000000..4554f46 --- /dev/null +++ b/backend/src/utils/streaming-context.ts @@ -0,0 +1,153 @@ +/** + * Streaming Context for Abort Handling + * Ensures completion records are saved even when client disconnects + */ + +import type { + CompletionsStatusEnumType, + ToolCallType, +} from "@/db/schema"; +import { addCompletions, type Completion } from "@/utils/completions"; +import { consumeTokens } from "@/plugins/apiKeyRateLimitPlugin"; +import type { ApiKey } from "@/plugins/apiKeyPlugin"; + +/** + * StreamingContext manages the state of a streaming response. + * It ensures completion records are saved even when client disconnects. + */ +export class StreamingContext { + private completion: Completion; + private bearer: string; + private apiKeyRecord: ApiKey | null; + private begin: number; + private saved = false; + private signal?: AbortSignal; + + // Accumulated data during streaming + textParts: string[] = []; + thinkingParts: string[] = []; + inputTokens = -1; + outputTokens = -1; + ttft = -1; + streamToolCalls: Map = new Map(); + toolCallArguments: Map = new Map(); + indexToIdMap: Map = new Map(); + nextToolCallIndex = 0; + isFirstChunk = true; + + constructor( + completion: Completion, + bearer: string, + apiKeyRecord: ApiKey | null, + begin: number, + signal?: AbortSignal, + ) { + this.completion = completion; + this.bearer = bearer; + this.apiKeyRecord = apiKeyRecord; + this.begin = begin; + this.signal = signal; + + // Note: We don't save immediately on abort anymore. + // Instead, we continue processing chunks from upstream and save the full + // response when the stream ends. This ensures we capture all data even + // when the client disconnects mid-stream. + // The abort status is checked via isAborted() and the final save uses + // "aborted" status if the client disconnected. + } + + /** + * Record time to first token + */ + recordTTFT(): void { + if (this.isFirstChunk) { + this.isFirstChunk = false; + this.ttft = Date.now() - this.begin; + } + } + + /** + * Check if client has aborted + */ + isAborted(): boolean { + return this.signal?.aborted ?? false; + } + + /** + * Save the completion record to database + */ + async saveCompletion( + status: CompletionsStatusEnumType, + error?: string, + ): Promise { + // Prevent double-save + if (this.saved) { + return; + } + this.saved = true; + + // Collect final tool calls + const finalToolCalls: ToolCallType[] | undefined = + this.streamToolCalls.size > 0 + ? Array.from(this.streamToolCalls.values()) + : undefined; + + // Build content text + const contentText = + (this.thinkingParts.length > 0 + ? `${this.thinkingParts.join("")}\n` + : "") + this.textParts.join(""); + + // Update completion record + this.completion.completion = [ + { + role: "assistant", + content: contentText || null, + tool_calls: finalToolCalls, + }, + ]; + this.completion.promptTokens = this.inputTokens; + this.completion.completionTokens = this.outputTokens; + this.completion.status = status; + this.completion.ttft = this.ttft; + this.completion.duration = Date.now() - this.begin; + + // Save to database + if (error) { + await addCompletions(this.completion, this.bearer, { + level: status === "aborted" ? "info" : "error", + message: `Stream ${status}: ${error}`, + details: { + type: "completionError", + data: { type: status, msg: error }, + }, + }); + } else { + await addCompletions(this.completion, this.bearer); + } + + // Consume tokens for TPM rate limiting + // Use Math.max(0, ...) to handle -1 (unknown) values and ensure partial usage is charged + const inputTokens = Math.max(0, this.inputTokens); + const outputTokens = Math.max(0, this.outputTokens); + const totalTokens = inputTokens + outputTokens; + if (this.apiKeyRecord && totalTokens > 0) { + await consumeTokens(this.apiKeyRecord.id, this.apiKeyRecord.tpmLimit, totalTokens); + } + } + + /** + * Check if completion has already been saved + */ + isSaved(): boolean { + return this.saved; + } + + /** + * Clean up resources + */ + cleanup(): void { + // No-op now since we don't register abort handlers anymore + // Kept for API compatibility + } +} diff --git a/frontend/src/i18n/locales/en-US.json b/frontend/src/i18n/locales/en-US.json index d05ec11..8611ccc 100644 --- a/frontend/src/i18n/locales/en-US.json +++ b/frontend/src/i18n/locales/en-US.json @@ -93,6 +93,7 @@ "pages.requests.columns.Pending": "Pending", "pages.requests.columns.Completed": "Completed", "pages.requests.columns.Failed": "Failed", + "pages.requests.columns.Aborted": "Aborted", "pages.requests.columns.Model": "Model", "pages.requests.columns.TTFT": "TTFT", "pages.requests.columns.TimeToFirstToken": "Time To First Token", @@ -118,6 +119,7 @@ "pages.requests.detail-panel.header.Pending": "Pending", "pages.requests.detail-panel.header.Completed": "Completed", "pages.requests.detail-panel.header.Failed": "Failed", + "pages.requests.detail-panel.header.Aborted": "Aborted", "pages.requests.detail-panel.header.ClosePanel": "Close panel", "pages.requests.detail-panel.index.Close": "Close", "pages.requests.detail-panel.index.Retry": "Retry", @@ -190,6 +192,7 @@ "pages.embeddings.columns.Pending": "Pending", "pages.embeddings.columns.Completed": "Completed", "pages.embeddings.columns.Failed": "Failed", + "pages.embeddings.columns.Aborted": "Aborted", "pages.embeddings.columns.Model": "Model", "pages.embeddings.columns.Input": "Input", "pages.embeddings.columns.Tokens": "Tokens", diff --git a/frontend/src/i18n/locales/zh-CN.json b/frontend/src/i18n/locales/zh-CN.json index 6e224ec..61780ac 100644 --- a/frontend/src/i18n/locales/zh-CN.json +++ b/frontend/src/i18n/locales/zh-CN.json @@ -94,6 +94,7 @@ "pages.requests.columns.Pending": "待处理", "pages.requests.columns.Completed": "已完成", "pages.requests.columns.Failed": "失败", + "pages.requests.columns.Aborted": "已中止", "pages.requests.columns.Model": "模型", "pages.requests.columns.TTFT": "TTFT", "pages.requests.columns.TimeToFirstToken": "首 Token 返回时间", @@ -119,6 +120,7 @@ "pages.requests.detail-panel.header.Pending": "待处理", "pages.requests.detail-panel.header.Completed": "已完成", "pages.requests.detail-panel.header.Failed": "失败", + "pages.requests.detail-panel.header.Aborted": "已中止", "pages.requests.detail-panel.header.ClosePanel": "关闭面板", "pages.requests.detail-panel.index.Close": "关闭", "pages.requests.detail-panel.index.Retry": "重试", @@ -191,6 +193,7 @@ "pages.embeddings.columns.Pending": "待处理", "pages.embeddings.columns.Completed": "已完成", "pages.embeddings.columns.Failed": "失败", + "pages.embeddings.columns.Aborted": "已中止", "pages.embeddings.columns.Model": "模型", "pages.embeddings.columns.Input": "输入", "pages.embeddings.columns.Tokens": "Tokens", diff --git a/frontend/src/pages/embeddings/columns.tsx b/frontend/src/pages/embeddings/columns.tsx index f098f0d..1153d4b 100644 --- a/frontend/src/pages/embeddings/columns.tsx +++ b/frontend/src/pages/embeddings/columns.tsx @@ -35,6 +35,11 @@ export const columns: ColumnDef[] = [ {i18n.t('pages.embeddings.columns.Failed')} )) + .with('aborted', () => ( + + {i18n.t('pages.embeddings.columns.Aborted')} + + )) .exhaustive() return (
diff --git a/frontend/src/pages/requests/columns.tsx b/frontend/src/pages/requests/columns.tsx index 02072b1..d6eb2f5 100644 --- a/frontend/src/pages/requests/columns.tsx +++ b/frontend/src/pages/requests/columns.tsx @@ -41,6 +41,9 @@ export const columns: ColumnDef[] = [ .with('failed', () => ( {i18n.t('pages.requests.columns.Failed')} )) + .with('aborted', () => ( + {i18n.t('pages.requests.columns.Aborted')} + )) .exhaustive() return (
diff --git a/frontend/src/pages/requests/detail-panel/header.tsx b/frontend/src/pages/requests/detail-panel/header.tsx index 05536d6..587b65d 100644 --- a/frontend/src/pages/requests/detail-panel/header.tsx +++ b/frontend/src/pages/requests/detail-panel/header.tsx @@ -62,6 +62,11 @@ function StatusIndicator({ status }: { status: ChatRequest['status'] }) { {t('pages.requests.detail-panel.header.Failed')} )) + .with('aborted', () => ( + + {t('pages.requests.detail-panel.header.Aborted')} + + )) .exhaustive() }