-
Notifications
You must be signed in to change notification settings - Fork 17
feat(api-proxy): add WebSocket upgrade support to fix Codex /v1/responses streaming #1486
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
6d0a290
12f20f7
41fe490
7008df7
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -12,6 +12,7 @@ | |||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||
| const http = require('http'); | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| const https = require('https'); | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| const tls = require('tls'); | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| const { URL } = require('url'); | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| const { HttpsProxyAgent } = require('https-proxy-agent'); | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| const { generateRequestId, sanitizeForLog, logRequest } = require('./logging'); | ||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -423,6 +424,216 @@ function proxyRequest(req, res, targetHost, injectHeaders, provider, basePath = | |||||||||||||||||||||||||||||||||||||||||||||||||||
| }); | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||
| /** | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| * Handle a WebSocket upgrade request by tunnelling through the Squid proxy. | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| * | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| * Flow: | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| * client --[HTTP Upgrade]--> proxy --[CONNECT]--> Squid:3128 --[TLS]--> upstream:443 | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| * | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| * Steps: | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| * 1. Validate the request (WebSocket upgrade only, relative URL) | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| * 2. Apply rate limiting (counts as one request, zero body bytes) | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| * 3. Open a CONNECT tunnel to targetHost:443 through Squid | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| * 4. TLS-handshake the tunnel | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| * 5. Replay the HTTP Upgrade request with auth headers injected | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| * 6. Bidirectionally pipe the raw TCP sockets | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| * | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| * No additional npm dependencies are required — only Node.js built-ins. | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| * | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| * @param {http.IncomingMessage} req - The incoming HTTP Upgrade request | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| * @param {import('net').Socket} socket - Raw TCP socket to the WebSocket client | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| * @param {Buffer} head - Any bytes already buffered after the upgrade headers | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| * @param {string} targetHost - Upstream hostname (e.g. 'api.openai.com') | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| * @param {Object} injectHeaders - Auth headers to inject (e.g. { Authorization: 'Bearer …' }) | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| * @param {string} provider - Provider name for logging and metrics | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| * @param {string} [basePath=''] - Optional base-path prefix for the upstream URL | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| */ | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| function proxyWebSocket(req, socket, head, targetHost, injectHeaders, provider, basePath = '') { | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| const startTime = Date.now(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| const clientRequestId = req.headers['x-request-id']; | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| const requestId = isValidRequestId(clientRequestId) ? clientRequestId : generateRequestId(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||
| // ── Validate: only forward WebSocket upgrades ────────────────────────── | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| const upgradeType = (req.headers['upgrade'] || '').toLowerCase(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| if (upgradeType !== 'websocket') { | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| logRequest('warn', 'websocket_upgrade_rejected', { | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| request_id: requestId, | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| provider, | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| path: sanitizeForLog(req.url), | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| reason: 'unsupported upgrade type', | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| upgrade: sanitizeForLog(req.headers['upgrade'] || ''), | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| }); | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| socket.write('HTTP/1.1 400 Bad Request\r\nConnection: close\r\n\r\n'); | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| socket.destroy(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| return; | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||
| // ── Validate: relative path only (prevent SSRF) ──────────────────────── | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| if (!req.url || !req.url.startsWith('/')) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| logRequest('warn', 'websocket_upgrade_rejected', { | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| request_id: requestId, | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| provider, | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| path: sanitizeForLog(req.url), | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| reason: 'URL must be a relative path', | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| }); | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| socket.write('HTTP/1.1 400 Bad Request\r\nConnection: close\r\n\r\n'); | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| socket.destroy(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| return; | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||
| const upstreamPath = buildUpstreamPath(req.url, targetHost, basePath); | ||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||
| // ── Rate limit (counts as one request, frames are not tracked) ────────── | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| const rateCheck = limiter.check(provider, 0); | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| if (!rateCheck.allowed) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| metrics.increment('rate_limit_rejected_total', { provider, limit_type: rateCheck.limitType }); | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| logRequest('warn', 'rate_limited', { | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| request_id: requestId, | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| provider, | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| limit_type: rateCheck.limitType, | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| limit: rateCheck.limit, | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| retry_after: rateCheck.retryAfter, | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| }); | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| socket.write( | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| `HTTP/1.1 429 Too Many Requests\r\nRetry-After: ${rateCheck.retryAfter}\r\nConnection: close\r\n\r\n` | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| ); | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| socket.destroy(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| return; | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||
| logRequest('info', 'websocket_upgrade_start', { | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| request_id: requestId, | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| provider, | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| path: sanitizeForLog(req.url), | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| upstream_host: targetHost, | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| }); | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| metrics.gaugeInc('active_requests', { provider }); | ||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||
| // finalize() must be called exactly once when the WebSocket session ends. | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| let finalized = false; | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| function finalize(isError, description) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| if (finalized) return; | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| finalized = true; | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| const duration = Date.now() - startTime; | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| metrics.gaugeDec('active_requests', { provider }); | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| if (isError) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| metrics.increment('requests_errors_total', { provider }); | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| logRequest('error', 'websocket_upgrade_failed', { | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| request_id: requestId, | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| provider, | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| path: sanitizeForLog(req.url), | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| duration_ms: duration, | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| error: sanitizeForLog(String(description || 'unknown error')), | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| }); | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| } else { | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| metrics.increment('requests_total', { provider, method: 'GET', status_class: '1xx' }); | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| metrics.observe('request_duration_ms', duration, { provider }); | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| logRequest('info', 'websocket_upgrade_complete', { | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| request_id: requestId, | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| provider, | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| path: sanitizeForLog(req.url), | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| duration_ms: duration, | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| }); | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||
| // abort(): called before the socket pipe is established (pre-TLS errors). | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| // Sends a 502 to the client and finalizes with an error. | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| function abort(reason, ...extra) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| finalize(true, reason); | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| if (!socket.destroyed && socket.writable) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| socket.write('HTTP/1.1 502 Bad Gateway\r\nConnection: close\r\n\r\n'); | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| socket.destroy(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| for (const s of extra) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| if (s && !s.destroyed) s.destroy(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||
| // ── Require Squid proxy ──────────────────────────────────────────────── | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| if (!HTTPS_PROXY) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| abort('No Squid proxy configured (HTTPS_PROXY not set)'); | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| return; | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||
| let proxyUrl; | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| try { | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| proxyUrl = new URL(HTTPS_PROXY); | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| } catch (err) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| abort(`Invalid proxy URL: ${err.message}`); | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| return; | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||
| const proxyHost = proxyUrl.hostname; | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| const proxyPort = parseInt(proxyUrl.port, 10) || 3128; | ||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||
| // ── Step 1: CONNECT tunnel through Squid to targetHost:443 ──────────── | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| const connectReq = http.request({ | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| host: proxyHost, | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| port: proxyPort, | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| method: 'CONNECT', | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| path: `${targetHost}:443`, | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| headers: { 'Host': `${targetHost}:443` }, | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| }); | ||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||
| connectReq.once('error', (err) => abort(`CONNECT error: ${err.message}`)); | ||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||
| connectReq.once('connect', (connectRes, tunnel) => { | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| if (connectRes.statusCode !== 200) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| abort(`CONNECT failed: HTTP ${connectRes.statusCode}`, tunnel); | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| return; | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||
| // ── Step 2: TLS-upgrade the raw tunnel ────────────────────────────── | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| const tlsSocket = tls.connect({ socket: tunnel, servername: targetHost, rejectUnauthorized: true }); | ||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||
| // Pre-TLS error handler: removed once TLS is established. | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| const onTlsError = (err) => abort(`TLS handshake error: ${err.message}`, tunnel); | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| tlsSocket.once('error', onTlsError); | ||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||
| tlsSocket.once('secureConnect', () => { | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| // TLS connected — swap to post-connection teardown error handlers. | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| tlsSocket.removeListener('error', onTlsError); | ||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||
| // ── Step 3: Replay the HTTP Upgrade request with auth injected ──── | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| const forwardHeaders = {}; | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| for (const [name, value] of Object.entries(req.headers)) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| if (!shouldStripHeader(name)) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| forwardHeaders[name] = value; | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| Object.assign(forwardHeaders, injectHeaders); | ||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||
| Object.assign(forwardHeaders, injectHeaders); | |
| Object.assign(forwardHeaders, injectHeaders); | |
| forwardHeaders['x-request-id'] = requestId; // Ensure consistent tracing for WebSocket upgrades |
Copilot
AI
Mar 28, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
finalize(false) is triggered solely by socket close events, so an upstream handshake rejection (e.g. 401/403/400 instead of 101 Switching Protocols) will still be logged/recorded as websocket_upgrade_complete with status_class: '1xx'. If you want metrics/logs to reflect actual upgrade success, consider parsing the first upstream response line/headers to confirm status=101 before marking success; otherwise treat early close as an upgrade failure.
This issue also appears on line 624 of the same file.
| // Finalize once when either side closes; destroy the other side. | |
| socket.once('close', () => { finalize(false); tlsSocket.destroy(); }); | |
| tlsSocket.once('close', () => { finalize(false); socket.destroy(); }); | |
| // Track whether the upstream actually completed the HTTP 101 upgrade. | |
| let upgradeEstablished = false; | |
| const inspectStatusLine = (chunk) => { | |
| // Only need the first response chunk to determine if the upgrade succeeded. | |
| try { | |
| const str = chunk.toString('ascii'); | |
| const firstLineEnd = str.indexOf('\r\n'); | |
| const statusLine = firstLineEnd === -1 ? str : str.slice(0, firstLineEnd); | |
| const match = /^HTTP\/1\.[01]\s+(\d{3})/.exec(statusLine); | |
| if (match && match[1] === '101') { | |
| upgradeEstablished = true; | |
| } | |
| } finally { | |
| // Ensure we don't keep inspecting further data chunks. | |
| tlsSocket.removeListener('data', inspectStatusLine); | |
| } | |
| }; | |
| tlsSocket.once('data', inspectStatusLine); | |
| // Finalize once when either side closes; destroy the other side. | |
| socket.once('close', () => { finalize(!upgradeEstablished); tlsSocket.destroy(); }); | |
| tlsSocket.once('close', () => { finalize(!upgradeEstablished); socket.destroy(); }); |
Copilot
AI
Mar 28, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the OpenCode server, the new WebSocket upgrade path uses provider: 'opencode', but the existing HTTP handler just above still calls proxyRequest(...) without passing a provider (so metrics/rate limiting labels differ and may end up as provider=undefined). Consider updating the HTTP path to pass 'opencode' (and basePath if relevant) so both HTTP and WS requests are attributed consistently.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The CONNECT and TLS handshake path has no explicit timeouts (
connectReqor the TLS socket). If Squid/upstream stalls mid-handshake, the sockets (andactive_requestsgauge) can hang indefinitely. Consider setting reasonable timeouts for CONNECT and TLS negotiation and callingabort(...)on timeout to ensure resources/metrics are released.