From 01a3e1b6a1169339b40ed9ce51b16ba3f6d6f0e0 Mon Sep 17 00:00:00 2001 From: bbbugg Date: Thu, 29 Jan 2026 15:45:56 +0800 Subject: [PATCH] feat: implement keep-alive mechanism for OpenAI fake stream mode --- src/core/RequestHandler.js | 143 +++++++++++++++++++++++-------------- 1 file changed, 90 insertions(+), 53 deletions(-) diff --git a/src/core/RequestHandler.js b/src/core/RequestHandler.js index 9e38851..d9d1361 100644 --- a/src/core/RequestHandler.js +++ b/src/core/RequestHandler.js @@ -518,53 +518,85 @@ class RequestHandler { this.logger.info(`[Adapter] OpenAI streaming response (Real Mode) started...`); await this._streamOpenAIResponse(messageQueue, res, model); } else { - const result = await this._executeRequestWithRetries(proxyRequest, messageQueue); + // OpenAI Fake Stream / Non-Stream mode + // Set up keep-alive timer for fake stream mode to prevent client timeout + let connectionMaintainer; + if (isOpenAIStream) { + const scheduleNextKeepAlive = () => { + const randomInterval = 12000 + Math.floor(Math.random() * 6000); // 12 - 18 seconds + connectionMaintainer = setTimeout(() => { + if (!res.headersSent) { + res.status(200).set({ + "Cache-Control": "no-cache", + Connection: "keep-alive", + "Content-Type": "text/event-stream", + }); + } + if (!res.writableEnded) { + res.write(": keep-alive\n\n"); + scheduleNextKeepAlive(); + } + }, randomInterval); + }; + scheduleNextKeepAlive(); + } - if (!result.success) { - // Send standard HTTP error response for both streaming and non-streaming - // if the error happens before the stream starts. - this._sendErrorResponse(res, result.error.status || 500, result.error.message); + try { + const result = await this._executeRequestWithRetries(proxyRequest, messageQueue); - // Handle account switch without sending callback to client - await this.authSwitcher.handleRequestFailureAndSwitch(result.error, null); - return; - } + if (!result.success) { + // Send standard HTTP error response for both streaming and non-streaming + if (connectionMaintainer) clearTimeout(connectionMaintainer); + this._sendErrorResponse(res, result.error.status || 500, result.error.message); - if (this.authSwitcher.failureCount > 0) { - this.logger.info(`✅ [Auth] OpenAI interface request successful - failure count reset to 0`); - this.authSwitcher.failureCount = 0; - } + // Handle account switch without sending callback to client + await this.authSwitcher.handleRequestFailureAndSwitch(result.error, null); + return; + } - if (isOpenAIStream) { - // Fake stream - res.status(200).set({ - "Cache-Control": "no-cache", - Connection: "keep-alive", - "Content-Type": "text/event-stream", - }); - this.logger.info(`[Adapter] OpenAI streaming response (Fake Mode) started...`); - let fullBody = ""; - let streaming = true; - while (streaming) { - const message = await messageQueue.dequeue(); - if (message.type === "STREAM_END") { - streaming = false; - break; + if (this.authSwitcher.failureCount > 0) { + this.logger.info(`✅ [Auth] OpenAI interface request successful - failure count reset to 0`); + this.authSwitcher.failureCount = 0; + } + + if (isOpenAIStream) { + // Fake stream - ensure headers are set before sending data + if (!res.headersSent) { + res.status(200).set({ + "Cache-Control": "no-cache", + Connection: "keep-alive", + "Content-Type": "text/event-stream", + }); } - if (message.data) fullBody += message.data; + // Clear keep-alive timer as we are about to send real data + if (connectionMaintainer) clearTimeout(connectionMaintainer); + + this.logger.info(`[Adapter] OpenAI streaming response (Fake Mode) started...`); + let fullBody = ""; + let streaming = true; + while (streaming) { + const message = await messageQueue.dequeue(); + if (message.type === "STREAM_END") { + streaming = false; + break; + } + if (message.data) fullBody += message.data; + } + const streamState = {}; + const translatedChunk = this.formatConverter.translateGoogleToOpenAIStream( + fullBody, + model, + streamState + ); + if (translatedChunk) res.write(translatedChunk); + res.write("data: [DONE]\n\n"); + this.logger.info("[Adapter] Fake mode: Complete content sent at once."); + } else { + // Non-stream + await this._sendOpenAINonStreamResponse(messageQueue, res, model); } - const streamState = {}; - const translatedChunk = this.formatConverter.translateGoogleToOpenAIStream( - fullBody, - model, - streamState - ); - if (translatedChunk) res.write(translatedChunk); - res.write("data: [DONE]\n\n"); - this.logger.info("[Adapter] Fake mode: Complete content sent at once."); - } else { - // Non-stream - await this._sendOpenAINonStreamResponse(messageQueue, res, model); + } finally { + if (connectionMaintainer) clearTimeout(connectionMaintainer); } } } catch (error) { @@ -617,6 +649,8 @@ class RequestHandler { const result = await this._executeRequestWithRetries(proxyRequest, messageQueue); if (!result.success) { + clearTimeout(connectionMaintainer); + if (result.error.message?.includes("The user aborted a request")) { this.logger.info( `[Request] Request #${proxyRequest.request_id} was properly cancelled by user, not counted in failure statistics.` @@ -642,6 +676,14 @@ class RequestHandler { this.authSwitcher.failureCount = 0; } + if (!res.headersSent) { + res.setHeader("Content-Type", "text/event-stream"); + res.setHeader("Cache-Control", "no-cache"); + res.setHeader("Connection", "keep-alive"); + } + // Clear the keep-alive timer as we are about to send real data + clearTimeout(connectionMaintainer); + // Read all data chunks until STREAM_END to handle potential fragmentation let fullData = ""; let streaming = true; @@ -781,6 +823,10 @@ class RequestHandler { } this._setResponseHeaders(res, headerMessage, req); + // Fallback: Ensure Content-Type is set for streaming response + if (!res.get("Content-Type")) { + res.type("text/event-stream"); + } this.logger.info("[Request] Starting streaming transmission..."); try { let lastChunk = ""; @@ -1066,18 +1112,9 @@ class RequestHandler { if ((lowerName === "x-goog-upload-url" || lowerName === "location") && value.includes("googleapis.com")) { try { const urlObj = new URL(value); - // Construct local proxy URL using configured host/port - // Note: The client (build.js) might have already embedded the original host in __proxy_host__ - // But wait, headerMessage comes from the BROWSER. - // If the Browser sends back the header as received from Google, then it's the GOOGLE URL. - // If the Browser rewrote it, it's the LOCALHOST URL. - // build.js `_transmitHeaders` rewrites it! - - // So `value` is `http://localhost:xxxx/...&__proxy_host__=google.com` (from Browser) - // We just need to ensure it points to *our* current listener address. - - // Use the Host header from the request to support remote clients (e.g. Docker IPs) - // If req.headers.host exists (standard), use it. Otherwise fallback to config. + // Rewrite upload/redirect URLs to point to this proxy server + // build.js already rewrote the URL to localhost with __proxy_host__ param + // Here we just ensure it matches the client's request host (for Docker/remote access) let newAuthority; if (req && req.headers && req.headers.host) { newAuthority = req.headers.host;