Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
143 changes: 90 additions & 53 deletions src/core/RequestHandler.js
Original file line number Diff line number Diff line change
Expand Up @@ -518,53 +518,85 @@ class RequestHandler {
this.logger.info(`[Adapter] OpenAI streaming response (Real Mode) started...`);
await this._streamOpenAIResponse(messageQueue, res, model);
} else {
const result = await this._executeRequestWithRetries(proxyRequest, messageQueue);
// OpenAI Fake Stream / Non-Stream mode
// Set up keep-alive timer for fake stream mode to prevent client timeout
let connectionMaintainer;
if (isOpenAIStream) {
const scheduleNextKeepAlive = () => {
const randomInterval = 12000 + Math.floor(Math.random() * 6000); // 12 - 18 seconds
connectionMaintainer = setTimeout(() => {
if (!res.headersSent) {
res.status(200).set({
"Cache-Control": "no-cache",
Connection: "keep-alive",
"Content-Type": "text/event-stream",
});
}
if (!res.writableEnded) {
res.write(": keep-alive\n\n");
scheduleNextKeepAlive();
}
}, randomInterval);
};
scheduleNextKeepAlive();
}

if (!result.success) {
// Send standard HTTP error response for both streaming and non-streaming
// if the error happens before the stream starts.
this._sendErrorResponse(res, result.error.status || 500, result.error.message);
try {
const result = await this._executeRequestWithRetries(proxyRequest, messageQueue);

// Handle account switch without sending callback to client
await this.authSwitcher.handleRequestFailureAndSwitch(result.error, null);
return;
}
if (!result.success) {
// Send standard HTTP error response for both streaming and non-streaming
if (connectionMaintainer) clearTimeout(connectionMaintainer);
this._sendErrorResponse(res, result.error.status || 500, result.error.message);

if (this.authSwitcher.failureCount > 0) {
this.logger.info(`✅ [Auth] OpenAI interface request successful - failure count reset to 0`);
this.authSwitcher.failureCount = 0;
}
// Handle account switch without sending callback to client
await this.authSwitcher.handleRequestFailureAndSwitch(result.error, null);
return;
}

if (isOpenAIStream) {
// Fake stream
res.status(200).set({
"Cache-Control": "no-cache",
Connection: "keep-alive",
"Content-Type": "text/event-stream",
});
this.logger.info(`[Adapter] OpenAI streaming response (Fake Mode) started...`);
let fullBody = "";
let streaming = true;
while (streaming) {
const message = await messageQueue.dequeue();
if (message.type === "STREAM_END") {
streaming = false;
break;
if (this.authSwitcher.failureCount > 0) {
this.logger.info(`✅ [Auth] OpenAI interface request successful - failure count reset to 0`);
this.authSwitcher.failureCount = 0;
}

if (isOpenAIStream) {
// Fake stream - ensure headers are set before sending data
if (!res.headersSent) {
res.status(200).set({
"Cache-Control": "no-cache",
Connection: "keep-alive",
"Content-Type": "text/event-stream",
});
}
if (message.data) fullBody += message.data;
// Clear keep-alive timer as we are about to send real data
if (connectionMaintainer) clearTimeout(connectionMaintainer);

this.logger.info(`[Adapter] OpenAI streaming response (Fake Mode) started...`);
let fullBody = "";
let streaming = true;
while (streaming) {
const message = await messageQueue.dequeue();
if (message.type === "STREAM_END") {
streaming = false;
break;
}
if (message.data) fullBody += message.data;
}
const streamState = {};
const translatedChunk = this.formatConverter.translateGoogleToOpenAIStream(
fullBody,
model,
streamState
);
if (translatedChunk) res.write(translatedChunk);
res.write("data: [DONE]\n\n");
this.logger.info("[Adapter] Fake mode: Complete content sent at once.");
} else {
// Non-stream
await this._sendOpenAINonStreamResponse(messageQueue, res, model);
}
const streamState = {};
const translatedChunk = this.formatConverter.translateGoogleToOpenAIStream(
fullBody,
model,
streamState
);
if (translatedChunk) res.write(translatedChunk);
res.write("data: [DONE]\n\n");
this.logger.info("[Adapter] Fake mode: Complete content sent at once.");
} else {
// Non-stream
await this._sendOpenAINonStreamResponse(messageQueue, res, model);
} finally {
if (connectionMaintainer) clearTimeout(connectionMaintainer);
}
}
} catch (error) {
Expand Down Expand Up @@ -617,6 +649,8 @@ class RequestHandler {
const result = await this._executeRequestWithRetries(proxyRequest, messageQueue);

if (!result.success) {
clearTimeout(connectionMaintainer);

if (result.error.message?.includes("The user aborted a request")) {
this.logger.info(
`[Request] Request #${proxyRequest.request_id} was properly cancelled by user, not counted in failure statistics.`
Expand All @@ -642,6 +676,14 @@ class RequestHandler {
this.authSwitcher.failureCount = 0;
}

if (!res.headersSent) {
res.setHeader("Content-Type", "text/event-stream");
res.setHeader("Cache-Control", "no-cache");
res.setHeader("Connection", "keep-alive");
}
// Clear the keep-alive timer as we are about to send real data
clearTimeout(connectionMaintainer);

// Read all data chunks until STREAM_END to handle potential fragmentation
let fullData = "";
let streaming = true;
Expand Down Expand Up @@ -781,6 +823,10 @@ class RequestHandler {
}

this._setResponseHeaders(res, headerMessage, req);
// Fallback: Ensure Content-Type is set for streaming response
if (!res.get("Content-Type")) {
res.type("text/event-stream");
}
this.logger.info("[Request] Starting streaming transmission...");
try {
let lastChunk = "";
Expand Down Expand Up @@ -1066,18 +1112,9 @@ class RequestHandler {
if ((lowerName === "x-goog-upload-url" || lowerName === "location") && value.includes("googleapis.com")) {
try {
const urlObj = new URL(value);
// Construct local proxy URL using configured host/port
// Note: The client (build.js) might have already embedded the original host in __proxy_host__
// But wait, headerMessage comes from the BROWSER.
// If the Browser sends back the header as received from Google, then it's the GOOGLE URL.
// If the Browser rewrote it, it's the LOCALHOST URL.
// build.js `_transmitHeaders` rewrites it!

// So `value` is `http://localhost:xxxx/...&__proxy_host__=google.com` (from Browser)
// We just need to ensure it points to *our* current listener address.

// Use the Host header from the request to support remote clients (e.g. Docker IPs)
// If req.headers.host exists (standard), use it. Otherwise fallback to config.
// Rewrite upload/redirect URLs to point to this proxy server
// build.js already rewrote the URL to localhost with __proxy_host__ param
// Here we just ensure it matches the client's request host (for Docker/remote access)
let newAuthority;
if (req && req.headers && req.headers.host) {
newAuthority = req.headers.host;
Expand Down
Loading