From cd819f04b58d59e871838e75b9a9cb09e3daeeb3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=9F=A9=E7=BF=94=E5=AE=87?= Date: Tue, 20 Jan 2026 05:15:31 +0800 Subject: [PATCH 1/5] feat(backend): add automatic failover and retry for multi-provider requests Implement automatic failover capability that switches to alternate providers when a request fails with retriable errors (5xx, 429, network timeouts). Key features: - Failover service with configurable retry behavior - Exponential backoff with jitter for retries - Same-provider retries for transient errors (429) - Cross-provider failover for persistent failures - Weighted random selection for failover candidates - Detailed error logging with all attempted providers Also includes the weighted load balancing fix from PR #49: - Implement proper weighted random selection in selectModel() - Add filterCandidates() helper for failover use Affected endpoints: - /v1/chat/completions - /v1/messages - /v1/responses Configuration defaults: - maxProviderAttempts: 3 - sameProviderRetries: 1 - retriableStatusCodes: [429, 500, 502, 503, 504] - timeoutMs: 120000 (2 minutes) Co-Authored-By: Claude Opus 4.5 --- backend/src/api/v1/completions.ts | 487 +++++++++++++++++------------ backend/src/api/v1/messages.ts | 492 ++++++++++++++++++------------ backend/src/api/v1/responses.ts | 492 ++++++++++++++++++------------ backend/src/services/failover.ts | 413 +++++++++++++++++++++++++ backend/src/utils/api-helpers.ts | 30 ++ 5 files changed, 1311 insertions(+), 603 deletions(-) create mode 100644 backend/src/services/failover.ts diff --git a/backend/src/api/v1/completions.ts b/backend/src/api/v1/completions.ts index db10091..ab908bd 100644 --- a/backend/src/api/v1/completions.ts +++ b/backend/src/api/v1/completions.ts @@ -23,13 +23,18 @@ import type { } from "@/db/schema"; import { extractUpstreamHeaders, - selectModel, + filterCandidates, extractContentText, extractToolCalls, parseModelProvider, PROVIDER_HEADER, } from "@/utils/api-helpers"; import { addCompletions, type Completion } from "@/utils/completions"; +import { + executeWithFailover, + selectMultipleCandidates, + type FailoverConfig, +} from "@/services/failover"; const logger = consola.withTag("completionsApi"); @@ -132,70 +137,16 @@ function buildCompletionRecord( } /** - * Handle non-streaming completion request + * Process a successful non-streaming response */ -async function handleNonStreamingRequest( - upstreamUrl: string, - upstreamInit: RequestInit, +async function processNonStreamingResponse( + resp: Response, completion: Completion, bearer: string, - set: { status?: number | string }, providerType: string, apiKeyRecord: ApiKey | null, + begin: number, ): Promise { - const begin = Date.now(); - - logger.debug("proxying completions request to upstream", { - bearer, - upstreamUrl, - providerType, - }); - - const [resp, err] = await fetch(upstreamUrl, upstreamInit) - .then((r) => [r, null] as [Response, null]) - .catch((error: unknown) => { - logger.error("fetch error", error); - return [null, error] as [null, Error]; - }); - - if (!resp) { - logger.error("upstream error", { - status: 500, - msg: "Failed to fetch upstream", - }); - completion.status = "failed"; - addCompletions(completion, bearer, { - level: "error", - message: `Failed to fetch upstream. ${err.toString()}`, - details: { - type: "completionError", - data: { type: "fetchError", msg: err.toString() }, - }, - }).catch(() => { - logger.error("Failed to log completion after fetch failure"); - }); - set.status = 500; - return JSON.stringify({ error: "Failed to fetch upstream" }); - } - - if (!resp.ok) { - const msg = await resp.text(); - logger.error("upstream error", { status: resp.status, msg }); - completion.status = "failed"; - addCompletions(completion, bearer, { - level: "error", - message: `Upstream error: ${msg}`, - details: { - type: "completionError", - data: { type: "upstreamError", status: resp.status, msg }, - }, - }).catch(() => { - logger.error("Failed to log completion error after upstream error"); - }); - set.status = resp.status; - return msg; - } - // Parse response using upstream adapter const upstreamAdapter = getUpstreamAdapter(providerType); const internalResponse = await upstreamAdapter.parseResponse(resp); @@ -235,92 +186,17 @@ async function handleNonStreamingRequest( } /** - * Handle streaming completion request + * Process a successful streaming response * @yields string chunks in OpenAI format */ -async function* handleStreamingRequest( - upstreamUrl: string, - upstreamInit: RequestInit, +async function* processStreamingResponse( + resp: Response, completion: Completion, bearer: string, - set: { status?: number | string }, providerType: string, apiKeyRecord: ApiKey | null, + begin: number, ): AsyncGenerator { - const begin = Date.now(); - - logger.debug("proxying stream completions request to upstream", { - userKey: bearer, - upstreamUrl, - providerType, - stream: true, - }); - - const [resp, err] = await fetch(upstreamUrl, upstreamInit) - .then((r) => [r, null] as [Response, null]) - .catch((error: unknown) => { - logger.error("fetch error", error); - return [null, error] as [null, Error]; - }); - - if (!resp) { - logger.error("upstream error", { - status: 500, - msg: "Failed to fetch upstream", - }); - completion.status = "failed"; - addCompletions(completion, bearer, { - level: "error", - message: `Failed to fetch upstream. ${err.toString()}`, - details: { - type: "completionError", - data: { type: "fetchError", msg: err.toString() }, - }, - }).catch(() => { - logger.error("Failed to log completion after fetch failure"); - }); - set.status = 500; - yield JSON.stringify({ error: "Failed to fetch upstream" }); - return; - } - - if (!resp.ok) { - const msg = await resp.text(); - logger.error("upstream error", { status: resp.status, msg }); - completion.status = "failed"; - addCompletions(completion, bearer, { - level: "error", - message: `Upstream error: ${msg}`, - details: { - type: "completionError", - data: { type: "upstreamError", status: resp.status, msg }, - }, - }).catch(() => { - logger.error("Failed to log completion after upstream error"); - }); - set.status = resp.status; - yield msg; - return; - } - - if (!resp.body) { - logger.error("upstream error", { status: resp.status, msg: "No body" }); - completion.status = "failed"; - addCompletions(completion, bearer, { - level: "error", - message: "No body", - details: { - type: "completionError", - data: { type: "upstreamError", status: resp.status, msg: "No body" }, - }, - }).catch(() => { - logger.error("Failed to log completion after no body error"); - }); - set.status = 500; - yield JSON.stringify({ error: "No body" }); - return; - } - // Get adapters const upstreamAdapter = getUpstreamAdapter(providerType); const responseAdapter = getResponseAdapter("openai-chat"); @@ -460,24 +336,7 @@ async function* handleStreamingRequest( // Handle case where no chunks were received if (isFirstChunk) { - logger.error("upstream error: no chunk received"); - completion.status = "failed"; - addCompletions(completion, bearer, { - level: "error", - message: "No chunk received", - details: { - type: "completionError", - data: { - type: "upstreamError", - status: 500, - msg: "No chunk received", - }, - }, - }).catch(() => { - logger.error("Failed to log completion after no chunk received"); - }); - set.status = 500; - yield JSON.stringify({ error: "No chunk received" }); + throw new Error("No chunk received from upstream"); } } catch (error) { logger.error("Stream processing error", error); @@ -492,11 +351,18 @@ async function* handleStreamingRequest( }).catch(() => { logger.error("Failed to log completion after stream processing error"); }); - set.status = 500; - yield JSON.stringify({ error: "Stream processing error" }); + throw error; } } +// Failover configuration for completions API +const COMPLETIONS_FAILOVER_CONFIG: Partial = { + maxProviderAttempts: 3, + sameProviderRetries: 1, + retriableStatusCodes: [429, 500, 502, 503, 504], + timeoutMs: 120000, // 2 minutes for completions +}; + export const completionsApi = new Elysia({ prefix: "/chat", detail: { @@ -516,6 +382,7 @@ export const completionsApi = new Elysia({ } const reqHeaders = request.headers; + const begin = Date.now(); // Parse model@provider format and extract provider from header const { systemName, targetProvider } = parseModelProvider( @@ -542,12 +409,13 @@ export const completionsApi = new Elysia({ return; } - // Select model/provider - const selected = selectModel( + // Filter candidates by target provider (if specified) + const filteredCandidates = filterCandidates( modelsWithProviders as ModelWithProvider[], targetProvider, ); - if (!selected) { + + if (filteredCandidates.length === 0) { set.status = 404; yield JSON.stringify({ error: { @@ -559,7 +427,11 @@ export const completionsApi = new Elysia({ return; } - const { model: modelConfig, provider } = selected; + // Select candidates for failover (weighted random order) + const candidates = selectMultipleCandidates( + filteredCandidates, + COMPLETIONS_FAILOVER_CONFIG.maxProviderAttempts || 3, + ); // Extract extra headers for passthrough const extraHeaders = extractUpstreamHeaders(reqHeaders); @@ -570,10 +442,7 @@ export const completionsApi = new Elysia({ body as Record, ); - // Update model in internal request to use remote ID - internalRequest.model = modelConfig.remoteId ?? modelConfig.systemName; - - // Add extra headers + // Add extra headers to internal request if (extraHeaders) { internalRequest.extraHeaders = { ...internalRequest.extraHeaders, @@ -581,25 +450,16 @@ export const completionsApi = new Elysia({ }; } - // Get provider type (default to openai for compatibility) - const providerType = provider.type || "openai"; + // Build request function for failover + const buildRequestForProvider = (mp: ModelWithProvider) => { + // Clone internal request and update model + const req = { ...internalRequest }; + req.model = mp.model.remoteId ?? mp.model.systemName; - // Build upstream request using adapter - const upstreamAdapter = getUpstreamAdapter(providerType); - const { url: upstreamUrl, init: upstreamInit } = - upstreamAdapter.buildRequest(internalRequest, provider); - - // Build completion record for logging (with full message data) - // tools and tool_choice are now validated by schema - const completion = buildCompletionRecord( - body.model, - modelConfig.id, - body.messages as CompletionsMessageType[], - body.tools as ToolDefinitionType[] | undefined, - body.tool_choice as ToolChoiceType | undefined, - internalRequest.extraParams, - extraHeaders, - ); + const providerType = mp.provider.type || "openai"; + const upstreamAdapter = getUpstreamAdapter(providerType); + return upstreamAdapter.buildRequest(req, mp.provider); + }; // Handle streaming vs non-streaming if (internalRequest.stream) { @@ -611,26 +471,251 @@ export const completionsApi = new Elysia({ return; } - yield* handleStreamingRequest( - upstreamUrl, - upstreamInit, - completion, - bearer, - set, - providerType, - apiKeyRecord ?? null, + // For streaming, use failover only for connection establishment + const result = await executeWithFailover( + candidates, + buildRequestForProvider, + COMPLETIONS_FAILOVER_CONFIG, ); + + if (!result.success) { + // Build completion record for logging + const completion = buildCompletionRecord( + body.model, + result.provider?.model.id ?? candidates[0]?.model.id ?? 0, + body.messages as CompletionsMessageType[], + body.tools as ToolDefinitionType[] | undefined, + body.tool_choice as ToolChoiceType | undefined, + internalRequest.extraParams, + extraHeaders, + ); + completion.status = "failed"; + + // Non-retriable HTTP error from upstream - forward the response + if (result.response) { + logger.warn("Non-retriable upstream error for streaming request", { + status: result.response.status, + provider: result.provider?.provider.name, + }); + const errorSummary = result.errors + .map((e) => `${e.providerName}: ${e.error}`) + .join("; "); + addCompletions(completion, bearer, { + level: "error", + message: `Upstream error (non-retriable): ${errorSummary}`, + details: { + type: "completionError", + data: { + type: "upstreamError", + msg: result.finalError, + }, + }, + }).catch(() => { + logger.error("Failed to log completion after upstream error"); + }); + + set.status = result.response.status; + const responseBody = await result.response.text(); + yield responseBody; + return; + } + + // All providers failed with retriable errors or network errors + logger.error("All providers failed for streaming request", { + errors: result.errors, + totalAttempts: result.totalAttempts, + }); + const errorSummary = result.errors + .map((e) => `${e.providerName}: ${e.error}`) + .join("; "); + addCompletions(completion, bearer, { + level: "error", + message: `All providers failed (${result.totalAttempts} attempts): ${errorSummary}`, + details: { + type: "completionError", + data: { + type: "failoverExhausted", + msg: result.finalError, + }, + }, + }).catch(() => { + logger.error("Failed to log completion after failover exhaustion"); + }); + + set.status = 502; + yield JSON.stringify({ + error: { + message: "All upstream providers failed", + type: "upstream_error", + code: "all_providers_failed", + }, + }); + return; + } + + if (!result.response || !result.provider) { + set.status = 500; + yield JSON.stringify({ error: "Internal server error" }); + return; + } + + // Check if response has body + if (!result.response.body) { + set.status = 500; + yield JSON.stringify({ error: "No body in response" }); + return; + } + + const providerType = result.provider.provider.type || "openai"; + const completion = buildCompletionRecord( + body.model, + result.provider.model.id, + body.messages as CompletionsMessageType[], + body.tools as ToolDefinitionType[] | undefined, + body.tool_choice as ToolChoiceType | undefined, + internalRequest.extraParams, + extraHeaders, + ); + + try { + yield* processStreamingResponse( + result.response, + completion, + bearer, + providerType, + apiKeyRecord ?? null, + begin, + ); + } catch { + set.status = 500; + yield JSON.stringify({ error: "Stream processing error" }); + } } else { - const response = await handleNonStreamingRequest( - upstreamUrl, - upstreamInit, - completion, - bearer, - set, - providerType, - apiKeyRecord ?? null, + // Non-streaming request with failover + const result = await executeWithFailover( + candidates, + buildRequestForProvider, + COMPLETIONS_FAILOVER_CONFIG, + ); + + if (!result.success) { + // Build completion record for logging + const completion = buildCompletionRecord( + body.model, + result.provider?.model.id ?? candidates[0]?.model.id ?? 0, + body.messages as CompletionsMessageType[], + body.tools as ToolDefinitionType[] | undefined, + body.tool_choice as ToolChoiceType | undefined, + internalRequest.extraParams, + extraHeaders, + ); + completion.status = "failed"; + + // Non-retriable HTTP error from upstream - forward the response + if (result.response) { + logger.warn("Non-retriable upstream error for non-streaming request", { + status: result.response.status, + provider: result.provider?.provider.name, + }); + const errorSummary = result.errors + .map((e) => `${e.providerName}: ${e.error}`) + .join("; "); + addCompletions(completion, bearer, { + level: "error", + message: `Upstream error (non-retriable): ${errorSummary}`, + details: { + type: "completionError", + data: { + type: "upstreamError", + msg: result.finalError, + }, + }, + }).catch(() => { + logger.error("Failed to log completion after upstream error"); + }); + + set.status = result.response.status; + const responseBody = await result.response.text(); + yield responseBody; + return; + } + + // All providers failed with retriable errors or network errors + logger.error("All providers failed for non-streaming request", { + errors: result.errors, + totalAttempts: result.totalAttempts, + }); + const errorSummary = result.errors + .map((e) => `${e.providerName}: ${e.error}`) + .join("; "); + addCompletions(completion, bearer, { + level: "error", + message: `All providers failed (${result.totalAttempts} attempts): ${errorSummary}`, + details: { + type: "completionError", + data: { + type: "failoverExhausted", + msg: result.finalError, + }, + }, + }).catch(() => { + logger.error("Failed to log completion after failover exhaustion"); + }); + + set.status = 502; + yield JSON.stringify({ + error: { + message: "All upstream providers failed", + type: "upstream_error", + code: "all_providers_failed", + }, + }); + return; + } + + if (!result.response || !result.provider) { + set.status = 500; + yield JSON.stringify({ error: "Internal server error" }); + return; + } + + const providerType = result.provider.provider.type || "openai"; + const completion = buildCompletionRecord( + body.model, + result.provider.model.id, + body.messages as CompletionsMessageType[], + body.tools as ToolDefinitionType[] | undefined, + body.tool_choice as ToolChoiceType | undefined, + internalRequest.extraParams, + extraHeaders, ); - yield response; + + try { + const response = await processNonStreamingResponse( + result.response, + completion, + bearer, + providerType, + apiKeyRecord ?? null, + begin, + ); + yield response; + } catch (error) { + logger.error("Failed to process response", error); + completion.status = "failed"; + addCompletions(completion, bearer, { + level: "error", + message: `Response processing error: ${String(error)}`, + details: { + type: "completionError", + data: { type: "processingError", msg: String(error) }, + }, + }).catch(() => { + logger.error("Failed to log completion after processing error"); + }); + set.status = 500; + yield JSON.stringify({ error: "Failed to process response" }); + } } }, { diff --git a/backend/src/api/v1/messages.ts b/backend/src/api/v1/messages.ts index 429bcf5..bc5e224 100644 --- a/backend/src/api/v1/messages.ts +++ b/backend/src/api/v1/messages.ts @@ -17,12 +17,17 @@ import { apiKeyRateLimitPlugin, consumeTokens } from "@/plugins/apiKeyRateLimitP import { rateLimitPlugin } from "@/plugins/rateLimitPlugin"; import { extractUpstreamHeaders, - selectModel, + filterCandidates, extractContentText, parseModelProvider, PROVIDER_HEADER, } from "@/utils/api-helpers"; import { addCompletions, type Completion } from "@/utils/completions"; +import { + executeWithFailover, + selectMultipleCandidates, + type FailoverConfig, +} from "@/services/failover"; const logger = consola.withTag("messagesApi"); @@ -146,73 +151,16 @@ function buildCompletionRecord( } /** - * Handle non-streaming message request + * Process a successful non-streaming message response */ -async function handleNonStreamingRequest( - upstreamUrl: string, - upstreamInit: RequestInit, +async function processNonStreamingResponse( + resp: Response, completion: Completion, bearer: string, - set: { status?: number | string }, providerType: string, apiKeyRecord: ApiKey | null, + begin: number, ): Promise { - const begin = Date.now(); - - logger.debug("proxying messages request to upstream", { - bearer, - upstreamUrl, - providerType, - }); - - const [resp, err] = await fetch(upstreamUrl, upstreamInit) - .then((r) => [r, null] as [Response, null]) - .catch((error: unknown) => { - logger.error("fetch error", error); - return [null, error] as [null, Error]; - }); - - if (!resp) { - logger.error("upstream error", { - status: 500, - msg: "Failed to fetch upstream", - }); - completion.status = "failed"; - addCompletions(completion, bearer, { - level: "error", - message: `Failed to fetch upstream. ${err.toString()}`, - details: { - type: "completionError", - data: { type: "fetchError", msg: err.toString() }, - }, - }).catch(() => { - logger.error("Failed to log completion after fetch failure"); - }); - set.status = 500; - return JSON.stringify({ - type: "error", - error: { type: "api_error", message: "Failed to fetch upstream" }, - }); - } - - if (!resp.ok) { - const msg = await resp.text(); - logger.error("upstream error", { status: resp.status, msg }); - completion.status = "failed"; - addCompletions(completion, bearer, { - level: "error", - message: `Upstream error: ${msg}`, - details: { - type: "completionError", - data: { type: "upstreamError", status: resp.status, msg }, - }, - }).catch(() => { - logger.error("Failed to log completion after upstream error"); - }); - set.status = resp.status; - return msg; - } - // Parse response using upstream adapter const upstreamAdapter = getUpstreamAdapter(providerType); const internalResponse = await upstreamAdapter.parseResponse(resp); @@ -248,92 +196,17 @@ async function handleNonStreamingRequest( } /** - * Handle streaming message request + * Process a successful streaming message response * @yields string - SSE formatted string chunks */ -async function* handleStreamingRequest( - upstreamUrl: string, - upstreamInit: RequestInit, +async function* processStreamingResponse( + resp: Response, completion: Completion, bearer: string, - set: { status?: number | string }, providerType: string, apiKeyRecord: ApiKey | null, + begin: number, ): AsyncGenerator { - const begin = Date.now(); - - logger.debug("proxying stream messages request to upstream", { - userKey: bearer, - upstreamUrl, - providerType, - stream: true, - }); - - const [resp, err] = await fetch(upstreamUrl, upstreamInit) - .then((r) => [r, null] as [Response, null]) - .catch((error: unknown) => { - logger.error("fetch error", error); - return [null, error] as [null, Error]; - }); - - if (!resp) { - logger.error("upstream error", { - status: 500, - msg: "Failed to fetch upstream", - }); - completion.status = "failed"; - addCompletions(completion, bearer, { - level: "error", - message: `Failed to fetch upstream. ${err.toString()}`, - details: { - type: "completionError", - data: { type: "fetchError", msg: err.toString() }, - }, - }).catch(() => { - logger.error("Failed to log completion after fetch error"); - }); - set.status = 500; - yield `event: error\ndata: ${JSON.stringify({ type: "error", error: { type: "api_error", message: "Failed to fetch upstream" } })}\n\n`; - return; - } - - if (!resp.ok) { - const msg = await resp.text(); - logger.error("upstream error", { status: resp.status, msg }); - completion.status = "failed"; - addCompletions(completion, bearer, { - level: "error", - message: `Upstream error: ${msg}`, - details: { - type: "completionError", - data: { type: "upstreamError", status: resp.status, msg }, - }, - }).catch(() => { - logger.error("Failed to log completion after upstream error"); - }); - set.status = resp.status; - yield msg; - return; - } - - if (!resp.body) { - logger.error("upstream error", { status: resp.status, msg: "No body" }); - completion.status = "failed"; - addCompletions(completion, bearer, { - level: "error", - message: "No body", - details: { - type: "completionError", - data: { type: "upstreamError", status: resp.status, msg: "No body" }, - }, - }).catch(() => { - logger.error("Failed to log completion after no body error"); - }); - set.status = 500; - yield `event: error\ndata: ${JSON.stringify({ type: "error", error: { type: "api_error", message: "No body" } })}\n\n`; - return; - } - // Get adapters const upstreamAdapter = getUpstreamAdapter(providerType); const responseAdapter = getResponseAdapter("anthropic"); @@ -406,25 +279,9 @@ async function* handleStreamingRequest( await consumeTokens(apiKeyRecord.id, apiKeyRecord.tpmLimit, totalTokens); } + // Handle case where no chunks were received if (isFirstChunk) { - logger.error("upstream error: no chunk received"); - completion.status = "failed"; - addCompletions(completion, bearer, { - level: "error", - message: "No chunk received", - details: { - type: "completionError", - data: { - type: "upstreamError", - status: 500, - msg: "No chunk received", - }, - }, - }).catch(() => { - logger.error("Failed to log completion after no chunk received"); - }); - set.status = 500; - yield `event: error\ndata: ${JSON.stringify({ type: "error", error: { type: "api_error", message: "No chunk received" } })}\n\n`; + throw new Error("No chunk received from upstream"); } } catch (error) { logger.error("Stream processing error", error); @@ -439,11 +296,18 @@ async function* handleStreamingRequest( }).catch(() => { logger.error("Failed to log completion after stream error"); }); - set.status = 500; - yield `event: error\ndata: ${JSON.stringify({ type: "error", error: { type: "api_error", message: "Stream processing error" } })}\n\n`; + throw error; } } +// Failover configuration for messages API +const MESSAGES_FAILOVER_CONFIG: Partial = { + maxProviderAttempts: 3, + sameProviderRetries: 1, + retriableStatusCodes: [429, 500, 502, 503, 504], + timeoutMs: 120000, // 2 minutes for messages +}; + export const messagesApi = new Elysia({ detail: { security: [{ apiKey: [] }], @@ -465,6 +329,7 @@ export const messagesApi = new Elysia({ } const reqHeaders = request.headers; + const begin = Date.now(); // Parse model@provider format and extract provider from header const { systemName, targetProvider } = parseModelProvider( @@ -491,12 +356,13 @@ export const messagesApi = new Elysia({ return; } - // Select model/provider - const selected = selectModel( + // Filter candidates by target provider (if specified) + const filteredCandidates = filterCandidates( modelsWithProviders as ModelWithProvider[], targetProvider, ); - if (!selected) { + + if (filteredCandidates.length === 0) { set.status = 404; yield JSON.stringify({ type: "error", @@ -508,7 +374,11 @@ export const messagesApi = new Elysia({ return; } - const { model: modelConfig, provider } = selected; + // Select candidates for failover (weighted random order) + const candidates = selectMultipleCandidates( + filteredCandidates, + MESSAGES_FAILOVER_CONFIG.maxProviderAttempts || 3, + ); // Extract extra headers for passthrough const extraHeaders = extractUpstreamHeaders(reqHeaders); @@ -519,10 +389,7 @@ export const messagesApi = new Elysia({ body as Record, ); - // Update model in internal request to use remote ID - internalRequest.model = modelConfig.remoteId ?? modelConfig.systemName; - - // Add extra headers + // Add extra headers to internal request if (extraHeaders) { internalRequest.extraHeaders = { ...internalRequest.extraHeaders, @@ -530,45 +397,268 @@ export const messagesApi = new Elysia({ }; } - // Get provider type (default to openai for compatibility) - const providerType = provider.type || "openai"; - - // Build upstream request using adapter - const upstreamAdapter = getUpstreamAdapter(providerType); - const { url: upstreamUrl, init: upstreamInit } = - upstreamAdapter.buildRequest(internalRequest, provider); + // Build request function for failover + const buildRequestForProvider = (mp: ModelWithProvider) => { + // Clone internal request and update model + const req = { ...internalRequest }; + req.model = mp.model.remoteId ?? mp.model.systemName; - // Build completion record for logging - const completion = buildCompletionRecord( - body.model, - modelConfig.id, - body.messages, - internalRequest.extraParams, - extraHeaders, - ); + const providerType = mp.provider.type || "openai"; + const upstreamAdapter = getUpstreamAdapter(providerType); + return upstreamAdapter.buildRequest(req, mp.provider); + }; // Handle streaming vs non-streaming if (internalRequest.stream) { - yield* handleStreamingRequest( - upstreamUrl, - upstreamInit, - completion, - bearer, - set, - providerType, - apiKeyRecord ?? null, + // For streaming, use failover only for connection establishment + const result = await executeWithFailover( + candidates, + buildRequestForProvider, + MESSAGES_FAILOVER_CONFIG, + ); + + if (!result.success) { + // Build completion record for logging + const completion = buildCompletionRecord( + body.model, + result.provider?.model.id ?? candidates[0]?.model.id ?? 0, + body.messages, + internalRequest.extraParams, + extraHeaders, + ); + completion.status = "failed"; + + // Non-retriable HTTP error from upstream - forward the response + if (result.response) { + logger.warn("Non-retriable upstream error for streaming request", { + status: result.response.status, + provider: result.provider?.provider.name, + }); + const errorSummary = result.errors + .map((e) => `${e.providerName}: ${e.error}`) + .join("; "); + addCompletions(completion, bearer, { + level: "error", + message: `Upstream error (non-retriable): ${errorSummary}`, + details: { + type: "completionError", + data: { + type: "upstreamError", + msg: result.finalError, + }, + }, + }).catch(() => { + logger.error("Failed to log completion after upstream error"); + }); + + set.status = result.response.status; + const responseBody = await result.response.text(); + yield responseBody; + return; + } + + // All providers failed with retriable errors or network errors + logger.error("All providers failed for streaming request", { + errors: result.errors, + totalAttempts: result.totalAttempts, + }); + const errorSummary = result.errors + .map((e) => `${e.providerName}: ${e.error}`) + .join("; "); + addCompletions(completion, bearer, { + level: "error", + message: `All providers failed (${result.totalAttempts} attempts): ${errorSummary}`, + details: { + type: "completionError", + data: { + type: "failoverExhausted", + msg: result.finalError, + }, + }, + }).catch(() => { + logger.error("Failed to log completion after failover exhaustion"); + }); + + set.status = 502; + yield JSON.stringify({ + type: "error", + error: { + type: "api_error", + message: "All upstream providers failed", + }, + }); + return; + } + + if (!result.response || !result.provider) { + set.status = 500; + yield JSON.stringify({ + type: "error", + error: { type: "api_error", message: "Internal server error" }, + }); + return; + } + + // Check if response has body + if (!result.response.body) { + set.status = 500; + yield JSON.stringify({ + type: "error", + error: { type: "api_error", message: "No body in response" }, + }); + return; + } + + const providerType = result.provider.provider.type || "openai"; + const completion = buildCompletionRecord( + body.model, + result.provider.model.id, + body.messages, + internalRequest.extraParams, + extraHeaders, ); + + try { + yield* processStreamingResponse( + result.response, + completion, + bearer, + providerType, + apiKeyRecord ?? null, + begin, + ); + } catch { + set.status = 500; + yield `event: error\ndata: ${JSON.stringify({ type: "error", error: { type: "api_error", message: "Stream processing error" } })}\n\n`; + } } else { - const response = await handleNonStreamingRequest( - upstreamUrl, - upstreamInit, - completion, - bearer, - set, - providerType, - apiKeyRecord ?? null, + // Non-streaming request with failover + const result = await executeWithFailover( + candidates, + buildRequestForProvider, + MESSAGES_FAILOVER_CONFIG, ); - yield response; + + if (!result.success) { + // Build completion record for logging + const completion = buildCompletionRecord( + body.model, + result.provider?.model.id ?? candidates[0]?.model.id ?? 0, + body.messages, + internalRequest.extraParams, + extraHeaders, + ); + completion.status = "failed"; + + // Non-retriable HTTP error from upstream - forward the response + if (result.response) { + logger.warn("Non-retriable upstream error for non-streaming request", { + status: result.response.status, + provider: result.provider?.provider.name, + }); + const errorSummary = result.errors + .map((e) => `${e.providerName}: ${e.error}`) + .join("; "); + addCompletions(completion, bearer, { + level: "error", + message: `Upstream error (non-retriable): ${errorSummary}`, + details: { + type: "completionError", + data: { + type: "upstreamError", + msg: result.finalError, + }, + }, + }).catch(() => { + logger.error("Failed to log completion after upstream error"); + }); + + set.status = result.response.status; + const responseBody = await result.response.text(); + yield responseBody; + return; + } + + // All providers failed with retriable errors or network errors + logger.error("All providers failed for non-streaming request", { + errors: result.errors, + totalAttempts: result.totalAttempts, + }); + const errorSummary = result.errors + .map((e) => `${e.providerName}: ${e.error}`) + .join("; "); + addCompletions(completion, bearer, { + level: "error", + message: `All providers failed (${result.totalAttempts} attempts): ${errorSummary}`, + details: { + type: "completionError", + data: { + type: "failoverExhausted", + msg: result.finalError, + }, + }, + }).catch(() => { + logger.error("Failed to log completion after failover exhaustion"); + }); + + set.status = 502; + yield JSON.stringify({ + type: "error", + error: { + type: "api_error", + message: "All upstream providers failed", + }, + }); + return; + } + + if (!result.response || !result.provider) { + set.status = 500; + yield JSON.stringify({ + type: "error", + error: { type: "api_error", message: "Internal server error" }, + }); + return; + } + + const providerType = result.provider.provider.type || "openai"; + const completion = buildCompletionRecord( + body.model, + result.provider.model.id, + body.messages, + internalRequest.extraParams, + extraHeaders, + ); + + try { + const response = await processNonStreamingResponse( + result.response, + completion, + bearer, + providerType, + apiKeyRecord ?? null, + begin, + ); + yield response; + } catch (error) { + logger.error("Failed to process response", error); + completion.status = "failed"; + addCompletions(completion, bearer, { + level: "error", + message: `Response processing error: ${String(error)}`, + details: { + type: "completionError", + data: { type: "processingError", msg: String(error) }, + }, + }).catch(() => { + logger.error("Failed to log completion after processing error"); + }); + set.status = 500; + yield JSON.stringify({ + type: "error", + error: { type: "api_error", message: "Failed to process response" }, + }); + } } }, { diff --git a/backend/src/api/v1/responses.ts b/backend/src/api/v1/responses.ts index 3a2072c..576a627 100644 --- a/backend/src/api/v1/responses.ts +++ b/backend/src/api/v1/responses.ts @@ -17,12 +17,17 @@ import { apiKeyRateLimitPlugin, consumeTokens } from "@/plugins/apiKeyRateLimitP import { rateLimitPlugin } from "@/plugins/rateLimitPlugin"; import { extractUpstreamHeaders, - selectModel, + filterCandidates, extractContentText, parseModelProvider, PROVIDER_HEADER, } from "@/utils/api-helpers"; import { addCompletions, type Completion } from "@/utils/completions"; +import { + executeWithFailover, + selectMultipleCandidates, + type FailoverConfig, +} from "@/services/failover"; const logger = consola.withTag("responsesApi"); @@ -157,73 +162,16 @@ function buildCompletionRecord( } /** - * Handle non-streaming response request + * Process a successful non-streaming response */ -async function handleNonStreamingRequest( - upstreamUrl: string, - upstreamInit: RequestInit, +async function processNonStreamingResponse( + resp: Response, completion: Completion, bearer: string, - set: { status?: number | string }, providerType: string, apiKeyRecord: ApiKey | null, + begin: number, ): Promise { - const begin = Date.now(); - - logger.debug("proxying responses request to upstream", { - bearer, - upstreamUrl, - providerType, - }); - - const [resp, err] = await fetch(upstreamUrl, upstreamInit) - .then((r) => [r, null] as [Response, null]) - .catch((error: unknown) => { - logger.error("fetch error", error); - return [null, error] as [null, Error]; - }); - - if (!resp) { - logger.error("upstream error", { - status: 500, - msg: "Failed to fetch upstream", - }); - completion.status = "failed"; - addCompletions(completion, bearer, { - level: "error", - message: `Failed to fetch upstream. ${err.toString()}`, - details: { - type: "completionError", - data: { type: "fetchError", msg: err.toString() }, - }, - }).catch(() => { - logger.error("Failed to log completion error after fetch failure"); - }); - set.status = 500; - return JSON.stringify({ - object: "error", - error: { type: "server_error", message: "Failed to fetch upstream" }, - }); - } - - if (!resp.ok) { - const msg = await resp.text(); - logger.error("upstream error", { status: resp.status, msg }); - completion.status = "failed"; - addCompletions(completion, bearer, { - level: "error", - message: `Upstream error: ${msg}`, - details: { - type: "completionError", - data: { type: "upstreamError", status: resp.status, msg }, - }, - }).catch(() => { - logger.error("Failed to log completion error after upstream error"); - }); - set.status = resp.status; - return msg; - } - // Parse response using upstream adapter const upstreamAdapter = getUpstreamAdapter(providerType); const internalResponse = await upstreamAdapter.parseResponse(resp); @@ -259,92 +207,17 @@ async function handleNonStreamingRequest( } /** - * Handle streaming response request + * Process a successful streaming response * @yields SSE formatted strings */ -async function* handleStreamingRequest( - upstreamUrl: string, - upstreamInit: RequestInit, +async function* processStreamingResponse( + resp: Response, completion: Completion, bearer: string, - set: { status?: number | string }, providerType: string, apiKeyRecord: ApiKey | null, + begin: number, ): AsyncGenerator { - const begin = Date.now(); - - logger.debug("proxying stream responses request to upstream", { - userKey: bearer, - upstreamUrl, - providerType, - stream: true, - }); - - const [resp, err] = await fetch(upstreamUrl, upstreamInit) - .then((r) => [r, null] as [Response, null]) - .catch((error: unknown) => { - logger.error("fetch error", error); - return [null, error] as [null, Error]; - }); - - if (!resp) { - logger.error("upstream error", { - status: 500, - msg: "Failed to fetch upstream", - }); - completion.status = "failed"; - addCompletions(completion, bearer, { - level: "error", - message: `Failed to fetch upstream. ${err.toString()}`, - details: { - type: "completionError", - data: { type: "fetchError", msg: err.toString() }, - }, - }).catch(() => { - logger.error("Failed to log completion error after fetch failure"); - }); - set.status = 500; - yield `data: ${JSON.stringify({ type: "error", error: { type: "server_error", message: "Failed to fetch upstream" } })}\n\n`; - return; - } - - if (!resp.ok) { - const msg = await resp.text(); - logger.error("upstream error", { status: resp.status, msg }); - completion.status = "failed"; - addCompletions(completion, bearer, { - level: "error", - message: `Upstream error: ${msg}`, - details: { - type: "completionError", - data: { type: "upstreamError", status: resp.status, msg }, - }, - }).catch(() => { - logger.error("Failed to log completion error after upstream error"); - }); - set.status = resp.status; - yield msg; - return; - } - - if (!resp.body) { - logger.error("upstream error", { status: resp.status, msg: "No body" }); - completion.status = "failed"; - addCompletions(completion, bearer, { - level: "error", - message: "No body", - details: { - type: "completionError", - data: { type: "upstreamError", status: resp.status, msg: "No body" }, - }, - }).catch(() => { - logger.error("Failed to log completion error after no body"); - }); - set.status = 500; - yield `data: ${JSON.stringify({ type: "error", error: { type: "server_error", message: "No body" } })}\n\n`; - return; - } - // Get adapters const upstreamAdapter = getUpstreamAdapter(providerType); const responseAdapter = getResponseAdapter("openai-responses"); @@ -422,25 +295,9 @@ async function* handleStreamingRequest( await consumeTokens(apiKeyRecord.id, apiKeyRecord.tpmLimit, totalTokens); } + // Handle case where no chunks were received if (isFirstChunk) { - logger.error("upstream error: no chunk received"); - completion.status = "failed"; - addCompletions(completion, bearer, { - level: "error", - message: "No chunk received", - details: { - type: "completionError", - data: { - type: "upstreamError", - status: 500, - msg: "No chunk received", - }, - }, - }).catch(() => { - logger.error("Failed to log completion error after no chunk received"); - }); - set.status = 500; - yield `data: ${JSON.stringify({ type: "error", error: { type: "server_error", message: "No chunk received" } })}\n\n`; + throw new Error("No chunk received from upstream"); } } catch (error) { logger.error("Stream processing error", error); @@ -455,11 +312,18 @@ async function* handleStreamingRequest( }).catch(() => { logger.error("Failed to log completion error after stream failure"); }); - set.status = 500; - yield `data: ${JSON.stringify({ type: "error", error: { type: "server_error", message: "Stream processing error" } })}\n\n`; + throw error; } } +// Failover configuration for responses API +const RESPONSES_FAILOVER_CONFIG: Partial = { + maxProviderAttempts: 3, + sameProviderRetries: 1, + retriableStatusCodes: [429, 500, 502, 503, 504], + timeoutMs: 120000, // 2 minutes for responses +}; + export const responsesApi = new Elysia({ detail: { security: [{ apiKey: [] }], @@ -481,6 +345,7 @@ export const responsesApi = new Elysia({ } const reqHeaders = request.headers; + const begin = Date.now(); // Parse model@provider format and extract provider from header const { systemName, targetProvider } = parseModelProvider( @@ -507,12 +372,13 @@ export const responsesApi = new Elysia({ return; } - // Select model/provider - const selected = selectModel( + // Filter candidates by target provider (if specified) + const filteredCandidates = filterCandidates( modelsWithProviders as ModelWithProvider[], targetProvider, ); - if (!selected) { + + if (filteredCandidates.length === 0) { set.status = 404; yield JSON.stringify({ object: "error", @@ -524,7 +390,11 @@ export const responsesApi = new Elysia({ return; } - const { model: modelConfig, provider } = selected; + // Select candidates for failover (weighted random order) + const candidates = selectMultipleCandidates( + filteredCandidates, + RESPONSES_FAILOVER_CONFIG.maxProviderAttempts || 3, + ); // Extract extra headers for passthrough const extraHeaders = extractUpstreamHeaders(reqHeaders); @@ -535,10 +405,7 @@ export const responsesApi = new Elysia({ body as Record, ); - // Update model in internal request to use remote ID - internalRequest.model = modelConfig.remoteId ?? modelConfig.systemName; - - // Add extra headers + // Add extra headers to internal request if (extraHeaders) { internalRequest.extraHeaders = { ...internalRequest.extraHeaders, @@ -546,45 +413,268 @@ export const responsesApi = new Elysia({ }; } - // Get provider type (default to openai for compatibility) - const providerType = provider.type || "openai"; + // Build request function for failover + const buildRequestForProvider = (mp: ModelWithProvider) => { + // Clone internal request and update model + const req = { ...internalRequest }; + req.model = mp.model.remoteId ?? mp.model.systemName; - // Build upstream request using adapter - const upstreamAdapter = getUpstreamAdapter(providerType); - const { url: upstreamUrl, init: upstreamInit } = - upstreamAdapter.buildRequest(internalRequest, provider); - - // Build completion record for logging - const completion = buildCompletionRecord( - body.model, - modelConfig.id, - body.input, - internalRequest.extraParams, - extraHeaders, - ); + const providerType = mp.provider.type || "openai"; + const upstreamAdapter = getUpstreamAdapter(providerType); + return upstreamAdapter.buildRequest(req, mp.provider); + }; // Handle streaming vs non-streaming if (internalRequest.stream) { - yield* handleStreamingRequest( - upstreamUrl, - upstreamInit, - completion, - bearer, - set, - providerType, - apiKeyRecord ?? null, + // For streaming, use failover only for connection establishment + const result = await executeWithFailover( + candidates, + buildRequestForProvider, + RESPONSES_FAILOVER_CONFIG, ); + + if (!result.success) { + // Build completion record for logging + const completion = buildCompletionRecord( + body.model, + result.provider?.model.id ?? candidates[0]?.model.id ?? 0, + body.input, + internalRequest.extraParams, + extraHeaders, + ); + completion.status = "failed"; + + // Non-retriable HTTP error from upstream - forward the response + if (result.response) { + logger.warn("Non-retriable upstream error for streaming request", { + status: result.response.status, + provider: result.provider?.provider.name, + }); + const errorSummary = result.errors + .map((e) => `${e.providerName}: ${e.error}`) + .join("; "); + addCompletions(completion, bearer, { + level: "error", + message: `Upstream error (non-retriable): ${errorSummary}`, + details: { + type: "completionError", + data: { + type: "upstreamError", + msg: result.finalError, + }, + }, + }).catch(() => { + logger.error("Failed to log completion after upstream error"); + }); + + set.status = result.response.status; + const responseBody = await result.response.text(); + yield responseBody; + return; + } + + // All providers failed with retriable errors or network errors + logger.error("All providers failed for streaming request", { + errors: result.errors, + totalAttempts: result.totalAttempts, + }); + const errorSummary = result.errors + .map((e) => `${e.providerName}: ${e.error}`) + .join("; "); + addCompletions(completion, bearer, { + level: "error", + message: `All providers failed (${result.totalAttempts} attempts): ${errorSummary}`, + details: { + type: "completionError", + data: { + type: "failoverExhausted", + msg: result.finalError, + }, + }, + }).catch(() => { + logger.error("Failed to log completion after failover exhaustion"); + }); + + set.status = 502; + yield JSON.stringify({ + object: "error", + error: { + type: "server_error", + message: "All upstream providers failed", + }, + }); + return; + } + + if (!result.response || !result.provider) { + set.status = 500; + yield JSON.stringify({ + object: "error", + error: { type: "server_error", message: "Internal server error" }, + }); + return; + } + + // Check if response has body + if (!result.response.body) { + set.status = 500; + yield JSON.stringify({ + object: "error", + error: { type: "server_error", message: "No body in response" }, + }); + return; + } + + const providerType = result.provider.provider.type || "openai"; + const completion = buildCompletionRecord( + body.model, + result.provider.model.id, + body.input, + internalRequest.extraParams, + extraHeaders, + ); + + try { + yield* processStreamingResponse( + result.response, + completion, + bearer, + providerType, + apiKeyRecord ?? null, + begin, + ); + } catch { + set.status = 500; + yield `data: ${JSON.stringify({ type: "error", error: { type: "server_error", message: "Stream processing error" } })}\n\n`; + } } else { - const response = await handleNonStreamingRequest( - upstreamUrl, - upstreamInit, - completion, - bearer, - set, - providerType, - apiKeyRecord ?? null, + // Non-streaming request with failover + const result = await executeWithFailover( + candidates, + buildRequestForProvider, + RESPONSES_FAILOVER_CONFIG, ); - yield response; + + if (!result.success) { + // Build completion record for logging + const completion = buildCompletionRecord( + body.model, + result.provider?.model.id ?? candidates[0]?.model.id ?? 0, + body.input, + internalRequest.extraParams, + extraHeaders, + ); + completion.status = "failed"; + + // Non-retriable HTTP error from upstream - forward the response + if (result.response) { + logger.warn("Non-retriable upstream error for non-streaming request", { + status: result.response.status, + provider: result.provider?.provider.name, + }); + const errorSummary = result.errors + .map((e) => `${e.providerName}: ${e.error}`) + .join("; "); + addCompletions(completion, bearer, { + level: "error", + message: `Upstream error (non-retriable): ${errorSummary}`, + details: { + type: "completionError", + data: { + type: "upstreamError", + msg: result.finalError, + }, + }, + }).catch(() => { + logger.error("Failed to log completion after upstream error"); + }); + + set.status = result.response.status; + const responseBody = await result.response.text(); + yield responseBody; + return; + } + + // All providers failed with retriable errors or network errors + logger.error("All providers failed for non-streaming request", { + errors: result.errors, + totalAttempts: result.totalAttempts, + }); + const errorSummary = result.errors + .map((e) => `${e.providerName}: ${e.error}`) + .join("; "); + addCompletions(completion, bearer, { + level: "error", + message: `All providers failed (${result.totalAttempts} attempts): ${errorSummary}`, + details: { + type: "completionError", + data: { + type: "failoverExhausted", + msg: result.finalError, + }, + }, + }).catch(() => { + logger.error("Failed to log completion after failover exhaustion"); + }); + + set.status = 502; + yield JSON.stringify({ + object: "error", + error: { + type: "server_error", + message: "All upstream providers failed", + }, + }); + return; + } + + if (!result.response || !result.provider) { + set.status = 500; + yield JSON.stringify({ + object: "error", + error: { type: "server_error", message: "Internal server error" }, + }); + return; + } + + const providerType = result.provider.provider.type || "openai"; + const completion = buildCompletionRecord( + body.model, + result.provider.model.id, + body.input, + internalRequest.extraParams, + extraHeaders, + ); + + try { + const response = await processNonStreamingResponse( + result.response, + completion, + bearer, + providerType, + apiKeyRecord ?? null, + begin, + ); + yield response; + } catch (error) { + logger.error("Failed to process response", error); + completion.status = "failed"; + addCompletions(completion, bearer, { + level: "error", + message: `Response processing error: ${String(error)}`, + details: { + type: "completionError", + data: { type: "processingError", msg: String(error) }, + }, + }).catch(() => { + logger.error("Failed to log completion after processing error"); + }); + set.status = 500; + yield JSON.stringify({ + object: "error", + error: { type: "server_error", message: "Failed to process response" }, + }); + } } }, { diff --git a/backend/src/services/failover.ts b/backend/src/services/failover.ts new file mode 100644 index 0000000..2fa039f --- /dev/null +++ b/backend/src/services/failover.ts @@ -0,0 +1,413 @@ +/** + * Failover Service + * + * Provides automatic failover and retry capabilities for upstream requests. + * When a request to one provider fails with a retriable error, it automatically + * tries the next available provider. + */ + +import { consola } from "consola"; +import type { ModelWithProvider } from "@/adapters/types"; + +const logger = consola.withTag("failover"); + +// ============================================================================= +// Configuration Types +// ============================================================================= + +export interface FailoverConfig { + /** Maximum number of different providers to try (default: 3) */ + maxProviderAttempts: number; + /** Maximum retries on the same provider for transient errors like 429 (default: 1) */ + sameProviderRetries: number; + /** HTTP status codes that should trigger a retry (default: [429, 500, 502, 503, 504]) */ + retriableStatusCodes: number[]; + /** Network error codes that should trigger a retry */ + retriableErrorCodes: string[]; + /** Base delay in ms for exponential backoff (default: 100) */ + baseDelayMs: number; + /** Maximum delay in ms for exponential backoff (default: 5000) */ + maxDelayMs: number; + /** Exponential base for backoff calculation (default: 2) */ + exponentialBase: number; + /** Jitter factor to add randomness to delays (default: 0.1) */ + jitterFactor: number; + /** Request timeout in ms (default: 60000) */ + timeoutMs: number; +} + +export const DEFAULT_FAILOVER_CONFIG: FailoverConfig = { + maxProviderAttempts: 3, + sameProviderRetries: 1, + retriableStatusCodes: [429, 500, 502, 503, 504], + retriableErrorCodes: [ + "ETIMEDOUT", + "ECONNRESET", + "ECONNREFUSED", + "ENOTFOUND", + "EAI_AGAIN", + "EPIPE", + "UND_ERR_CONNECT_TIMEOUT", + ], + baseDelayMs: 100, + maxDelayMs: 5000, + exponentialBase: 2, + jitterFactor: 0.1, + timeoutMs: 60000, +}; + +// ============================================================================= +// Error Types +// ============================================================================= + +export interface FailoverError { + providerId: number; + providerName: string; + attempt: number; + error: string; + statusCode?: number; + retriable: boolean; + timestamp: number; +} + +export interface FailoverResult { + success: boolean; + response?: T; + provider?: ModelWithProvider; + errors: FailoverError[]; + totalAttempts: number; + finalError?: string; +} + +// ============================================================================= +// Utility Functions +// ============================================================================= + +/** + * Calculate delay with exponential backoff and jitter + */ +export function calculateBackoffDelay( + attempt: number, + config: FailoverConfig, +): number { + const exponentialDelay = + config.baseDelayMs * Math.pow(config.exponentialBase, attempt); + const cappedDelay = Math.min(exponentialDelay, config.maxDelayMs); + const jitter = cappedDelay * config.jitterFactor * Math.random(); + return Math.floor(cappedDelay + jitter); +} + +/** + * Check if an HTTP status code is retriable + */ +export function isRetriableStatusCode( + statusCode: number, + config: FailoverConfig, +): boolean { + return config.retriableStatusCodes.includes(statusCode); +} + +/** + * Check if a network error is retriable + */ +export function isRetriableNetworkError( + error: Error, + config: FailoverConfig, +): boolean { + const errorCode = (error as NodeJS.ErrnoException).code; + if (errorCode && config.retriableErrorCodes.includes(errorCode)) { + return true; + } + // Check error message for common network issues + const message = error.message.toLowerCase(); + return ( + message.includes("timeout") || + message.includes("econnreset") || + message.includes("econnrefused") || + message.includes("network") || + message.includes("socket hang up") + ); +} + +/** + * Sleep for a given number of milliseconds + */ +export async function sleep(ms: number): Promise { + return new Promise((resolve) => setTimeout(resolve, ms)); +} + +/** + * Fetch with timeout support + */ +export async function fetchWithTimeout( + url: string, + init: RequestInit, + timeoutMs: number, +): Promise { + const controller = new AbortController(); + const timeoutId = setTimeout(() => { + controller.abort(); + }, timeoutMs); + + try { + const response = await fetch(url, { + ...init, + signal: controller.signal, + }); + return response; + } finally { + clearTimeout(timeoutId); + } +} + +// ============================================================================= +// Failover Execution Engine +// ============================================================================= + +export interface RequestBuilder { + (provider: ModelWithProvider): { url: string; init: RequestInit }; +} + +/** + * Execute a request with automatic failover across multiple providers + * + * @param candidates - List of model/provider combinations to try + * @param buildRequest - Function to build the request for a given provider + * @param config - Failover configuration (optional, uses defaults if not provided) + * @returns FailoverResult with the response or accumulated errors + */ +export async function executeWithFailover( + candidates: ModelWithProvider[], + buildRequest: RequestBuilder, + config: Partial = {}, +): Promise> { + const cfg: FailoverConfig = { ...DEFAULT_FAILOVER_CONFIG, ...config }; + const errors: FailoverError[] = []; + const triedProviders = new Set(); + let totalAttempts = 0; + + // Limit candidates to maxProviderAttempts + const maxProviders = Math.min(candidates.length, cfg.maxProviderAttempts); + + for (let providerIndex = 0; providerIndex < maxProviders; providerIndex++) { + // Select next provider (skip already tried ones) + const provider = candidates.find( + (c) => !triedProviders.has(c.provider.id), + ); + if (!provider) { + logger.debug("No more providers to try"); + break; + } + + triedProviders.add(provider.provider.id); + const { url, init } = buildRequest(provider); + + // Try this provider with same-provider retries for transient errors + for ( + let sameProviderAttempt = 0; + sameProviderAttempt <= cfg.sameProviderRetries; + sameProviderAttempt++ + ) { + totalAttempts++; + + // Add delay for retries (not for first attempt) + if (sameProviderAttempt > 0 || providerIndex > 0) { + const delay = calculateBackoffDelay(totalAttempts - 1, cfg); + logger.debug("Waiting before retry", { + delay, + attempt: totalAttempts, + provider: provider.provider.name, + }); + await sleep(delay); + } + + try { + logger.debug("Attempting request", { + provider: provider.provider.name, + providerId: provider.provider.id, + attempt: totalAttempts, + sameProviderAttempt, + url, + }); + + const response = await fetchWithTimeout(url, init, cfg.timeoutMs); + + // Success - return immediately + if (response.ok) { + logger.debug("Request succeeded", { + provider: provider.provider.name, + attempt: totalAttempts, + status: response.status, + }); + return { + success: true, + response, + provider, + errors, + totalAttempts, + }; + } + + // Check if status code is retriable + const retriable = isRetriableStatusCode(response.status, cfg); + const errorMsg = `HTTP ${response.status}`; + + errors.push({ + providerId: provider.provider.id, + providerName: provider.provider.name, + attempt: totalAttempts, + error: errorMsg, + statusCode: response.status, + retriable, + timestamp: Date.now(), + }); + + logger.warn("Request failed with HTTP error", { + provider: provider.provider.name, + status: response.status, + retriable, + attempt: totalAttempts, + }); + + // If not retriable, return the response (let caller handle the error) + if (!retriable) { + return { + success: false, + response, + provider, + errors, + totalAttempts, + finalError: errorMsg, + }; + } + + // If retriable but we've exhausted same-provider retries, move to next provider + if (sameProviderAttempt >= cfg.sameProviderRetries) { + logger.debug("Exhausted same-provider retries, trying next provider"); + break; + } + + // Otherwise, continue with same-provider retry + } catch (error) { + const err = error instanceof Error ? error : new Error(String(error)); + const retriable = isRetriableNetworkError(err, cfg); + + errors.push({ + providerId: provider.provider.id, + providerName: provider.provider.name, + attempt: totalAttempts, + error: err.message, + retriable, + timestamp: Date.now(), + }); + + logger.warn("Request failed with network error", { + provider: provider.provider.name, + error: err.message, + retriable, + attempt: totalAttempts, + }); + + // If not retriable, throw immediately + if (!retriable) { + return { + success: false, + errors, + totalAttempts, + finalError: err.message, + }; + } + + // If retriable but we've exhausted same-provider retries, move to next provider + if (sameProviderAttempt >= cfg.sameProviderRetries) { + logger.debug("Exhausted same-provider retries, trying next provider"); + break; + } + } + } + } + + // All providers exhausted + const lastError = errors[errors.length - 1]; + logger.error("All providers exhausted", { + totalAttempts, + providersAttempted: triedProviders.size, + errors: errors.map((e) => ({ + provider: e.providerName, + error: e.error, + statusCode: e.statusCode, + })), + }); + + return { + success: false, + errors, + totalAttempts, + finalError: lastError?.error || "All providers exhausted", + }; +} + +/** + * Reorder candidates to put a specific provider first (if available) + * while maintaining weighted random order for the rest + */ +export function reorderCandidatesWithPreferred( + candidates: ModelWithProvider[], + preferredProviderId?: number, +): ModelWithProvider[] { + if (!preferredProviderId || candidates.length <= 1) { + return candidates; + } + + const preferred = candidates.find( + (c) => c.provider.id === preferredProviderId, + ); + if (!preferred) { + return candidates; + } + + const others = candidates.filter((c) => c.provider.id !== preferredProviderId); + return [preferred, ...others]; +} + +/** + * Select multiple candidates using weighted random selection + * Returns candidates in order of selection (first selected = highest priority) + */ +export function selectMultipleCandidates( + candidates: ModelWithProvider[], + count: number, +): ModelWithProvider[] { + if (candidates.length <= count) { + return [...candidates]; + } + + const result: ModelWithProvider[] = []; + const remaining = [...candidates]; + + for (let i = 0; i < count && remaining.length > 0; i++) { + const totalWeight = remaining.reduce((sum, c) => sum + c.model.weight, 0); + const random = Math.random() * totalWeight; + + let cumulative = 0; + let selectedIndex = 0; + for (let j = 0; j < remaining.length; j++) { + const item = remaining[j]; + if (item) { + cumulative += item.model.weight; + if (random < cumulative) { + selectedIndex = j; + break; + } + } + } + + const selected = remaining[selectedIndex]; + if (selected) { + result.push(selected); + remaining.splice(selectedIndex, 1); + } + } + + return result; +} diff --git a/backend/src/utils/api-helpers.ts b/backend/src/utils/api-helpers.ts index db6b2ef..9540d5d 100644 --- a/backend/src/utils/api-helpers.ts +++ b/backend/src/utils/api-helpers.ts @@ -80,6 +80,36 @@ export function extractUpstreamHeaders( // Model Selection // ============================================================================= +/** + * Filter candidates by target provider if specified + * Returns all matching candidates (for use with failover) + */ +export function filterCandidates( + modelsWithProviders: ModelWithProvider[], + targetProvider?: string, +): ModelWithProvider[] { + if (modelsWithProviders.length === 0) { + return []; + } + + if (!targetProvider) { + return modelsWithProviders; + } + + const filtered = modelsWithProviders.filter( + (mp) => mp.provider.name === targetProvider, + ); + + if (filtered.length > 0) { + return filtered; + } + + logger.warn( + `Provider '${targetProvider}' does not offer requested model, falling back to available providers`, + ); + return modelsWithProviders; +} + /** * Select the best model/provider combination based on target provider and weights * Uses weighted random selection for load balancing across multiple providers From d4e3e9e157c9814c19a85663fba1bd96ecf03157 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=9F=A9=E7=BF=94=E5=AE=87?= Date: Tue, 20 Jan 2026 05:41:55 +0800 Subject: [PATCH 2/5] fix: address PR review comments for failover implementation - Change modelId fallback from ?? 0 to undefined to avoid database integrity issues - Add AbortError handling in isRetriableNetworkError for timeout scenarios - Fix SSE error format in responses.ts to match OpenAI Responses API spec - Add error capture to catch blocks for better logging - Optimize selectMultipleCandidates by calculating totalWeight once Co-Authored-By: Claude Opus 4.5 --- backend/src/api/v1/completions.ts | 9 +++++---- backend/src/api/v1/messages.ts | 9 +++++---- backend/src/api/v1/responses.ts | 11 ++++++----- backend/src/services/failover.ts | 15 +++++++++++---- 4 files changed, 27 insertions(+), 17 deletions(-) diff --git a/backend/src/api/v1/completions.ts b/backend/src/api/v1/completions.ts index ab908bd..af28619 100644 --- a/backend/src/api/v1/completions.ts +++ b/backend/src/api/v1/completions.ts @@ -109,7 +109,7 @@ const tChatCompletionCreate = t.Object( */ function buildCompletionRecord( requestedModel: string, - modelId: number, + modelId: number | undefined, messages: CompletionsMessageType[], tools?: ToolDefinitionType[], toolChoice?: ToolChoiceType, @@ -482,7 +482,7 @@ export const completionsApi = new Elysia({ // Build completion record for logging const completion = buildCompletionRecord( body.model, - result.provider?.model.id ?? candidates[0]?.model.id ?? 0, + result.provider?.model.id ?? candidates[0]?.model.id, body.messages as CompletionsMessageType[], body.tools as ToolDefinitionType[] | undefined, body.tool_choice as ToolChoiceType | undefined, @@ -586,7 +586,8 @@ export const completionsApi = new Elysia({ apiKeyRecord ?? null, begin, ); - } catch { + } catch (error) { + logger.error("Stream processing error", error); set.status = 500; yield JSON.stringify({ error: "Stream processing error" }); } @@ -602,7 +603,7 @@ export const completionsApi = new Elysia({ // Build completion record for logging const completion = buildCompletionRecord( body.model, - result.provider?.model.id ?? candidates[0]?.model.id ?? 0, + result.provider?.model.id ?? candidates[0]?.model.id, body.messages as CompletionsMessageType[], body.tools as ToolDefinitionType[] | undefined, body.tool_choice as ToolChoiceType | undefined, diff --git a/backend/src/api/v1/messages.ts b/backend/src/api/v1/messages.ts index bc5e224..7db6fc5 100644 --- a/backend/src/api/v1/messages.ts +++ b/backend/src/api/v1/messages.ts @@ -123,7 +123,7 @@ const tAnthropicMessageCreate = t.Object( */ function buildCompletionRecord( requestedModel: string, - modelId: number, + modelId: number | undefined, messages: Array<{ role: string; content: unknown }>, extraBody?: Record, extraHeaders?: Record, @@ -421,7 +421,7 @@ export const messagesApi = new Elysia({ // Build completion record for logging const completion = buildCompletionRecord( body.model, - result.provider?.model.id ?? candidates[0]?.model.id ?? 0, + result.provider?.model.id ?? candidates[0]?.model.id, body.messages, internalRequest.extraParams, extraHeaders, @@ -527,7 +527,8 @@ export const messagesApi = new Elysia({ apiKeyRecord ?? null, begin, ); - } catch { + } catch (error) { + logger.error("Stream processing error", error); set.status = 500; yield `event: error\ndata: ${JSON.stringify({ type: "error", error: { type: "api_error", message: "Stream processing error" } })}\n\n`; } @@ -543,7 +544,7 @@ export const messagesApi = new Elysia({ // Build completion record for logging const completion = buildCompletionRecord( body.model, - result.provider?.model.id ?? candidates[0]?.model.id ?? 0, + result.provider?.model.id ?? candidates[0]?.model.id, body.messages, internalRequest.extraParams, extraHeaders, diff --git a/backend/src/api/v1/responses.ts b/backend/src/api/v1/responses.ts index 576a627..3c9a3de 100644 --- a/backend/src/api/v1/responses.ts +++ b/backend/src/api/v1/responses.ts @@ -112,7 +112,7 @@ const tResponseApiCreate = t.Object( */ function buildCompletionRecord( requestedModel: string, - modelId: number, + modelId: number | undefined, input: unknown, extraBody?: Record, extraHeaders?: Record, @@ -437,7 +437,7 @@ export const responsesApi = new Elysia({ // Build completion record for logging const completion = buildCompletionRecord( body.model, - result.provider?.model.id ?? candidates[0]?.model.id ?? 0, + result.provider?.model.id ?? candidates[0]?.model.id, body.input, internalRequest.extraParams, extraHeaders, @@ -543,9 +543,10 @@ export const responsesApi = new Elysia({ apiKeyRecord ?? null, begin, ); - } catch { + } catch (error) { + logger.error("Stream processing error", error); set.status = 500; - yield `data: ${JSON.stringify({ type: "error", error: { type: "server_error", message: "Stream processing error" } })}\n\n`; + yield `event: error\ndata: ${JSON.stringify({ type: "error", error: { code: "internal_error", message: "Stream processing error", param: null, help_url: null } })}\n\n`; } } else { // Non-streaming request with failover @@ -559,7 +560,7 @@ export const responsesApi = new Elysia({ // Build completion record for logging const completion = buildCompletionRecord( body.model, - result.provider?.model.id ?? candidates[0]?.model.id ?? 0, + result.provider?.model.id ?? candidates[0]?.model.id, body.input, internalRequest.extraParams, extraHeaders, diff --git a/backend/src/services/failover.ts b/backend/src/services/failover.ts index 2fa039f..80c7b62 100644 --- a/backend/src/services/failover.ts +++ b/backend/src/services/failover.ts @@ -118,10 +118,15 @@ export function isRetriableNetworkError( if (errorCode && config.retriableErrorCodes.includes(errorCode)) { return true; } + // Handle AbortController timeout errors (AbortError) + if (error.name === "AbortError") { + return true; + } // Check error message for common network issues const message = error.message.toLowerCase(); return ( message.includes("timeout") || + message.includes("aborted") || message.includes("econnreset") || message.includes("econnrefused") || message.includes("network") || @@ -384,13 +389,15 @@ export function selectMultipleCandidates( const result: ModelWithProvider[] = []; const remaining = [...candidates]; + // Calculate total weight once and update incrementally + let totalWeight = remaining.reduce((sum, c) => sum + c.model.weight, 0); for (let i = 0; i < count && remaining.length > 0; i++) { - const totalWeight = remaining.reduce((sum, c) => sum + c.model.weight, 0); const random = Math.random() * totalWeight; let cumulative = 0; - let selectedIndex = 0; + // Default to the last element as a fallback for floating point edge cases + let selectedIndex = remaining.length - 1; for (let j = 0; j < remaining.length; j++) { const item = remaining[j]; if (item) { @@ -402,10 +409,10 @@ export function selectMultipleCandidates( } } - const selected = remaining[selectedIndex]; + const [selected] = remaining.splice(selectedIndex, 1); if (selected) { result.push(selected); - remaining.splice(selectedIndex, 1); + totalWeight -= selected.model.weight; } } From 360d6426df901ce45449c3c81158f6bc685f16c8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=9F=A9=E7=BF=94=E5=AE=87?= Date: Tue, 20 Jan 2026 05:51:10 +0800 Subject: [PATCH 3/5] refactor: extract shared failover error handling to reduce code duplication - Add processFailoverError helper function in api-helpers.ts - Refactor completions.ts, messages.ts, responses.ts to use shared helper - Reduces ~120 lines of duplicated error handling code - Maintains same behavior: forward non-retriable errors, return 502 on exhaustion Co-Authored-By: Claude Opus 4.5 --- backend/src/api/v1/completions.ts | 112 +++--------------------------- backend/src/api/v1/messages.ts | 112 +++--------------------------- backend/src/api/v1/responses.ts | 112 +++--------------------------- backend/src/utils/api-helpers.ts | 105 ++++++++++++++++++++++++++++ 4 files changed, 138 insertions(+), 303 deletions(-) diff --git a/backend/src/api/v1/completions.ts b/backend/src/api/v1/completions.ts index af28619..db36840 100644 --- a/backend/src/api/v1/completions.ts +++ b/backend/src/api/v1/completions.ts @@ -27,6 +27,7 @@ import { extractContentText, extractToolCalls, parseModelProvider, + processFailoverError, PROVIDER_HEADER, } from "@/utils/api-helpers"; import { addCompletions, type Completion } from "@/utils/completions"; @@ -479,7 +480,6 @@ export const completionsApi = new Elysia({ ); if (!result.success) { - // Build completion record for logging const completion = buildCompletionRecord( body.model, result.provider?.model.id ?? candidates[0]?.model.id, @@ -489,59 +489,15 @@ export const completionsApi = new Elysia({ internalRequest.extraParams, extraHeaders, ); - completion.status = "failed"; - // Non-retriable HTTP error from upstream - forward the response - if (result.response) { - logger.warn("Non-retriable upstream error for streaming request", { - status: result.response.status, - provider: result.provider?.provider.name, - }); - const errorSummary = result.errors - .map((e) => `${e.providerName}: ${e.error}`) - .join("; "); - addCompletions(completion, bearer, { - level: "error", - message: `Upstream error (non-retriable): ${errorSummary}`, - details: { - type: "completionError", - data: { - type: "upstreamError", - msg: result.finalError, - }, - }, - }).catch(() => { - logger.error("Failed to log completion after upstream error"); - }); - - set.status = result.response.status; - const responseBody = await result.response.text(); - yield responseBody; + const errorResult = await processFailoverError(result, completion, bearer, "streaming"); + + if (errorResult.type === "upstream_error") { + set.status = errorResult.status; + yield errorResult.body; return; } - // All providers failed with retriable errors or network errors - logger.error("All providers failed for streaming request", { - errors: result.errors, - totalAttempts: result.totalAttempts, - }); - const errorSummary = result.errors - .map((e) => `${e.providerName}: ${e.error}`) - .join("; "); - addCompletions(completion, bearer, { - level: "error", - message: `All providers failed (${result.totalAttempts} attempts): ${errorSummary}`, - details: { - type: "completionError", - data: { - type: "failoverExhausted", - msg: result.finalError, - }, - }, - }).catch(() => { - logger.error("Failed to log completion after failover exhaustion"); - }); - set.status = 502; yield JSON.stringify({ error: { @@ -559,7 +515,6 @@ export const completionsApi = new Elysia({ return; } - // Check if response has body if (!result.response.body) { set.status = 500; yield JSON.stringify({ error: "No body in response" }); @@ -600,7 +555,6 @@ export const completionsApi = new Elysia({ ); if (!result.success) { - // Build completion record for logging const completion = buildCompletionRecord( body.model, result.provider?.model.id ?? candidates[0]?.model.id, @@ -610,59 +564,15 @@ export const completionsApi = new Elysia({ internalRequest.extraParams, extraHeaders, ); - completion.status = "failed"; - // Non-retriable HTTP error from upstream - forward the response - if (result.response) { - logger.warn("Non-retriable upstream error for non-streaming request", { - status: result.response.status, - provider: result.provider?.provider.name, - }); - const errorSummary = result.errors - .map((e) => `${e.providerName}: ${e.error}`) - .join("; "); - addCompletions(completion, bearer, { - level: "error", - message: `Upstream error (non-retriable): ${errorSummary}`, - details: { - type: "completionError", - data: { - type: "upstreamError", - msg: result.finalError, - }, - }, - }).catch(() => { - logger.error("Failed to log completion after upstream error"); - }); - - set.status = result.response.status; - const responseBody = await result.response.text(); - yield responseBody; + const errorResult = await processFailoverError(result, completion, bearer, "non-streaming"); + + if (errorResult.type === "upstream_error") { + set.status = errorResult.status; + yield errorResult.body; return; } - // All providers failed with retriable errors or network errors - logger.error("All providers failed for non-streaming request", { - errors: result.errors, - totalAttempts: result.totalAttempts, - }); - const errorSummary = result.errors - .map((e) => `${e.providerName}: ${e.error}`) - .join("; "); - addCompletions(completion, bearer, { - level: "error", - message: `All providers failed (${result.totalAttempts} attempts): ${errorSummary}`, - details: { - type: "completionError", - data: { - type: "failoverExhausted", - msg: result.finalError, - }, - }, - }).catch(() => { - logger.error("Failed to log completion after failover exhaustion"); - }); - set.status = 502; yield JSON.stringify({ error: { diff --git a/backend/src/api/v1/messages.ts b/backend/src/api/v1/messages.ts index 7db6fc5..9253f8c 100644 --- a/backend/src/api/v1/messages.ts +++ b/backend/src/api/v1/messages.ts @@ -20,6 +20,7 @@ import { filterCandidates, extractContentText, parseModelProvider, + processFailoverError, PROVIDER_HEADER, } from "@/utils/api-helpers"; import { addCompletions, type Completion } from "@/utils/completions"; @@ -418,7 +419,6 @@ export const messagesApi = new Elysia({ ); if (!result.success) { - // Build completion record for logging const completion = buildCompletionRecord( body.model, result.provider?.model.id ?? candidates[0]?.model.id, @@ -426,59 +426,15 @@ export const messagesApi = new Elysia({ internalRequest.extraParams, extraHeaders, ); - completion.status = "failed"; - // Non-retriable HTTP error from upstream - forward the response - if (result.response) { - logger.warn("Non-retriable upstream error for streaming request", { - status: result.response.status, - provider: result.provider?.provider.name, - }); - const errorSummary = result.errors - .map((e) => `${e.providerName}: ${e.error}`) - .join("; "); - addCompletions(completion, bearer, { - level: "error", - message: `Upstream error (non-retriable): ${errorSummary}`, - details: { - type: "completionError", - data: { - type: "upstreamError", - msg: result.finalError, - }, - }, - }).catch(() => { - logger.error("Failed to log completion after upstream error"); - }); - - set.status = result.response.status; - const responseBody = await result.response.text(); - yield responseBody; + const errorResult = await processFailoverError(result, completion, bearer, "streaming"); + + if (errorResult.type === "upstream_error") { + set.status = errorResult.status; + yield errorResult.body; return; } - // All providers failed with retriable errors or network errors - logger.error("All providers failed for streaming request", { - errors: result.errors, - totalAttempts: result.totalAttempts, - }); - const errorSummary = result.errors - .map((e) => `${e.providerName}: ${e.error}`) - .join("; "); - addCompletions(completion, bearer, { - level: "error", - message: `All providers failed (${result.totalAttempts} attempts): ${errorSummary}`, - details: { - type: "completionError", - data: { - type: "failoverExhausted", - msg: result.finalError, - }, - }, - }).catch(() => { - logger.error("Failed to log completion after failover exhaustion"); - }); - set.status = 502; yield JSON.stringify({ type: "error", @@ -499,7 +455,6 @@ export const messagesApi = new Elysia({ return; } - // Check if response has body if (!result.response.body) { set.status = 500; yield JSON.stringify({ @@ -541,7 +496,6 @@ export const messagesApi = new Elysia({ ); if (!result.success) { - // Build completion record for logging const completion = buildCompletionRecord( body.model, result.provider?.model.id ?? candidates[0]?.model.id, @@ -549,59 +503,15 @@ export const messagesApi = new Elysia({ internalRequest.extraParams, extraHeaders, ); - completion.status = "failed"; - // Non-retriable HTTP error from upstream - forward the response - if (result.response) { - logger.warn("Non-retriable upstream error for non-streaming request", { - status: result.response.status, - provider: result.provider?.provider.name, - }); - const errorSummary = result.errors - .map((e) => `${e.providerName}: ${e.error}`) - .join("; "); - addCompletions(completion, bearer, { - level: "error", - message: `Upstream error (non-retriable): ${errorSummary}`, - details: { - type: "completionError", - data: { - type: "upstreamError", - msg: result.finalError, - }, - }, - }).catch(() => { - logger.error("Failed to log completion after upstream error"); - }); - - set.status = result.response.status; - const responseBody = await result.response.text(); - yield responseBody; + const errorResult = await processFailoverError(result, completion, bearer, "non-streaming"); + + if (errorResult.type === "upstream_error") { + set.status = errorResult.status; + yield errorResult.body; return; } - // All providers failed with retriable errors or network errors - logger.error("All providers failed for non-streaming request", { - errors: result.errors, - totalAttempts: result.totalAttempts, - }); - const errorSummary = result.errors - .map((e) => `${e.providerName}: ${e.error}`) - .join("; "); - addCompletions(completion, bearer, { - level: "error", - message: `All providers failed (${result.totalAttempts} attempts): ${errorSummary}`, - details: { - type: "completionError", - data: { - type: "failoverExhausted", - msg: result.finalError, - }, - }, - }).catch(() => { - logger.error("Failed to log completion after failover exhaustion"); - }); - set.status = 502; yield JSON.stringify({ type: "error", diff --git a/backend/src/api/v1/responses.ts b/backend/src/api/v1/responses.ts index 3c9a3de..224e060 100644 --- a/backend/src/api/v1/responses.ts +++ b/backend/src/api/v1/responses.ts @@ -20,6 +20,7 @@ import { filterCandidates, extractContentText, parseModelProvider, + processFailoverError, PROVIDER_HEADER, } from "@/utils/api-helpers"; import { addCompletions, type Completion } from "@/utils/completions"; @@ -434,7 +435,6 @@ export const responsesApi = new Elysia({ ); if (!result.success) { - // Build completion record for logging const completion = buildCompletionRecord( body.model, result.provider?.model.id ?? candidates[0]?.model.id, @@ -442,59 +442,15 @@ export const responsesApi = new Elysia({ internalRequest.extraParams, extraHeaders, ); - completion.status = "failed"; - // Non-retriable HTTP error from upstream - forward the response - if (result.response) { - logger.warn("Non-retriable upstream error for streaming request", { - status: result.response.status, - provider: result.provider?.provider.name, - }); - const errorSummary = result.errors - .map((e) => `${e.providerName}: ${e.error}`) - .join("; "); - addCompletions(completion, bearer, { - level: "error", - message: `Upstream error (non-retriable): ${errorSummary}`, - details: { - type: "completionError", - data: { - type: "upstreamError", - msg: result.finalError, - }, - }, - }).catch(() => { - logger.error("Failed to log completion after upstream error"); - }); - - set.status = result.response.status; - const responseBody = await result.response.text(); - yield responseBody; + const errorResult = await processFailoverError(result, completion, bearer, "streaming"); + + if (errorResult.type === "upstream_error") { + set.status = errorResult.status; + yield errorResult.body; return; } - // All providers failed with retriable errors or network errors - logger.error("All providers failed for streaming request", { - errors: result.errors, - totalAttempts: result.totalAttempts, - }); - const errorSummary = result.errors - .map((e) => `${e.providerName}: ${e.error}`) - .join("; "); - addCompletions(completion, bearer, { - level: "error", - message: `All providers failed (${result.totalAttempts} attempts): ${errorSummary}`, - details: { - type: "completionError", - data: { - type: "failoverExhausted", - msg: result.finalError, - }, - }, - }).catch(() => { - logger.error("Failed to log completion after failover exhaustion"); - }); - set.status = 502; yield JSON.stringify({ object: "error", @@ -515,7 +471,6 @@ export const responsesApi = new Elysia({ return; } - // Check if response has body if (!result.response.body) { set.status = 500; yield JSON.stringify({ @@ -557,7 +512,6 @@ export const responsesApi = new Elysia({ ); if (!result.success) { - // Build completion record for logging const completion = buildCompletionRecord( body.model, result.provider?.model.id ?? candidates[0]?.model.id, @@ -565,59 +519,15 @@ export const responsesApi = new Elysia({ internalRequest.extraParams, extraHeaders, ); - completion.status = "failed"; - // Non-retriable HTTP error from upstream - forward the response - if (result.response) { - logger.warn("Non-retriable upstream error for non-streaming request", { - status: result.response.status, - provider: result.provider?.provider.name, - }); - const errorSummary = result.errors - .map((e) => `${e.providerName}: ${e.error}`) - .join("; "); - addCompletions(completion, bearer, { - level: "error", - message: `Upstream error (non-retriable): ${errorSummary}`, - details: { - type: "completionError", - data: { - type: "upstreamError", - msg: result.finalError, - }, - }, - }).catch(() => { - logger.error("Failed to log completion after upstream error"); - }); - - set.status = result.response.status; - const responseBody = await result.response.text(); - yield responseBody; + const errorResult = await processFailoverError(result, completion, bearer, "non-streaming"); + + if (errorResult.type === "upstream_error") { + set.status = errorResult.status; + yield errorResult.body; return; } - // All providers failed with retriable errors or network errors - logger.error("All providers failed for non-streaming request", { - errors: result.errors, - totalAttempts: result.totalAttempts, - }); - const errorSummary = result.errors - .map((e) => `${e.providerName}: ${e.error}`) - .join("; "); - addCompletions(completion, bearer, { - level: "error", - message: `All providers failed (${result.totalAttempts} attempts): ${errorSummary}`, - details: { - type: "completionError", - data: { - type: "failoverExhausted", - msg: result.finalError, - }, - }, - }).catch(() => { - logger.error("Failed to log completion after failover exhaustion"); - }); - set.status = 502; yield JSON.stringify({ object: "error", diff --git a/backend/src/utils/api-helpers.ts b/backend/src/utils/api-helpers.ts index 9540d5d..a24fc5b 100644 --- a/backend/src/utils/api-helpers.ts +++ b/backend/src/utils/api-helpers.ts @@ -243,3 +243,108 @@ export function parseModelProvider( return { systemName, targetProvider }; } + +// ============================================================================= +// Failover Error Handling +// ============================================================================= + +import type { FailoverResult, FailoverError } from "@/services/failover"; +import { addCompletions, type Completion } from "@/utils/completions"; + +/** + * Result of processing a failover error + */ +export type FailoverErrorResult = + | { + type: "upstream_error"; + status: number; + body: string; + } + | { + type: "failover_exhausted"; + status: 502; + } + | { + type: "internal_error"; + status: 500; + }; + +/** + * Generate error summary from failover errors + */ +export function getErrorSummary(errors: FailoverError[]): string { + return errors.map((e) => `${e.providerName}: ${e.error}`).join("; "); +} + +/** + * Process failover error and update completion record + * Returns structured result indicating error type and response data + * + * @param result - The failover result (must have success=false) + * @param completion - The completion record to update (will be mutated) + * @param bearer - API key for logging + * @param requestType - "streaming" or "non-streaming" for log messages + */ +export async function processFailoverError( + result: FailoverResult, + completion: Completion, + bearer: string, + requestType: "streaming" | "non-streaming", +): Promise { + completion.status = "failed"; + const errorSummary = getErrorSummary(result.errors); + + // Non-retriable HTTP error from upstream - forward the response + if (result.response) { + logger.warn(`Non-retriable upstream error for ${requestType} request`, { + status: result.response.status, + provider: result.provider?.provider.name, + }); + + addCompletions(completion, bearer, { + level: "error", + message: `Upstream error (non-retriable): ${errorSummary}`, + details: { + type: "completionError", + data: { + type: "upstreamError", + msg: result.finalError, + }, + }, + }).catch(() => { + logger.error("Failed to log completion after upstream error"); + }); + + const responseBody = await result.response.text(); + return { + type: "upstream_error", + status: result.response.status, + body: responseBody, + }; + } + + // All providers failed with retriable errors or network errors + logger.error(`All providers failed for ${requestType} request`, { + errors: result.errors, + totalAttempts: result.totalAttempts, + }); + + addCompletions(completion, bearer, { + level: "error", + message: `All providers failed (${result.totalAttempts} attempts): ${errorSummary}`, + details: { + type: "completionError", + data: { + type: "failoverExhausted", + msg: result.finalError, + }, + }, + }).catch(() => { + logger.error("Failed to log completion after failover exhaustion"); + }); + + return { + type: "failover_exhausted", + status: 502, + }; +} From 4392023a06bebc522ba8626e33a7a18fd21109ad Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=9F=A9=E7=BF=94=E5=AE=87?= Date: Tue, 20 Jan 2026 05:55:22 +0800 Subject: [PATCH 4/5] fix: address code review feedback for failover service - Use more specific error message patterns in isRetriableNetworkError to avoid false positives (e.g., "network error" instead of "network") - Simplify provider iteration loop using for...of with entries() since candidates are already unique and ordered from selectMultipleCandidates - Remove redundant triedProviders Set Co-Authored-By: Claude Opus 4.5 --- backend/src/services/failover.ts | 36 +++++++++++++------------------- 1 file changed, 15 insertions(+), 21 deletions(-) diff --git a/backend/src/services/failover.ts b/backend/src/services/failover.ts index 80c7b62..0e6a8e3 100644 --- a/backend/src/services/failover.ts +++ b/backend/src/services/failover.ts @@ -122,15 +122,19 @@ export function isRetriableNetworkError( if (error.name === "AbortError") { return true; } - // Check error message for common network issues + // Check error message for specific transient network issues + // Use specific phrases to avoid false positives (e.g., "invalid network configuration") const message = error.message.toLowerCase(); return ( + message.includes("timed out") || message.includes("timeout") || - message.includes("aborted") || - message.includes("econnreset") || - message.includes("econnrefused") || - message.includes("network") || - message.includes("socket hang up") + message.includes("the operation was aborted") || + message.includes("connection reset") || + message.includes("connection refused") || + message.includes("network error") || + message.includes("network request failed") || + message.includes("socket hang up") || + message.includes("fetch failed") ); } @@ -188,23 +192,13 @@ export async function executeWithFailover( ): Promise> { const cfg: FailoverConfig = { ...DEFAULT_FAILOVER_CONFIG, ...config }; const errors: FailoverError[] = []; - const triedProviders = new Set(); let totalAttempts = 0; - // Limit candidates to maxProviderAttempts - const maxProviders = Math.min(candidates.length, cfg.maxProviderAttempts); - - for (let providerIndex = 0; providerIndex < maxProviders; providerIndex++) { - // Select next provider (skip already tried ones) - const provider = candidates.find( - (c) => !triedProviders.has(c.provider.id), - ); - if (!provider) { - logger.debug("No more providers to try"); - break; - } + // Limit candidates to maxProviderAttempts and iterate in order + // candidates are already unique and ordered by selectMultipleCandidates + const providersToTry = candidates.slice(0, cfg.maxProviderAttempts); - triedProviders.add(provider.provider.id); + for (const [providerIndex, provider] of providersToTry.entries()) { const { url, init } = buildRequest(provider); // Try this provider with same-provider retries for transient errors @@ -336,7 +330,7 @@ export async function executeWithFailover( const lastError = errors[errors.length - 1]; logger.error("All providers exhausted", { totalAttempts, - providersAttempted: triedProviders.size, + providersAttempted: providersToTry.length, errors: errors.map((e) => ({ provider: e.providerName, error: e.error, From b0e896d05d3d2daddcb2105e3e3be8bc0f8329c0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=9F=A9=E7=BF=94=E5=AE=87?= Date: Tue, 20 Jan 2026 06:06:09 +0800 Subject: [PATCH 5/5] fix: address additional code review feedback - Fix SSE error type in messages.ts to use "server_error" per Anthropic spec - Remove unused "internal_error" case from FailoverErrorResult type Co-Authored-By: Claude Opus 4.5 --- backend/src/api/v1/messages.ts | 2 +- backend/src/utils/api-helpers.ts | 4 ---- 2 files changed, 1 insertion(+), 5 deletions(-) diff --git a/backend/src/api/v1/messages.ts b/backend/src/api/v1/messages.ts index 9253f8c..9b93e66 100644 --- a/backend/src/api/v1/messages.ts +++ b/backend/src/api/v1/messages.ts @@ -485,7 +485,7 @@ export const messagesApi = new Elysia({ } catch (error) { logger.error("Stream processing error", error); set.status = 500; - yield `event: error\ndata: ${JSON.stringify({ type: "error", error: { type: "api_error", message: "Stream processing error" } })}\n\n`; + yield `event: error\ndata: ${JSON.stringify({ type: "error", error: { type: "server_error", message: "Stream processing error" } })}\n\n`; } } else { // Non-streaming request with failover diff --git a/backend/src/utils/api-helpers.ts b/backend/src/utils/api-helpers.ts index a24fc5b..5303a84 100644 --- a/backend/src/utils/api-helpers.ts +++ b/backend/src/utils/api-helpers.ts @@ -263,10 +263,6 @@ export type FailoverErrorResult = | { type: "failover_exhausted"; status: 502; - } - | { - type: "internal_error"; - status: 500; }; /**