Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 15 additions & 6 deletions proxy/internal/handler/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,14 +282,23 @@ func (h *Handler) handleStreamingResponse(w http.ResponseWriter, resp *http.Resp
scanner := bufio.NewScanner(resp.Body)
for scanner.Scan() {
line := scanner.Text()
if line == "" || !strings.HasPrefix(line, "data:") {
continue

// Forward every line to the client as-is, preserving the full SSE
// protocol (event: lines, data: lines, and blank separators).
fmt.Fprintf(w, "%s\n", line)
if line == "" {
// Blank line = SSE event boundary; flush to client.
if f, ok := w.(http.Flusher); ok {
f.Flush()
}
}

streamingChunks = append(streamingChunks, line)
fmt.Fprintf(w, "%s\n\n", line)
if f, ok := w.(http.Flusher); ok {
f.Flush()
if line != "" {
streamingChunks = append(streamingChunks, line)
}

if !strings.HasPrefix(line, "data:") {
continue
}

jsonData := strings.TrimPrefix(line, "data: ")
Expand Down
14 changes: 7 additions & 7 deletions proxy/internal/provider/openai.go
Original file line number Diff line number Diff line change
Expand Up @@ -602,11 +602,11 @@ func transformOpenAIStreamToAnthropic(openAIStream io.ReadCloser, anthropicStrea
if data == "[DONE]" {
// Send Anthropic-style completion
if contentStarted {
fmt.Fprintf(anthropicStream, "data: {\"type\":\"content_block_stop\",\"index\":0}\n\n")
fmt.Fprintf(anthropicStream, "event: content_block_stop\ndata: {\"type\":\"content_block_stop\",\"index\":0}\n\n")
}
if messageStarted {
fmt.Fprintf(anthropicStream, "data: {\"type\":\"message_delta\",\"delta\":{\"stop_reason\":\"end_turn\",\"stop_sequence\":null}}\n\n")
fmt.Fprintf(anthropicStream, "data: {\"type\":\"message_stop\"}\n\n")
fmt.Fprintf(anthropicStream, "event: message_delta\ndata: {\"type\":\"message_delta\",\"delta\":{\"stop_reason\":\"end_turn\",\"stop_sequence\":null}}\n\n")
fmt.Fprintf(anthropicStream, "event: message_stop\ndata: {\"type\":\"message_stop\"}\n\n")
}
break
}
Expand Down Expand Up @@ -644,7 +644,7 @@ func transformOpenAIStreamToAnthropic(openAIStream io.ReadCloser, anthropicStrea
"usage": anthropicUsage,
}
usageJSON, _ := json.Marshal(usageDelta)
fmt.Fprintf(anthropicStream, "data: %s\n\n", usageJSON)
fmt.Fprintf(anthropicStream, "event: message_delta\ndata: %s\n\n", usageJSON)
}
}

Expand Down Expand Up @@ -684,7 +684,7 @@ func transformOpenAIStreamToAnthropic(openAIStream io.ReadCloser, anthropicStrea
},
}
startJSON, _ := json.Marshal(messageStart)
fmt.Fprintf(anthropicStream, "data: %s\n\n", startJSON)
fmt.Fprintf(anthropicStream, "event: message_start\ndata: %s\n\n", startJSON)
}

// Handle content
Expand All @@ -701,7 +701,7 @@ func transformOpenAIStreamToAnthropic(openAIStream io.ReadCloser, anthropicStrea
},
}
blockStartJSON, _ := json.Marshal(blockStart)
fmt.Fprintf(anthropicStream, "data: %s\n\n", blockStartJSON)
fmt.Fprintf(anthropicStream, "event: content_block_start\ndata: %s\n\n", blockStartJSON)
}

// Send content_block_delta
Expand All @@ -714,7 +714,7 @@ func transformOpenAIStreamToAnthropic(openAIStream io.ReadCloser, anthropicStrea
},
}
deltaJSON, _ := json.Marshal(contentDelta)
fmt.Fprintf(anthropicStream, "data: %s\n\n", deltaJSON)
fmt.Fprintf(anthropicStream, "event: content_block_delta\ndata: %s\n\n", deltaJSON)
}

}
Expand Down
2 changes: 1 addition & 1 deletion web/app/components/RequestDetailContent.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -513,7 +513,7 @@ function ResponseDetails({ response }: { response: NonNullable<Request['response
// Parse streaming chunks to extract the final assembled text
const parseStreamingResponse = (chunks: string[]) => {
let assembledText = '';
let rawData = chunks.join('');
let rawData = chunks.join('\n');

try {
// Split by lines and process each SSE event
Expand Down