Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 49 additions & 22 deletions backend/src/api/v1/completions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import {
type FailoverConfig,
} from "@/services/failover";
import {
acceptsEventStream,
extractUpstreamHeaders,
filterCandidates,
extractContentText,
Expand Down Expand Up @@ -569,6 +570,12 @@ export const completionsApi = new Elysia({
// Extract extra headers for passthrough
const extraHeaders = extractUpstreamHeaders(reqHeaders);

// Determine streaming mode. Body wins when explicit; otherwise honor
// the client's Accept: text/event-stream negotiation hint.
if (body.stream === undefined && acceptsEventStream(reqHeaders)) {
body.stream = true;
}

// Check ReqId for deduplication (if provided)
const isStream = body.stream === true;
const reqIdResult = await checkReqId(reqId, {
Expand Down Expand Up @@ -718,31 +725,51 @@ export const completionsApi = new Elysia({
extraHeaders,
);

// Return an async generator for streaming
// Return a native Response with proper SSE headers. Wrapping the
// pre-formatted SSE generator in a ReadableStream ensures Elysia
// skips its auto SSE-wrapping (which would double-prefix `data:`)
// and lets us set Content-Type: text/event-stream explicitly.
const streamResponse = result.response;
const streamSignal = request.signal;
return (async function* () {
try {
yield* processStreamingResponse(
streamResponse,
completion,
bearer,
providerType,
apiKeyRecord ?? null,
begin,
streamSignal,
reqIdContext ?? undefined,
);
} catch (error) {
// Don't log error if it's due to client abort
if (!streamSignal.aborted) {
logger.error("Stream processing error", error);
// Note: HTTP status cannot be changed after streaming has started
// Use SSE format for error: data: {...}\n\n
yield `data: ${JSON.stringify({ error: { message: "Stream processing error", type: "server_error", code: "stream_error" } })}\n\n`;
const sseStream = new ReadableStream<Uint8Array>({
async start(controller) {
const encoder = new TextEncoder();
try {
for await (const chunk of processStreamingResponse(
streamResponse,
completion,
bearer,
providerType,
apiKeyRecord ?? null,
begin,
streamSignal,
reqIdContext ?? undefined,
)) {
controller.enqueue(encoder.encode(chunk));
}
} catch (error) {
if (!streamSignal.aborted) {
logger.error("Stream processing error", error);
controller.enqueue(
encoder.encode(
`data: ${JSON.stringify({ error: { message: "Stream processing error", type: "server_error", code: "stream_error" } })}\n\n`,
),
);
}
} finally {
controller.close();
}
}
})();
},
});

return new Response(sseStream, {
status: 200,
headers: {
"Content-Type": "text/event-stream; charset=utf-8",
"Cache-Control": "no-cache",
Connection: "keep-alive",
},
});
} else {
// Non-streaming request - return JSON response directly
const result = await executeWithFailover(
Expand Down
70 changes: 49 additions & 21 deletions backend/src/api/v1/messages.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import {
type FailoverConfig,
} from "@/services/failover";
import {
acceptsEventStream,
extractUpstreamHeaders,
filterCandidates,
extractContentText,
Expand Down Expand Up @@ -570,6 +571,12 @@ export const messagesApi = new Elysia({
// Extract extra headers for passthrough
const extraHeaders = extractUpstreamHeaders(reqHeaders);

// Determine streaming mode. Body wins when explicit; otherwise honor
// the client's Accept: text/event-stream negotiation hint.
if (body.stream === undefined && acceptsEventStream(reqHeaders)) {
body.stream = true;
}

// Check ReqId for deduplication (if provided)
const isStream = body.stream === true;
const reqIdResult = await checkReqId(reqId, {
Expand Down Expand Up @@ -722,30 +729,51 @@ export const messagesApi = new Elysia({
extraHeaders,
);

// Return an async generator for streaming
// Return a native Response with proper SSE headers. Wrapping the
// pre-formatted SSE generator in a ReadableStream ensures Elysia
// skips its auto SSE-wrapping (which would double-prefix `data:`)
// and lets us set Content-Type: text/event-stream explicitly.
const streamResponse = result.response;
const streamSignal = request.signal;
return (async function* () {
try {
yield* processStreamingResponse(
streamResponse,
completion,
bearer,
providerType,
apiKeyRecord ?? null,
begin,
streamSignal,
reqIdContext ?? undefined,
);
} catch (error) {
// Don't log error if it's due to client abort
if (!streamSignal.aborted) {
logger.error("Stream processing error", error);
// Note: HTTP status cannot be changed after streaming has started
yield `event: error\ndata: ${JSON.stringify({ type: "error", error: { type: "server_error", message: "Stream processing error" } })}\n\n`;
const sseStream = new ReadableStream<Uint8Array>({
async start(controller) {
const encoder = new TextEncoder();
try {
for await (const chunk of processStreamingResponse(
streamResponse,
completion,
bearer,
providerType,
apiKeyRecord ?? null,
begin,
streamSignal,
reqIdContext ?? undefined,
)) {
controller.enqueue(encoder.encode(chunk));
}
} catch (error) {
if (!streamSignal.aborted) {
logger.error("Stream processing error", error);
controller.enqueue(
encoder.encode(
`event: error\ndata: ${JSON.stringify({ type: "error", error: { type: "server_error", message: "Stream processing error" } })}\n\n`,
),
);
}
} finally {
controller.close();
}
}
})();
},
});

return new Response(sseStream, {
status: 200,
headers: {
"Content-Type": "text/event-stream; charset=utf-8",
"Cache-Control": "no-cache",
Connection: "keep-alive",
},
});
} else {
// Non-streaming request - return JSON response directly
const result = await executeWithFailover(
Expand Down
69 changes: 48 additions & 21 deletions backend/src/api/v1/responses.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import {
type FailoverConfig,
} from "@/services/failover";
import {
acceptsEventStream,
extractUpstreamHeaders,
filterCandidates,
extractContentText,
Expand Down Expand Up @@ -596,6 +597,11 @@ export const responsesApi = new Elysia({
const extraHeaders = extractUpstreamHeaders(reqHeaders);

// Check ReqId for deduplication (if provided)
// Determine streaming mode. Body wins when explicit; otherwise honor
// the client's Accept: text/event-stream negotiation hint.
if (body.stream === undefined && acceptsEventStream(reqHeaders)) {
body.stream = true;
}
const isStream = body.stream === true;

// Convert input to messages format for storage
Expand Down Expand Up @@ -765,30 +771,51 @@ export const responsesApi = new Elysia({
extraHeaders,
);

// Return an async generator for streaming
// Return a native Response with proper SSE headers. Wrapping the
// pre-formatted SSE generator in a ReadableStream ensures Elysia
// skips its auto SSE-wrapping (which would double-prefix `data:`)
// and lets us set Content-Type: text/event-stream explicitly.
const streamResponse = result.response;
const streamSignal = request.signal;
return (async function* () {
try {
yield* processStreamingResponse(
streamResponse,
completion,
bearer,
providerType,
apiKeyRecord ?? null,
begin,
streamSignal,
reqIdContext ?? undefined,
);
} catch (error) {
// Don't log error if it's due to client abort
if (!streamSignal.aborted) {
logger.error("Stream processing error", error);
// Note: HTTP status cannot be changed after streaming has started
yield `event: error\ndata: ${JSON.stringify({ type: "error", error: { code: "internal_error", message: "Stream processing error", param: null, help_url: null } })}\n\n`;
const sseStream = new ReadableStream<Uint8Array>({
async start(controller) {
const encoder = new TextEncoder();
try {
for await (const chunk of processStreamingResponse(
streamResponse,
completion,
bearer,
providerType,
apiKeyRecord ?? null,
begin,
streamSignal,
reqIdContext ?? undefined,
)) {
controller.enqueue(encoder.encode(chunk));
}
} catch (error) {
if (!streamSignal.aborted) {
logger.error("Stream processing error", error);
controller.enqueue(
encoder.encode(
`event: error\ndata: ${JSON.stringify({ type: "error", error: { code: "internal_error", message: "Stream processing error", param: null, help_url: null } })}\n\n`,
),
);
}
} finally {
controller.close();
}
}
})();
},
});

return new Response(sseStream, {
status: 200,
headers: {
"Content-Type": "text/event-stream; charset=utf-8",
"Cache-Control": "no-cache",
Connection: "keep-alive",
},
});
} else {
// Non-streaming request - return JSON response directly
const result = await executeWithFailover(
Expand Down
74 changes: 74 additions & 0 deletions backend/src/utils/api-helpers.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/**
* Unit tests for api-helpers utilities
*/

import { describe, expect, test } from "bun:test";
import { acceptsEventStream } from "./api-helpers";

const h = (value?: string) => new Headers(value ? { accept: value } : {});

describe("acceptsEventStream", () => {
test("plain text/event-stream", () => {
expect(acceptsEventStream(h("text/event-stream"))).toBe(true);
});

test("missing Accept header", () => {
expect(acceptsEventStream(h())).toBe(false);
});

test("unrelated media type", () => {
expect(acceptsEventStream(h("application/json"))).toBe(false);
});

test("wildcard does not opt in", () => {
expect(acceptsEventStream(h("*/*"))).toBe(false);
});

test("case-insensitive media type", () => {
expect(acceptsEventStream(h("TEXT/EVENT-STREAM"))).toBe(true);
});

test("tolerates internal whitespace", () => {
expect(acceptsEventStream(h("text/event-stream ; q = 0.5 "))).toBe(true);
});

test("weighted list with positive q is accepted", () => {
expect(
acceptsEventStream(h("application/json, text/event-stream;q=0.5")),
).toBe(true);
});

test("explicit q=0 rejects SSE (RFC 7231 §5.3.1)", () => {
expect(acceptsEventStream(h("text/event-stream;q=0"))).toBe(false);
});

test("q=0.0 also rejects SSE", () => {
expect(acceptsEventStream(h("text/event-stream;q=0.0"))).toBe(false);
});

test("q = 0 with whitespace around = rejects SSE", () => {
expect(acceptsEventStream(h("text/event-stream ; q = 0"))).toBe(false);
});

test("q = 0.5 with whitespace around = is accepted", () => {
expect(acceptsEventStream(h("text/event-stream ; q = 0.5"))).toBe(true);
});

test("malformed empty q value is treated as not acceptable", () => {
expect(acceptsEventStream(h("text/event-stream;q="))).toBe(false);
});

test("q=0 in a weighted list rejects SSE", () => {
expect(
acceptsEventStream(h("text/event-stream;q=0, application/json")),
).toBe(false);
});

test("structured-suffix match is rejected", () => {
expect(acceptsEventStream(h("text/event-stream+json"))).toBe(false);
});

test("malformed q value is treated as not acceptable", () => {
expect(acceptsEventStream(h("text/event-stream;q=NaN"))).toBe(false);
});
});
35 changes: 35 additions & 0 deletions backend/src/utils/api-helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,41 @@ export const EXCLUDED_HEADERS = new Set([
// Header Extraction
// =============================================================================

/**
* Whether the client is requesting an SSE stream via the Accept header.
* Matches `text/event-stream` exactly (no structured-suffix matches like
* `text/event-stream+json`) and respects the RFC 7231 §5.3.1 quality
* factor — `q=0` means "do not accept" and is filtered out.
*/
export function acceptsEventStream(headers: Headers): boolean {
const accept = headers.get("accept");
if (!accept) {
return false;
}
return accept.split(",").some((part) => {
const [mediaType, ...params] = part
.split(";")
.map((segment) => segment.trim().toLowerCase());
if (mediaType !== "text/event-stream") {
return false;
}
for (const param of params) {
const eq = param.indexOf("=");
if (eq === -1) {
continue;
}
const name = param.slice(0, eq).trim();
if (name !== "q") {
continue;
}
const value = param.slice(eq + 1).trim();
const q = Number(value);
return Number.isFinite(q) && q > 0;
}
return true;
});
}
Comment thread
pescn marked this conversation as resolved.

/**
* Extract headers to be forwarded to upstream
* All headers are forwarded EXCEPT:
Expand Down
Loading