Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
267 changes: 266 additions & 1 deletion backend/src/api/v1/messages.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@ import {
} from "@/plugins/apiKeyRateLimitPlugin";
import { rateLimitPlugin } from "@/plugins/rateLimitPlugin";
import {
DEFAULT_FAILOVER_CONFIG,
executeWithFailover,
fetchWithTimeout,
isRetriableNetworkError,
selectMultipleCandidates,
type FailoverConfig,
} from "@/services/failover";
Expand All @@ -34,7 +37,7 @@ import {
PROVIDER_HEADER,
} from "@/utils/api-helpers";
import { addCompletions, type Completion } from "@/utils/completions";
import { safeParseToolArgs } from "@/utils/json";
import { parseJsonResponse, safeParseToolArgs } from "@/utils/json";
import { createLogger } from "@/utils/logger";
import { getProviderProxy } from "@/utils/proxy-fetch";
import {
Expand Down Expand Up @@ -142,6 +145,22 @@ const tAnthropicMessageCreate = tLooseObject({
metadata: t.Optional(tAnthropicMetadata),
});

// Anthropic count_tokens API request schema. Keep this route intentionally loose
// so we can transparently forward newer content block shapes upstream.
const tAnthropicMessageCountTokens = tLooseObject({
model: t.String(),
messages: t.Array(
tLooseObject({
role: t.String(),
content: t.Union([t.String(), t.Array(t.Unknown())]),
}),
),
system: t.Optional(t.Union([t.String(), t.Array(t.Unknown())])),
tools: t.Optional(t.Array(t.Unknown())),
tool_choice: t.Optional(t.Unknown()),
thinking: t.Optional(t.Unknown()),
});

/**
* Build completion record for logging
*/
Expand Down Expand Up @@ -490,6 +509,96 @@ const MESSAGES_FAILOVER_CONFIG: Partial<FailoverConfig> = {
timeoutMs: 120000, // 2 minutes for messages
};

const COUNT_TOKENS_FAILOVER_CONFIG: FailoverConfig = {
...DEFAULT_FAILOVER_CONFIG,
maxProviderAttempts: 3,
sameProviderRetries: 0,
retriableStatusCodes: [404, 405, 429, 500, 502, 503, 504],
timeoutMs: 30000,
};

function normalizeAnthropicBaseUrl(baseUrl: string): string {
let normalized = baseUrl.replace(/\/+$/, "");
if (normalized.endsWith("/v1")) {
normalized = normalized.slice(0, -3);
}
return normalized;
}

function buildAnthropicCountTokensRequest(
body: Record<string, unknown>,
provider: ModelWithProvider,
extraHeaders?: Record<string, string>,
): {
url: string;
init: RequestInit;
proxy?: string;
} {
const baseUrl = normalizeAnthropicBaseUrl(provider.provider.baseUrl);
const url = `${baseUrl}/v1/messages/count_tokens`;
const remoteModel = provider.model.remoteId ?? provider.model.systemName;
const headers: Record<string, string> = {
"Content-Type": "application/json",
"anthropic-version": provider.provider.apiVersion || "2023-06-01",
};

if (provider.provider.apiKey) {
headers["x-api-key"] = provider.provider.apiKey;
}
if (extraHeaders) {
Object.assign(headers, extraHeaders);
}

return {
url,
init: {
method: "POST",
headers,
body: JSON.stringify({
...body,
model: remoteModel,
}),
},
proxy: getProviderProxy(provider.provider),
};
}

async function parseUpstreamJsonBody(
response: Response,
context: string,
): Promise<Record<string, unknown>> {
const text = await response.text();
return parseJsonResponse<Record<string, unknown>>(text, context);
}

async function parseUpstreamErrorBody(response: Response): Promise<
Record<string, unknown>
> {
const text = await response.text();
if (!text) {
return {
type: "error",
error: {
type: "api_error",
message: `Upstream returned HTTP ${response.status}`,
},
};
}

try {
return JSON.parse(text) as Record<string, unknown>;
} catch {
return {
type: "error",
error: {
type: "api_error",
message: text,
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

In API error handlers, log the full error details on the server for debugging, but return only a generic, structured error message to the client to avoid leaking internal implementation details. Returning raw upstream error bodies (even truncated) can expose internal infrastructure details or sensitive information from proxies/gateways.

Suggested change
message: text,
message: "An unexpected error occurred while processing the upstream response.",
References
  1. In API error handlers, log the full error details on the server for debugging, but return only a generic, structured error message to the client to avoid leaking internal implementation details.

code: "unparseable_error",
},
};
}
}

export const messagesApi = new Elysia({
detail: {
security: [{ apiKey: [] }],
Expand All @@ -498,6 +607,162 @@ export const messagesApi = new Elysia({
.use(apiKeyPlugin)
.use(apiKeyRateLimitPlugin)
.use(rateLimitPlugin)
.post(
"/messages/count_tokens",
async function ({ body, set, request }) {
try {
const reqHeaders = request.headers;

const { systemName, targetProvider } = parseModelProvider(
body.model,
reqHeaders.get(PROVIDER_HEADER),
);

const modelsWithProviders = await getModelsWithProviderBySystemName(
systemName,
"chat",
);

if (modelsWithProviders.length === 0) {
set.status = 404;
return {
type: "error",
error: {
type: "not_found_error",
message: `Model '${systemName}' not found`,
},
};
}

const filteredCandidates = filterCandidates(
modelsWithProviders as ModelWithProvider[],
targetProvider,
);

if (filteredCandidates.length === 0) {
set.status = 404;
return {
type: "error",
error: {
type: "not_found_error",
message: `No available provider for model '${systemName}'`,
},
};
}

const candidates = selectMultipleCandidates(
filteredCandidates,
COUNT_TOKENS_FAILOVER_CONFIG.maxProviderAttempts,
);
const extraHeaders = extractUpstreamHeaders(reqHeaders);
const upstreamBody = body as Record<string, unknown>;

let lastResponse: Response | undefined;
let lastError: Error | undefined;

for (const candidate of candidates) {
const { url, init, proxy } = buildAnthropicCountTokensRequest(
upstreamBody,
candidate,
extraHeaders,
);

try {
const response = await fetchWithTimeout(
url,
init,
COUNT_TOKENS_FAILOVER_CONFIG.timeoutMs,
proxy,
);

if (response.ok) {
return await parseUpstreamJsonBody(
response,
"Anthropic count_tokens",
);
}

lastResponse = response;
const shouldTryNext =
COUNT_TOKENS_FAILOVER_CONFIG.retriableStatusCodes.includes(
response.status,
) && candidate !== candidates[candidates.length - 1];

logger.warn("count_tokens upstream request failed", {
provider: candidate.provider.name,
providerType: candidate.provider.type,
status: response.status,
shouldTryNext,
});

if (!shouldTryNext) {
set.status = response.status;
return await parseUpstreamErrorBody(response);
}
Comment on lines +685 to +701
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🌐 Web query:

In Node.js fetch/undici, should Response.body be consumed or canceled before retrying another upstream request to avoid resource/socket retention?

💡 Result:

Yes. In Node.js’ undici-based fetch, you should fully consume or cancel the previous Response.body before retrying (i.e., before issuing the next upstream request), because undici does not rely on garbage collection to promptly release connection/socket resources. What to do - If you don’t need the response body: either - consume it (e.g., await response.body.dump / response.body.text / bytes), or - explicitly cancel it (e.g., await response.body.cancel). - If you only need headers: consider using HEAD so there’s no body to drain. Why - undici’s own docs state: “All response bodies must always be fully consumed or destroyed” and explain that connection resource release should not be left to GC. - Maintainer guidance in undici discussions echoes that you should “always either consume or cancel the response body,” because otherwise the body/connection resources can remain buffered until GC. Notes for retries - For undici’s retry logic, this pattern matters especially when the earlier response is left unread; leaving the body unconsumed can reduce connection reuse and can lead to stalls/deadlocks when connections are exhausted (undici explicitly warns about this risk). - If you use response.body.cancel, ensure you release any reader lock if you used getReader; in some undici versions this affected whether the server could close cleanly. Example pattern (headers only, then retry) const res = await fetch(url); if (!res.ok) { // Drain/cancel so the socket/connection can be released before retrying. await res.body?.dump; // or: await res.body.cancel; return retry; } await res.body.text; Recommendation Always either consume or cancel the response body before starting the next request, especially when using upstream retry logic / connection pooling. If you don’t need the body at all, prefer HEAD when possible.

Citations:


🏁 Script executed:

# First, check the file exists and get the relevant section
wc -l backend/src/api/v1/messages.ts

Repository: EM-GeekLab/NexusGate

Length of output: 100


🏁 Script executed:

# Read the code around lines 685-701 with more context
sed -n '670,720p' backend/src/api/v1/messages.ts

Repository: EM-GeekLab/NexusGate

Length of output: 1792


🏁 Script executed:

# Get more context - see the loop structure
sed -n '650,730p' backend/src/api/v1/messages.ts

Repository: EM-GeekLab/NexusGate

Length of output: 2634


🏁 Script executed:

# Check what parseUpstreamJsonBody and parseUpstreamErrorBody do
rg -A 10 "export.*parseUpstreamJsonBody|function parseUpstreamJsonBody" backend/

Repository: EM-GeekLab/NexusGate

Length of output: 729


🏁 Script executed:

# Also check if these are imported from elsewhere
rg "parseUpstreamJsonBody|parseUpstreamErrorBody" backend/src/api/v1/messages.ts | head -20

Repository: EM-GeekLab/NexusGate

Length of output: 342


🏁 Script executed:

# Check the full implementation of parseUpstreamErrorBody
sed -n '/^async function parseUpstreamErrorBody/,/^}/p' backend/src/api/v1/messages.ts

Repository: EM-GeekLab/NexusGate

Length of output: 606


在 failover 继续前释放失败响应体

Lines 685-701 在 shouldTryNext === true 时直接进入下一候选,但未消费或取消当前 response.body。根据 undici(Node.js fetch 基础库)的设计,Response.body 必须在重试前显式消费或取消,否则连接/套接字资源不会被及时释放,在密集重试时会导致连接池耗尽、资源滞后回收,影响系统稳定性。建议在继续下一个候选前调用 await response.body?.cancel() 释放资源。

建议修改
             logger.warn("count_tokens upstream request failed", {
               provider: candidate.provider.name,
               providerType: candidate.provider.type,
               status: response.status,
               shouldTryNext,
             });

+            if (shouldTryNext) {
+              await response.body?.cancel();
+              continue;
+            }
+
             if (!shouldTryNext) {
               set.status = response.status;
               return await parseUpstreamErrorBody(response);
             }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
lastResponse = response;
const shouldTryNext =
COUNT_TOKENS_FAILOVER_CONFIG.retriableStatusCodes.includes(
response.status,
) && candidate !== candidates[candidates.length - 1];
logger.warn("count_tokens upstream request failed", {
provider: candidate.provider.name,
providerType: candidate.provider.type,
status: response.status,
shouldTryNext,
});
if (!shouldTryNext) {
set.status = response.status;
return await parseUpstreamErrorBody(response);
}
lastResponse = response;
const shouldTryNext =
COUNT_TOKENS_FAILOVER_CONFIG.retriableStatusCodes.includes(
response.status,
) && candidate !== candidates[candidates.length - 1];
logger.warn("count_tokens upstream request failed", {
provider: candidate.provider.name,
providerType: candidate.provider.type,
status: response.status,
shouldTryNext,
});
if (shouldTryNext) {
await response.body?.cancel();
continue;
}
if (!shouldTryNext) {
set.status = response.status;
return await parseUpstreamErrorBody(response);
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@backend/src/api/v1/messages.ts` around lines 685 - 701, The response body
isn't being released when a retriable upstream error will cause failover; before
skipping to the next candidate (where you compute shouldTryNext using
COUNT_TOKENS_FAILOVER_CONFIG and candidates), call await response.body?.cancel()
to free the undici/Node fetch socket/resources; do this just after deciding
shouldTryNext is true and before returning or continuing, keeping existing logic
around lastResponse, logger.warn, set.status and parseUpstreamErrorBody
unchanged.

} catch (error) {
const err =
error instanceof Error ? error : new Error(String(error));
lastError = err;
const shouldTryNext =
isRetriableNetworkError(err, COUNT_TOKENS_FAILOVER_CONFIG) &&
candidate !== candidates[candidates.length - 1];

logger.warn("count_tokens upstream network error", {
provider: candidate.provider.name,
providerType: candidate.provider.type,
error: err.message,
shouldTryNext,
});

if (!shouldTryNext) {
set.status = 502;
return {
type: "error",
error: {
type: "api_error",
message: `Count tokens request failed: ${err.message}`,
},
};
}
}
}

if (lastResponse) {
set.status = lastResponse.status;
return await parseUpstreamErrorBody(lastResponse);
}

set.status = 502;
return {
type: "error",
error: {
type: "api_error",
message:
lastError?.message ||
"All upstream providers failed for token counting",
},
};
Comment on lines +660 to +744
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The manual failover loop duplicates logic that is already encapsulated in the executeWithFailover service. Using the centralized service ensures that the request benefits from standard features like exponential backoff and consistent logging. This refactoring also correctly differentiates between non-retriable upstream errors (which are forwarded) and true failover exhaustion (which returns a 502), adhering to repository standards.

        const result = await executeWithFailover(
          candidates,
          (candidate) =>
            buildAnthropicCountTokensRequest(upstreamBody, candidate, extraHeaders),
          COUNT_TOKENS_FAILOVER_CONFIG,
        );

        if (result.success && result.response) {
          return await parseUpstreamJsonBody(
            result.response,
            "Anthropic count_tokens",
          );
        }

        if (result.response) {
          set.status = result.response.status;
          return await parseUpstreamErrorBody(result.response);
        }

        set.status = 502;
        return {
          type: "error",
          error: {
            type: "api_error",
            message:
              result.finalError ||
              "All upstream providers failed for token counting",
          },
        };
References
  1. When handling failover results, differentiate between non-retriable upstream errors (which should be forwarded to the client) and true failover exhaustion due to retriable errors (which may warrant a 502 Bad Gateway response).

} catch (error) {
logger.error("count_tokens handler failed", error);
set.status = 502;
return {
type: "error",
error: {
type: "api_error",
message: "Failed to process count_tokens response",
},
};
}
},
{
body: tAnthropicMessageCountTokens,
checkApiKey: true,
apiKeyRateLimit: true,
rateLimit: {
identifier: (body: unknown) => (body as { model: string }).model,
},
},
)
.post(
"/messages",
async function ({ body, set, bearer, request, apiKeyRecord }) {
Expand Down
Loading
Loading