diff --git a/backend/drizzle/0011_outgoing_johnny_blaze.sql b/backend/drizzle/0011_outgoing_johnny_blaze.sql new file mode 100644 index 0000000..76795fb --- /dev/null +++ b/backend/drizzle/0011_outgoing_johnny_blaze.sql @@ -0,0 +1,7 @@ +ALTER TYPE "public"."completions_status" ADD VALUE 'cache_hit';--> statement-breakpoint +ALTER TABLE "completions" ADD COLUMN "req_id" varchar(127);--> statement-breakpoint +ALTER TABLE "completions" ADD COLUMN "source_completion_id" integer;--> statement-breakpoint +ALTER TABLE "completions" ADD COLUMN "api_format" varchar(31);--> statement-breakpoint +ALTER TABLE "completions" ADD COLUMN "cached_response" jsonb;--> statement-breakpoint +ALTER TABLE "completions" ADD CONSTRAINT "completions_source_completion_id_completions_id_fk" FOREIGN KEY ("source_completion_id") REFERENCES "public"."completions"("id") ON DELETE no action ON UPDATE no action;--> statement-breakpoint +CREATE UNIQUE INDEX "completions_api_key_req_id_unique" ON "completions" ("api_key_id", "req_id") WHERE "req_id" IS NOT NULL; \ No newline at end of file diff --git a/backend/drizzle/meta/0011_snapshot.json b/backend/drizzle/meta/0011_snapshot.json new file mode 100644 index 0000000..2df7511 --- /dev/null +++ b/backend/drizzle/meta/0011_snapshot.json @@ -0,0 +1,1040 @@ +{ + "id": "7d19c611-e3a5-4186-b863-f3a792c60de3", + "prevId": "6be63fd9-50d1-4d49-acea-e53401c54b2f", + "version": "7", + "dialect": "postgresql", + "tables": { + "public.api_keys": { + "name": "api_keys", + "schema": "", + "columns": { + "id": { + "name": "id", + "type": "integer", + "primaryKey": true, + "notNull": true, + "identity": { + "type": "always", + "name": "api_keys_id_seq", + "schema": "public", + "increment": "1", + "startWith": "1", + "minValue": "1", + "maxValue": "2147483647", + "cache": "1", + "cycle": false + } + }, + "key": { + "name": "key", + "type": "varchar(63)", + "primaryKey": false, + "notNull": true + }, + "comment": { + "name": "comment", + "type": "varchar", + "primaryKey": false, + "notNull": false + }, + "created_at": { + "name": "created_at", + "type": "timestamp", + "primaryKey": false, + "notNull": true, + "default": "now()" + }, + "updated_at": { + "name": "updated_at", + "type": "timestamp", + "primaryKey": false, + "notNull": true, + "default": "now()" + }, + "expires_at": { + "name": "expires_at", + "type": "timestamp", + "primaryKey": false, + "notNull": false + }, + "last_seen": { + "name": "last_seen", + "type": "timestamp", + "primaryKey": false, + "notNull": false + }, + "revoked": { + "name": "revoked", + "type": "boolean", + "primaryKey": false, + "notNull": true, + "default": false + }, + "rpm_limit": { + "name": "rpm_limit", + "type": "integer", + "primaryKey": false, + "notNull": true, + "default": 50 + }, + "tpm_limit": { + "name": "tpm_limit", + "type": "integer", + "primaryKey": false, + "notNull": true, + "default": 50000 + }, + "external_id": { + "name": "external_id", + "type": "varchar(127)", + "primaryKey": false, + "notNull": false + }, + "source": { + "name": "source", + "type": "api_key_source", + "typeSchema": "public", + "primaryKey": false, + "notNull": false, + "default": "'manual'" + } + }, + "indexes": {}, + "foreignKeys": {}, + "compositePrimaryKeys": {}, + "uniqueConstraints": { + "api_keys_key_unique": { + "name": "api_keys_key_unique", + "nullsNotDistinct": false, + "columns": [ + "key" + ] + }, + "api_keys_external_id_unique": { + "name": "api_keys_external_id_unique", + "nullsNotDistinct": false, + "columns": [ + "external_id" + ] + } + }, + "policies": {}, + "checkConstraints": {}, + "isRLSEnabled": false + }, + "public.completions": { + "name": "completions", + "schema": "", + "columns": { + "id": { + "name": "id", + "type": "integer", + "primaryKey": true, + "notNull": true, + "identity": { + "type": "always", + "name": "completions_id_seq", + "schema": "public", + "increment": "1", + "startWith": "1", + "minValue": "1", + "maxValue": "2147483647", + "cache": "1", + "cycle": false + } + }, + "api_key_id": { + "name": "api_key_id", + "type": "integer", + "primaryKey": false, + "notNull": true + }, + "upstream_id": { + "name": "upstream_id", + "type": "integer", + "primaryKey": false, + "notNull": false + }, + "model_id": { + "name": "model_id", + "type": "integer", + "primaryKey": false, + "notNull": false + }, + "model": { + "name": "model", + "type": "varchar", + "primaryKey": false, + "notNull": true + }, + "prompt": { + "name": "prompt", + "type": "jsonb", + "primaryKey": false, + "notNull": true + }, + "prompt_tokens": { + "name": "prompt_tokens", + "type": "integer", + "primaryKey": false, + "notNull": true + }, + "completion": { + "name": "completion", + "type": "jsonb", + "primaryKey": false, + "notNull": true + }, + "completion_tokens": { + "name": "completion_tokens", + "type": "integer", + "primaryKey": false, + "notNull": true + }, + "status": { + "name": "status", + "type": "completions_status", + "typeSchema": "public", + "primaryKey": false, + "notNull": true, + "default": "'pending'" + }, + "ttft": { + "name": "ttft", + "type": "integer", + "primaryKey": false, + "notNull": true + }, + "duration": { + "name": "duration", + "type": "integer", + "primaryKey": false, + "notNull": true + }, + "created_at": { + "name": "created_at", + "type": "timestamp", + "primaryKey": false, + "notNull": true, + "default": "now()" + }, + "updated_at": { + "name": "updated_at", + "type": "timestamp", + "primaryKey": false, + "notNull": true, + "default": "now()" + }, + "deleted": { + "name": "deleted", + "type": "boolean", + "primaryKey": false, + "notNull": true, + "default": false + }, + "rating": { + "name": "rating", + "type": "real", + "primaryKey": false, + "notNull": false + }, + "req_id": { + "name": "req_id", + "type": "varchar(127)", + "primaryKey": false, + "notNull": false + }, + "source_completion_id": { + "name": "source_completion_id", + "type": "integer", + "primaryKey": false, + "notNull": false + }, + "api_format": { + "name": "api_format", + "type": "varchar(31)", + "primaryKey": false, + "notNull": false + }, + "cached_response": { + "name": "cached_response", + "type": "jsonb", + "primaryKey": false, + "notNull": false + } + }, + "indexes": {}, + "foreignKeys": { + "completions_api_key_id_api_keys_id_fk": { + "name": "completions_api_key_id_api_keys_id_fk", + "tableFrom": "completions", + "tableTo": "api_keys", + "columnsFrom": [ + "api_key_id" + ], + "columnsTo": [ + "id" + ], + "onDelete": "no action", + "onUpdate": "no action" + }, + "completions_upstream_id_upstreams_id_fk": { + "name": "completions_upstream_id_upstreams_id_fk", + "tableFrom": "completions", + "tableTo": "upstreams", + "columnsFrom": [ + "upstream_id" + ], + "columnsTo": [ + "id" + ], + "onDelete": "no action", + "onUpdate": "no action" + }, + "completions_source_completion_id_completions_id_fk": { + "name": "completions_source_completion_id_completions_id_fk", + "tableFrom": "completions", + "tableTo": "completions", + "columnsFrom": [ + "source_completion_id" + ], + "columnsTo": [ + "id" + ], + "onDelete": "no action", + "onUpdate": "no action" + } + }, + "compositePrimaryKeys": {}, + "uniqueConstraints": { + "completions_id_unique": { + "name": "completions_id_unique", + "nullsNotDistinct": false, + "columns": [ + "id" + ] + } + }, + "policies": {}, + "checkConstraints": {}, + "isRLSEnabled": false + }, + "public.embeddings": { + "name": "embeddings", + "schema": "", + "columns": { + "id": { + "name": "id", + "type": "integer", + "primaryKey": true, + "notNull": true, + "identity": { + "type": "always", + "name": "embeddings_id_seq", + "schema": "public", + "increment": "1", + "startWith": "1", + "minValue": "1", + "maxValue": "2147483647", + "cache": "1", + "cycle": false + } + }, + "api_key_id": { + "name": "api_key_id", + "type": "integer", + "primaryKey": false, + "notNull": true + }, + "model_id": { + "name": "model_id", + "type": "integer", + "primaryKey": false, + "notNull": false + }, + "model": { + "name": "model", + "type": "varchar", + "primaryKey": false, + "notNull": true + }, + "input": { + "name": "input", + "type": "jsonb", + "primaryKey": false, + "notNull": true + }, + "input_tokens": { + "name": "input_tokens", + "type": "integer", + "primaryKey": false, + "notNull": true + }, + "embedding": { + "name": "embedding", + "type": "jsonb", + "primaryKey": false, + "notNull": true + }, + "dimensions": { + "name": "dimensions", + "type": "integer", + "primaryKey": false, + "notNull": true + }, + "status": { + "name": "status", + "type": "completions_status", + "typeSchema": "public", + "primaryKey": false, + "notNull": true, + "default": "'pending'" + }, + "duration": { + "name": "duration", + "type": "integer", + "primaryKey": false, + "notNull": true + }, + "created_at": { + "name": "created_at", + "type": "timestamp", + "primaryKey": false, + "notNull": true, + "default": "now()" + }, + "updated_at": { + "name": "updated_at", + "type": "timestamp", + "primaryKey": false, + "notNull": true, + "default": "now()" + }, + "deleted": { + "name": "deleted", + "type": "boolean", + "primaryKey": false, + "notNull": true, + "default": false + } + }, + "indexes": {}, + "foreignKeys": { + "embeddings_api_key_id_api_keys_id_fk": { + "name": "embeddings_api_key_id_api_keys_id_fk", + "tableFrom": "embeddings", + "tableTo": "api_keys", + "columnsFrom": [ + "api_key_id" + ], + "columnsTo": [ + "id" + ], + "onDelete": "no action", + "onUpdate": "no action" + }, + "embeddings_model_id_models_id_fk": { + "name": "embeddings_model_id_models_id_fk", + "tableFrom": "embeddings", + "tableTo": "models", + "columnsFrom": [ + "model_id" + ], + "columnsTo": [ + "id" + ], + "onDelete": "no action", + "onUpdate": "no action" + } + }, + "compositePrimaryKeys": {}, + "uniqueConstraints": { + "embeddings_id_unique": { + "name": "embeddings_id_unique", + "nullsNotDistinct": false, + "columns": [ + "id" + ] + } + }, + "policies": {}, + "checkConstraints": {}, + "isRLSEnabled": false + }, + "public.models": { + "name": "models", + "schema": "", + "columns": { + "id": { + "name": "id", + "type": "integer", + "primaryKey": true, + "notNull": true, + "identity": { + "type": "always", + "name": "models_id_seq", + "schema": "public", + "increment": "1", + "startWith": "1", + "minValue": "1", + "maxValue": "2147483647", + "cache": "1", + "cycle": false + } + }, + "provider_id": { + "name": "provider_id", + "type": "integer", + "primaryKey": false, + "notNull": true + }, + "system_name": { + "name": "system_name", + "type": "varchar(63)", + "primaryKey": false, + "notNull": true + }, + "remote_id": { + "name": "remote_id", + "type": "varchar(63)", + "primaryKey": false, + "notNull": false + }, + "model_type": { + "name": "model_type", + "type": "model_type", + "typeSchema": "public", + "primaryKey": false, + "notNull": true, + "default": "'chat'" + }, + "context_length": { + "name": "context_length", + "type": "integer", + "primaryKey": false, + "notNull": false + }, + "input_price": { + "name": "input_price", + "type": "real", + "primaryKey": false, + "notNull": false + }, + "output_price": { + "name": "output_price", + "type": "real", + "primaryKey": false, + "notNull": false + }, + "weight": { + "name": "weight", + "type": "real", + "primaryKey": false, + "notNull": true, + "default": 1 + }, + "comment": { + "name": "comment", + "type": "varchar", + "primaryKey": false, + "notNull": false + }, + "created_at": { + "name": "created_at", + "type": "timestamp", + "primaryKey": false, + "notNull": true, + "default": "now()" + }, + "updated_at": { + "name": "updated_at", + "type": "timestamp", + "primaryKey": false, + "notNull": true, + "default": "now()" + }, + "deleted": { + "name": "deleted", + "type": "boolean", + "primaryKey": false, + "notNull": true, + "default": false + } + }, + "indexes": {}, + "foreignKeys": { + "models_provider_id_providers_id_fk": { + "name": "models_provider_id_providers_id_fk", + "tableFrom": "models", + "tableTo": "providers", + "columnsFrom": [ + "provider_id" + ], + "columnsTo": [ + "id" + ], + "onDelete": "no action", + "onUpdate": "no action" + } + }, + "compositePrimaryKeys": {}, + "uniqueConstraints": { + "models_provider_system_name_unique": { + "name": "models_provider_system_name_unique", + "nullsNotDistinct": false, + "columns": [ + "provider_id", + "system_name" + ] + } + }, + "policies": {}, + "checkConstraints": {}, + "isRLSEnabled": false + }, + "public.providers": { + "name": "providers", + "schema": "", + "columns": { + "id": { + "name": "id", + "type": "integer", + "primaryKey": true, + "notNull": true, + "identity": { + "type": "always", + "name": "providers_id_seq", + "schema": "public", + "increment": "1", + "startWith": "1", + "minValue": "1", + "maxValue": "2147483647", + "cache": "1", + "cycle": false + } + }, + "name": { + "name": "name", + "type": "varchar(63)", + "primaryKey": false, + "notNull": true + }, + "type": { + "name": "type", + "type": "provider_type", + "typeSchema": "public", + "primaryKey": false, + "notNull": true, + "default": "'openai'" + }, + "base_url": { + "name": "base_url", + "type": "varchar(255)", + "primaryKey": false, + "notNull": true + }, + "api_key": { + "name": "api_key", + "type": "varchar(255)", + "primaryKey": false, + "notNull": false + }, + "api_version": { + "name": "api_version", + "type": "varchar(31)", + "primaryKey": false, + "notNull": false + }, + "comment": { + "name": "comment", + "type": "varchar", + "primaryKey": false, + "notNull": false + }, + "created_at": { + "name": "created_at", + "type": "timestamp", + "primaryKey": false, + "notNull": true, + "default": "now()" + }, + "updated_at": { + "name": "updated_at", + "type": "timestamp", + "primaryKey": false, + "notNull": true, + "default": "now()" + }, + "deleted": { + "name": "deleted", + "type": "boolean", + "primaryKey": false, + "notNull": true, + "default": false + } + }, + "indexes": {}, + "foreignKeys": {}, + "compositePrimaryKeys": {}, + "uniqueConstraints": { + "providers_name_unique": { + "name": "providers_name_unique", + "nullsNotDistinct": false, + "columns": [ + "name" + ] + } + }, + "policies": {}, + "checkConstraints": {}, + "isRLSEnabled": false + }, + "public.settings": { + "name": "settings", + "schema": "", + "columns": { + "id": { + "name": "id", + "type": "integer", + "primaryKey": true, + "notNull": true, + "identity": { + "type": "always", + "name": "settings_id_seq", + "schema": "public", + "increment": "1", + "startWith": "1", + "minValue": "1", + "maxValue": "2147483647", + "cache": "1", + "cycle": false + } + }, + "key": { + "name": "key", + "type": "varchar(63)", + "primaryKey": false, + "notNull": true + }, + "value": { + "name": "value", + "type": "jsonb", + "primaryKey": false, + "notNull": true + }, + "updated_at": { + "name": "updated_at", + "type": "timestamp", + "primaryKey": false, + "notNull": true, + "default": "now()" + } + }, + "indexes": {}, + "foreignKeys": {}, + "compositePrimaryKeys": {}, + "uniqueConstraints": { + "settings_key_unique": { + "name": "settings_key_unique", + "nullsNotDistinct": false, + "columns": [ + "key" + ] + } + }, + "policies": {}, + "checkConstraints": {}, + "isRLSEnabled": false + }, + "public.srv_logs": { + "name": "srv_logs", + "schema": "", + "columns": { + "id": { + "name": "id", + "type": "integer", + "primaryKey": true, + "notNull": true, + "identity": { + "type": "always", + "name": "srv_logs_id_seq", + "schema": "public", + "increment": "1", + "startWith": "1", + "minValue": "1", + "maxValue": "2147483647", + "cache": "1", + "cycle": false + } + }, + "related_api_key_id": { + "name": "related_api_key_id", + "type": "integer", + "primaryKey": false, + "notNull": false + }, + "related_upstream_id": { + "name": "related_upstream_id", + "type": "integer", + "primaryKey": false, + "notNull": false + }, + "related_completion_id": { + "name": "related_completion_id", + "type": "integer", + "primaryKey": false, + "notNull": false + }, + "message": { + "name": "message", + "type": "varchar", + "primaryKey": false, + "notNull": true + }, + "level": { + "name": "level", + "type": "srv_logs_level", + "typeSchema": "public", + "primaryKey": false, + "notNull": true + }, + "details": { + "name": "details", + "type": "jsonb", + "primaryKey": false, + "notNull": false + }, + "created_at": { + "name": "created_at", + "type": "timestamp", + "primaryKey": false, + "notNull": true, + "default": "now()" + }, + "acknowledged": { + "name": "acknowledged", + "type": "boolean", + "primaryKey": false, + "notNull": true, + "default": false + }, + "ack_at": { + "name": "ack_at", + "type": "timestamp", + "primaryKey": false, + "notNull": false + } + }, + "indexes": {}, + "foreignKeys": { + "srv_logs_related_api_key_id_api_keys_id_fk": { + "name": "srv_logs_related_api_key_id_api_keys_id_fk", + "tableFrom": "srv_logs", + "tableTo": "api_keys", + "columnsFrom": [ + "related_api_key_id" + ], + "columnsTo": [ + "id" + ], + "onDelete": "no action", + "onUpdate": "no action" + }, + "srv_logs_related_upstream_id_upstreams_id_fk": { + "name": "srv_logs_related_upstream_id_upstreams_id_fk", + "tableFrom": "srv_logs", + "tableTo": "upstreams", + "columnsFrom": [ + "related_upstream_id" + ], + "columnsTo": [ + "id" + ], + "onDelete": "no action", + "onUpdate": "no action" + }, + "srv_logs_related_completion_id_completions_id_fk": { + "name": "srv_logs_related_completion_id_completions_id_fk", + "tableFrom": "srv_logs", + "tableTo": "completions", + "columnsFrom": [ + "related_completion_id" + ], + "columnsTo": [ + "id" + ], + "onDelete": "no action", + "onUpdate": "no action" + } + }, + "compositePrimaryKeys": {}, + "uniqueConstraints": { + "srv_logs_id_unique": { + "name": "srv_logs_id_unique", + "nullsNotDistinct": false, + "columns": [ + "id" + ] + } + }, + "policies": {}, + "checkConstraints": {}, + "isRLSEnabled": false + }, + "public.upstreams": { + "name": "upstreams", + "schema": "", + "columns": { + "id": { + "name": "id", + "type": "integer", + "primaryKey": true, + "notNull": true, + "identity": { + "type": "always", + "name": "upstreams_id_seq", + "schema": "public", + "increment": "1", + "startWith": "1", + "minValue": "1", + "maxValue": "2147483647", + "cache": "1", + "cycle": false + } + }, + "name": { + "name": "name", + "type": "varchar(63)", + "primaryKey": false, + "notNull": true + }, + "url": { + "name": "url", + "type": "varchar(255)", + "primaryKey": false, + "notNull": true + }, + "model": { + "name": "model", + "type": "varchar(63)", + "primaryKey": false, + "notNull": true + }, + "upstream_model": { + "name": "upstream_model", + "type": "varchar(63)", + "primaryKey": false, + "notNull": false + }, + "api_key": { + "name": "api_key", + "type": "varchar(255)", + "primaryKey": false, + "notNull": false + }, + "weight": { + "name": "weight", + "type": "real", + "primaryKey": false, + "notNull": true, + "default": 1 + }, + "comment": { + "name": "comment", + "type": "varchar", + "primaryKey": false, + "notNull": false + }, + "created_at": { + "name": "created_at", + "type": "timestamp", + "primaryKey": false, + "notNull": true, + "default": "now()" + }, + "updated_at": { + "name": "updated_at", + "type": "timestamp", + "primaryKey": false, + "notNull": true, + "default": "now()" + }, + "deleted": { + "name": "deleted", + "type": "boolean", + "primaryKey": false, + "notNull": true, + "default": false + } + }, + "indexes": {}, + "foreignKeys": {}, + "compositePrimaryKeys": {}, + "uniqueConstraints": {}, + "policies": {}, + "checkConstraints": {}, + "isRLSEnabled": false + } + }, + "enums": { + "public.api_key_source": { + "name": "api_key_source", + "schema": "public", + "values": [ + "manual", + "operator", + "init" + ] + }, + "public.completions_status": { + "name": "completions_status", + "schema": "public", + "values": [ + "pending", + "completed", + "failed", + "aborted", + "cache_hit" + ] + }, + "public.model_type": { + "name": "model_type", + "schema": "public", + "values": [ + "chat", + "embedding" + ] + }, + "public.provider_type": { + "name": "provider_type", + "schema": "public", + "values": [ + "openai", + "openai-responses", + "anthropic", + "azure", + "ollama" + ] + }, + "public.srv_logs_level": { + "name": "srv_logs_level", + "schema": "public", + "values": [ + "unspecific", + "info", + "warn", + "error" + ] + } + }, + "schemas": {}, + "sequences": {}, + "roles": {}, + "policies": {}, + "views": {}, + "_meta": { + "columns": {}, + "schemas": {}, + "tables": {} + } +} \ No newline at end of file diff --git a/backend/drizzle/meta/_journal.json b/backend/drizzle/meta/_journal.json index 9f2d7e8..8c4593c 100644 --- a/backend/drizzle/meta/_journal.json +++ b/backend/drizzle/meta/_journal.json @@ -78,6 +78,13 @@ "when": 1769256296714, "tag": "0010_noisy_deathbird", "breakpoints": true + }, + { + "idx": 11, + "version": "7", + "when": 1769275632379, + "tag": "0011_outgoing_johnny_blaze", + "breakpoints": true } ] } \ No newline at end of file diff --git a/backend/src/api/v1/completions.ts b/backend/src/api/v1/completions.ts index 97df736..eb1486f 100644 --- a/backend/src/api/v1/completions.ts +++ b/backend/src/api/v1/completions.ts @@ -19,6 +19,7 @@ import type { CompletionsMessageType, ToolDefinitionType, ToolChoiceType, + CachedResponseType, } from "@/db/schema"; import { extractUpstreamHeaders, @@ -36,6 +37,15 @@ import { selectMultipleCandidates, type FailoverConfig, } from "@/services/failover"; +import { + checkReqId, + finalizeReqId, + finalizeReqIdOnError, + extractAndValidateReqId, + handleReqIdResult, + type ApiFormat, + type ReqIdContext, +} from "@/utils/reqIdHandler"; const logger = consola.withTag("completionsApi"); @@ -160,6 +170,8 @@ function buildCompletionRecord( }; } +// ReqIdContext is imported from reqIdHandler + /** * Process a successful non-streaming response * Ensures completion is saved to database before returning @@ -172,6 +184,7 @@ async function processNonStreamingResponse( apiKeyRecord: ApiKey | null, begin: number, signal?: AbortSignal, + reqIdContext?: ReqIdContext, ): Promise { // Parse response using upstream adapter const upstreamAdapter = getUpstreamAdapter(providerType); @@ -197,21 +210,53 @@ async function processNonStreamingResponse( }, ]; + // Build cached response for ReqId deduplication + const cachedResponse: CachedResponseType = { + body: serialized, + format: "openai-chat", + }; + // Check if client disconnected during processing if (signal?.aborted) { completion.status = "aborted"; - await addCompletions(completion, bearer, { - level: "info", - message: "Client disconnected during non-streaming response", - details: { - type: "completionError", - data: { type: "aborted", msg: "Client disconnected" }, - }, - }); + if (reqIdContext) { + // Use finalizeReqId for ReqId requests + await finalizeReqId( + reqIdContext.apiKeyId, + reqIdContext.reqId, + reqIdContext.preCreatedCompletionId, + { + ...completion, + cachedResponse, + }, + ); + } else { + await addCompletions(completion, bearer, { + level: "info", + message: "Client disconnected during non-streaming response", + details: { + type: "completionError", + data: { type: "aborted", msg: "Client disconnected" }, + }, + }); + } } else { completion.status = "completed"; - // Use await to ensure database write completes before returning - await addCompletions(completion, bearer); + if (reqIdContext) { + // Use finalizeReqId for ReqId requests + await finalizeReqId( + reqIdContext.apiKeyId, + reqIdContext.reqId, + reqIdContext.preCreatedCompletionId, + { + ...completion, + cachedResponse, + }, + ); + } else { + // Use await to ensure database write completes before returning + await addCompletions(completion, bearer); + } } // Consume tokens for TPM rate limiting (post-flight) @@ -237,6 +282,7 @@ async function* processStreamingResponse( apiKeyRecord: ApiKey | null, begin: number, signal?: AbortSignal, + reqIdContext?: ReqIdContext, ): AsyncGenerator { // Get adapters const upstreamAdapter = getUpstreamAdapter(providerType); @@ -244,8 +290,44 @@ async function* processStreamingResponse( logger.debug("parse stream completions response"); + // Build streaming ReqId context if provided + const streamingReqIdContext = reqIdContext + ? { + reqId: reqIdContext.reqId, + apiKeyId: reqIdContext.apiKeyId, + preCreatedCompletionId: reqIdContext.preCreatedCompletionId, + apiFormat: reqIdContext.apiFormat, + buildCachedResponse: (comp: Completion): CachedResponseType => { + // For streaming, we build a complete non-streaming response for cache + return { + body: { + id: `chatcmpl-cache-${reqIdContext.preCreatedCompletionId}`, + object: "chat.completion", + created: Math.floor(Date.now() / 1000), + model: comp.model, + choices: comp.completion.map((c, i) => ({ + index: i, + message: { + role: c.role || "assistant", + content: c.content, + tool_calls: c.tool_calls, + }, + finish_reason: c.tool_calls?.length ? "tool_calls" : "stop", + })), + usage: { + prompt_tokens: comp.promptTokens, + completion_tokens: comp.completionTokens, + total_tokens: comp.promptTokens + comp.completionTokens, + }, + }, + format: "openai-chat", + }; + }, + } + : undefined; + // Create streaming context with abort handling - const ctx = new StreamingContext(completion, bearer, apiKeyRecord, begin, signal); + const ctx = new StreamingContext(completion, bearer, apiKeyRecord, begin, signal, streamingReqIdContext); // Track whether we've logged the client abort (to avoid duplicate logs) let loggedAbort = false; @@ -398,7 +480,7 @@ export const completionsApi = new Elysia({ .use(rateLimitPlugin) .post( "/completions", - async function* ({ body, set, bearer, request, apiKeyRecord }) { + async function ({ body, set, bearer, request, apiKeyRecord }) { if (bearer === undefined) { set.status = 500; return { error: "Internal server error" }; @@ -407,6 +489,15 @@ export const completionsApi = new Elysia({ const reqHeaders = request.headers; const begin = Date.now(); + // Extract and validate ReqId for request deduplication + const apiFormat: ApiFormat = "openai-chat"; + const reqIdExtraction = extractAndValidateReqId(reqHeaders, apiFormat); + if (reqIdExtraction.type === "error") { + set.status = reqIdExtraction.status; + return reqIdExtraction.body; + } + const reqId = reqIdExtraction.reqId; + // Parse model@provider format and extract provider from header const { systemName, targetProvider } = parseModelProvider( body.model, @@ -457,6 +548,43 @@ export const completionsApi = new Elysia({ // Extract extra headers for passthrough const extraHeaders = extractUpstreamHeaders(reqHeaders); + // Check ReqId for deduplication (if provided) + const isStream = body.stream === true; + const reqIdResult = await checkReqId(reqId, { + apiKeyId: apiKeyRecord.id, + model: body.model, + modelId: candidates[0]?.model.id, + prompt: { + messages: body.messages as CompletionsMessageType[], + tools: body.tools as ToolDefinitionType[] | undefined, + tool_choice: body.tool_choice as ToolChoiceType | undefined, + extraHeaders, + }, + apiFormat, + endpoint: "/v1/chat/completions", + isStream, + }); + + // Handle ReqId result (cache_hit, in_flight, or continue) + const reqIdHandleResult = await handleReqIdResult( + reqIdResult, + reqId, + apiKeyRecord.id, + apiFormat, + ); + + if (reqIdHandleResult.type === "cache_hit") { + return reqIdHandleResult.response; + } + + if (reqIdHandleResult.type === "in_flight") { + set.status = reqIdHandleResult.status; + set.headers["Retry-After"] = String(reqIdHandleResult.retryAfter); + return reqIdHandleResult.response; + } + + const reqIdContext = reqIdHandleResult.context; + // Parse request using adapter const requestAdapter = getRequestAdapter("openai-chat"); const internalRequest = requestAdapter.parse( @@ -484,7 +612,7 @@ export const completionsApi = new Elysia({ // Handle streaming vs non-streaming if (internalRequest.stream) { - // Streaming request - use yield for streaming responses + // Streaming request - return an async generator if (body.n && body.n > 1) { set.status = 400; return { error: "Stream completions with n > 1 is not supported" }; @@ -510,6 +638,9 @@ export const completionsApi = new Elysia({ const errorResult = await processFailoverError(result, completion, bearer, "streaming"); + // Finalize pre-created completion if ReqId was used + await finalizeReqIdOnError(reqIdContext, begin); + if (errorResult.type === "upstream_error") { set.status = errorResult.status; return JSON.parse(errorResult.body) as Record; @@ -526,11 +657,13 @@ export const completionsApi = new Elysia({ } if (!result.response || !result.provider) { + await finalizeReqIdOnError(reqIdContext, begin); set.status = 500; return { error: "Internal server error" }; } if (!result.response.body) { + await finalizeReqIdOnError(reqIdContext, begin); set.status = 500; return { error: "No body in response" }; } @@ -546,26 +679,33 @@ export const completionsApi = new Elysia({ extraHeaders, ); - try { - yield* processStreamingResponse( - result.response, - completion, - bearer, - providerType, - apiKeyRecord ?? null, - begin, - request.signal, - ); - } catch (error) { - // Don't log error if it's due to client abort - if (!request.signal.aborted) { - logger.error("Stream processing error", error); - set.status = 500; - yield JSON.stringify({ error: "Stream processing error" }); + // Return an async generator for streaming + const streamResponse = result.response; + const streamSignal = request.signal; + return (async function* () { + try { + yield* processStreamingResponse( + streamResponse, + completion, + bearer, + providerType, + apiKeyRecord ?? null, + begin, + streamSignal, + reqIdContext ?? undefined, + ); + } catch (error) { + // Don't log error if it's due to client abort + if (!streamSignal.aborted) { + logger.error("Stream processing error", error); + // Note: HTTP status cannot be changed after streaming has started + // Use SSE format for error: data: {...}\n\n + yield `data: ${JSON.stringify({ error: { message: "Stream processing error", type: "server_error", code: "stream_error" } })}\n\n`; + } } - } + })(); } else { - // Non-streaming request - use return for normal JSON response + // Non-streaming request - return JSON response directly const result = await executeWithFailover( candidates, buildRequestForProvider, @@ -585,6 +725,9 @@ export const completionsApi = new Elysia({ const errorResult = await processFailoverError(result, completion, bearer, "non-streaming"); + // Finalize pre-created completion if ReqId was used + await finalizeReqIdOnError(reqIdContext, begin); + if (errorResult.type === "upstream_error") { set.status = errorResult.status; return JSON.parse(errorResult.body) as Record; @@ -601,6 +744,7 @@ export const completionsApi = new Elysia({ } if (!result.response || !result.provider) { + await finalizeReqIdOnError(reqIdContext, begin); set.status = 500; return { error: "Internal server error" }; } @@ -625,6 +769,7 @@ export const completionsApi = new Elysia({ apiKeyRecord ?? null, begin, request.signal, + reqIdContext ?? undefined, ); // Return parsed JSON object for proper content-type return JSON.parse(response) as Record; diff --git a/backend/src/api/v1/messages.ts b/backend/src/api/v1/messages.ts index 9eb6a36..6a698f9 100644 --- a/backend/src/api/v1/messages.ts +++ b/backend/src/api/v1/messages.ts @@ -30,6 +30,16 @@ import { selectMultipleCandidates, type FailoverConfig, } from "@/services/failover"; +import { + checkReqId, + finalizeReqId, + finalizeReqIdOnError, + extractAndValidateReqId, + handleReqIdResult, + type ApiFormat, + type ReqIdContext, +} from "@/utils/reqIdHandler"; +import type { CachedResponseType } from "@/db/schema"; const logger = consola.withTag("messagesApi"); @@ -152,6 +162,8 @@ function buildCompletionRecord( }; } +// ReqIdContext is imported from reqIdHandler + /** * Process a successful non-streaming message response * Ensures completion is saved to database before returning @@ -164,6 +176,7 @@ async function processNonStreamingResponse( apiKeyRecord: ApiKey | null, begin: number, signal?: AbortSignal, + reqIdContext?: ReqIdContext, ): Promise { // Parse response using upstream adapter const upstreamAdapter = getUpstreamAdapter(providerType); @@ -185,20 +198,44 @@ async function processNonStreamingResponse( }, ]; + // Build cached response for ReqId deduplication + const cachedResponse: CachedResponseType = { + body: serialized, + format: "anthropic", + }; + // Check if client disconnected during processing if (signal?.aborted) { completion.status = "aborted"; - await addCompletions(completion, bearer, { - level: "info", - message: "Client disconnected during non-streaming response", - details: { - type: "completionError", - data: { type: "aborted", msg: "Client disconnected" }, - }, - }); + if (reqIdContext) { + await finalizeReqId( + reqIdContext.apiKeyId, + reqIdContext.reqId, + reqIdContext.preCreatedCompletionId, + { ...completion, cachedResponse }, + ); + } else { + await addCompletions(completion, bearer, { + level: "info", + message: "Client disconnected during non-streaming response", + details: { + type: "completionError", + data: { type: "aborted", msg: "Client disconnected" }, + }, + }); + } } else { completion.status = "completed"; - await addCompletions(completion, bearer); + if (reqIdContext) { + await finalizeReqId( + reqIdContext.apiKeyId, + reqIdContext.reqId, + reqIdContext.preCreatedCompletionId, + { ...completion, cachedResponse }, + ); + } else { + await addCompletions(completion, bearer); + } } // Consume tokens for TPM rate limiting (post-flight) @@ -224,6 +261,7 @@ async function* processStreamingResponse( apiKeyRecord: ApiKey | null, begin: number, signal?: AbortSignal, + reqIdContext?: ReqIdContext, ): AsyncGenerator { // Get adapters const upstreamAdapter = getUpstreamAdapter(providerType); @@ -231,8 +269,57 @@ async function* processStreamingResponse( logger.debug("parse stream messages response"); + // Build streaming ReqId context if provided + const streamingReqIdContext = reqIdContext + ? { + reqId: reqIdContext.reqId, + apiKeyId: reqIdContext.apiKeyId, + preCreatedCompletionId: reqIdContext.preCreatedCompletionId, + apiFormat: reqIdContext.apiFormat, + buildCachedResponse: (comp: Completion): CachedResponseType => { + // For streaming, build a complete non-streaming Anthropic response for cache + // Build content blocks including both text and tool_use + const contentBlocks: Array> = []; + for (const c of comp.completion) { + // Add text content if present + if (c.content) { + contentBlocks.push({ type: "text", text: c.content }); + } + // Add tool_use blocks if present + if (c.tool_calls) { + for (const tc of c.tool_calls) { + contentBlocks.push({ + type: "tool_use", + id: tc.id, + name: tc.function.name, + input: JSON.parse(tc.function.arguments || "{}"), + }); + } + } + } + // Determine stop_reason based on content + const hasToolUse = contentBlocks.some((b) => b.type === "tool_use"); + return { + body: { + id: `msg-cache-${reqIdContext.preCreatedCompletionId}`, + type: "message", + role: "assistant", + content: contentBlocks.length > 0 ? contentBlocks : [{ type: "text", text: "" }], + model: comp.model, + stop_reason: hasToolUse ? "tool_use" : "end_turn", + usage: { + input_tokens: comp.promptTokens, + output_tokens: comp.completionTokens, + }, + }, + format: "anthropic", + }; + }, + } + : undefined; + // Create streaming context with abort handling - const ctx = new StreamingContext(completion, bearer, apiKeyRecord, begin, signal); + const ctx = new StreamingContext(completion, bearer, apiKeyRecord, begin, signal, streamingReqIdContext); // Track whether we've logged the client abort (to avoid duplicate logs) let loggedAbort = false; @@ -379,7 +466,7 @@ export const messagesApi = new Elysia({ .use(rateLimitPlugin) .post( "/messages", - async function* ({ body, set, bearer, request, apiKeyRecord }) { + async function ({ body, set, bearer, request, apiKeyRecord }) { if (bearer === undefined) { set.status = 500; return { @@ -391,6 +478,15 @@ export const messagesApi = new Elysia({ const reqHeaders = request.headers; const begin = Date.now(); + // Extract and validate ReqId for request deduplication + const apiFormat: ApiFormat = "anthropic"; + const reqIdExtraction = extractAndValidateReqId(reqHeaders, apiFormat); + if (reqIdExtraction.type === "error") { + set.status = reqIdExtraction.status; + return reqIdExtraction.body; + } + const reqId = reqIdExtraction.reqId; + // Parse model@provider format and extract provider from header const { systemName, targetProvider } = parseModelProvider( body.model, @@ -441,6 +537,44 @@ export const messagesApi = new Elysia({ // Extract extra headers for passthrough const extraHeaders = extractUpstreamHeaders(reqHeaders); + // Check ReqId for deduplication (if provided) + const isStream = body.stream === true; + const reqIdResult = await checkReqId(reqId, { + apiKeyId: apiKeyRecord.id, + model: body.model, + modelId: candidates[0]?.model.id, + prompt: { + messages: body.messages.map((m: { role: string; content: unknown }) => ({ + role: m.role, + content: typeof m.content === "string" ? m.content : JSON.stringify(m.content), + })), + extraHeaders, + }, + apiFormat, + endpoint: "/v1/messages", + isStream, + }); + + // Handle ReqId result (cache_hit, in_flight, or continue) + const reqIdHandleResult = await handleReqIdResult( + reqIdResult, + reqId, + apiKeyRecord.id, + apiFormat, + ); + + if (reqIdHandleResult.type === "cache_hit") { + return reqIdHandleResult.response; + } + + if (reqIdHandleResult.type === "in_flight") { + set.status = reqIdHandleResult.status; + set.headers["Retry-After"] = String(reqIdHandleResult.retryAfter); + return reqIdHandleResult.response; + } + + const reqIdContext = reqIdHandleResult.context; + // Parse request using Anthropic adapter const requestAdapter = getRequestAdapter("anthropic"); const internalRequest = requestAdapter.parse( @@ -468,7 +602,7 @@ export const messagesApi = new Elysia({ // Handle streaming vs non-streaming if (internalRequest.stream) { - // Streaming request - use yield for streaming responses + // Streaming request - return an async generator const result = await executeWithFailover( candidates, buildRequestForProvider, @@ -486,6 +620,9 @@ export const messagesApi = new Elysia({ const errorResult = await processFailoverError(result, completion, bearer, "streaming"); + // Finalize pre-created completion if ReqId was used + await finalizeReqIdOnError(reqIdContext, begin); + if (errorResult.type === "upstream_error") { set.status = errorResult.status; return JSON.parse(errorResult.body) as Record; @@ -502,6 +639,7 @@ export const messagesApi = new Elysia({ } if (!result.response || !result.provider) { + await finalizeReqIdOnError(reqIdContext, begin); set.status = 500; return { type: "error", @@ -510,6 +648,7 @@ export const messagesApi = new Elysia({ } if (!result.response.body) { + await finalizeReqIdOnError(reqIdContext, begin); set.status = 500; return { type: "error", @@ -526,26 +665,32 @@ export const messagesApi = new Elysia({ extraHeaders, ); - try { - yield* processStreamingResponse( - result.response, - completion, - bearer, - providerType, - apiKeyRecord ?? null, - begin, - request.signal, - ); - } catch (error) { - // Don't log error if it's due to client abort - if (!request.signal.aborted) { - logger.error("Stream processing error", error); - set.status = 500; - yield `event: error\ndata: ${JSON.stringify({ type: "error", error: { type: "server_error", message: "Stream processing error" } })}\n\n`; + // Return an async generator for streaming + const streamResponse = result.response; + const streamSignal = request.signal; + return (async function* () { + try { + yield* processStreamingResponse( + streamResponse, + completion, + bearer, + providerType, + apiKeyRecord ?? null, + begin, + streamSignal, + reqIdContext ?? undefined, + ); + } catch (error) { + // Don't log error if it's due to client abort + if (!streamSignal.aborted) { + logger.error("Stream processing error", error); + // Note: HTTP status cannot be changed after streaming has started + yield `event: error\ndata: ${JSON.stringify({ type: "error", error: { type: "server_error", message: "Stream processing error" } })}\n\n`; + } } - } + })(); } else { - // Non-streaming request - use return for normal JSON response + // Non-streaming request - return JSON response directly const result = await executeWithFailover( candidates, buildRequestForProvider, @@ -563,6 +708,9 @@ export const messagesApi = new Elysia({ const errorResult = await processFailoverError(result, completion, bearer, "non-streaming"); + // Finalize pre-created completion if ReqId was used + await finalizeReqIdOnError(reqIdContext, begin); + if (errorResult.type === "upstream_error") { set.status = errorResult.status; return JSON.parse(errorResult.body) as Record; @@ -579,6 +727,7 @@ export const messagesApi = new Elysia({ } if (!result.response || !result.provider) { + await finalizeReqIdOnError(reqIdContext, begin); set.status = 500; return { type: "error", @@ -604,6 +753,7 @@ export const messagesApi = new Elysia({ apiKeyRecord ?? null, begin, request.signal, + reqIdContext ?? undefined, ); // Return parsed JSON object for proper content-type return JSON.parse(response) as Record; diff --git a/backend/src/api/v1/responses.ts b/backend/src/api/v1/responses.ts index d86b44f..8cd7ed1 100644 --- a/backend/src/api/v1/responses.ts +++ b/backend/src/api/v1/responses.ts @@ -30,6 +30,16 @@ import { selectMultipleCandidates, type FailoverConfig, } from "@/services/failover"; +import { + checkReqId, + finalizeReqId, + finalizeReqIdOnError, + extractAndValidateReqId, + handleReqIdResult, + type ApiFormat, + type ReqIdContext, +} from "@/utils/reqIdHandler"; +import type { CachedResponseType } from "@/db/schema"; const logger = consola.withTag("responsesApi"); @@ -163,6 +173,8 @@ function buildCompletionRecord( }; } +// ReqIdContext is imported from reqIdHandler + /** * Process a successful non-streaming response * Ensures completion is saved to database before returning @@ -175,6 +187,7 @@ async function processNonStreamingResponse( apiKeyRecord: ApiKey | null, begin: number, signal?: AbortSignal, + reqIdContext?: ReqIdContext, ): Promise { // Parse response using upstream adapter const upstreamAdapter = getUpstreamAdapter(providerType); @@ -196,20 +209,44 @@ async function processNonStreamingResponse( }, ]; + // Build cached response for ReqId deduplication + const cachedResponse: CachedResponseType = { + body: serialized, + format: "openai-responses", + }; + // Check if client disconnected during processing if (signal?.aborted) { completion.status = "aborted"; - await addCompletions(completion, bearer, { - level: "info", - message: "Client disconnected during non-streaming response", - details: { - type: "completionError", - data: { type: "aborted", msg: "Client disconnected" }, - }, - }); + if (reqIdContext) { + await finalizeReqId( + reqIdContext.apiKeyId, + reqIdContext.reqId, + reqIdContext.preCreatedCompletionId, + { ...completion, cachedResponse }, + ); + } else { + await addCompletions(completion, bearer, { + level: "info", + message: "Client disconnected during non-streaming response", + details: { + type: "completionError", + data: { type: "aborted", msg: "Client disconnected" }, + }, + }); + } } else { completion.status = "completed"; - await addCompletions(completion, bearer); + if (reqIdContext) { + await finalizeReqId( + reqIdContext.apiKeyId, + reqIdContext.reqId, + reqIdContext.preCreatedCompletionId, + { ...completion, cachedResponse }, + ); + } else { + await addCompletions(completion, bearer); + } } // Consume tokens for TPM rate limiting (post-flight) @@ -235,6 +272,7 @@ async function* processStreamingResponse( apiKeyRecord: ApiKey | null, begin: number, signal?: AbortSignal, + reqIdContext?: ReqIdContext, ): AsyncGenerator { // Get adapters const upstreamAdapter = getUpstreamAdapter(providerType); @@ -242,8 +280,69 @@ async function* processStreamingResponse( logger.debug("parse stream responses"); + // Build streaming ReqId context if provided + const streamingReqIdContext = reqIdContext + ? { + reqId: reqIdContext.reqId, + apiKeyId: reqIdContext.apiKeyId, + preCreatedCompletionId: reqIdContext.preCreatedCompletionId, + apiFormat: reqIdContext.apiFormat, + buildCachedResponse: (comp: Completion): CachedResponseType => { + // For streaming, build a complete non-streaming Response API response for cache + // Build output items including both messages and function_call + const outputItems: Array> = []; + for (const c of comp.completion) { + // Build content array for message + const content: Array> = []; + if (c.content) { + content.push({ type: "output_text", text: c.content }); + } + // Add message output item if there's text content + if (content.length > 0) { + outputItems.push({ + type: "message", + role: c.role || "assistant", + content, + }); + } + // Add function_call output items for tool_calls + if (c.tool_calls) { + for (const tc of c.tool_calls) { + outputItems.push({ + type: "function_call", + id: tc.id, + call_id: tc.id, + name: tc.function.name, + arguments: tc.function.arguments || "{}", + }); + } + } + } + return { + body: { + id: `resp-cache-${reqIdContext.preCreatedCompletionId}`, + object: "response", + created_at: Math.floor(Date.now() / 1000), + model: comp.model, + output: outputItems.length > 0 ? outputItems : [{ + type: "message", + role: "assistant", + content: [{ type: "output_text", text: "" }], + }], + usage: { + input_tokens: comp.promptTokens, + output_tokens: comp.completionTokens, + total_tokens: comp.promptTokens + comp.completionTokens, + }, + }, + format: "openai-responses", + }; + }, + } + : undefined; + // Create streaming context with abort handling - const ctx = new StreamingContext(completion, bearer, apiKeyRecord, begin, signal); + const ctx = new StreamingContext(completion, bearer, apiKeyRecord, begin, signal, streamingReqIdContext); // Track whether we've logged the client abort (to avoid duplicate logs) let loggedAbort = false; @@ -395,7 +494,7 @@ export const responsesApi = new Elysia({ .use(rateLimitPlugin) .post( "/responses", - async function* ({ body, set, bearer, request, apiKeyRecord }) { + async function ({ body, set, bearer, request, apiKeyRecord }) { if (bearer === undefined) { set.status = 500; return { @@ -407,6 +506,15 @@ export const responsesApi = new Elysia({ const reqHeaders = request.headers; const begin = Date.now(); + // Extract and validate ReqId for request deduplication + const apiFormat: ApiFormat = "openai-responses"; + const reqIdExtraction = extractAndValidateReqId(reqHeaders, apiFormat); + if (reqIdExtraction.type === "error") { + set.status = reqIdExtraction.status; + return reqIdExtraction.body; + } + const reqId = reqIdExtraction.reqId; + // Parse model@provider format and extract provider from header const { systemName, targetProvider } = parseModelProvider( body.model, @@ -457,6 +565,64 @@ export const responsesApi = new Elysia({ // Extract extra headers for passthrough const extraHeaders = extractUpstreamHeaders(reqHeaders); + // Check ReqId for deduplication (if provided) + const isStream = body.stream === true; + + // Convert input to messages format for storage + const inputMessages: Array<{ role: string; content: string }> = []; + if (typeof body.input === "string") { + inputMessages.push({ role: "user", content: body.input }); + } else if (Array.isArray(body.input)) { + for (const item of body.input) { + if (typeof item === "object" && item !== null) { + if (item.type === "message") { + inputMessages.push({ + role: item.role || "user", + content: typeof item.content === "string" ? item.content : JSON.stringify(item.content), + }); + } else if (item.type === "function_call_output") { + inputMessages.push({ + role: "tool", + content: item.output || "", + }); + } + } + } + } + + const reqIdResult = await checkReqId(reqId, { + apiKeyId: apiKeyRecord.id, + model: body.model, + modelId: candidates[0]?.model.id, + prompt: { + messages: inputMessages, + extraHeaders, + }, + apiFormat, + endpoint: "/v1/responses", + isStream, + }); + + // Handle ReqId result (cache_hit, in_flight, or continue) + const reqIdHandleResult = await handleReqIdResult( + reqIdResult, + reqId, + apiKeyRecord.id, + apiFormat, + ); + + if (reqIdHandleResult.type === "cache_hit") { + return reqIdHandleResult.response; + } + + if (reqIdHandleResult.type === "in_flight") { + set.status = reqIdHandleResult.status; + set.headers["Retry-After"] = String(reqIdHandleResult.retryAfter); + return reqIdHandleResult.response; + } + + const reqIdContext = reqIdHandleResult.context; + // Parse request using Response API adapter const requestAdapter = getRequestAdapter("openai-responses"); const internalRequest = requestAdapter.parse( @@ -484,7 +650,7 @@ export const responsesApi = new Elysia({ // Handle streaming vs non-streaming if (internalRequest.stream) { - // Streaming request - use yield for streaming responses + // Streaming request - return an async generator const result = await executeWithFailover( candidates, buildRequestForProvider, @@ -502,6 +668,9 @@ export const responsesApi = new Elysia({ const errorResult = await processFailoverError(result, completion, bearer, "streaming"); + // Finalize pre-created completion if ReqId was used + await finalizeReqIdOnError(reqIdContext, begin); + if (errorResult.type === "upstream_error") { set.status = errorResult.status; return JSON.parse(errorResult.body) as Record; @@ -518,6 +687,7 @@ export const responsesApi = new Elysia({ } if (!result.response || !result.provider) { + await finalizeReqIdOnError(reqIdContext, begin); set.status = 500; return { object: "error", @@ -526,6 +696,7 @@ export const responsesApi = new Elysia({ } if (!result.response.body) { + await finalizeReqIdOnError(reqIdContext, begin); set.status = 500; return { object: "error", @@ -542,26 +713,32 @@ export const responsesApi = new Elysia({ extraHeaders, ); - try { - yield* processStreamingResponse( - result.response, - completion, - bearer, - providerType, - apiKeyRecord ?? null, - begin, - request.signal, - ); - } catch (error) { - // Don't log error if it's due to client abort - if (!request.signal.aborted) { - logger.error("Stream processing error", error); - set.status = 500; - yield `event: error\ndata: ${JSON.stringify({ type: "error", error: { code: "internal_error", message: "Stream processing error", param: null, help_url: null } })}\n\n`; + // Return an async generator for streaming + const streamResponse = result.response; + const streamSignal = request.signal; + return (async function* () { + try { + yield* processStreamingResponse( + streamResponse, + completion, + bearer, + providerType, + apiKeyRecord ?? null, + begin, + streamSignal, + reqIdContext ?? undefined, + ); + } catch (error) { + // Don't log error if it's due to client abort + if (!streamSignal.aborted) { + logger.error("Stream processing error", error); + // Note: HTTP status cannot be changed after streaming has started + yield `event: error\ndata: ${JSON.stringify({ type: "error", error: { code: "internal_error", message: "Stream processing error", param: null, help_url: null } })}\n\n`; + } } - } + })(); } else { - // Non-streaming request - use return for normal JSON response + // Non-streaming request - return JSON response directly const result = await executeWithFailover( candidates, buildRequestForProvider, @@ -579,6 +756,9 @@ export const responsesApi = new Elysia({ const errorResult = await processFailoverError(result, completion, bearer, "non-streaming"); + // Finalize pre-created completion if ReqId was used + await finalizeReqIdOnError(reqIdContext, begin); + if (errorResult.type === "upstream_error") { set.status = errorResult.status; return JSON.parse(errorResult.body) as Record; @@ -595,6 +775,7 @@ export const responsesApi = new Elysia({ } if (!result.response || !result.provider) { + await finalizeReqIdOnError(reqIdContext, begin); set.status = 500; return { object: "error", @@ -620,6 +801,7 @@ export const responsesApi = new Elysia({ apiKeyRecord ?? null, begin, request.signal, + reqIdContext ?? undefined, ); // Return parsed JSON object for proper content-type return JSON.parse(response) as Record; diff --git a/backend/src/db/index.ts b/backend/src/db/index.ts index d51bd5b..72bb219 100644 --- a/backend/src/db/index.ts +++ b/backend/src/db/index.ts @@ -1257,3 +1257,89 @@ export async function getEmbeddingsTimeSeries( avg_duration: string; }[]; } + +// ============================================ +// ReqId Deduplication Operations +// ============================================ + +/** + * Find a completion by ReqId (for cache hit detection) + * Only returns completions that are not pending (completed, failed, aborted, cache_hit) + * @param apiKeyId the API key ID (ReqId is scoped per API key) + * @param reqId the client-provided request ID + * @returns completion record if found and not pending, null otherwise + */ +export async function findCompletionByReqId( + apiKeyId: number, + reqId: string, +): Promise { + logger.debug("findCompletionByReqId", apiKeyId, reqId); + const r = await db + .select() + .from(schema.CompletionsTable) + .where( + and( + eq(schema.CompletionsTable.apiKeyId, apiKeyId), + eq(schema.CompletionsTable.reqId, reqId), + not(schema.CompletionsTable.deleted), + // Only return non-pending completions (completed, failed, aborted, cache_hit) + not(eq(schema.CompletionsTable.status, "pending")), + ), + ) + .limit(1); + const [first] = r; + return first ?? null; +} + +/** + * Create a pending completion record with ReqId + * Used to reserve the ReqId before making the upstream request + * @param c completion data including reqId + * @returns the created completion record, null if ReqId already exists (unique constraint violation) + */ +export async function createPendingCompletion( + c: CompletionInsert, +): Promise { + logger.debug("createPendingCompletion", c.model, c.reqId); + try { + const r = await db + .insert(schema.CompletionsTable) + .values(c) + .returning(); + const [first] = r; + return first ?? null; + } catch (error) { + // Handle unique constraint violation (duplicate ReqId) + // PostgreSQL error code 23505 = unique_violation + if ( + error instanceof Error && + "code" in error && + (error as { code: string }).code === "23505" + ) { + logger.warn("Duplicate ReqId detected", c.reqId); + return null; + } + throw error; + } +} + +/** + * Update a completion record + * Used to update pending completions after upstream request completes + * @param id completion ID + * @param updates partial completion data to update + * @returns updated completion record, null if not found + */ +export async function updateCompletion( + id: number, + updates: Partial, +): Promise { + logger.debug("updateCompletion", id); + const r = await db + .update(schema.CompletionsTable) + .set({ ...updates, updatedAt: new Date() }) + .where(eq(schema.CompletionsTable.id, id)) + .returning(); + const [first] = r; + return first ?? null; +} diff --git a/backend/src/db/schema.ts b/backend/src/db/schema.ts index 5ea0561..e70d6d6 100644 --- a/backend/src/db/schema.ts +++ b/backend/src/db/schema.ts @@ -151,8 +151,18 @@ export const CompletionsStatusEnum = pgEnum("completions_status", [ "completed", "failed", "aborted", + "cache_hit", ]); -export type CompletionsStatusEnumType = "pending" | "completed" | "failed" | "aborted"; +export type CompletionsStatusEnumType = "pending" | "completed" | "failed" | "aborted" | "cache_hit"; + +/** + * Cached response type for ReqId deduplication + * Stores the serialized response for cache_hit returns + */ +export type CachedResponseType = { + body: unknown; + format: "openai-chat" | "openai-responses" | "anthropic"; +}; export const CompletionsTable = pgTable("completions", { id: integer("id").primaryKey().generatedAlwaysAsIdentity().unique(), @@ -176,6 +186,13 @@ export const CompletionsTable = pgTable("completions", { updatedAt: timestamp("updated_at").notNull().defaultNow(), deleted: boolean("deleted").notNull().default(false), rating: real("rating"), + // ReqId deduplication fields + reqId: varchar("req_id", { length: 127 }), + sourceCompletionId: integer("source_completion_id").references( + (): AnyPgColumn => CompletionsTable.id, + ), + apiFormat: varchar("api_format", { length: 31 }), + cachedResponse: jsonb("cached_response").$type(), }); export const SrvLogsLevelEnum = pgEnum("srv_logs_level", [ diff --git a/backend/src/utils/redisClient.ts b/backend/src/utils/redisClient.ts index dc3d086..2a8d3f8 100644 --- a/backend/src/utils/redisClient.ts +++ b/backend/src/utils/redisClient.ts @@ -98,6 +98,34 @@ class RedisClient { } } + /** + * Set a value in Redis only if the key does not exist (atomic SETNX) + * @param {string} key - Key to set + * @param {string | number} value - Value to store + * @param {number} ttlSeconds - Time to live in seconds + * @returns {Promise} true if the key was set, false if it already existed + */ + public async setnx( + key: string, + value: string | number, + ttlSeconds: number, + ): Promise { + try { + // SET key value EX ttl NX - sets only if key doesn't exist + const result = await this.client.set( + key, + value.toString(), + "EX", + ttlSeconds, + "NX", + ); + return result === "OK"; + } catch (error) { + logger.error(`Redis setnx error: ${(error as Error).message}`); + return false; + } + } + /** * Execute a Lua script atomically * @param {string} script - Lua script to execute diff --git a/backend/src/utils/reqIdCache.ts b/backend/src/utils/reqIdCache.ts new file mode 100644 index 0000000..91336ad --- /dev/null +++ b/backend/src/utils/reqIdCache.ts @@ -0,0 +1,159 @@ +/** + * Redis-based In-Flight request tracking for ReqId deduplication + * + * Tracks requests that are currently being processed to prevent duplicate + * concurrent requests with the same ReqId. + */ + +import { consola } from "consola"; +import { redisClient } from "./redisClient"; + +const logger = consola.withTag("reqIdCache"); + +/** + * In-flight request data stored in Redis + */ +export interface InFlightRequest { + completionId: number; + startTime: number; // Unix timestamp in milliseconds + isStream: boolean; + endpoint: string; +} + +// Redis key prefix for in-flight requests +const KEY_PREFIX = "reqid:inflight"; + +// TTL for in-flight markers (10 minutes) +// This prevents orphan keys if the server crashes during request processing +const IN_FLIGHT_TTL_SECONDS = 600; + +// Estimated request durations for Retry-After calculation +const ESTIMATED_STREAM_DURATION_MS = 60000; // 60 seconds for streaming +const ESTIMATED_NON_STREAM_DURATION_MS = 30000; // 30 seconds for non-streaming + +/** + * Build the Redis key for an in-flight request + */ +function buildKey(apiKeyId: number, reqId: string): string { + return `${KEY_PREFIX}:${apiKeyId}:${reqId}`; +} + +/** + * Mark a request as in-flight (atomically using SETNX) + * + * @param apiKeyId - The API key ID + * @param reqId - The client-provided request ID + * @param completionId - The database completion ID + * @param endpoint - The API endpoint being called + * @param isStream - Whether this is a streaming request + * @returns true if successfully marked (request is new), false if already in-flight + */ +export async function markInFlight( + apiKeyId: number, + reqId: string, + completionId: number, + endpoint: string, + isStream: boolean, +): Promise { + const key = buildKey(apiKeyId, reqId); + const data: InFlightRequest = { + completionId, + startTime: Date.now(), + isStream, + endpoint, + }; + + try { + const success = await redisClient.setnx( + key, + JSON.stringify(data), + IN_FLIGHT_TTL_SECONDS, + ); + + if (success) { + logger.debug("Marked request as in-flight", { apiKeyId, reqId, completionId }); + } else { + logger.debug("Request already in-flight", { apiKeyId, reqId }); + } + + return success; + } catch (error) { + logger.error("Failed to mark request as in-flight", error); + // Return false to be safe - treat as if already in-flight + return false; + } +} + +/** + * Get in-flight request data + * + * @param apiKeyId - The API key ID + * @param reqId - The client-provided request ID + * @returns The in-flight request data, or null if not in-flight + */ +export async function getInFlight( + apiKeyId: number, + reqId: string, +): Promise { + const key = buildKey(apiKeyId, reqId); + + try { + const data = await redisClient.get(key); + if (!data) { + return null; + } + + return JSON.parse(data) as InFlightRequest; + } catch (error) { + logger.error("Failed to get in-flight request", error); + return null; + } +} + +/** + * Clear the in-flight marker for a request + * + * Should be called when a request completes (successfully or with error) + * + * @param apiKeyId - The API key ID + * @param reqId - The client-provided request ID + */ +export async function clearInFlight( + apiKeyId: number, + reqId: string, +): Promise { + const key = buildKey(apiKeyId, reqId); + + try { + await redisClient.del(key); + logger.debug("Cleared in-flight marker", { apiKeyId, reqId }); + } catch (error) { + logger.error("Failed to clear in-flight marker", error); + // Non-critical - the TTL will eventually expire the key + } +} + +/** + * Calculate the recommended Retry-After value based on in-flight request state + * + * @param inFlight - The in-flight request data + * @returns Retry-After value in seconds (minimum 1) + */ +export function calculateRetryAfter(inFlight: InFlightRequest): number { + const elapsed = Date.now() - inFlight.startTime; + const estimatedTotal = inFlight.isStream + ? ESTIMATED_STREAM_DURATION_MS + : ESTIMATED_NON_STREAM_DURATION_MS; + + const remainingMs = Math.max(estimatedTotal - elapsed, 1000); + return Math.ceil(remainingMs / 1000); +} + +/** + * Check if Redis is available for in-flight tracking + * + * @returns true if Redis is connected and ready + */ +export function isRedisAvailable(): boolean { + return redisClient.isConnected(); +} diff --git a/backend/src/utils/reqIdHandler.ts b/backend/src/utils/reqIdHandler.ts new file mode 100644 index 0000000..70fb415 --- /dev/null +++ b/backend/src/utils/reqIdHandler.ts @@ -0,0 +1,719 @@ +/** + * ReqId Handler - Main logic for request deduplication + * + * Handles the full lifecycle of ReqId-based request deduplication: + * 1. Check if request is a cache hit (completed/failed/aborted) + * 2. Check if request is currently in-flight + * 3. Create new pending requests + * 4. Finalize requests after completion + */ + +import { consola } from "consola"; +import { + findCompletionByReqId, + createPendingCompletion, + updateCompletion, + insertCompletion, + type Completion, + type CompletionInsert, +} from "@/db"; +import { + markInFlight, + getInFlight, + clearInFlight, + calculateRetryAfter, + isRedisAvailable, + type InFlightRequest, +} from "./reqIdCache"; +import type { CachedResponseType } from "@/db/schema"; + +const logger = consola.withTag("reqIdHandler"); + +/** + * HTTP header name for client-provided request ID + */ +export const REQID_HEADER = "x-nexusgate-reqid"; + +/** + * API format types + */ +export type ApiFormat = "openai-chat" | "openai-responses" | "anthropic"; + +/** + * Result types for ReqId check + */ +export type ReqIdCheckResult = + | { type: "cache_hit"; completion: Completion } + | { type: "in_flight"; inFlight: InFlightRequest; retryAfter: number } + | { type: "new_request"; completionId: number } + | { type: "no_reqid" }; // No ReqId provided - proceed normally + +/** + * Data needed to create a pending completion + */ +export interface PendingCompletionData { + apiKeyId: number; + model: string; + modelId?: number; + prompt: CompletionInsert["prompt"]; + apiFormat: ApiFormat; + endpoint: string; + isStream: boolean; +} + +/** + * Check ReqId status and determine how to handle the request + * + * Flow: + * 1. If no ReqId provided, return no_reqid (proceed normally) + * 2. Check database for completed request with this ReqId + * 3. Check Redis for in-flight request with this ReqId + * 4. Create new pending request and mark as in-flight + * + * @param reqId - The client-provided request ID (from header) + * @param data - Pending completion data + * @returns Check result indicating how to proceed + */ +export async function checkReqId( + reqId: string | null, + data: PendingCompletionData, +): Promise { + // No ReqId provided - proceed with normal request flow + if (!reqId) { + return { type: "no_reqid" }; + } + + const { apiKeyId, model, modelId, prompt, apiFormat, endpoint, isStream } = data; + + // Step 1: Check database for existing completed request + const existingCompletion = await findCompletionByReqId(apiKeyId, reqId); + if (existingCompletion) { + logger.info("Cache hit for ReqId", { reqId, completionId: existingCompletion.id }); + return { type: "cache_hit", completion: existingCompletion }; + } + + // Step 2: Check Redis for in-flight request + // Note on race conditions when Redis is unavailable: + // Without Redis, concurrent requests with the same ReqId will both proceed to step 3 + // (create pending completion). The database unique constraint on (api_key_id, req_id) + // will catch this - one request succeeds and the other fails with a constraint violation, + // triggering a re-check (lines 127-154). This means duplicate processing may occur briefly + // until one request claims the ReqId in the database. Redis provides faster in-flight + // detection but is not required for correctness. + if (isRedisAvailable()) { + const inFlight = await getInFlight(apiKeyId, reqId); + if (inFlight) { + const retryAfter = calculateRetryAfter(inFlight); + logger.info("Request in-flight for ReqId", { reqId, retryAfter }); + return { type: "in_flight", inFlight, retryAfter }; + } + } else { + logger.warn("Redis unavailable, skipping in-flight check - race conditions possible until DB constraint catches duplicates"); + } + + // Step 3: Create pending completion and mark as in-flight + const pendingData: CompletionInsert = { + apiKeyId, + model, + modelId, + prompt, + promptTokens: -1, + completion: [], + completionTokens: -1, + status: "pending", + ttft: -1, + duration: -1, + reqId, + apiFormat, + }; + + const newCompletion = await createPendingCompletion(pendingData); + + if (!newCompletion) { + // Unique constraint violation - another request beat us to it + // Re-check database (might have completed) or Redis (might be in-flight) + logger.warn("Failed to create pending completion, re-checking state", { reqId }); + + const recheck = await findCompletionByReqId(apiKeyId, reqId); + if (recheck) { + return { type: "cache_hit", completion: recheck }; + } + + if (isRedisAvailable()) { + const inFlight = await getInFlight(apiKeyId, reqId); + if (inFlight) { + const retryAfter = calculateRetryAfter(inFlight); + return { type: "in_flight", inFlight, retryAfter }; + } + } + + // Shouldn't happen, but treat as in-flight with default retry + logger.error("Unexpected state: ReqId exists but not found", { reqId }); + return { + type: "in_flight", + inFlight: { + completionId: 0, + startTime: Date.now(), + isStream, + endpoint, + }, + retryAfter: 5, + }; + } + + // Step 4: Mark as in-flight in Redis + if (isRedisAvailable()) { + const marked = await markInFlight( + apiKeyId, + reqId, + newCompletion.id, + endpoint, + isStream, + ); + + if (!marked) { + // Another process beat us - this shouldn't happen since we have DB unique constraint + // But handle gracefully + logger.warn("Failed to mark in-flight after DB insert", { reqId }); + } + } + + logger.debug("Created new pending request", { reqId, completionId: newCompletion.id }); + return { type: "new_request", completionId: newCompletion.id }; +} + +/** + * Finalize a request after it completes + * + * Updates the completion record and clears the in-flight marker + * + * @param apiKeyId - The API key ID + * @param reqId - The client-provided request ID + * @param completionId - The completion ID to update + * @param updates - Completion updates (status, tokens, response, etc.) + */ +export async function finalizeReqId( + apiKeyId: number, + reqId: string, + completionId: number, + updates: Partial & { cachedResponse?: CachedResponseType }, +): Promise { + try { + // Update completion record + await updateCompletion(completionId, updates); + + // Clear in-flight marker + if (isRedisAvailable()) { + await clearInFlight(apiKeyId, reqId); + } + + logger.debug("Finalized request", { reqId, completionId, status: updates.status }); + } catch (error) { + logger.error("Failed to finalize request", { reqId, completionId, error }); + // Still try to clear in-flight marker + if (isRedisAvailable()) { + await clearInFlight(apiKeyId, reqId); + } + throw error; + } +} + +/** + * Build a cache_hit completion record + * + * Creates a new completion record that references the source completion + * + * @param sourceCompletion - The original completion that was cached + * @param apiKeyId - The API key ID for the new request + * @returns CompletionInsert for the cache_hit record + */ +export function buildCacheHitRecord( + sourceCompletion: Completion, + apiKeyId: number, +): CompletionInsert { + return { + apiKeyId, + model: sourceCompletion.model, + modelId: sourceCompletion.modelId, + upstreamId: sourceCompletion.upstreamId, + prompt: sourceCompletion.prompt, + promptTokens: 0, // Cache hit doesn't consume tokens + completion: sourceCompletion.completion, + completionTokens: 0, // Cache hit doesn't consume tokens + status: "cache_hit", + ttft: 0, + duration: 0, + // Note: reqId is intentionally omitted to avoid unique constraint violations + // cache_hit records don't need their own reqId since they reference sourceCompletionId + sourceCompletionId: sourceCompletion.id, + apiFormat: sourceCompletion.apiFormat, + }; +} + +/** + * Record a cache hit in the database + * + * @param sourceCompletion - The original completion that was cached + * @param apiKeyId - The API key ID for the new request + * @returns The created cache_hit completion record + */ +export async function recordCacheHit( + sourceCompletion: Completion, + apiKeyId: number, +): Promise { + const record = buildCacheHitRecord(sourceCompletion, apiKeyId); + return await insertCompletion(record); +} + +/** + * Build the 409 Conflict error response for in-flight requests + * + * @param reqId - The client-provided request ID + * @param inFlight - The in-flight request data + * @param retryAfter - Retry-After value in seconds + * @param format - The API format for response formatting + * @returns Error response object + */ +export function buildInFlightErrorResponse( + reqId: string, + inFlight: InFlightRequest, + retryAfter: number, + format: ApiFormat, +): Record { + const startedAt = new Date(inFlight.startTime).toISOString(); + + if (format === "anthropic") { + return { + type: "error", + error: { + type: "conflict", + message: "A request with this X-NexusGate-ReqId is already being processed", + req_id: reqId, + retry_after: retryAfter, + started_at: startedAt, + }, + }; + } + + // OpenAI format (openai-chat, openai-responses) + return { + error: { + code: "request_in_flight", + message: "A request with this X-NexusGate-ReqId is already being processed", + type: "conflict", + req_id: reqId, + retry_after: retryAfter, + started_at: startedAt, + }, + }; +} + +/** + * Maximum length for ReqId (database schema constraint) + */ +export const REQID_MAX_LENGTH = 127; + +/** + * Regex pattern for valid ReqId characters + * Allows alphanumeric, hyphens, underscores, dots, colons, and forward slashes + * This prevents control characters, null bytes, and other potentially problematic characters + */ +const REQID_VALID_PATTERN = /^[\w\-.:/]+$/; + +/** + * Result type for extractReqId + */ +export type ExtractReqIdResult = + | { type: "valid"; value: string } + | { type: "empty" } + | { type: "too_long"; length: number } + | { type: "invalid_characters" }; + +/** + * Extract and validate ReqId from request headers + * + * @param headers - Request headers + * @returns Extraction result indicating valid value, empty, or error + */ +export function extractReqId(headers: Headers): ExtractReqIdResult { + const reqId = headers.get(REQID_HEADER); + if (!reqId) { + return { type: "empty" }; + } + // Trim first, then validate + const trimmedReqId = reqId.trim(); + if (trimmedReqId === "") { + return { type: "empty" }; + } + // Validate ReqId length (max 127 chars as per schema) + if (trimmedReqId.length > REQID_MAX_LENGTH) { + logger.warn("ReqId too long", { length: trimmedReqId.length, maxLength: REQID_MAX_LENGTH }); + return { type: "too_long", length: trimmedReqId.length }; + } + // Validate ReqId contains only allowed characters + if (!REQID_VALID_PATTERN.test(trimmedReqId)) { + logger.warn("ReqId contains invalid characters", { reqId: trimmedReqId }); + return { type: "invalid_characters" }; + } + return { type: "valid", value: trimmedReqId }; +} + +// ============================================================================= +// Response Builders for Cache Hits +// ============================================================================= + +/** + * Build OpenAI Chat Completion format response from cached completion + */ +export function buildOpenAIChatResponse(completion: Completion): Record { + return { + id: `chatcmpl-cache-${completion.id}`, + object: "chat.completion", + created: Math.floor(completion.createdAt.getTime() / 1000), + model: completion.model, + choices: completion.completion.map((c, i) => ({ + index: i, + message: { + role: c.role || "assistant", + content: c.content, + tool_calls: c.tool_calls, + }, + finish_reason: c.tool_calls?.length ? "tool_calls" : "stop", + })), + usage: { + prompt_tokens: completion.promptTokens, + completion_tokens: completion.completionTokens, + total_tokens: completion.promptTokens + completion.completionTokens, + }, + }; +} + +/** + * Build Anthropic Messages format response from cached completion + */ +export function buildAnthropicResponse(completion: Completion): Record { + // Build content blocks including both text and tool_use + const contentBlocks: Array> = []; + for (const c of completion.completion) { + // Add text content if present + if (c.content) { + contentBlocks.push({ type: "text", text: c.content }); + } + // Add tool_use blocks if present + if (c.tool_calls) { + for (const tc of c.tool_calls) { + contentBlocks.push({ + type: "tool_use", + id: tc.id, + name: tc.function.name, + input: JSON.parse(tc.function.arguments || "{}"), + }); + } + } + } + // Determine stop_reason based on content + const hasToolUse = contentBlocks.some((b) => b.type === "tool_use"); + return { + id: `msg-cache-${completion.id}`, + type: "message", + role: "assistant", + content: contentBlocks.length > 0 ? contentBlocks : [{ type: "text", text: "" }], + model: completion.model, + stop_reason: hasToolUse ? "tool_use" : "end_turn", + usage: { + input_tokens: completion.promptTokens, + output_tokens: completion.completionTokens, + }, + }; +} + +/** + * Build OpenAI Responses API format response from cached completion + */ +export function buildOpenAIResponsesResponse(completion: Completion): Record { + // Build output items including both messages and function_call + const outputItems: Array> = []; + for (const c of completion.completion) { + // Build content array for message + const content: Array> = []; + if (c.content) { + content.push({ type: "output_text", text: c.content }); + } + // Add message output item if there's text content + if (content.length > 0) { + outputItems.push({ + type: "message", + role: c.role || "assistant", + content, + }); + } + // Add function_call output items for tool_calls + if (c.tool_calls) { + for (const tc of c.tool_calls) { + outputItems.push({ + type: "function_call", + id: tc.id, + call_id: tc.id, + name: tc.function.name, + arguments: tc.function.arguments || "{}", + }); + } + } + } + return { + id: `resp-cache-${completion.id}`, + object: "response", + created_at: Math.floor(completion.createdAt.getTime() / 1000), + model: completion.model, + output: outputItems.length > 0 ? outputItems : [{ + type: "message", + role: "assistant", + content: [{ type: "output_text", text: "" }], + }], + usage: { + input_tokens: completion.promptTokens, + output_tokens: completion.completionTokens, + total_tokens: completion.promptTokens + completion.completionTokens, + }, + }; +} + +/** + * Build cached response based on API format + */ +export function buildCachedResponseByFormat( + completion: Completion, + format: ApiFormat, +): Record { + switch (format) { + case "openai-chat": + return buildOpenAIChatResponse(completion); + case "anthropic": + return buildAnthropicResponse(completion); + case "openai-responses": + return buildOpenAIResponsesResponse(completion); + } +} + +// ============================================================================= +// Error Finalization Helper +// ============================================================================= + +/** + * Context for ReqId request handling + */ +export interface ReqIdContext { + reqId: string; + apiKeyId: number; + preCreatedCompletionId: number; + apiFormat: ApiFormat; +} + +/** + * Finalize a pre-created completion on error + * + * Helper function to reduce duplication in error handling paths + * + * @param context - ReqId context (or null if no ReqId) + * @param begin - Request start timestamp + */ +export async function finalizeReqIdOnError( + context: ReqIdContext | null | undefined, + begin: number, +): Promise { + if (!context) { + return; + } + + await finalizeReqId(context.apiKeyId, context.reqId, context.preCreatedCompletionId, { + status: "failed", + promptTokens: 0, + completionTokens: 0, + completion: [], + ttft: -1, + duration: Date.now() - begin, + }); +} + +// ============================================================================= +// Consolidated ReqId Handling Helpers +// ============================================================================= + +/** + * Result of ReqId extraction and validation + */ +export type ReqIdExtractionResult = + | { type: "valid"; reqId: string } + | { type: "empty"; reqId: null } + | { type: "error"; status: number; body: Record }; + +/** + * Extract, validate, and return ReqId with proper error responses + * + * Consolidates the extraction + validation + error response building pattern + * + * @param headers - Request headers + * @param apiFormat - API format for error response formatting + * @returns Extraction result with reqId or error response + */ +export function extractAndValidateReqId( + headers: Headers, + apiFormat: ApiFormat, +): ReqIdExtractionResult { + const extraction = extractReqId(headers); + + if (extraction.type === "too_long" || extraction.type === "invalid_characters") { + const errorResponse = buildReqIdValidationErrorResponse(extraction, apiFormat); + return { type: "error", status: errorResponse.status, body: errorResponse.body }; + } + + if (extraction.type === "valid") { + return { type: "valid", reqId: extraction.value }; + } + + return { type: "empty", reqId: null }; +} + +/** + * Result of handling ReqId check result + */ +export type ReqIdHandleResult = + | { type: "cache_hit"; response: Record } + | { type: "in_flight"; status: 409; retryAfter: number; response: Record } + | { type: "continue"; context: ReqIdContext | null }; + +/** + * Handle ReqId check result - returns early response or context to continue + * + * Consolidates cache_hit handling, in_flight handling, and context building + * + * @param result - Result from checkReqId + * @param reqId - The extracted reqId (or null) + * @param apiKeyId - API key ID for the request + * @param apiFormat - API format for response formatting + * @returns Handle result indicating how to proceed + */ +export async function handleReqIdResult( + result: ReqIdCheckResult, + reqId: string | null, + apiKeyId: number, + apiFormat: ApiFormat, +): Promise { + // Handle cache hit - return cached response + if (result.type === "cache_hit") { + const sourceCompletion = result.completion; + + // Record the cache hit (best-effort) + try { + await recordCacheHit(sourceCompletion, apiKeyId); + } catch (error) { + logger.warn("Failed to record cache hit", error); + } + + // Return cached response if available, otherwise reconstruct + const response = sourceCompletion.cachedResponse + ? (sourceCompletion.cachedResponse.body as Record) + : buildCachedResponseByFormat(sourceCompletion, apiFormat); + + return { type: "cache_hit", response }; + } + + // Handle in-flight - return 409 Conflict + if (result.type === "in_flight") { + if (!reqId) { + throw new Error("Invariant violated: reqId is null for in_flight result"); + } + + return { + type: "in_flight", + status: 409, + retryAfter: result.retryAfter, + response: buildInFlightErrorResponse(reqId, result.inFlight, result.retryAfter, apiFormat), + }; + } + + // Build context for new_request or no_reqid + const context: ReqIdContext | null = (result.type === "new_request" && reqId) + ? { + reqId, + apiKeyId, + preCreatedCompletionId: result.completionId, + apiFormat, + } + : null; + + return { type: "continue", context }; +} + +// ============================================================================= +// ReqId Validation Error Responses +// ============================================================================= + +/** + * Build error response for invalid ReqId (too long or invalid characters) + */ +function buildReqIdValidationErrorResponse( + extraction: ExtractReqIdResult, + format: ApiFormat, +): { status: number; body: Record } { + if (extraction.type === "too_long") { + const message = `X-NexusGate-ReqId exceeds maximum length of ${REQID_MAX_LENGTH} characters (got ${extraction.length})`; + return { + status: 400, + body: buildValidationErrorBody(message, "reqid_too_long", format), + }; + } + + if (extraction.type === "invalid_characters") { + const message = "X-NexusGate-ReqId contains invalid characters. Only alphanumeric characters, hyphens, underscores, dots, colons, and forward slashes are allowed."; + return { + status: 400, + body: buildValidationErrorBody(message, "reqid_invalid_characters", format), + }; + } + + // Should not reach here, but provide a fallback + return { + status: 400, + body: buildValidationErrorBody("Invalid X-NexusGate-ReqId", "reqid_invalid", format), + }; +} + +/** + * Build validation error body in the appropriate format + */ +function buildValidationErrorBody( + message: string, + code: string, + format: ApiFormat, +): Record { + if (format === "anthropic") { + return { + type: "error", + error: { + type: "invalid_request_error", + message, + }, + }; + } + + if (format === "openai-responses") { + return { + object: "error", + error: { + type: "invalid_request_error", + message, + code, + }, + }; + } + + // openai-chat format + return { + error: { + message, + type: "invalid_request_error", + code, + }, + }; +} diff --git a/backend/src/utils/streaming-context.ts b/backend/src/utils/streaming-context.ts index 4554f46..e30f01e 100644 --- a/backend/src/utils/streaming-context.ts +++ b/backend/src/utils/streaming-context.ts @@ -4,12 +4,26 @@ */ import type { + CachedResponseType, CompletionsStatusEnumType, ToolCallType, } from "@/db/schema"; import { addCompletions, type Completion } from "@/utils/completions"; import { consumeTokens } from "@/plugins/apiKeyRateLimitPlugin"; import type { ApiKey } from "@/plugins/apiKeyPlugin"; +import { finalizeReqId } from "@/utils/reqIdHandler"; + +/** + * ReqId context for request deduplication + */ +export interface StreamingReqIdContext { + reqId: string; + apiKeyId: number; + preCreatedCompletionId: number; + apiFormat: "openai-chat" | "openai-responses" | "anthropic"; + /** Function to build the cached response from accumulated data */ + buildCachedResponse?: (completion: Completion) => CachedResponseType; +} /** * StreamingContext manages the state of a streaming response. @@ -22,6 +36,7 @@ export class StreamingContext { private begin: number; private saved = false; private signal?: AbortSignal; + private reqIdContext?: StreamingReqIdContext; // Accumulated data during streaming textParts: string[] = []; @@ -41,12 +56,14 @@ export class StreamingContext { apiKeyRecord: ApiKey | null, begin: number, signal?: AbortSignal, + reqIdContext?: StreamingReqIdContext, ) { this.completion = completion; this.bearer = bearer; this.apiKeyRecord = apiKeyRecord; this.begin = begin; this.signal = signal; + this.reqIdContext = reqIdContext; // Note: We don't save immediately on abort anymore. // Instead, we continue processing chunks from upstream and save the full @@ -112,8 +129,21 @@ export class StreamingContext { this.completion.ttft = this.ttft; this.completion.duration = Date.now() - this.begin; - // Save to database - if (error) { + // Save to database - use finalizeReqId if ReqId context is present + if (this.reqIdContext) { + // Build cached response if callback is provided + const cachedResponse = this.reqIdContext.buildCachedResponse?.(this.completion); + + await finalizeReqId( + this.reqIdContext.apiKeyId, + this.reqIdContext.reqId, + this.reqIdContext.preCreatedCompletionId, + { + ...this.completion, + cachedResponse, + }, + ); + } else if (error) { await addCompletions(this.completion, this.bearer, { level: status === "aborted" ? "info" : "error", message: `Stream ${status}: ${error}`, diff --git a/frontend/src/i18n/locales/en-US.json b/frontend/src/i18n/locales/en-US.json index 8611ccc..6a3d98a 100644 --- a/frontend/src/i18n/locales/en-US.json +++ b/frontend/src/i18n/locales/en-US.json @@ -94,6 +94,7 @@ "pages.requests.columns.Completed": "Completed", "pages.requests.columns.Failed": "Failed", "pages.requests.columns.Aborted": "Aborted", + "pages.requests.columns.CacheHit": "Cache Hit", "pages.requests.columns.Model": "Model", "pages.requests.columns.TTFT": "TTFT", "pages.requests.columns.TimeToFirstToken": "Time To First Token", @@ -120,6 +121,7 @@ "pages.requests.detail-panel.header.Completed": "Completed", "pages.requests.detail-panel.header.Failed": "Failed", "pages.requests.detail-panel.header.Aborted": "Aborted", + "pages.requests.detail-panel.header.CacheHit": "Cache Hit", "pages.requests.detail-panel.header.ClosePanel": "Close panel", "pages.requests.detail-panel.index.Close": "Close", "pages.requests.detail-panel.index.Retry": "Retry", diff --git a/frontend/src/i18n/locales/zh-CN.json b/frontend/src/i18n/locales/zh-CN.json index 61780ac..d2b3064 100644 --- a/frontend/src/i18n/locales/zh-CN.json +++ b/frontend/src/i18n/locales/zh-CN.json @@ -95,6 +95,7 @@ "pages.requests.columns.Completed": "已完成", "pages.requests.columns.Failed": "失败", "pages.requests.columns.Aborted": "已中止", + "pages.requests.columns.CacheHit": "缓存命中", "pages.requests.columns.Model": "模型", "pages.requests.columns.TTFT": "TTFT", "pages.requests.columns.TimeToFirstToken": "首 Token 返回时间", @@ -121,6 +122,7 @@ "pages.requests.detail-panel.header.Completed": "已完成", "pages.requests.detail-panel.header.Failed": "失败", "pages.requests.detail-panel.header.Aborted": "已中止", + "pages.requests.detail-panel.header.CacheHit": "缓存命中", "pages.requests.detail-panel.header.ClosePanel": "关闭面板", "pages.requests.detail-panel.index.Close": "关闭", "pages.requests.detail-panel.index.Retry": "重试", diff --git a/frontend/src/pages/requests/columns.tsx b/frontend/src/pages/requests/columns.tsx index d6eb2f5..1df9a03 100644 --- a/frontend/src/pages/requests/columns.tsx +++ b/frontend/src/pages/requests/columns.tsx @@ -44,6 +44,9 @@ export const columns: ColumnDef[] = [ .with('aborted', () => ( {i18n.t('pages.requests.columns.Aborted')} )) + .with('cache_hit', () => ( + {i18n.t('pages.requests.columns.CacheHit')} + )) .exhaustive() return (
diff --git a/frontend/src/pages/requests/detail-panel/header.tsx b/frontend/src/pages/requests/detail-panel/header.tsx index 587b65d..f44f5e8 100644 --- a/frontend/src/pages/requests/detail-panel/header.tsx +++ b/frontend/src/pages/requests/detail-panel/header.tsx @@ -67,6 +67,11 @@ function StatusIndicator({ status }: { status: ChatRequest['status'] }) { {t('pages.requests.detail-panel.header.Aborted')} )) + .with('cache_hit', () => ( + + {t('pages.requests.detail-panel.header.CacheHit')} + + )) .exhaustive() }