diff --git a/backend/src/api/v1/completions.ts b/backend/src/api/v1/completions.ts index f25cb5b..da6849b 100644 --- a/backend/src/api/v1/completions.ts +++ b/backend/src/api/v1/completions.ts @@ -29,6 +29,7 @@ import { type FailoverConfig, } from "@/services/failover"; import { + acceptsEventStream, extractUpstreamHeaders, filterCandidates, extractContentText, @@ -569,6 +570,12 @@ export const completionsApi = new Elysia({ // Extract extra headers for passthrough const extraHeaders = extractUpstreamHeaders(reqHeaders); + // Determine streaming mode. Body wins when explicit; otherwise honor + // the client's Accept: text/event-stream negotiation hint. + if (body.stream === undefined && acceptsEventStream(reqHeaders)) { + body.stream = true; + } + // Check ReqId for deduplication (if provided) const isStream = body.stream === true; const reqIdResult = await checkReqId(reqId, { @@ -718,31 +725,51 @@ export const completionsApi = new Elysia({ extraHeaders, ); - // Return an async generator for streaming + // Return a native Response with proper SSE headers. Wrapping the + // pre-formatted SSE generator in a ReadableStream ensures Elysia + // skips its auto SSE-wrapping (which would double-prefix `data:`) + // and lets us set Content-Type: text/event-stream explicitly. const streamResponse = result.response; const streamSignal = request.signal; - return (async function* () { - try { - yield* processStreamingResponse( - streamResponse, - completion, - bearer, - providerType, - apiKeyRecord ?? null, - begin, - streamSignal, - reqIdContext ?? undefined, - ); - } catch (error) { - // Don't log error if it's due to client abort - if (!streamSignal.aborted) { - logger.error("Stream processing error", error); - // Note: HTTP status cannot be changed after streaming has started - // Use SSE format for error: data: {...}\n\n - yield `data: ${JSON.stringify({ error: { message: "Stream processing error", type: "server_error", code: "stream_error" } })}\n\n`; + const sseStream = new ReadableStream({ + async start(controller) { + const encoder = new TextEncoder(); + try { + for await (const chunk of processStreamingResponse( + streamResponse, + completion, + bearer, + providerType, + apiKeyRecord ?? null, + begin, + streamSignal, + reqIdContext ?? undefined, + )) { + controller.enqueue(encoder.encode(chunk)); + } + } catch (error) { + if (!streamSignal.aborted) { + logger.error("Stream processing error", error); + controller.enqueue( + encoder.encode( + `data: ${JSON.stringify({ error: { message: "Stream processing error", type: "server_error", code: "stream_error" } })}\n\n`, + ), + ); + } + } finally { + controller.close(); } - } - })(); + }, + }); + + return new Response(sseStream, { + status: 200, + headers: { + "Content-Type": "text/event-stream; charset=utf-8", + "Cache-Control": "no-cache", + Connection: "keep-alive", + }, + }); } else { // Non-streaming request - return JSON response directly const result = await executeWithFailover( diff --git a/backend/src/api/v1/messages.ts b/backend/src/api/v1/messages.ts index c6237ba..420ebb6 100644 --- a/backend/src/api/v1/messages.ts +++ b/backend/src/api/v1/messages.ts @@ -25,6 +25,7 @@ import { type FailoverConfig, } from "@/services/failover"; import { + acceptsEventStream, extractUpstreamHeaders, filterCandidates, extractContentText, @@ -570,6 +571,12 @@ export const messagesApi = new Elysia({ // Extract extra headers for passthrough const extraHeaders = extractUpstreamHeaders(reqHeaders); + // Determine streaming mode. Body wins when explicit; otherwise honor + // the client's Accept: text/event-stream negotiation hint. + if (body.stream === undefined && acceptsEventStream(reqHeaders)) { + body.stream = true; + } + // Check ReqId for deduplication (if provided) const isStream = body.stream === true; const reqIdResult = await checkReqId(reqId, { @@ -722,30 +729,51 @@ export const messagesApi = new Elysia({ extraHeaders, ); - // Return an async generator for streaming + // Return a native Response with proper SSE headers. Wrapping the + // pre-formatted SSE generator in a ReadableStream ensures Elysia + // skips its auto SSE-wrapping (which would double-prefix `data:`) + // and lets us set Content-Type: text/event-stream explicitly. const streamResponse = result.response; const streamSignal = request.signal; - return (async function* () { - try { - yield* processStreamingResponse( - streamResponse, - completion, - bearer, - providerType, - apiKeyRecord ?? null, - begin, - streamSignal, - reqIdContext ?? undefined, - ); - } catch (error) { - // Don't log error if it's due to client abort - if (!streamSignal.aborted) { - logger.error("Stream processing error", error); - // Note: HTTP status cannot be changed after streaming has started - yield `event: error\ndata: ${JSON.stringify({ type: "error", error: { type: "server_error", message: "Stream processing error" } })}\n\n`; + const sseStream = new ReadableStream({ + async start(controller) { + const encoder = new TextEncoder(); + try { + for await (const chunk of processStreamingResponse( + streamResponse, + completion, + bearer, + providerType, + apiKeyRecord ?? null, + begin, + streamSignal, + reqIdContext ?? undefined, + )) { + controller.enqueue(encoder.encode(chunk)); + } + } catch (error) { + if (!streamSignal.aborted) { + logger.error("Stream processing error", error); + controller.enqueue( + encoder.encode( + `event: error\ndata: ${JSON.stringify({ type: "error", error: { type: "server_error", message: "Stream processing error" } })}\n\n`, + ), + ); + } + } finally { + controller.close(); } - } - })(); + }, + }); + + return new Response(sseStream, { + status: 200, + headers: { + "Content-Type": "text/event-stream; charset=utf-8", + "Cache-Control": "no-cache", + Connection: "keep-alive", + }, + }); } else { // Non-streaming request - return JSON response directly const result = await executeWithFailover( diff --git a/backend/src/api/v1/responses.ts b/backend/src/api/v1/responses.ts index 67a74a6..7066f98 100644 --- a/backend/src/api/v1/responses.ts +++ b/backend/src/api/v1/responses.ts @@ -24,6 +24,7 @@ import { type FailoverConfig, } from "@/services/failover"; import { + acceptsEventStream, extractUpstreamHeaders, filterCandidates, extractContentText, @@ -596,6 +597,11 @@ export const responsesApi = new Elysia({ const extraHeaders = extractUpstreamHeaders(reqHeaders); // Check ReqId for deduplication (if provided) + // Determine streaming mode. Body wins when explicit; otherwise honor + // the client's Accept: text/event-stream negotiation hint. + if (body.stream === undefined && acceptsEventStream(reqHeaders)) { + body.stream = true; + } const isStream = body.stream === true; // Convert input to messages format for storage @@ -765,30 +771,51 @@ export const responsesApi = new Elysia({ extraHeaders, ); - // Return an async generator for streaming + // Return a native Response with proper SSE headers. Wrapping the + // pre-formatted SSE generator in a ReadableStream ensures Elysia + // skips its auto SSE-wrapping (which would double-prefix `data:`) + // and lets us set Content-Type: text/event-stream explicitly. const streamResponse = result.response; const streamSignal = request.signal; - return (async function* () { - try { - yield* processStreamingResponse( - streamResponse, - completion, - bearer, - providerType, - apiKeyRecord ?? null, - begin, - streamSignal, - reqIdContext ?? undefined, - ); - } catch (error) { - // Don't log error if it's due to client abort - if (!streamSignal.aborted) { - logger.error("Stream processing error", error); - // Note: HTTP status cannot be changed after streaming has started - yield `event: error\ndata: ${JSON.stringify({ type: "error", error: { code: "internal_error", message: "Stream processing error", param: null, help_url: null } })}\n\n`; + const sseStream = new ReadableStream({ + async start(controller) { + const encoder = new TextEncoder(); + try { + for await (const chunk of processStreamingResponse( + streamResponse, + completion, + bearer, + providerType, + apiKeyRecord ?? null, + begin, + streamSignal, + reqIdContext ?? undefined, + )) { + controller.enqueue(encoder.encode(chunk)); + } + } catch (error) { + if (!streamSignal.aborted) { + logger.error("Stream processing error", error); + controller.enqueue( + encoder.encode( + `event: error\ndata: ${JSON.stringify({ type: "error", error: { code: "internal_error", message: "Stream processing error", param: null, help_url: null } })}\n\n`, + ), + ); + } + } finally { + controller.close(); } - } - })(); + }, + }); + + return new Response(sseStream, { + status: 200, + headers: { + "Content-Type": "text/event-stream; charset=utf-8", + "Cache-Control": "no-cache", + Connection: "keep-alive", + }, + }); } else { // Non-streaming request - return JSON response directly const result = await executeWithFailover( diff --git a/backend/src/utils/api-helpers.test.ts b/backend/src/utils/api-helpers.test.ts new file mode 100644 index 0000000..fe14108 --- /dev/null +++ b/backend/src/utils/api-helpers.test.ts @@ -0,0 +1,74 @@ +/** + * Unit tests for api-helpers utilities + */ + +import { describe, expect, test } from "bun:test"; +import { acceptsEventStream } from "./api-helpers"; + +const h = (value?: string) => new Headers(value ? { accept: value } : {}); + +describe("acceptsEventStream", () => { + test("plain text/event-stream", () => { + expect(acceptsEventStream(h("text/event-stream"))).toBe(true); + }); + + test("missing Accept header", () => { + expect(acceptsEventStream(h())).toBe(false); + }); + + test("unrelated media type", () => { + expect(acceptsEventStream(h("application/json"))).toBe(false); + }); + + test("wildcard does not opt in", () => { + expect(acceptsEventStream(h("*/*"))).toBe(false); + }); + + test("case-insensitive media type", () => { + expect(acceptsEventStream(h("TEXT/EVENT-STREAM"))).toBe(true); + }); + + test("tolerates internal whitespace", () => { + expect(acceptsEventStream(h("text/event-stream ; q = 0.5 "))).toBe(true); + }); + + test("weighted list with positive q is accepted", () => { + expect( + acceptsEventStream(h("application/json, text/event-stream;q=0.5")), + ).toBe(true); + }); + + test("explicit q=0 rejects SSE (RFC 7231 §5.3.1)", () => { + expect(acceptsEventStream(h("text/event-stream;q=0"))).toBe(false); + }); + + test("q=0.0 also rejects SSE", () => { + expect(acceptsEventStream(h("text/event-stream;q=0.0"))).toBe(false); + }); + + test("q = 0 with whitespace around = rejects SSE", () => { + expect(acceptsEventStream(h("text/event-stream ; q = 0"))).toBe(false); + }); + + test("q = 0.5 with whitespace around = is accepted", () => { + expect(acceptsEventStream(h("text/event-stream ; q = 0.5"))).toBe(true); + }); + + test("malformed empty q value is treated as not acceptable", () => { + expect(acceptsEventStream(h("text/event-stream;q="))).toBe(false); + }); + + test("q=0 in a weighted list rejects SSE", () => { + expect( + acceptsEventStream(h("text/event-stream;q=0, application/json")), + ).toBe(false); + }); + + test("structured-suffix match is rejected", () => { + expect(acceptsEventStream(h("text/event-stream+json"))).toBe(false); + }); + + test("malformed q value is treated as not acceptable", () => { + expect(acceptsEventStream(h("text/event-stream;q=NaN"))).toBe(false); + }); +}); diff --git a/backend/src/utils/api-helpers.ts b/backend/src/utils/api-helpers.ts index 0d7b330..b539948 100644 --- a/backend/src/utils/api-helpers.ts +++ b/backend/src/utils/api-helpers.ts @@ -46,6 +46,41 @@ export const EXCLUDED_HEADERS = new Set([ // Header Extraction // ============================================================================= +/** + * Whether the client is requesting an SSE stream via the Accept header. + * Matches `text/event-stream` exactly (no structured-suffix matches like + * `text/event-stream+json`) and respects the RFC 7231 §5.3.1 quality + * factor — `q=0` means "do not accept" and is filtered out. + */ +export function acceptsEventStream(headers: Headers): boolean { + const accept = headers.get("accept"); + if (!accept) { + return false; + } + return accept.split(",").some((part) => { + const [mediaType, ...params] = part + .split(";") + .map((segment) => segment.trim().toLowerCase()); + if (mediaType !== "text/event-stream") { + return false; + } + for (const param of params) { + const eq = param.indexOf("="); + if (eq === -1) { + continue; + } + const name = param.slice(0, eq).trim(); + if (name !== "q") { + continue; + } + const value = param.slice(eq + 1).trim(); + const q = Number(value); + return Number.isFinite(q) && q > 0; + } + return true; + }); +} + /** * Extract headers to be forwarded to upstream * All headers are forwarded EXCEPT: