From 62d1235a731e2e2794596de4a0a51b270cda9b0d Mon Sep 17 00:00:00 2001 From: Koosha Paridehpour Date: Fri, 27 Feb 2026 03:40:13 -0700 Subject: [PATCH 01/12] fix(responses): prevent JSON tree corruption from literal control chars in function output Cherry-pick of upstream PR #1672. Adds containsLiteralControlChars guard to prevent sjson.SetRaw from corrupting the JSON tree when function outputs contain literal control characters. Co-Authored-By: Claude Opus 4.6 --- .../gemini_openai-responses_request.go | 24 +++++++++++++++++-- 1 file changed, 22 insertions(+), 2 deletions(-) diff --git a/internal/translator/gemini/openai/responses/gemini_openai-responses_request.go b/internal/translator/gemini/openai/responses/gemini_openai-responses_request.go index 6ffa62ce67..72d25cf224 100644 --- a/internal/translator/gemini/openai/responses/gemini_openai-responses_request.go +++ b/internal/translator/gemini/openai/responses/gemini_openai-responses_request.go @@ -310,10 +310,17 @@ func ConvertOpenAIResponsesRequestToGemini(modelName string, inputRawJSON []byte functionResponse, _ = sjson.Set(functionResponse, "functionResponse.name", functionName) functionResponse, _ = sjson.Set(functionResponse, "functionResponse.id", callID) - // Set the raw JSON output directly (preserves string encoding) + // Set the function output into the response. + // When the output is valid JSON without literal control characters + // (newlines, carriage returns inside string values) we embed it as a + // raw JSON value so the model sees structured data. Otherwise we + // fall back to sjson.Set which safely escapes the value as a string. + // This prevents sjson.SetRaw from corrupting the JSON tree when the + // raw value contains literal newlines (common with double-encoded + // function outputs whose inner escape sequences were decoded by .Str). if outputRaw != "" && outputRaw != "null" { output := gjson.Parse(outputRaw) - if output.Type == gjson.JSON { + if output.Type == gjson.JSON && !containsLiteralControlChars(output.Raw) { functionResponse, _ = sjson.SetRaw(functionResponse, "functionResponse.response.result", output.Raw) } else { functionResponse, _ = sjson.Set(functionResponse, "functionResponse.response.result", outputRaw) @@ -433,3 +440,16 @@ func ConvertOpenAIResponsesRequestToGemini(modelName string, inputRawJSON []byte result = common.AttachDefaultSafetySettings(result, "safetySettings") return result } + +// containsLiteralControlChars reports whether s contains any ASCII control +// character (0x00–0x1F) other than horizontal tab (0x09). Literal newlines +// and carriage returns inside a JSON value cause sjson.SetRaw to mis-parse +// string boundaries and corrupt the surrounding JSON tree. +func containsLiteralControlChars(s string) bool { + for _, c := range s { + if c < 0x20 && c != '\t' { + return true + } + } + return false +} From aee3e5c5df921154e6ee25b41b84e7b6cb6781e3 Mon Sep 17 00:00:00 2001 From: Koosha Paridehpour Date: Fri, 27 Feb 2026 03:40:58 -0700 Subject: [PATCH 02/12] fix(auth): limit auto-refresh concurrency to prevent refresh storms Cherry-pick of upstream PR #1686. Reduces refresh check interval to 5s and adds refreshMaxConcurrency=16 constant (semaphore already in main). Co-Authored-By: Claude Opus 4.6 --- sdk/cliproxy/auth/conductor.go | 23 ++++++++++++++++++++--- 1 file changed, 20 insertions(+), 3 deletions(-) diff --git a/sdk/cliproxy/auth/conductor.go b/sdk/cliproxy/auth/conductor.go index b0ed3c0991..3dfe5faa86 100644 --- a/sdk/cliproxy/auth/conductor.go +++ b/sdk/cliproxy/auth/conductor.go @@ -59,7 +59,8 @@ type RefreshEvaluator interface { } const ( - refreshCheckInterval = 30 * time.Second + refreshCheckInterval = 5 * time.Second + refreshMaxConcurrency = 16 refreshPendingBackoff = time.Minute refreshFailureBackoff = 1 * time.Minute quotaBackoffBase = time.Second @@ -155,7 +156,8 @@ type Manager struct { rtProvider RoundTripperProvider // Auto refresh state - refreshCancel context.CancelFunc + refreshCancel context.CancelFunc + refreshSemaphore chan struct{} } // NewManager constructs a manager with optional custom selector and hook. @@ -173,6 +175,7 @@ func NewManager(store Store, selector Selector, hook Hook) *Manager { hook: hook, auths: make(map[string]*Auth), providerOffsets: make(map[string]int), + refreshSemaphore: make(chan struct{}, refreshMaxConcurrency), } // atomic.Value requires non-nil initial value. manager.runtimeConfig.Store(&internalconfig.Config{}) @@ -1869,11 +1872,25 @@ func (m *Manager) checkRefreshes(ctx context.Context) { if !m.markRefreshPending(a.ID, now) { continue } - go m.refreshAuth(ctx, a.ID) + go m.refreshAuthWithLimit(ctx, a.ID) } } } +func (m *Manager) refreshAuthWithLimit(ctx context.Context, id string) { + if m.refreshSemaphore == nil { + m.refreshAuth(ctx, id) + return + } + select { + case m.refreshSemaphore <- struct{}{}: + defer func() { <-m.refreshSemaphore }() + case <-ctx.Done(): + return + } + m.refreshAuth(ctx, id) +} + func (m *Manager) snapshotAuths() []*Auth { m.mu.RLock() defer m.mu.RUnlock() From 1eeb820b1cc94e977af93dd2f2b59d729fb8110c Mon Sep 17 00:00:00 2001 From: Koosha Paridehpour Date: Fri, 27 Feb 2026 03:41:49 -0700 Subject: [PATCH 03/12] fix(translator): correct Gemini API schema parameter naming Cherry-pick of upstream PR #1648. Renames parametersJsonSchema to parameters for Gemini API compatibility. Co-Authored-By: Claude Opus 4.6 --- .../gemini/antigravity_gemini_request.go | 7 ++- .../gemini/antigravity_gemini_request_test.go | 56 +++++++++++++++++++ 2 files changed, 61 insertions(+), 2 deletions(-) diff --git a/internal/translator/antigravity/gemini/antigravity_gemini_request.go b/internal/translator/antigravity/gemini/antigravity_gemini_request.go index 9c20bd922e..cd045eb339 100644 --- a/internal/translator/antigravity/gemini/antigravity_gemini_request.go +++ b/internal/translator/antigravity/gemini/antigravity_gemini_request.go @@ -87,9 +87,12 @@ func ConvertGeminiRequestToAntigravity(modelName string, inputRawJSON []byte, _ if functionDeclarationsResult.Exists() && functionDeclarationsResult.IsArray() { functionDeclarationsResults := functionDeclarationsResult.Array() for j := 0; j < len(functionDeclarationsResults); j++ { + // Gemini API expects function_declarations[].parameters (JSON Schema). + // Keep/normalize to "parameters" for compatibility, even if clients send "parametersJsonSchema". parametersResult := gjson.GetBytes(rawJSON, fmt.Sprintf("request.tools.%d.function_declarations.%d.parameters", i, j)) - if parametersResult.Exists() { - strJson, _ := util.RenameKey(string(rawJSON), fmt.Sprintf("request.tools.%d.function_declarations.%d.parameters", i, j), fmt.Sprintf("request.tools.%d.function_declarations.%d.parametersJsonSchema", i, j)) + parametersJSONSchemaResult := gjson.GetBytes(rawJSON, fmt.Sprintf("request.tools.%d.function_declarations.%d.parametersJsonSchema", i, j)) + if !parametersResult.Exists() && parametersJSONSchemaResult.Exists() { + strJson, _ := util.RenameKey(string(rawJSON), fmt.Sprintf("request.tools.%d.function_declarations.%d.parametersJsonSchema", i, j), fmt.Sprintf("request.tools.%d.function_declarations.%d.parameters", i, j)) rawJSON = []byte(strJson) } } diff --git a/internal/translator/antigravity/gemini/antigravity_gemini_request_test.go b/internal/translator/antigravity/gemini/antigravity_gemini_request_test.go index 8867a30eae..92289a6458 100644 --- a/internal/translator/antigravity/gemini/antigravity_gemini_request_test.go +++ b/internal/translator/antigravity/gemini/antigravity_gemini_request_test.go @@ -7,6 +7,62 @@ import ( "github.com/tidwall/gjson" ) +func TestConvertGeminiRequestToAntigravity_ToolParametersKeyPreserved(t *testing.T) { + inputJSON := []byte(`{ + "model": "gemini-3-pro-preview", + "tools": [ + { + "function_declarations": [ + { + "name": "test_tool", + "description": "test", + "parameters": {"type":"OBJECT","properties":{"a":{"type":"STRING"}}} + } + ] + } + ], + "contents": [{"role":"user","parts":[{"text":"hi"}]}] + }`) + + output := ConvertGeminiRequestToAntigravity("gemini-3-pro-preview", inputJSON, false) + outStr := string(output) + + if !gjson.Get(outStr, "request.tools.0.function_declarations.0.parameters").Exists() { + t.Fatalf("expected request.tools[0].function_declarations[0].parameters to exist") + } + if gjson.Get(outStr, "request.tools.0.function_declarations.0.parametersJsonSchema").Exists() { + t.Fatalf("expected request.tools[0].function_declarations[0].parametersJsonSchema to NOT exist") + } +} + +func TestConvertGeminiRequestToAntigravity_ToolParametersJsonSchemaNormalized(t *testing.T) { + inputJSON := []byte(`{ + "model": "gemini-3-pro-preview", + "tools": [ + { + "function_declarations": [ + { + "name": "test_tool", + "description": "test", + "parametersJsonSchema": {"type":"OBJECT","properties":{"a":{"type":"STRING"}}} + } + ] + } + ], + "contents": [{"role":"user","parts":[{"text":"hi"}]}] + }`) + + output := ConvertGeminiRequestToAntigravity("gemini-3-pro-preview", inputJSON, false) + outStr := string(output) + + if !gjson.Get(outStr, "request.tools.0.function_declarations.0.parameters").Exists() { + t.Fatalf("expected request.tools[0].function_declarations[0].parameters to exist") + } + if gjson.Get(outStr, "request.tools.0.function_declarations.0.parametersJsonSchema").Exists() { + t.Fatalf("expected request.tools[0].function_declarations[0].parametersJsonSchema to NOT exist") + } +} + func TestConvertGeminiRequestToAntigravity_PreserveValidSignature(t *testing.T) { // Valid signature on functionCall should be preserved validSignature := "abc123validSignature1234567890123456789012345678901234567890" From d1a06d64d9c216bfe0114b7bae37bbd4377effcf Mon Sep 17 00:00:00 2001 From: Koosha Paridehpour Date: Fri, 27 Feb 2026 03:41:55 -0700 Subject: [PATCH 04/12] feat: add official Termux (aarch64) build to release workflow Cherry-pick of upstream PR #1233. Adds build-termux job that builds inside a Termux container for aarch64 support. Co-Authored-By: Claude Opus 4.6 --- .github/workflows/release.yaml | 35 ++++++++++++++++++++++++++++++++++ 1 file changed, 35 insertions(+) diff --git a/.github/workflows/release.yaml b/.github/workflows/release.yaml index 04ec21a9a5..1314f83ff5 100644 --- a/.github/workflows/release.yaml +++ b/.github/workflows/release.yaml @@ -37,3 +37,38 @@ jobs: VERSION: ${{ env.VERSION }} COMMIT: ${{ env.COMMIT }} BUILD_DATE: ${{ env.BUILD_DATE }} + + build-termux: + name: Build Termux (aarch64) + runs-on: ubuntu-24.04-arm + steps: + - name: Checkout + uses: actions/checkout@v4 + with: + fetch-depth: 0 + - name: Build in Termux Container + run: | + # Prepare metadata on host + VERSION=$(git describe --tags --always --dirty | sed 's/^v//') + COMMIT=$(git rev-parse --short HEAD) + BUILD_DATE=$(date -u +%Y-%m-%dT%H:%M:%SZ) + + # Ensure the workspace is writable by the container + chmod -R 777 . + + # Run the build inside Termux container + docker run --rm -v $(pwd):/workspace -w /workspace \ + -e VERSION=$VERSION -e COMMIT=$COMMIT -e BUILD_DATE=$BUILD_DATE \ + termux/termux-docker:aarch64 bash -c " + pkg update -y && pkg upgrade -y -o Dpkg::Options::='--force-confdef' -o Dpkg::Options::='--force-confold' + pkg install -y golang build-essential tar + CGO_ENABLED=0 go build -ldflags \"-s -w -X 'main.Version=\$VERSION' -X 'main.Commit=\$COMMIT' -X 'main.BuildDate=\$BUILD_DATE'\" -o cli-proxy-api ./cmd/server + tar -czf cli-proxy-api-termux-aarch64.tar.gz cli-proxy-api LICENSE README.md README_CN.md config.example.yaml + " + - name: Upload to Release + uses: softprops/action-gh-release@v2 + if: startsWith(github.ref, 'refs/tags/') + with: + files: cli-proxy-api-termux-aarch64.tar.gz + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} From 5bd0d5efbf63e6f9702d4f55f3b3a443051b985e Mon Sep 17 00:00:00 2001 From: Koosha Paridehpour Date: Fri, 27 Feb 2026 03:42:01 -0700 Subject: [PATCH 05/12] fix(translator): fix Claude tool_use streaming for OpenAI-compat providers Cherry-pick of upstream PR #1579. Fixes duplicate/empty tool_use blocks in OpenAI->Claude streaming translation. Co-Authored-By: Claude Opus 4.6 --- .../openai/claude/openai_claude_response.go | 204 +++++++++++++++--- .../claude/openai_claude_response_test.go | 86 ++++++++ 2 files changed, 257 insertions(+), 33 deletions(-) create mode 100644 internal/translator/openai/claude/openai_claude_response_test.go diff --git a/internal/translator/openai/claude/openai_claude_response.go b/internal/translator/openai/claude/openai_claude_response.go index e270a57eeb..5f5f5abce2 100644 --- a/internal/translator/openai/claude/openai_claude_response.go +++ b/internal/translator/openai/claude/openai_claude_response.go @@ -10,6 +10,7 @@ import ( "context" "encoding/json" "fmt" + "sort" "strings" "github.com/kooshapari/cliproxyapi-plusplus/v6/internal/util" @@ -52,6 +53,8 @@ type ConvertOpenAIResponseToAnthropicParams struct { ThinkingContentBlockIndex int // Next available content block index NextContentBlockIndex int + // Canonical tool name map from lowercase -> declared request tool name + CanonicalToolNameByLower map[string]string } // ToolCallAccumulator holds the state for accumulating tool call data @@ -59,6 +62,7 @@ type ToolCallAccumulator struct { ID string Name string Arguments strings.Builder + Started bool } // ConvertOpenAIResponseToClaude converts OpenAI streaming response format to Anthropic API format. @@ -90,6 +94,7 @@ func ConvertOpenAIResponseToClaude(_ context.Context, _ string, originalRequestR TextContentBlockIndex: -1, ThinkingContentBlockIndex: -1, NextContentBlockIndex: 0, + CanonicalToolNameByLower: buildCanonicalToolNameByLower(originalRequestRawJSON), } } @@ -106,7 +111,7 @@ func ConvertOpenAIResponseToClaude(_ context.Context, _ string, originalRequestR streamResult := gjson.GetBytes(originalRequestRawJSON, "stream") if !streamResult.Exists() || (streamResult.Exists() && streamResult.Type == gjson.False) { - return convertOpenAINonStreamingToAnthropic(rawJSON) + return convertOpenAINonStreamingToAnthropic(rawJSON, (*param).(*ConvertOpenAIResponseToAnthropicParams).CanonicalToolNameByLower) } else { return convertOpenAIStreamingChunkToAnthropic(rawJSON, (*param).(*ConvertOpenAIResponseToAnthropicParams)) } @@ -268,27 +273,29 @@ func convertOpenAIStreamingChunkToAnthropic(rawJSON []byte, param *ConvertOpenAI // Handle function name if function := toolCall.Get("function"); function.Exists() { if name := function.Get("name"); name.Exists() { - accumulator.Name = name.String() - - ensureMessageStarted() // Must send message_start before content_block_start + nameStr := strings.TrimSpace(name.String()) + if nameStr != "" { + accumulator.Name = canonicalizeToolName(nameStr, param.CanonicalToolNameByLower) + } + } + // Emit tool_use start exactly once per tool call. + // Some OpenAI-compatible streams repeat function.name="" in later chunks; + // emitting start repeatedly breaks Claude Code tool execution. + if !accumulator.Started && accumulator.Name != "" { stopThinkingContentBlock(param, &results) - stopTextContentBlock(param, &results) - // Send content_block_start for tool_use - contentBlockStart := map[string]interface{}{ - "type": "content_block_start", - "index": blockIndex, - "content_block": map[string]interface{}{ - "type": "tool_use", - "id": accumulator.ID, - "name": accumulator.Name, - "input": map[string]interface{}{}, - }, + if accumulator.ID == "" { + accumulator.ID = fmt.Sprintf("call_%d", index) } - contentBlockStartJSON, _ := json.Marshal(contentBlockStart) - results = append(results, "event: content_block_start\ndata: "+string(contentBlockStartJSON)+"\n\n") + + contentBlockStartJSON := `{"type":"content_block_start","index":0,"content_block":{"type":"tool_use","id":"","name":"","input":{}}}` + contentBlockStartJSON, _ = sjson.Set(contentBlockStartJSON, "index", blockIndex) + contentBlockStartJSON, _ = sjson.Set(contentBlockStartJSON, "content_block.id", accumulator.ID) + contentBlockStartJSON, _ = sjson.Set(contentBlockStartJSON, "content_block.name", accumulator.Name) + results = append(results, "event: content_block_start\ndata: "+contentBlockStartJSON+"\n\n") + accumulator.Started = true } // Handle function arguments @@ -327,10 +334,26 @@ func convertOpenAIStreamingChunkToAnthropic(rawJSON []byte, param *ConvertOpenAI // Send content_block_stop for any tool calls if !param.ContentBlocksStopped { - for index := range param.ToolCallsAccumulator { + for _, index := range sortedToolCallIndexes(param.ToolCallsAccumulator) { accumulator := param.ToolCallsAccumulator[index] blockIndex := param.toolContentBlockIndex(index) + if !accumulator.Started { + if strings.TrimSpace(accumulator.Name) == "" { + delete(param.ToolCallBlockIndexes, index) + continue + } + if accumulator.ID == "" { + accumulator.ID = fmt.Sprintf("call_%d", index) + } + contentBlockStartJSON := `{"type":"content_block_start","index":0,"content_block":{"type":"tool_use","id":"","name":"","input":{}}}` + contentBlockStartJSON, _ = sjson.Set(contentBlockStartJSON, "index", blockIndex) + contentBlockStartJSON, _ = sjson.Set(contentBlockStartJSON, "content_block.id", accumulator.ID) + contentBlockStartJSON, _ = sjson.Set(contentBlockStartJSON, "content_block.name", accumulator.Name) + results = append(results, "event: content_block_start\ndata: "+contentBlockStartJSON+"\n\n") + accumulator.Started = true + } + // Send complete input_json_delta with all accumulated arguments if accumulator.Arguments.Len() > 0 { inputDelta := map[string]interface{}{ @@ -417,10 +440,26 @@ func convertOpenAIDoneToAnthropic(param *ConvertOpenAIResponseToAnthropicParams) stopTextContentBlock(param, &results) if !param.ContentBlocksStopped { - for index := range param.ToolCallsAccumulator { + for _, index := range sortedToolCallIndexes(param.ToolCallsAccumulator) { accumulator := param.ToolCallsAccumulator[index] blockIndex := param.toolContentBlockIndex(index) + if !accumulator.Started { + if strings.TrimSpace(accumulator.Name) == "" { + delete(param.ToolCallBlockIndexes, index) + continue + } + if accumulator.ID == "" { + accumulator.ID = fmt.Sprintf("call_%d", index) + } + contentBlockStartJSON := `{"type":"content_block_start","index":0,"content_block":{"type":"tool_use","id":"","name":"","input":{}}}` + contentBlockStartJSON, _ = sjson.Set(contentBlockStartJSON, "index", blockIndex) + contentBlockStartJSON, _ = sjson.Set(contentBlockStartJSON, "content_block.id", accumulator.ID) + contentBlockStartJSON, _ = sjson.Set(contentBlockStartJSON, "content_block.name", accumulator.Name) + results = append(results, "event: content_block_start\ndata: "+contentBlockStartJSON+"\n\n") + accumulator.Started = true + } + if accumulator.Arguments.Len() > 0 { inputDelta := map[string]interface{}{ "type": "content_block_delta", @@ -466,7 +505,7 @@ func convertOpenAIDoneToAnthropic(param *ConvertOpenAIResponseToAnthropicParams) } // convertOpenAINonStreamingToAnthropic converts OpenAI non-streaming response to Anthropic format -func convertOpenAINonStreamingToAnthropic(rawJSON []byte) []string { +func convertOpenAINonStreamingToAnthropic(rawJSON []byte, canonicalToolNameByLower map[string]string) []string { root := gjson.ParseBytes(rawJSON) // Build Anthropic response @@ -514,10 +553,21 @@ func convertOpenAINonStreamingToAnthropic(rawJSON []byte) []string { // Handle tool calls if toolCalls := choice.Get("message.tool_calls"); toolCalls.Exists() && toolCalls.IsArray() { toolCalls.ForEach(func(_, toolCall gjson.Result) bool { - toolUseBlock := map[string]interface{}{ - "type": "tool_use", - "id": toolCall.Get("id").String(), - "name": toolCall.Get("function.name").String(), + toolUseBlock := `{"type":"tool_use","id":"","name":"","input":{}}` + toolUseBlock, _ = sjson.Set(toolUseBlock, "id", toolCall.Get("id").String()) + toolName := canonicalizeToolName(toolCall.Get("function.name").String(), canonicalToolNameByLower) + toolUseBlock, _ = sjson.Set(toolUseBlock, "name", toolName) + + argsStr := util.FixJSON(toolCall.Get("function.arguments").String()) + if argsStr != "" && gjson.Valid(argsStr) { + argsJSON := gjson.Parse(argsStr) + if argsJSON.IsObject() { + toolUseBlock, _ = sjson.SetRaw(toolUseBlock, "input", argsJSON.Raw) + } else { + toolUseBlock, _ = sjson.SetRaw(toolUseBlock, "input", "{}") + } + } else { + toolUseBlock, _ = sjson.SetRaw(toolUseBlock, "input", "{}") } // Parse arguments @@ -677,8 +727,8 @@ func stopTextContentBlock(param *ConvertOpenAIResponseToAnthropicParams, results // Returns: // - string: An Anthropic-compatible JSON response. func ConvertOpenAIResponseToClaudeNonStream(_ context.Context, _ string, originalRequestRawJSON, requestRawJSON, rawJSON []byte, _ *any) string { - _ = originalRequestRawJSON _ = requestRawJSON + canonicalToolNameByLower := buildCanonicalToolNameByLower(originalRequestRawJSON) root := gjson.ParseBytes(rawJSON) @@ -747,10 +797,21 @@ func ConvertOpenAIResponseToClaudeNonStream(_ context.Context, _ string, origina if toolCalls.IsArray() { toolCalls.ForEach(func(_, tc gjson.Result) bool { hasToolCall = true - toolUse := map[string]interface{}{ - "type": "tool_use", - "id": tc.Get("id").String(), - "name": tc.Get("function.name").String(), + toolUse := `{"type":"tool_use","id":"","name":"","input":{}}` + toolUse, _ = sjson.Set(toolUse, "id", tc.Get("id").String()) + toolName := canonicalizeToolName(tc.Get("function.name").String(), canonicalToolNameByLower) + toolUse, _ = sjson.Set(toolUse, "name", toolName) + + argsStr := util.FixJSON(tc.Get("function.arguments").String()) + if argsStr != "" && gjson.Valid(argsStr) { + argsJSON := gjson.Parse(argsStr) + if argsJSON.IsObject() { + toolUse, _ = sjson.SetRaw(toolUse, "input", argsJSON.Raw) + } else { + toolUse, _ = sjson.SetRaw(toolUse, "input", "{}") + } + } else { + toolUse, _ = sjson.SetRaw(toolUse, "input", "{}") } argsStr := util.FixJSON(tc.Get("function.arguments").String()) @@ -808,10 +869,21 @@ func ConvertOpenAIResponseToClaudeNonStream(_ context.Context, _ string, origina if toolCalls := message.Get("tool_calls"); toolCalls.Exists() && toolCalls.IsArray() { toolCalls.ForEach(func(_, toolCall gjson.Result) bool { hasToolCall = true - toolUseBlock := map[string]interface{}{ - "type": "tool_use", - "id": toolCall.Get("id").String(), - "name": toolCall.Get("function.name").String(), + toolUseBlock := `{"type":"tool_use","id":"","name":"","input":{}}` + toolUseBlock, _ = sjson.Set(toolUseBlock, "id", toolCall.Get("id").String()) + toolName := canonicalizeToolName(toolCall.Get("function.name").String(), canonicalToolNameByLower) + toolUseBlock, _ = sjson.Set(toolUseBlock, "name", toolName) + + argsStr := util.FixJSON(toolCall.Get("function.arguments").String()) + if argsStr != "" && gjson.Valid(argsStr) { + argsJSON := gjson.Parse(argsStr) + if argsJSON.IsObject() { + toolUseBlock, _ = sjson.SetRaw(toolUseBlock, "input", argsJSON.Raw) + } else { + toolUseBlock, _ = sjson.SetRaw(toolUseBlock, "input", "{}") + } + } else { + toolUseBlock, _ = sjson.SetRaw(toolUseBlock, "input", "{}") } argsStr := toolCall.Get("function.arguments").String() @@ -878,3 +950,69 @@ func ConvertOpenAIResponseToClaudeNonStream(_ context.Context, _ string, origina func ClaudeTokenCount(ctx context.Context, count int64) string { return fmt.Sprintf(`{"input_tokens":%d}`, count) } + +func extractOpenAIUsage(usage gjson.Result) (int64, int64, int64) { + if !usage.Exists() || usage.Type == gjson.Null { + return 0, 0, 0 + } + + inputTokens := usage.Get("prompt_tokens").Int() + outputTokens := usage.Get("completion_tokens").Int() + cachedTokens := usage.Get("prompt_tokens_details.cached_tokens").Int() + + if cachedTokens > 0 { + if inputTokens >= cachedTokens { + inputTokens -= cachedTokens + } else { + inputTokens = 0 + } + } + + return inputTokens, outputTokens, cachedTokens +} + +func sortedToolCallIndexes(toolCalls map[int]*ToolCallAccumulator) []int { + indexes := make([]int, 0, len(toolCalls)) + for index := range toolCalls { + indexes = append(indexes, index) + } + sort.Ints(indexes) + return indexes +} + +func buildCanonicalToolNameByLower(originalRequestRawJSON []byte) map[string]string { + tools := gjson.GetBytes(originalRequestRawJSON, "tools") + if !tools.Exists() || !tools.IsArray() { + return nil + } + + out := make(map[string]string) + tools.ForEach(func(_, tool gjson.Result) bool { + name := strings.TrimSpace(tool.Get("name").String()) + if name == "" { + return true + } + key := strings.ToLower(name) + // Preserve first declaration if collisions only differ by case. + if _, exists := out[key]; !exists { + out[key] = name + } + return true + }) + + if len(out) == 0 { + return nil + } + return out +} + +func canonicalizeToolName(name string, canonicalToolNameByLower map[string]string) string { + name = strings.TrimSpace(name) + if name == "" || len(canonicalToolNameByLower) == 0 { + return name + } + if canonical, ok := canonicalToolNameByLower[strings.ToLower(name)]; ok { + return canonical + } + return name +} diff --git a/internal/translator/openai/claude/openai_claude_response_test.go b/internal/translator/openai/claude/openai_claude_response_test.go new file mode 100644 index 0000000000..e4f43d866a --- /dev/null +++ b/internal/translator/openai/claude/openai_claude_response_test.go @@ -0,0 +1,86 @@ +package claude + +import ( + "context" + "strings" + "testing" + + "github.com/tidwall/gjson" +) + +func TestConvertOpenAIResponseToClaude_StreamToolStartEmittedOnceAndNameCanonicalized(t *testing.T) { + originalRequest := `{ + "stream": true, + "tools": [ + { + "name": "Bash", + "description": "run shell", + "input_schema": {"type":"object","properties":{"command":{"type":"string"}}} + } + ] + }` + + chunks := []string{ + `data: {"id":"chatcmpl-1","model":"m","created":1,"choices":[{"index":0,"delta":{"tool_calls":[{"index":0,"id":"call_1","function":{"name":"bash","arguments":""}}]}}]}`, + `data: {"id":"chatcmpl-1","model":"m","created":1,"choices":[{"index":0,"delta":{"tool_calls":[{"index":0,"function":{"name":"","arguments":"{\"command\":\"pwd\"}"}}]}}]}`, + `data: {"id":"chatcmpl-1","model":"m","created":1,"choices":[{"index":0,"delta":{},"finish_reason":"tool_calls"}]}`, + `data: {"id":"chatcmpl-1","model":"m","created":1,"choices":[{"index":0,"delta":{},"finish_reason":"tool_calls"}],"usage":{"prompt_tokens":10,"completion_tokens":2}}`, + `data: [DONE]`, + } + + var param any + var outputs []string + for _, chunk := range chunks { + out := ConvertOpenAIResponseToClaude(context.Background(), "m", []byte(originalRequest), nil, []byte(chunk), ¶m) + outputs = append(outputs, out...) + } + + joined := strings.Join(outputs, "") + if got := strings.Count(joined, `"content_block":{"type":"tool_use"`); got != 1 { + t.Fatalf("expected exactly 1 tool_use content_block_start, got %d\noutput:\n%s", got, joined) + } + + if strings.Contains(joined, `"name":""`) { + t.Fatalf("tool_use block should not have empty name\noutput:\n%s", joined) + } + + if !strings.Contains(joined, `"name":"Bash"`) { + t.Fatalf("expected canonical tool name Bash in stream output\noutput:\n%s", joined) + } +} + +func TestConvertOpenAIResponseToClaudeNonStream_CanonicalizesToolName(t *testing.T) { + originalRequest := `{ + "tools": [ + {"name": "Bash", "input_schema": {"type":"object","properties":{"command":{"type":"string"}}}} + ] + }` + + openAIResponse := `{ + "id":"chatcmpl-1", + "model":"m", + "choices":[ + { + "finish_reason":"tool_calls", + "message":{ + "content":"", + "tool_calls":[ + {"id":"call_1","type":"function","function":{"name":"bash","arguments":"{\"command\":\"pwd\"}"}} + ] + } + } + ], + "usage":{"prompt_tokens":10,"completion_tokens":2} + }` + + var param any + out := ConvertOpenAIResponseToClaudeNonStream(context.Background(), "m", []byte(originalRequest), nil, []byte(openAIResponse), ¶m) + result := gjson.Parse(out) + + if got := result.Get("content.0.type").String(); got != "tool_use" { + t.Fatalf("expected first content block type tool_use, got %q", got) + } + if got := result.Get("content.0.name").String(); got != "Bash" { + t.Fatalf("expected canonical tool name %q, got %q", "Bash", got) + } +} From d484054e66edf3598fce644d4f45de74c135663e Mon Sep 17 00:00:00 2001 From: Koosha Paridehpour Date: Fri, 27 Feb 2026 03:42:08 -0700 Subject: [PATCH 06/12] feat(translator): pass through OpenAI web search annotations to all formats Cherry-pick of upstream PR #1539. Adds url_citation/annotation passthrough from OpenAI web search to Gemini and Claude response formats. Co-Authored-By: Claude Opus 4.6 --- .../openai/claude/openai_claude_response.go | 52 ++- .../openai/gemini/openai_gemini_response.go | 33 ++ .../openai_openai-responses_request.go | 4 +- .../openai_openai-responses_response.go | 34 ++ test/builtin_tools_translation_test.go | 12 +- test/openai_websearch_translation_test.go | 313 ++++++++++++++++++ 6 files changed, 440 insertions(+), 8 deletions(-) create mode 100644 test/openai_websearch_translation_test.go diff --git a/internal/translator/openai/claude/openai_claude_response.go b/internal/translator/openai/claude/openai_claude_response.go index 5f5f5abce2..ba40206b00 100644 --- a/internal/translator/openai/claude/openai_claude_response.go +++ b/internal/translator/openai/claude/openai_claude_response.go @@ -45,6 +45,8 @@ type ConvertOpenAIResponseToAnthropicParams struct { MessageStarted bool // Track if message_stop has been sent MessageStopSent bool + // Accumulated annotations (url_citation) from OpenAI web search + AnnotationsRaw []string // Tool call content block index mapping ToolCallBlockIndexes map[int]int // Index assigned to text content block @@ -248,6 +250,15 @@ func convertOpenAIStreamingChunkToAnthropic(rawJSON []byte, param *ConvertOpenAI param.ContentAccumulator.WriteString(content.String()) } + // Handle annotations (url_citation from web search) + if annotations := delta.Get("annotations"); annotations.Exists() && annotations.IsArray() { + annotations.ForEach(func(_, ann gjson.Result) bool { + compacted := strings.ReplaceAll(strings.ReplaceAll(ann.Raw, "\n", ""), "\r", "") + param.AnnotationsRaw = append(param.AnnotationsRaw, compacted) + return true + }) + } + // Handle tool calls if toolCalls := delta.Get("tool_calls"); toolCalls.Exists() && toolCalls.IsArray() { if param.ToolCallsAccumulator == nil { @@ -388,9 +399,24 @@ func convertOpenAIStreamingChunkToAnthropic(rawJSON []byte, param *ConvertOpenAI usage := root.Get("usage") var inputTokens, outputTokens int64 if usage.Exists() && usage.Type != gjson.Null { - // Check if usage has actual token counts - promptTokens := usage.Get("prompt_tokens") - completionTokens := usage.Get("completion_tokens") + inputTokens, outputTokens, cachedTokens = extractOpenAIUsage(usage) + // Send message_delta with usage + messageDeltaJSON := `{"type":"message_delta","delta":{"stop_reason":"","stop_sequence":null},"usage":{"input_tokens":0,"output_tokens":0}}` + messageDeltaJSON, _ = sjson.Set(messageDeltaJSON, "delta.stop_reason", mapOpenAIFinishReasonToAnthropic(param.FinishReason)) + messageDeltaJSON, _ = sjson.Set(messageDeltaJSON, "usage.input_tokens", inputTokens) + messageDeltaJSON, _ = sjson.Set(messageDeltaJSON, "usage.output_tokens", outputTokens) + if cachedTokens > 0 { + messageDeltaJSON, _ = sjson.Set(messageDeltaJSON, "usage.cache_read_input_tokens", cachedTokens) + } + // Attach accumulated annotations as citations + if len(param.AnnotationsRaw) > 0 { + messageDeltaJSON, _ = sjson.SetRaw(messageDeltaJSON, "citations", "[]") + for _, raw := range param.AnnotationsRaw { + messageDeltaJSON, _ = sjson.SetRaw(messageDeltaJSON, "citations.-1", raw) + } + } + results = append(results, "event: message_delta\ndata: "+messageDeltaJSON+"\n\n") + param.MessageDeltaSent = true if promptTokens.Exists() && completionTokens.Exists() { inputTokens = promptTokens.Int() @@ -593,6 +619,16 @@ func convertOpenAINonStreamingToAnthropic(rawJSON []byte, canonicalToolNameByLow if finishReason := choice.Get("finish_reason"); finishReason.Exists() { response["stop_reason"] = mapOpenAIFinishReasonToAnthropic(finishReason.String()) } + + // Handle annotations (url_citation from web search) + if annotations := choice.Get("message.annotations"); annotations.Exists() && annotations.IsArray() && len(annotations.Array()) > 0 { + out, _ = sjson.SetRaw(out, "citations", "[]") + annotations.ForEach(func(_, ann gjson.Result) bool { + compacted := strings.ReplaceAll(strings.ReplaceAll(ann.Raw, "\n", ""), "\r", "") + out, _ = sjson.SetRaw(out, "citations.-1", compacted) + return true + }) + } } response["content"] = contentBlocks @@ -903,6 +939,16 @@ func ConvertOpenAIResponseToClaudeNonStream(_ context.Context, _ string, origina return true }) } + + // Handle annotations (url_citation from web search) + if annotations := message.Get("annotations"); annotations.Exists() && annotations.IsArray() && len(annotations.Array()) > 0 { + out, _ = sjson.SetRaw(out, "citations", "[]") + annotations.ForEach(func(_, ann gjson.Result) bool { + compacted := strings.ReplaceAll(strings.ReplaceAll(ann.Raw, "\n", ""), "\r", "") + out, _ = sjson.SetRaw(out, "citations.-1", compacted) + return true + }) + } } } diff --git a/internal/translator/openai/gemini/openai_gemini_response.go b/internal/translator/openai/gemini/openai_gemini_response.go index 040f805ce8..7196eda96b 100644 --- a/internal/translator/openai/gemini/openai_gemini_response.go +++ b/internal/translator/openai/gemini/openai_gemini_response.go @@ -24,6 +24,8 @@ type ConvertOpenAIResponseToGeminiParams struct { ContentAccumulator strings.Builder // Track if this is the first chunk IsFirstChunk bool + // Accumulated annotations (url_citation) from OpenAI web search + AnnotationsRaw []string } // ToolCallAccumulator holds the state for accumulating tool call data @@ -146,6 +148,15 @@ func ConvertOpenAIResponseToGemini(_ context.Context, _ string, originalRequestR chunkOutputs = append(chunkOutputs, contentTemplate) } + // Handle annotations (url_citation from web search) + if annotations := delta.Get("annotations"); annotations.Exists() && annotations.IsArray() { + annotations.ForEach(func(_, ann gjson.Result) bool { + compacted := strings.ReplaceAll(strings.ReplaceAll(ann.Raw, "\n", ""), "\r", "") + (*param).(*ConvertOpenAIResponseToGeminiParams).AnnotationsRaw = append((*param).(*ConvertOpenAIResponseToGeminiParams).AnnotationsRaw, compacted) + return true + }) + } + if len(chunkOutputs) > 0 { results = append(results, chunkOutputs...) return true @@ -224,6 +235,16 @@ func ConvertOpenAIResponseToGemini(_ context.Context, _ string, originalRequestR (*param).(*ConvertOpenAIResponseToGeminiParams).ToolCallsAccumulator = make(map[int]*ToolCallAccumulator) } + // Attach accumulated annotations as groundingMetadata + if len((*param).(*ConvertOpenAIResponseToGeminiParams).AnnotationsRaw) > 0 { + gm := `{}` + gm, _ = sjson.SetRaw(gm, "citations", "[]") + for _, raw := range (*param).(*ConvertOpenAIResponseToGeminiParams).AnnotationsRaw { + gm, _ = sjson.SetRaw(gm, "citations.-1", raw) + } + template, _ = sjson.SetRaw(template, "candidates.0.groundingMetadata", gm) + } + results = append(results, template) return true } @@ -600,6 +621,18 @@ func ConvertOpenAIResponseToGeminiNonStream(_ context.Context, _ string, origina out, _ = sjson.Set(out, "candidates.0.finishReason", geminiFinishReason) } + // Handle annotations as groundingMetadata + if annotations := message.Get("annotations"); annotations.Exists() && annotations.IsArray() && len(annotations.Array()) > 0 { + gm := `{}` + gm, _ = sjson.SetRaw(gm, "citations", "[]") + annotations.ForEach(func(_, ann gjson.Result) bool { + compacted := strings.ReplaceAll(strings.ReplaceAll(ann.Raw, "\n", ""), "\r", "") + gm, _ = sjson.SetRaw(gm, "citations.-1", compacted) + return true + }) + out, _ = sjson.SetRaw(out, "candidates.0.groundingMetadata", gm) + } + // Set index out, _ = sjson.Set(out, "candidates.0.index", choiceIdx) diff --git a/internal/translator/openai/openai/responses/openai_openai-responses_request.go b/internal/translator/openai/openai/responses/openai_openai-responses_request.go index 9a64798bd7..eb70c4d63b 100644 --- a/internal/translator/openai/openai/responses/openai_openai-responses_request.go +++ b/internal/translator/openai/openai/responses/openai_openai-responses_request.go @@ -165,8 +165,8 @@ func ConvertOpenAIResponsesRequestToOpenAIChatCompletions(modelName string, inpu // Only function tools need structural conversion because Chat Completions nests details under "function". toolType := tool.Get("type").String() if toolType != "" && toolType != "function" && tool.IsObject() { - // Almost all providers lack built-in tools, so we just ignore them. - // chatCompletionsTools = append(chatCompletionsTools, tool.Value()) + // Pass through built-in tools (web_search, etc.) as-is + chatCompletionsTools = append(chatCompletionsTools, tool.Value()) return true } diff --git a/internal/translator/openai/openai/responses/openai_openai-responses_response.go b/internal/translator/openai/openai/responses/openai_openai-responses_response.go index 151528526c..f13d57fa5f 100644 --- a/internal/translator/openai/openai/responses/openai_openai-responses_response.go +++ b/internal/translator/openai/openai/responses/openai_openai-responses_response.go @@ -38,6 +38,8 @@ type oaiToResponsesState struct { // function item done state FuncArgsDone map[int]bool FuncItemDone map[int]bool + // Accumulated annotations (url_citation) from OpenAI web search + AnnotationsRaw []string // usage aggregation PromptTokens int64 CachedTokens int64 @@ -232,6 +234,15 @@ func ConvertOpenAIChatCompletionsResponseToOpenAIResponses(ctx context.Context, st.MsgTextBuf[idx].WriteString(c.String()) } + // Handle annotations (url_citation from web search) + if annotations := delta.Get("annotations"); annotations.Exists() && annotations.IsArray() { + annotations.ForEach(func(_, ann gjson.Result) bool { + compacted := strings.ReplaceAll(strings.ReplaceAll(ann.Raw, "\n", ""), "\r", "") + st.AnnotationsRaw = append(st.AnnotationsRaw, compacted) + return true + }) + } + // reasoning_content (OpenAI reasoning incremental text) if rc := delta.Get("reasoning_content"); rc.Exists() && rc.String() != "" { // On first appearance, add reasoning item and part @@ -386,6 +397,11 @@ func ConvertOpenAIChatCompletionsResponseToOpenAIResponses(ctx context.Context, partDone, _ = sjson.Set(partDone, "output_index", i) partDone, _ = sjson.Set(partDone, "content_index", 0) partDone, _ = sjson.Set(partDone, "part.text", fullText) + if len(st.AnnotationsRaw) > 0 { + for _, raw := range st.AnnotationsRaw { + partDone, _ = sjson.SetRaw(partDone, "part.annotations.-1", raw) + } + } out = append(out, emitRespEvent("response.content_part.done", partDone)) itemDone := `{"type":"response.output_item.done","sequence_number":0,"output_index":0,"item":{"id":"","type":"message","status":"completed","content":[{"type":"output_text","annotations":[],"logprobs":[],"text":""}],"role":"assistant"}}` @@ -393,6 +409,11 @@ func ConvertOpenAIChatCompletionsResponseToOpenAIResponses(ctx context.Context, itemDone, _ = sjson.Set(itemDone, "output_index", i) itemDone, _ = sjson.Set(itemDone, "item.id", fmt.Sprintf("msg_%s_%d", st.ResponseID, i)) itemDone, _ = sjson.Set(itemDone, "item.content.0.text", fullText) + if len(st.AnnotationsRaw) > 0 { + for _, raw := range st.AnnotationsRaw { + itemDone, _ = sjson.SetRaw(itemDone, "item.content.0.annotations.-1", raw) + } + } out = append(out, emitRespEvent("response.output_item.done", itemDone)) st.MsgItemDone[i] = true } @@ -544,6 +565,11 @@ func ConvertOpenAIChatCompletionsResponseToOpenAIResponses(ctx context.Context, item := `{"id":"","type":"message","status":"completed","content":[{"type":"output_text","annotations":[],"logprobs":[],"text":""}],"role":"assistant"}` item, _ = sjson.Set(item, "id", fmt.Sprintf("msg_%s_%d", st.ResponseID, i)) item, _ = sjson.Set(item, "content.0.text", txt) + if len(st.AnnotationsRaw) > 0 { + for _, raw := range st.AnnotationsRaw { + item, _ = sjson.SetRaw(item, "content.0.annotations.-1", raw) + } + } outputsWrapper, _ = sjson.SetRaw(outputsWrapper, "arr.-1", item) } } @@ -730,6 +756,14 @@ func ConvertOpenAIChatCompletionsResponseToOpenAIResponsesNonStream(_ context.Co item := `{"id":"","type":"message","status":"completed","content":[{"type":"output_text","annotations":[],"logprobs":[],"text":""}],"role":"assistant"}` item, _ = sjson.Set(item, "id", fmt.Sprintf("msg_%s_%d", id, int(choice.Get("index").Int()))) item, _ = sjson.Set(item, "content.0.text", c.String()) + // Populate annotations from message if present + if annotations := msg.Get("annotations"); annotations.Exists() && annotations.IsArray() && len(annotations.Array()) > 0 { + annotations.ForEach(func(_, ann gjson.Result) bool { + compacted := strings.ReplaceAll(strings.ReplaceAll(ann.Raw, "\n", ""), "\r", "") + item, _ = sjson.SetRaw(item, "content.0.annotations.-1", compacted) + return true + }) + } outputsWrapper, _ = sjson.SetRaw(outputsWrapper, "arr.-1", item) } diff --git a/test/builtin_tools_translation_test.go b/test/builtin_tools_translation_test.go index d7ac09f6d0..a7b10d926b 100644 --- a/test/builtin_tools_translation_test.go +++ b/test/builtin_tools_translation_test.go @@ -33,7 +33,7 @@ func TestOpenAIToCodex_PreservesBuiltinTools(t *testing.T) { } } -func TestOpenAIResponsesToOpenAI_IgnoresBuiltinTools(t *testing.T) { +func TestOpenAIResponsesToOpenAI_PassesBuiltinTools(t *testing.T) { in := []byte(`{ "model":"gpt-5", "input":[{"role":"user","content":[{"type":"input_text","text":"hi"}]}], @@ -42,7 +42,13 @@ func TestOpenAIResponsesToOpenAI_IgnoresBuiltinTools(t *testing.T) { out := sdktranslator.TranslateRequest(sdktranslator.FormatOpenAIResponse, sdktranslator.FormatOpenAI, "gpt-5", in, false) - if got := gjson.GetBytes(out, "tools.#").Int(); got != 0 { - t.Fatalf("expected 0 tools (builtin tools not supported in Chat Completions), got %d: %s", got, string(out)) + if got := gjson.GetBytes(out, "tools.#").Int(); got != 1 { + t.Fatalf("expected 1 tool (builtin tools passed through to Chat Completions), got %d: %s", got, string(out)) + } + if got := gjson.GetBytes(out, "tools.0.type").String(); got != "web_search" { + t.Fatalf("expected tools[0].type=web_search, got %q", got) + } + if got := gjson.GetBytes(out, "tools.0.search_context_size").String(); got != "low" { + t.Fatalf("expected tools[0].search_context_size=low, got %q", got) } } diff --git a/test/openai_websearch_translation_test.go b/test/openai_websearch_translation_test.go new file mode 100644 index 0000000000..a0eef5c494 --- /dev/null +++ b/test/openai_websearch_translation_test.go @@ -0,0 +1,313 @@ +package test + +import ( + "context" + "testing" + + _ "github.com/kooshapari/cliproxyapi-plusplus/v6/internal/translator" + + sdktranslator "github.com/kooshapari/cliproxyapi-plusplus/v6/sdk/translator" + "github.com/tidwall/gjson" +) + +// --- Request translation tests --- + +func TestResponsesToOpenAI_PassesBuiltinWebSearchTool(t *testing.T) { + in := []byte(`{ + "model":"gpt-4o", + "input":[{"role":"user","content":[{"type":"input_text","text":"search the web"}]}], + "tools":[ + {"type":"web_search_preview"}, + {"type":"function","name":"calc","description":"Calculate","parameters":{"type":"object","properties":{}}} + ] + }`) + + out := sdktranslator.TranslateRequest(sdktranslator.FormatOpenAIResponse, sdktranslator.FormatOpenAI, "gpt-4o", in, false) + + toolCount := gjson.GetBytes(out, "tools.#").Int() + if toolCount != 2 { + t.Fatalf("expected 2 tools, got %d: %s", toolCount, string(out)) + } + + // First tool should be passed through as-is + tool0Type := gjson.GetBytes(out, "tools.0.type").String() + if tool0Type != "web_search_preview" { + t.Fatalf("expected tools[0].type=web_search_preview, got %q", tool0Type) + } + + // Second should be converted to function format + tool1Type := gjson.GetBytes(out, "tools.1.type").String() + if tool1Type != "function" { + t.Fatalf("expected tools[1].type=function, got %q", tool1Type) + } +} + +// --- OpenAI→Claude response tests --- + +func TestOpenAIToClaude_StreamAnnotationsAsCitations(t *testing.T) { + ctx := context.Background() + model := "gpt-4o" + reqJSON := []byte(`{"stream":true}`) + var param any + + // First chunk with role + sse1 := []byte(`data: {"id":"chatcmpl-test","object":"chat.completion.chunk","created":1700000000,"model":"gpt-4o","choices":[{"index":0,"delta":{"role":"assistant","content":""},"finish_reason":null}]}`) + sdktranslator.TranslateStream(ctx, sdktranslator.FormatOpenAI, sdktranslator.FormatClaude, model, reqJSON, reqJSON, sse1, ¶m) + + // Content chunk + sse2 := []byte(`data: {"id":"chatcmpl-test","object":"chat.completion.chunk","created":1700000000,"model":"gpt-4o","choices":[{"index":0,"delta":{"content":"The answer is here."},"finish_reason":null}]}`) + sdktranslator.TranslateStream(ctx, sdktranslator.FormatOpenAI, sdktranslator.FormatClaude, model, reqJSON, reqJSON, sse2, ¶m) + + // First annotation chunk + sse3 := []byte(`data: {"id":"chatcmpl-test","object":"chat.completion.chunk","created":1700000000,"model":"gpt-4o","choices":[{"index":0,"delta":{"annotations":[{"type":"url_citation","url":"https://example.com/1","title":"First","start_index":0,"end_index":10}]},"finish_reason":null}]}`) + sdktranslator.TranslateStream(ctx, sdktranslator.FormatOpenAI, sdktranslator.FormatClaude, model, reqJSON, reqJSON, sse3, ¶m) + + // Second annotation chunk (tests multi-chunk accumulation) + sse3b := []byte(`data: {"id":"chatcmpl-test","object":"chat.completion.chunk","created":1700000000,"model":"gpt-4o","choices":[{"index":0,"delta":{"annotations":[{"type":"url_citation","url":"https://example.com/2","title":"Second","start_index":11,"end_index":19}]},"finish_reason":null}]}`) + sdktranslator.TranslateStream(ctx, sdktranslator.FormatOpenAI, sdktranslator.FormatClaude, model, reqJSON, reqJSON, sse3b, ¶m) + + // Finish + usage chunk + sse4 := []byte(`data: {"id":"chatcmpl-test","object":"chat.completion.chunk","created":1700000000,"model":"gpt-4o","choices":[{"index":0,"delta":{},"finish_reason":"stop"}],"usage":{"prompt_tokens":50,"completion_tokens":20,"total_tokens":70}}`) + results := sdktranslator.TranslateStream(ctx, sdktranslator.FormatOpenAI, sdktranslator.FormatClaude, model, reqJSON, reqJSON, sse4, ¶m) + + var messageDelta string + for _, r := range results { + if gjson.Get(r, "type").String() == "message_delta" { + messageDelta = r + break + } + } + if messageDelta == "" { + t.Fatalf("expected message_delta event, got: %v", results) + } + + citCount := gjson.Get(messageDelta, "citations.#").Int() + if citCount != 2 { + t.Fatalf("expected 2 citations on message_delta, got %d: %s", citCount, messageDelta) + } + if url := gjson.Get(messageDelta, "citations.0.url").String(); url != "https://example.com/1" { + t.Fatalf("expected citations[0].url=https://example.com/1, got %q", url) + } + if url := gjson.Get(messageDelta, "citations.1.url").String(); url != "https://example.com/2" { + t.Fatalf("expected citations[1].url=https://example.com/2, got %q", url) + } +} + +func TestOpenAIToClaude_NonStreamAnnotationsAsCitations(t *testing.T) { + ctx := context.Background() + model := "gpt-4o" + reqJSON := []byte(`{}`) + + // Non-streaming response with annotations + rawJSON := []byte(`{ + "id":"chatcmpl-ns","object":"chat.completion","created":1700000000,"model":"gpt-4o", + "choices":[{ + "index":0,"message":{ + "role":"assistant","content":"The answer is here.", + "annotations":[ + {"type":"url_citation","url":"https://example.com/1","title":"First","start_index":0,"end_index":10}, + {"type":"url_citation","url":"https://example.com/2","title":"Second","start_index":11,"end_index":19} + ] + },"finish_reason":"stop" + }], + "usage":{"prompt_tokens":50,"completion_tokens":20,"total_tokens":70} + }`) + + var param any + out := sdktranslator.TranslateNonStream(ctx, sdktranslator.FormatOpenAI, sdktranslator.FormatClaude, model, reqJSON, reqJSON, rawJSON, ¶m) + + // Verify citations on response + citCount := gjson.Get(out, "citations.#").Int() + if citCount != 2 { + t.Fatalf("expected 2 citations, got %d: %s", citCount, out) + } + if url := gjson.Get(out, "citations.0.url").String(); url != "https://example.com/1" { + t.Fatalf("expected citations[0].url=https://example.com/1, got %q", url) + } + if url := gjson.Get(out, "citations.1.url").String(); url != "https://example.com/2" { + t.Fatalf("expected citations[1].url=https://example.com/2, got %q", url) + } + + // Verify text content still present + textContent := gjson.Get(out, "content.0.text").String() + if textContent != "The answer is here." { + t.Fatalf("expected text content, got %q", textContent) + } +} + +// --- OpenAI→Gemini response tests --- + +func TestOpenAIToGemini_StreamAnnotationsAsGroundingMetadata(t *testing.T) { + ctx := context.Background() + model := "gpt-4o" + reqJSON := []byte(`{}`) + var param any + + // First chunk with role + sse1 := []byte(`data: {"id":"chatcmpl-gem","object":"chat.completion.chunk","created":1700000000,"model":"gpt-4o","choices":[{"index":0,"delta":{"role":"assistant","content":""},"finish_reason":null}]}`) + sdktranslator.TranslateStream(ctx, sdktranslator.FormatOpenAI, sdktranslator.FormatGemini, model, reqJSON, reqJSON, sse1, ¶m) + + // Content chunk + sse2 := []byte(`data: {"id":"chatcmpl-gem","object":"chat.completion.chunk","created":1700000000,"model":"gpt-4o","choices":[{"index":0,"delta":{"content":"Gemini answer."},"finish_reason":null}]}`) + sdktranslator.TranslateStream(ctx, sdktranslator.FormatOpenAI, sdktranslator.FormatGemini, model, reqJSON, reqJSON, sse2, ¶m) + + // Annotation chunk + sse3 := []byte(`data: {"id":"chatcmpl-gem","object":"chat.completion.chunk","created":1700000000,"model":"gpt-4o","choices":[{"index":0,"delta":{"annotations":[{"type":"url_citation","url":"https://gemini.test","title":"Gemini Source","start_index":0,"end_index":14}]},"finish_reason":null}]}`) + sdktranslator.TranslateStream(ctx, sdktranslator.FormatOpenAI, sdktranslator.FormatGemini, model, reqJSON, reqJSON, sse3, ¶m) + + // Finish reason chunk + sse4 := []byte(`data: {"id":"chatcmpl-gem","object":"chat.completion.chunk","created":1700000000,"model":"gpt-4o","choices":[{"index":0,"delta":{},"finish_reason":"stop"}]}`) + results := sdktranslator.TranslateStream(ctx, sdktranslator.FormatOpenAI, sdktranslator.FormatGemini, model, reqJSON, reqJSON, sse4, ¶m) + + // Find chunk with groundingMetadata + var finalChunk string + for _, r := range results { + if gjson.Get(r, "candidates.0.groundingMetadata").Exists() { + finalChunk = r + break + } + } + if finalChunk == "" { + t.Fatalf("expected groundingMetadata on finish chunk, got: %v", results) + } + + citCount := gjson.Get(finalChunk, "candidates.0.groundingMetadata.citations.#").Int() + if citCount != 1 { + t.Fatalf("expected 1 citation in groundingMetadata, got %d: %s", citCount, finalChunk) + } + citURL := gjson.Get(finalChunk, "candidates.0.groundingMetadata.citations.0.url").String() + if citURL != "https://gemini.test" { + t.Fatalf("expected citations[0].url=https://gemini.test, got %q", citURL) + } +} + +func TestOpenAIToGemini_NonStreamAnnotationsAsGroundingMetadata(t *testing.T) { + ctx := context.Background() + model := "gpt-4o" + reqJSON := []byte(`{}`) + + rawJSON := []byte(`{ + "id":"chatcmpl-gns","object":"chat.completion","created":1700000000,"model":"gpt-4o", + "choices":[{ + "index":0,"message":{ + "role":"assistant","content":"Gemini non-stream.", + "annotations":[ + {"type":"url_citation","url":"https://gemini-ns.test","title":"GNS Source","start_index":0,"end_index":18} + ] + },"finish_reason":"stop" + }], + "usage":{"prompt_tokens":40,"completion_tokens":15,"total_tokens":55} + }`) + + var param any + out := sdktranslator.TranslateNonStream(ctx, sdktranslator.FormatOpenAI, sdktranslator.FormatGemini, model, reqJSON, reqJSON, rawJSON, ¶m) + + if !gjson.Get(out, "candidates.0.groundingMetadata").Exists() { + t.Fatalf("expected groundingMetadata, got: %s", out) + } + + citCount := gjson.Get(out, "candidates.0.groundingMetadata.citations.#").Int() + if citCount != 1 { + t.Fatalf("expected 1 citation, got %d: %s", citCount, out) + } + citURL := gjson.Get(out, "candidates.0.groundingMetadata.citations.0.url").String() + if citURL != "https://gemini-ns.test" { + t.Fatalf("expected url=https://gemini-ns.test, got %q", citURL) + } +} + +// --- OpenAI CC→Responses tests --- + +func TestOpenAIToResponses_StreamAnnotationsPopulated(t *testing.T) { + ctx := context.Background() + model := "gpt-4o" + reqJSON := []byte(`{}`) + var param any + + // First chunk + sse1 := []byte(`data: {"id":"chatcmpl-resp","object":"chat.completion.chunk","created":1700000000,"model":"gpt-4o","choices":[{"index":0,"delta":{"role":"assistant","content":""},"finish_reason":null}]}`) + sdktranslator.TranslateStream(ctx, sdktranslator.FormatOpenAI, sdktranslator.FormatOpenAIResponse, model, reqJSON, reqJSON, sse1, ¶m) + + // Content + sse2 := []byte(`data: {"id":"chatcmpl-resp","object":"chat.completion.chunk","created":1700000000,"model":"gpt-4o","choices":[{"index":0,"delta":{"content":"Responses answer."},"finish_reason":null}]}`) + sdktranslator.TranslateStream(ctx, sdktranslator.FormatOpenAI, sdktranslator.FormatOpenAIResponse, model, reqJSON, reqJSON, sse2, ¶m) + + // Annotation + sse3 := []byte(`data: {"id":"chatcmpl-resp","object":"chat.completion.chunk","created":1700000000,"model":"gpt-4o","choices":[{"index":0,"delta":{"annotations":[{"type":"url_citation","url":"https://resp.test","title":"Resp Source","start_index":0,"end_index":17}]},"finish_reason":null}]}`) + sdktranslator.TranslateStream(ctx, sdktranslator.FormatOpenAI, sdktranslator.FormatOpenAIResponse, model, reqJSON, reqJSON, sse3, ¶m) + + // Finish + usage + sse4 := []byte(`data: {"id":"chatcmpl-resp","object":"chat.completion.chunk","created":1700000000,"model":"gpt-4o","choices":[{"index":0,"delta":{},"finish_reason":"stop"}],"usage":{"prompt_tokens":50,"completion_tokens":20,"total_tokens":70}}`) + results := sdktranslator.TranslateStream(ctx, sdktranslator.FormatOpenAI, sdktranslator.FormatOpenAIResponse, model, reqJSON, reqJSON, sse4, ¶m) + + // Verify content_part.done has annotations + var partDone string + for _, r := range results { + if gjson.Get(r, "type").String() == "response.content_part.done" { + partDone = r + break + } + } + if partDone == "" { + t.Fatalf("expected response.content_part.done, got: %v", results) + } + if cnt := gjson.Get(partDone, "part.annotations.#").Int(); cnt != 1 { + t.Fatalf("expected 1 annotation on content_part.done, got %d: %s", cnt, partDone) + } + if url := gjson.Get(partDone, "part.annotations.0.url").String(); url != "https://resp.test" { + t.Fatalf("expected part.annotations[0].url=https://resp.test, got %q", url) + } + + // Verify output_item.done has annotations + var itemDone string + for _, r := range results { + if gjson.Get(r, "type").String() == "response.output_item.done" { + if gjson.Get(r, "item.type").String() == "message" { + itemDone = r + break + } + } + } + if itemDone == "" { + t.Fatalf("expected response.output_item.done with message, got: %v", results) + } + annCount := gjson.Get(itemDone, "item.content.0.annotations.#").Int() + if annCount != 1 { + t.Fatalf("expected 1 annotation on output_item.done, got %d: %s", annCount, itemDone) + } + if url := gjson.Get(itemDone, "item.content.0.annotations.0.url").String(); url != "https://resp.test" { + t.Fatalf("expected annotations[0].url=https://resp.test, got %q", url) + } +} + +func TestOpenAIToResponses_NonStreamAnnotationsPopulated(t *testing.T) { + ctx := context.Background() + model := "gpt-4o" + reqJSON := []byte(`{}`) + + rawJSON := []byte(`{ + "id":"chatcmpl-rns","object":"chat.completion","created":1700000000,"model":"gpt-4o", + "choices":[{ + "index":0,"message":{ + "role":"assistant","content":"Non-stream response.", + "annotations":[ + {"type":"url_citation","url":"https://rns.test","title":"RNS Source","start_index":0,"end_index":20} + ] + },"finish_reason":"stop" + }], + "usage":{"prompt_tokens":40,"completion_tokens":15,"total_tokens":55} + }`) + + var param any + out := sdktranslator.TranslateNonStream(ctx, sdktranslator.FormatOpenAI, sdktranslator.FormatOpenAIResponse, model, reqJSON, reqJSON, rawJSON, ¶m) + + // Find message output item + annCount := gjson.Get(out, "output.#(type==\"message\").content.0.annotations.#").Int() + if annCount != 1 { + t.Fatalf("expected 1 annotation on message output, got %d: %s", annCount, out) + } + annURL := gjson.Get(out, "output.#(type==\"message\").content.0.annotations.0.url").String() + if annURL != "https://rns.test" { + t.Fatalf("expected annotations[0].url=https://rns.test, got %q", annURL) + } +} From be41f45771e6f24d4b327deedc9002075d7e4aa8 Mon Sep 17 00:00:00 2001 From: Koosha Paridehpour Date: Fri, 27 Feb 2026 03:42:14 -0700 Subject: [PATCH 07/12] feat: add sticky-round-robin routing strategy Cherry-pick of upstream PR #1673. Adds StickyRoundRobinSelector that routes requests with the same X-Session-Key to consistent auth credentials. Co-Authored-By: Claude Opus 4.6 --- .../api/handlers/management/config_basic.go | 2 + internal/config/config.go | 2 +- sdk/api/handlers/handlers.go | 8 ++ sdk/cliproxy/auth/selector.go | 81 +++++++++++++++++++ sdk/cliproxy/builder.go | 2 + sdk/cliproxy/service.go | 4 + 6 files changed, 98 insertions(+), 1 deletion(-) diff --git a/internal/api/handlers/management/config_basic.go b/internal/api/handlers/management/config_basic.go index b71332c230..fa2648133e 100644 --- a/internal/api/handlers/management/config_basic.go +++ b/internal/api/handlers/management/config_basic.go @@ -286,6 +286,8 @@ func normalizeRoutingStrategy(strategy string) (string, bool) { return "round-robin", true case "fill-first", "fillfirst", "ff": return "fill-first", true + case "sticky-round-robin", "stickyroundrobin", "srr": + return "sticky-round-robin", true default: return "", false } diff --git a/internal/config/config.go b/internal/config/config.go index ab1bd835aa..308632c8b8 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -208,7 +208,7 @@ type QuotaExceeded struct { // RoutingConfig configures how credentials are selected for requests. type RoutingConfig struct { // Strategy selects the credential selection strategy. - // Supported values: "round-robin" (default), "fill-first". + // Supported values: "round-robin" (default), "fill-first", "sticky-round-robin". Strategy string `yaml:"strategy,omitempty" json:"strategy,omitempty"` } diff --git a/sdk/api/handlers/handlers.go b/sdk/api/handlers/handlers.go index bf9fdd4182..55287e2ad2 100644 --- a/sdk/api/handlers/handlers.go +++ b/sdk/api/handlers/handlers.go @@ -199,6 +199,14 @@ func requestExecutionMetadata(ctx context.Context) map[string]any { } meta := map[string]any{idempotencyKeyMetadataKey: key} + // Forward X-Session-Key for sticky routing. + if ctx != nil { + if ginCtx, ok := ctx.Value("gin").(*gin.Context); ok && ginCtx != nil && ginCtx.Request != nil { + if sessionKey := strings.TrimSpace(ginCtx.GetHeader("X-Session-Key")); sessionKey != "" { + meta["session_key"] = sessionKey + } + } + } if pinnedAuthID := pinnedAuthIDFromContext(ctx); pinnedAuthID != "" { meta[coreexecutor.PinnedAuthMetadataKey] = pinnedAuthID } diff --git a/sdk/cliproxy/auth/selector.go b/sdk/cliproxy/auth/selector.go index 54f63a08b4..4e0f100239 100644 --- a/sdk/cliproxy/auth/selector.go +++ b/sdk/cliproxy/auth/selector.go @@ -29,6 +29,17 @@ type RoundRobinSelector struct { // rolling-window subscription caps (e.g. chat message limits). type FillFirstSelector struct{} +// StickyRoundRobinSelector assigns a session to a consistent auth credential. +// When a request carries a session key (e.g. from an X-Session-Key header), +// subsequent requests with the same key are routed to the same auth. If no +// session key is present, it falls back to standard round-robin. +type StickyRoundRobinSelector struct { + mu sync.Mutex + sessions map[string]string // session key -> auth ID + cursors map[string]int // fallback round-robin cursors + maxKeys int +} + type blockReason int const ( @@ -362,6 +373,76 @@ func (s *FillFirstSelector) Pick(ctx context.Context, provider, model string, op return available[0], nil } +const sessionKeyMetadataKey = "session_key" + +// Pick routes requests with the same session key to the same auth credential. +// When no session key is present, it falls back to round-robin selection. +func (s *StickyRoundRobinSelector) Pick(ctx context.Context, provider, model string, opts cliproxyexecutor.Options, auths []*Auth) (*Auth, error) { + now := time.Now() + available, err := getAvailableAuths(auths, provider, model, now) + if err != nil { + return nil, err + } + available = preferCodexWebsocketAuths(ctx, provider, available) + + // Extract session key from metadata. + sessionKey := "" + if opts.Metadata != nil { + if raw, ok := opts.Metadata[sessionKeyMetadataKey].(string); ok { + sessionKey = strings.TrimSpace(raw) + } + } + + s.mu.Lock() + defer s.mu.Unlock() + + if s.sessions == nil { + s.sessions = make(map[string]string) + } + if s.cursors == nil { + s.cursors = make(map[string]int) + } + + // If we have a session key, try to find the sticky auth. + if sessionKey != "" { + if authID, ok := s.sessions[sessionKey]; ok { + for _, candidate := range available { + if candidate.ID == authID { + return candidate, nil + } + } + // Sticky auth is no longer available; fall through to pick a new one. + } + } + + // Round-robin fallback. + rrKey := provider + ":" + canonicalModelKey(model) + limit := s.maxKeys + if limit <= 0 { + limit = 8192 + } + if _, ok := s.cursors[rrKey]; !ok && len(s.cursors) >= limit { + s.cursors = make(map[string]int) + } + index := s.cursors[rrKey] + if index >= 2_147_483_640 { + index = 0 + } + s.cursors[rrKey] = index + 1 + selected := available[index%len(available)] + + // Record the sticky mapping. + if sessionKey != "" { + // Evict oldest entries if map is too large. + if len(s.sessions) >= limit { + s.sessions = make(map[string]string) + } + s.sessions[sessionKey] = selected.ID + } + + return selected, nil +} + func isAuthBlockedForModel(auth *Auth, model string, now time.Time) (bool, blockReason, time.Time) { if auth == nil { return true, blockReasonOther, time.Time{} diff --git a/sdk/cliproxy/builder.go b/sdk/cliproxy/builder.go index 5d5738134a..d173ffc725 100644 --- a/sdk/cliproxy/builder.go +++ b/sdk/cliproxy/builder.go @@ -215,6 +215,8 @@ func (b *Builder) Build() (*Service, error) { switch strategy { case "fill-first", "fillfirst", "ff": selector = &coreauth.FillFirstSelector{} + case "sticky-round-robin", "stickyroundrobin", "srr": + selector = &coreauth.StickyRoundRobinSelector{} default: selector = &coreauth.RoundRobinSelector{} } diff --git a/sdk/cliproxy/service.go b/sdk/cliproxy/service.go index 337d02147d..847a2fad68 100644 --- a/sdk/cliproxy/service.go +++ b/sdk/cliproxy/service.go @@ -598,6 +598,8 @@ func (s *Service) Run(ctx context.Context) error { switch strategy { case "fill-first", "fill_first", "fillfirst", "ff": return "fill-first" + case "sticky-round-robin", "stickyroundrobin", "srr": + return "sticky-round-robin" default: return "round-robin" } @@ -609,6 +611,8 @@ func (s *Service) Run(ctx context.Context) error { switch nextStrategy { case "fill-first": selector = &coreauth.FillFirstSelector{} + case "sticky-round-robin": + selector = &coreauth.StickyRoundRobinSelector{} default: selector = &coreauth.RoundRobinSelector{} } From d4f2379b94aa723959b8121b89114397d926ad2e Mon Sep 17 00:00:00 2001 From: Koosha Paridehpour Date: Fri, 27 Feb 2026 03:42:39 -0700 Subject: [PATCH 08/12] fix: fall back to fill-first when no X-Session-Key header is present Follow-up for sticky-round-robin (upstream PR #1673). Uses partial eviction (evict half) instead of full map reset for better stickiness. Co-Authored-By: Claude Opus 4.6 --- sdk/cliproxy/auth/selector.go | 46 +++++++++++++++++++++-------------- 1 file changed, 28 insertions(+), 18 deletions(-) diff --git a/sdk/cliproxy/auth/selector.go b/sdk/cliproxy/auth/selector.go index 4e0f100239..ecae87c3f8 100644 --- a/sdk/cliproxy/auth/selector.go +++ b/sdk/cliproxy/auth/selector.go @@ -376,7 +376,7 @@ func (s *FillFirstSelector) Pick(ctx context.Context, provider, model string, op const sessionKeyMetadataKey = "session_key" // Pick routes requests with the same session key to the same auth credential. -// When no session key is present, it falls back to round-robin selection. +// When no session key is present, it falls back to fill-first selection. func (s *StickyRoundRobinSelector) Pick(ctx context.Context, provider, model string, opts cliproxyexecutor.Options, auths []*Auth) (*Auth, error) { now := time.Now() available, err := getAvailableAuths(auths, provider, model, now) @@ -393,29 +393,32 @@ func (s *StickyRoundRobinSelector) Pick(ctx context.Context, provider, model str } } + // No session key — fall back to fill-first (always pick the first available). + if sessionKey == "" { + return available[0], nil + } + s.mu.Lock() defer s.mu.Unlock() if s.sessions == nil { s.sessions = make(map[string]string) } - if s.cursors == nil { - s.cursors = make(map[string]int) - } - // If we have a session key, try to find the sticky auth. - if sessionKey != "" { - if authID, ok := s.sessions[sessionKey]; ok { - for _, candidate := range available { - if candidate.ID == authID { - return candidate, nil - } + // Try to find the sticky auth for this session. + if authID, ok := s.sessions[sessionKey]; ok { + for _, candidate := range available { + if candidate.ID == authID { + return candidate, nil } - // Sticky auth is no longer available; fall through to pick a new one. } + // Sticky auth is no longer available; fall through to pick a new one. } - // Round-robin fallback. + // Round-robin to assign a new auth for this session. + if s.cursors == nil { + s.cursors = make(map[string]int) + } rrKey := provider + ":" + canonicalModelKey(model) limit := s.maxKeys if limit <= 0 { @@ -432,13 +435,20 @@ func (s *StickyRoundRobinSelector) Pick(ctx context.Context, provider, model str selected := available[index%len(available)] // Record the sticky mapping. - if sessionKey != "" { - // Evict oldest entries if map is too large. - if len(s.sessions) >= limit { - s.sessions = make(map[string]string) + // Evict half the entries when the map is full to avoid losing all + // stickiness at once while still bounding memory usage. + if len(s.sessions) >= limit { + evictCount := 0 + evictTarget := len(s.sessions) / 2 + for k := range s.sessions { + delete(s.sessions, k) + evictCount++ + if evictCount >= evictTarget { + break + } } - s.sessions[sessionKey] = selected.ID } + s.sessions[sessionKey] = selected.ID return selected, nil } From ef49105434a745d90e0c52adeb2f68b859c4a39b Mon Sep 17 00:00:00 2001 From: Koosha Paridehpour Date: Fri, 27 Feb 2026 03:44:17 -0700 Subject: [PATCH 09/12] fix(antigravity): keep primary model list and backfill empty auths Cherry-pick of upstream PR #1699. Caches successful model fetches and falls back to cached list when fetches fail, preventing empty model lists. Co-Authored-By: Claude Opus 4.6 --- .../runtime/executor/antigravity_executor.go | 88 +++++++++--- .../antigravity_executor_models_cache_test.go | 64 +++++++++ sdk/cliproxy/service.go | 53 +++++++ .../service_antigravity_backfill_test.go | 135 ++++++++++++++++++ 4 files changed, 320 insertions(+), 20 deletions(-) create mode 100644 internal/runtime/executor/antigravity_executor_models_cache_test.go create mode 100644 sdk/cliproxy/service_antigravity_backfill_test.go diff --git a/internal/runtime/executor/antigravity_executor.go b/internal/runtime/executor/antigravity_executor.go index 52e4c828da..d35fef2dab 100644 --- a/internal/runtime/executor/antigravity_executor.go +++ b/internal/runtime/executor/antigravity_executor.go @@ -54,8 +54,58 @@ const ( var ( randSource = rand.New(rand.NewSource(time.Now().UnixNano())) randSourceMutex sync.Mutex + // antigravityPrimaryModelsCache keeps the latest non-empty model list fetched + // from any antigravity auth. Empty fetches never overwrite this cache. + antigravityPrimaryModelsCache struct { + mu sync.RWMutex + models []*registry.ModelInfo + } ) +func cloneAntigravityModels(models []*registry.ModelInfo) []*registry.ModelInfo { + if len(models) == 0 { + return nil + } + out := make([]*registry.ModelInfo, 0, len(models)) + for _, model := range models { + if model == nil || strings.TrimSpace(model.ID) == "" { + continue + } + clone := *model + out = append(out, &clone) + } + if len(out) == 0 { + return nil + } + return out +} + +func storeAntigravityPrimaryModels(models []*registry.ModelInfo) bool { + cloned := cloneAntigravityModels(models) + if len(cloned) == 0 { + return false + } + antigravityPrimaryModelsCache.mu.Lock() + antigravityPrimaryModelsCache.models = cloned + antigravityPrimaryModelsCache.mu.Unlock() + return true +} + +func loadAntigravityPrimaryModels() []*registry.ModelInfo { + antigravityPrimaryModelsCache.mu.RLock() + cloned := cloneAntigravityModels(antigravityPrimaryModelsCache.models) + antigravityPrimaryModelsCache.mu.RUnlock() + return cloned +} + +func fallbackAntigravityPrimaryModels() []*registry.ModelInfo { + models := loadAntigravityPrimaryModels() + if len(models) > 0 { + log.Debugf("antigravity executor: using cached primary model list (%d models)", len(models)) + } + return models +} + // AntigravityExecutor proxies requests to the antigravity upstream. type AntigravityExecutor struct { cfg *config.Config @@ -1006,13 +1056,8 @@ func (e *AntigravityExecutor) CountTokens(ctx context.Context, auth *cliproxyaut func FetchAntigravityModels(ctx context.Context, auth *cliproxyauth.Auth, cfg *config.Config) []*registry.ModelInfo { exec := &AntigravityExecutor{cfg: cfg} token, updatedAuth, errToken := exec.ensureAccessToken(ctx, auth) - if errToken != nil { - log.Warnf("antigravity executor: fetch models failed for %s: token error: %v", auth.ID, errToken) - return nil - } - if token == "" { - log.Warnf("antigravity executor: fetch models failed for %s: got empty token", auth.ID) - return nil + if errToken != nil || token == "" { + return fallbackAntigravityPrimaryModels() } if updatedAuth != nil { auth = updatedAuth @@ -1025,8 +1070,7 @@ func FetchAntigravityModels(ctx context.Context, auth *cliproxyauth.Auth, cfg *c modelsURL := baseURL + antigravityModelsPath httpReq, errReq := http.NewRequestWithContext(ctx, http.MethodPost, modelsURL, bytes.NewReader([]byte(`{}`))) if errReq != nil { - log.Warnf("antigravity executor: fetch models failed for %s: create request error: %v", auth.ID, errReq) - return nil + return fallbackAntigravityPrimaryModels() } httpReq.Header.Set("Content-Type", "application/json") httpReq.Header.Set("Authorization", "Bearer "+token) @@ -1038,15 +1082,13 @@ func FetchAntigravityModels(ctx context.Context, auth *cliproxyauth.Auth, cfg *c httpResp, errDo := httpClient.Do(httpReq) if errDo != nil { if errors.Is(errDo, context.Canceled) || errors.Is(errDo, context.DeadlineExceeded) { - log.Warnf("antigravity executor: fetch models failed for %s: context canceled: %v", auth.ID, errDo) - return nil + return fallbackAntigravityPrimaryModels() } if idx+1 < len(baseURLs) { log.Debugf("antigravity executor: models request error on base url %s, retrying with fallback base url: %s", baseURL, baseURLs[idx+1]) continue } - log.Warnf("antigravity executor: fetch models failed for %s: request error: %v", auth.ID, errDo) - return nil + return fallbackAntigravityPrimaryModels() } bodyBytes, errRead := io.ReadAll(httpResp.Body) @@ -1058,22 +1100,27 @@ func FetchAntigravityModels(ctx context.Context, auth *cliproxyauth.Auth, cfg *c log.Debugf("antigravity executor: models read error on base url %s, retrying with fallback base url: %s", baseURL, baseURLs[idx+1]) continue } - log.Warnf("antigravity executor: fetch models failed for %s: read body error: %v", auth.ID, errRead) - return nil + return fallbackAntigravityPrimaryModels() } if httpResp.StatusCode < http.StatusOK || httpResp.StatusCode >= http.StatusMultipleChoices { if httpResp.StatusCode == http.StatusTooManyRequests && idx+1 < len(baseURLs) { log.Debugf("antigravity executor: models request rate limited on base url %s, retrying with fallback base url: %s", baseURL, baseURLs[idx+1]) continue } - log.Warnf("antigravity executor: fetch models failed for %s: unexpected status %d, body: %s", auth.ID, httpResp.StatusCode, string(bodyBytes)) - return nil + if idx+1 < len(baseURLs) { + log.Debugf("antigravity executor: models request failed with status %d on base url %s, retrying with fallback base url: %s", httpResp.StatusCode, baseURL, baseURLs[idx+1]) + continue + } + return fallbackAntigravityPrimaryModels() } result := gjson.GetBytes(bodyBytes, "models") if !result.Exists() { - log.Warnf("antigravity executor: fetch models failed for %s: no models field in response, body: %s", auth.ID, string(bodyBytes)) - return nil + if idx+1 < len(baseURLs) { + log.Debugf("antigravity executor: models field missing on base url %s, retrying with fallback base url: %s", baseURL, baseURLs[idx+1]) + continue + } + return fallbackAntigravityPrimaryModels() } now := time.Now().Unix() @@ -1118,9 +1165,10 @@ func FetchAntigravityModels(ctx context.Context, auth *cliproxyauth.Auth, cfg *c } models = append(models, modelInfo) } + storeAntigravityPrimaryModels(models) return models } - return nil + return fallbackAntigravityPrimaryModels() } func (e *AntigravityExecutor) ensureAccessToken(ctx context.Context, auth *cliproxyauth.Auth) (string, *cliproxyauth.Auth, error) { diff --git a/internal/runtime/executor/antigravity_executor_models_cache_test.go b/internal/runtime/executor/antigravity_executor_models_cache_test.go new file mode 100644 index 0000000000..758bf2ff01 --- /dev/null +++ b/internal/runtime/executor/antigravity_executor_models_cache_test.go @@ -0,0 +1,64 @@ +package executor + +import ( + "testing" + + "github.com/kooshapari/cliproxyapi-plusplus/v6/internal/registry" +) + +func resetAntigravityPrimaryModelsCacheForTest() { + antigravityPrimaryModelsCache.mu.Lock() + antigravityPrimaryModelsCache.models = nil + antigravityPrimaryModelsCache.mu.Unlock() +} + +func TestStoreAntigravityPrimaryModels_EmptyDoesNotOverwrite(t *testing.T) { + resetAntigravityPrimaryModelsCacheForTest() + t.Cleanup(resetAntigravityPrimaryModelsCacheForTest) + + seed := []*registry.ModelInfo{ + {ID: "claude-sonnet-4-5"}, + {ID: "gemini-2.5-pro"}, + } + if updated := storeAntigravityPrimaryModels(seed); !updated { + t.Fatal("expected non-empty model list to update primary cache") + } + + if updated := storeAntigravityPrimaryModels(nil); updated { + t.Fatal("expected nil model list not to overwrite primary cache") + } + if updated := storeAntigravityPrimaryModels([]*registry.ModelInfo{}); updated { + t.Fatal("expected empty model list not to overwrite primary cache") + } + + got := loadAntigravityPrimaryModels() + if len(got) != 2 { + t.Fatalf("expected cached model count 2, got %d", len(got)) + } + if got[0].ID != "claude-sonnet-4-5" || got[1].ID != "gemini-2.5-pro" { + t.Fatalf("unexpected cached model ids: %q, %q", got[0].ID, got[1].ID) + } +} + +func TestLoadAntigravityPrimaryModels_ReturnsClone(t *testing.T) { + resetAntigravityPrimaryModelsCacheForTest() + t.Cleanup(resetAntigravityPrimaryModelsCacheForTest) + + if updated := storeAntigravityPrimaryModels([]*registry.ModelInfo{{ID: "gpt-5", DisplayName: "GPT-5"}}); !updated { + t.Fatal("expected model cache update") + } + + got := loadAntigravityPrimaryModels() + if len(got) != 1 { + t.Fatalf("expected one cached model, got %d", len(got)) + } + got[0].ID = "mutated-id" + + again := loadAntigravityPrimaryModels() + if len(again) != 1 { + t.Fatalf("expected one cached model after mutation, got %d", len(again)) + } + if again[0].ID != "gpt-5" { + t.Fatalf("expected cached model id to remain %q, got %q", "gpt-5", again[0].ID) + } +} diff --git a/sdk/cliproxy/service.go b/sdk/cliproxy/service.go index 847a2fad68..f403dc5e39 100644 --- a/sdk/cliproxy/service.go +++ b/sdk/cliproxy/service.go @@ -1041,6 +1041,9 @@ func (s *Service) registerModelsForAuth(a *coreauth.Auth) { key = strings.ToLower(strings.TrimSpace(a.Provider)) } GlobalModelRegistry().RegisterClient(a.ID, key, applyModelPrefixes(models, a.Prefix, s.cfg != nil && s.cfg.ForceModelPrefix)) + if provider == "antigravity" { + s.backfillAntigravityModels(a, models) + } return } @@ -1185,6 +1188,56 @@ func (s *Service) oauthExcludedModels(provider, authKind string) []string { return cfg.OAuthExcludedModels[providerKey] } +func (s *Service) backfillAntigravityModels(source *coreauth.Auth, primaryModels []*ModelInfo) { + if s == nil || s.coreManager == nil || len(primaryModels) == 0 { + return + } + + sourceID := "" + if source != nil { + sourceID = strings.TrimSpace(source.ID) + } + + reg := registry.GetGlobalRegistry() + for _, candidate := range s.coreManager.List() { + if candidate == nil || candidate.Disabled { + continue + } + candidateID := strings.TrimSpace(candidate.ID) + if candidateID == "" || candidateID == sourceID { + continue + } + if !strings.EqualFold(strings.TrimSpace(candidate.Provider), "antigravity") { + continue + } + if len(reg.GetModelsForClient(candidateID)) > 0 { + continue + } + + authKind := strings.ToLower(strings.TrimSpace(candidate.Attributes["auth_kind"])) + if authKind == "" { + if kind, _ := candidate.AccountInfo(); strings.EqualFold(kind, "api_key") { + authKind = "apikey" + } + } + excluded := s.oauthExcludedModels("antigravity", authKind) + if candidate.Attributes != nil { + if val, ok := candidate.Attributes["excluded_models"]; ok && strings.TrimSpace(val) != "" { + excluded = strings.Split(val, ",") + } + } + + models := applyExcludedModels(primaryModels, excluded) + models = applyOAuthModelAlias(s.cfg, "antigravity", authKind, models) + if len(models) == 0 { + continue + } + + reg.RegisterClient(candidateID, "antigravity", applyModelPrefixes(models, candidate.Prefix, s.cfg != nil && s.cfg.ForceModelPrefix)) + log.Debugf("antigravity models backfilled for auth %s using primary model list", candidateID) + } +} + func applyExcludedModels(models []*ModelInfo, excluded []string) []*ModelInfo { if len(models) == 0 || len(excluded) == 0 { return models diff --git a/sdk/cliproxy/service_antigravity_backfill_test.go b/sdk/cliproxy/service_antigravity_backfill_test.go new file mode 100644 index 0000000000..aae4b9094b --- /dev/null +++ b/sdk/cliproxy/service_antigravity_backfill_test.go @@ -0,0 +1,135 @@ +package cliproxy + +import ( + "context" + "strings" + "testing" + + "github.com/kooshapari/cliproxyapi-plusplus/v6/internal/registry" + coreauth "github.com/kooshapari/cliproxyapi-plusplus/v6/sdk/cliproxy/auth" + "github.com/kooshapari/cliproxyapi-plusplus/v6/sdk/config" +) + +func TestBackfillAntigravityModels_RegistersMissingAuth(t *testing.T) { + source := &coreauth.Auth{ + ID: "ag-backfill-source", + Provider: "antigravity", + Status: coreauth.StatusActive, + Attributes: map[string]string{ + "auth_kind": "oauth", + }, + } + target := &coreauth.Auth{ + ID: "ag-backfill-target", + Provider: "antigravity", + Status: coreauth.StatusActive, + Attributes: map[string]string{ + "auth_kind": "oauth", + }, + } + + manager := coreauth.NewManager(nil, nil, nil) + if _, err := manager.Register(context.Background(), source); err != nil { + t.Fatalf("register source auth: %v", err) + } + if _, err := manager.Register(context.Background(), target); err != nil { + t.Fatalf("register target auth: %v", err) + } + + service := &Service{ + cfg: &config.Config{}, + coreManager: manager, + } + + reg := registry.GetGlobalRegistry() + reg.UnregisterClient(source.ID) + reg.UnregisterClient(target.ID) + t.Cleanup(func() { + reg.UnregisterClient(source.ID) + reg.UnregisterClient(target.ID) + }) + + primary := []*ModelInfo{ + {ID: "claude-sonnet-4-5"}, + {ID: "gemini-2.5-pro"}, + } + reg.RegisterClient(source.ID, "antigravity", primary) + + service.backfillAntigravityModels(source, primary) + + got := reg.GetModelsForClient(target.ID) + if len(got) != 2 { + t.Fatalf("expected target auth to be backfilled with 2 models, got %d", len(got)) + } + + ids := make(map[string]struct{}, len(got)) + for _, model := range got { + if model == nil { + continue + } + ids[strings.ToLower(strings.TrimSpace(model.ID))] = struct{}{} + } + if _, ok := ids["claude-sonnet-4-5"]; !ok { + t.Fatal("expected backfilled model claude-sonnet-4-5") + } + if _, ok := ids["gemini-2.5-pro"]; !ok { + t.Fatal("expected backfilled model gemini-2.5-pro") + } +} + +func TestBackfillAntigravityModels_RespectsExcludedModels(t *testing.T) { + source := &coreauth.Auth{ + ID: "ag-backfill-source-excluded", + Provider: "antigravity", + Status: coreauth.StatusActive, + Attributes: map[string]string{ + "auth_kind": "oauth", + }, + } + target := &coreauth.Auth{ + ID: "ag-backfill-target-excluded", + Provider: "antigravity", + Status: coreauth.StatusActive, + Attributes: map[string]string{ + "auth_kind": "oauth", + "excluded_models": "gemini-2.5-pro", + }, + } + + manager := coreauth.NewManager(nil, nil, nil) + if _, err := manager.Register(context.Background(), source); err != nil { + t.Fatalf("register source auth: %v", err) + } + if _, err := manager.Register(context.Background(), target); err != nil { + t.Fatalf("register target auth: %v", err) + } + + service := &Service{ + cfg: &config.Config{}, + coreManager: manager, + } + + reg := registry.GetGlobalRegistry() + reg.UnregisterClient(source.ID) + reg.UnregisterClient(target.ID) + t.Cleanup(func() { + reg.UnregisterClient(source.ID) + reg.UnregisterClient(target.ID) + }) + + primary := []*ModelInfo{ + {ID: "claude-sonnet-4-5"}, + {ID: "gemini-2.5-pro"}, + } + reg.RegisterClient(source.ID, "antigravity", primary) + + service.backfillAntigravityModels(source, primary) + + got := reg.GetModelsForClient(target.ID) + if len(got) != 1 { + t.Fatalf("expected 1 model after exclusion, got %d", len(got)) + } + if got[0] == nil || !strings.EqualFold(strings.TrimSpace(got[0].ID), "claude-sonnet-4-5") { + t.Fatalf("expected remaining model %q, got %+v", "claude-sonnet-4-5", got[0]) + } +} From 2514f4d8410d34a7cc164711bee1c48c3a702095 Mon Sep 17 00:00:00 2001 From: Koosha Paridehpour Date: Fri, 27 Feb 2026 03:44:28 -0700 Subject: [PATCH 10/12] fix(antigravity): deep copy cached model metadata Cherry-pick of upstream PR #1699 (part 2). Ensures cached model metadata is deep-copied to prevent mutation across concurrent requests. Co-Authored-By: Claude Opus 4.6 --- .../runtime/executor/antigravity_executor.go | 24 ++++++++++++++-- .../antigravity_executor_models_cache_test.go | 28 ++++++++++++++++++- 2 files changed, 49 insertions(+), 3 deletions(-) diff --git a/internal/runtime/executor/antigravity_executor.go b/internal/runtime/executor/antigravity_executor.go index d35fef2dab..6eebcc5abf 100644 --- a/internal/runtime/executor/antigravity_executor.go +++ b/internal/runtime/executor/antigravity_executor.go @@ -71,8 +71,7 @@ func cloneAntigravityModels(models []*registry.ModelInfo) []*registry.ModelInfo if model == nil || strings.TrimSpace(model.ID) == "" { continue } - clone := *model - out = append(out, &clone) + out = append(out, cloneAntigravityModelInfo(model)) } if len(out) == 0 { return nil @@ -80,6 +79,27 @@ func cloneAntigravityModels(models []*registry.ModelInfo) []*registry.ModelInfo return out } +func cloneAntigravityModelInfo(model *registry.ModelInfo) *registry.ModelInfo { + if model == nil { + return nil + } + clone := *model + if len(model.SupportedGenerationMethods) > 0 { + clone.SupportedGenerationMethods = append([]string(nil), model.SupportedGenerationMethods...) + } + if len(model.SupportedParameters) > 0 { + clone.SupportedParameters = append([]string(nil), model.SupportedParameters...) + } + if model.Thinking != nil { + thinkingClone := *model.Thinking + if len(model.Thinking.Levels) > 0 { + thinkingClone.Levels = append([]string(nil), model.Thinking.Levels...) + } + clone.Thinking = &thinkingClone + } + return &clone +} + func storeAntigravityPrimaryModels(models []*registry.ModelInfo) bool { cloned := cloneAntigravityModels(models) if len(cloned) == 0 { diff --git a/internal/runtime/executor/antigravity_executor_models_cache_test.go b/internal/runtime/executor/antigravity_executor_models_cache_test.go index 758bf2ff01..749d958c2c 100644 --- a/internal/runtime/executor/antigravity_executor_models_cache_test.go +++ b/internal/runtime/executor/antigravity_executor_models_cache_test.go @@ -44,7 +44,15 @@ func TestLoadAntigravityPrimaryModels_ReturnsClone(t *testing.T) { resetAntigravityPrimaryModelsCacheForTest() t.Cleanup(resetAntigravityPrimaryModelsCacheForTest) - if updated := storeAntigravityPrimaryModels([]*registry.ModelInfo{{ID: "gpt-5", DisplayName: "GPT-5"}}); !updated { + if updated := storeAntigravityPrimaryModels([]*registry.ModelInfo{{ + ID: "gpt-5", + DisplayName: "GPT-5", + SupportedGenerationMethods: []string{"generateContent"}, + SupportedParameters: []string{"temperature"}, + Thinking: ®istry.ThinkingSupport{ + Levels: []string{"high"}, + }, + }}); !updated { t.Fatal("expected model cache update") } @@ -53,6 +61,15 @@ func TestLoadAntigravityPrimaryModels_ReturnsClone(t *testing.T) { t.Fatalf("expected one cached model, got %d", len(got)) } got[0].ID = "mutated-id" + if len(got[0].SupportedGenerationMethods) > 0 { + got[0].SupportedGenerationMethods[0] = "mutated-method" + } + if len(got[0].SupportedParameters) > 0 { + got[0].SupportedParameters[0] = "mutated-parameter" + } + if got[0].Thinking != nil && len(got[0].Thinking.Levels) > 0 { + got[0].Thinking.Levels[0] = "mutated-level" + } again := loadAntigravityPrimaryModels() if len(again) != 1 { @@ -61,4 +78,13 @@ func TestLoadAntigravityPrimaryModels_ReturnsClone(t *testing.T) { if again[0].ID != "gpt-5" { t.Fatalf("expected cached model id to remain %q, got %q", "gpt-5", again[0].ID) } + if len(again[0].SupportedGenerationMethods) == 0 || again[0].SupportedGenerationMethods[0] != "generateContent" { + t.Fatalf("expected cached generation methods to be unmutated, got %v", again[0].SupportedGenerationMethods) + } + if len(again[0].SupportedParameters) == 0 || again[0].SupportedParameters[0] != "temperature" { + t.Fatalf("expected cached supported parameters to be unmutated, got %v", again[0].SupportedParameters) + } + if again[0].Thinking == nil || len(again[0].Thinking.Levels) == 0 || again[0].Thinking.Levels[0] != "high" { + t.Fatalf("expected cached model thinking levels to be unmutated, got %v", again[0].Thinking) + } } From ffae39f159d644ed62d09877c6a12ed0dffdf3b8 Mon Sep 17 00:00:00 2001 From: Koosha Paridehpour Date: Fri, 27 Feb 2026 03:44:40 -0700 Subject: [PATCH 11/12] fix(iflow): harden 406 retry, stream fallback, and auth availability Cherry-pick of upstream PR #1650. Improves iflow executor with 406 retry handling, stream stability fixes, and better auth availability checks. Co-Authored-By: Claude Opus 4.6 --- internal/runtime/executor/iflow_executor.go | 580 +++++++++++++++--- .../runtime/executor/iflow_executor_test.go | 533 ++++++++++++++++ sdk/cliproxy/auth/conductor.go | 79 ++- sdk/cliproxy/auth/conductor_overrides_test.go | 80 +++ sdk/cliproxy/auth/selector.go | 7 +- sdk/cliproxy/auth/selector_test.go | 36 ++ 6 files changed, 1230 insertions(+), 85 deletions(-) diff --git a/internal/runtime/executor/iflow_executor.go b/internal/runtime/executor/iflow_executor.go index 8e63d1c94b..70a0f695f5 100644 --- a/internal/runtime/executor/iflow_executor.go +++ b/internal/runtime/executor/iflow_executor.go @@ -10,6 +10,7 @@ import ( "fmt" "io" "net/http" + "strconv" "strings" "time" @@ -29,6 +30,12 @@ import ( const ( iflowDefaultEndpoint = "/chat/completions" iflowUserAgent = "iFlow-Cli" + + // requestExecutionMetadata in handlers stores idempotency under this key. + iflowSessionIDMetadataKey = "idempotency_key" + + // Optional metadata key for explicit conversation binding. + iflowConversationIDMetadataKey = "conversation_id" ) // IFlowExecutor executes OpenAI-compatible chat completions against the iFlow API using API keys derived from OAuth. @@ -36,6 +43,17 @@ type IFlowExecutor struct { cfg *config.Config } +type iflowRequestAttempt struct { + WithSignature bool + UserAgent string + SessionID string + Conversation string + Accept string + ContentType string + HasSignature bool + HasTimestamp bool +} + // NewIFlowExecutor constructs a new executor instance. func NewIFlowExecutor(cfg *config.Config) *IFlowExecutor { return &IFlowExecutor{cfg: cfg} } @@ -110,34 +128,10 @@ func (e *IFlowExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, re body = applyPayloadConfigWithRoot(e.cfg, baseModel, to.String(), "", body, originalTranslated, requestedModel) endpoint := strings.TrimSuffix(baseURL, "/") + iflowDefaultEndpoint - - httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, endpoint, bytes.NewReader(body)) - if err != nil { - return resp, err - } - applyIFlowHeaders(httpReq, apiKey, false) - var authID, authLabel, authType, authValue string - if auth != nil { - authID = auth.ID - authLabel = auth.Label - authType, authValue = auth.AccountInfo() - } - recordAPIRequest(ctx, e.cfg, upstreamRequestLog{ - URL: endpoint, - Method: http.MethodPost, - Headers: httpReq.Header.Clone(), - Body: body, - Provider: e.Identifier(), - AuthID: authID, - AuthLabel: authLabel, - AuthType: authType, - AuthValue: authValue, - }) - + sessionID, conversationID := iflowRequestIDs(opts, body) httpClient := newProxyAwareHTTPClient(ctx, e.cfg, auth, 0) - httpResp, err := httpClient.Do(httpReq) + httpResp, err := e.executeChatCompletionsRequest(ctx, httpClient, auth, endpoint, apiKey, body, false, sessionID, conversationID) if err != nil { - recordAPIResponseError(ctx, e.cfg, err) return resp, err } defer func() { @@ -161,6 +155,13 @@ func (e *IFlowExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, re return resp, err } appendAPIResponseChunk(ctx, e.cfg, data) + if bizErr, ok := parseIFlowBusinessStatusError(data); ok { + logWithRequestID(ctx). + WithField("mapped_status", bizErr.code). + Warnf("iflow executor: upstream returned business error payload: %s", summarizeErrorBody(httpResp.Header.Get("Content-Type"), data)) + err = bizErr + return resp, err + } reporter.publish(ctx, parseOpenAIUsage(data)) // Ensure usage is recorded even if upstream omits usage metadata. reporter.ensurePublished(ctx) @@ -218,34 +219,10 @@ func (e *IFlowExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Au body = applyPayloadConfigWithRoot(e.cfg, baseModel, to.String(), "", body, originalTranslated, requestedModel) endpoint := strings.TrimSuffix(baseURL, "/") + iflowDefaultEndpoint - - httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, endpoint, bytes.NewReader(body)) - if err != nil { - return nil, err - } - applyIFlowHeaders(httpReq, apiKey, true) - var authID, authLabel, authType, authValue string - if auth != nil { - authID = auth.ID - authLabel = auth.Label - authType, authValue = auth.AccountInfo() - } - recordAPIRequest(ctx, e.cfg, upstreamRequestLog{ - URL: endpoint, - Method: http.MethodPost, - Headers: httpReq.Header.Clone(), - Body: body, - Provider: e.Identifier(), - AuthID: authID, - AuthLabel: authLabel, - AuthType: authType, - AuthValue: authValue, - }) - + sessionID, conversationID := iflowRequestIDs(opts, body) httpClient := newProxyAwareHTTPClient(ctx, e.cfg, auth, 0) - httpResp, err := httpClient.Do(httpReq) + httpResp, err := e.executeChatCompletionsRequest(ctx, httpClient, auth, endpoint, apiKey, body, true, sessionID, conversationID) if err != nil { - recordAPIResponseError(ctx, e.cfg, err) return nil, err } @@ -262,6 +239,7 @@ func (e *IFlowExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Au } out := make(chan cliproxyexecutor.StreamChunk) + responseContentType := strings.ToLower(strings.TrimSpace(httpResp.Header.Get("Content-Type"))) go func() { defer close(out) defer func() { @@ -270,24 +248,96 @@ func (e *IFlowExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Au } }() + var param any + emitChunks := func(chunks []string) { + for i := range chunks { + out <- cliproxyexecutor.StreamChunk{Payload: []byte(chunks[i])} + } + } + emitDone := func() { + doneChunks := sdktranslator.TranslateStream(ctx, to, from, req.Model, opts.OriginalRequest, body, []byte("data: [DONE]"), ¶m) + emitChunks(doneChunks) + if len(doneChunks) == 0 { + doneChunks = sdktranslator.TranslateStream(ctx, to, from, req.Model, opts.OriginalRequest, body, []byte("[DONE]"), ¶m) + emitChunks(doneChunks) + } + } + + if !strings.Contains(responseContentType, "text/event-stream") { + data, errRead := io.ReadAll(httpResp.Body) + if errRead != nil { + recordAPIResponseError(ctx, e.cfg, errRead) + reporter.publishFailure(ctx) + out <- cliproxyexecutor.StreamChunk{Err: errRead} + return + } + + appendAPIResponseChunk(ctx, e.cfg, data) + if bizErr, ok := parseIFlowBusinessStatusError(data); ok { + reporter.publishFailure(ctx) + logWithRequestID(ctx). + WithField("mapped_status", bizErr.code). + Warnf("iflow executor: upstream returned business error payload on stream request: %s", summarizeErrorBody(httpResp.Header.Get("Content-Type"), data)) + out <- cliproxyexecutor.StreamChunk{Err: bizErr} + return + } + + reporter.publish(ctx, parseOpenAIUsage(data)) + logWithRequestID(ctx). + WithField("response_content_type", strings.TrimSpace(httpResp.Header.Get("Content-Type"))). + Warn("iflow executor: upstream returned non-SSE response for stream request, synthesizing stream chunks") + + fallbackChunks := synthesizeOpenAIStreamingChunksFromNonStream(data) + if len(fallbackChunks) == 0 { + reporter.publishFailure(ctx) + out <- cliproxyexecutor.StreamChunk{ + Err: statusErr{ + code: http.StatusBadGateway, + msg: "iflow executor: upstream returned non-SSE response without valid choices", + }, + } + return + } + for i := range fallbackChunks { + line := append([]byte("data: "), fallbackChunks[i]...) + chunks := sdktranslator.TranslateStream(ctx, to, from, req.Model, opts.OriginalRequest, body, line, ¶m) + emitChunks(chunks) + } + emitDone() + // Guarantee a usage record exists even if no usage metadata was emitted in stream chunks. + reporter.ensurePublished(ctx) + return + } + scanner := bufio.NewScanner(httpResp.Body) scanner.Buffer(nil, 52_428_800) // 50MB - var param any + emittedPayload := false for scanner.Scan() { line := scanner.Bytes() appendAPIResponseChunk(ctx, e.cfg, line) if detail, ok := parseOpenAIStreamUsage(line); ok { reporter.publish(ctx, detail) } + if streamErr, ok := parseOpenAIStreamNetworkErrorWithoutContent(line); ok { + reporter.publishFailure(ctx) + if emittedPayload { + logWithRequestID(ctx).Warnf("iflow executor: upstream stream ended with network_error after payload: %s", streamErr.msg) + } + out <- cliproxyexecutor.StreamChunk{Err: streamErr} + return + } chunks := sdktranslator.TranslateStream(ctx, to, from, req.Model, opts.OriginalRequest, body, bytes.Clone(line), ¶m) - for i := range chunks { - out <- cliproxyexecutor.StreamChunk{Payload: []byte(chunks[i])} + if len(chunks) > 0 { + emittedPayload = true } + emitChunks(chunks) } if errScan := scanner.Err(); errScan != nil { recordAPIResponseError(ctx, e.cfg, errScan) reporter.publishFailure(ctx) out <- cliproxyexecutor.StreamChunk{Err: errScan} + } else { + emitDone() } // Guarantee a usage record exists even if the stream never emitted usage data. reporter.ensurePublished(ctx) @@ -452,29 +502,108 @@ func (e *IFlowExecutor) refreshOAuthBased(ctx context.Context, auth *cliproxyaut return auth, nil } -func applyIFlowHeaders(r *http.Request, apiKey string, stream bool) { +func (e *IFlowExecutor) executeChatCompletionsRequest( + ctx context.Context, + httpClient *http.Client, + auth *cliproxyauth.Auth, + endpoint, apiKey string, + body []byte, + stream bool, + sessionID, conversationID string, +) (*http.Response, error) { + send := func(withSignature bool) (*http.Response, iflowRequestAttempt, error) { + httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, endpoint, bytes.NewReader(body)) + if err != nil { + return nil, iflowRequestAttempt{}, err + } + applyIFlowHeaders(httpReq, apiKey, stream, withSignature, sessionID, conversationID) + attempt := snapshotIFlowRequestAttempt(httpReq, withSignature) + + var authID, authLabel, authType, authValue string + if auth != nil { + authID = auth.ID + authLabel = auth.Label + authType, authValue = auth.AccountInfo() + } + recordAPIRequest(ctx, e.cfg, upstreamRequestLog{ + URL: endpoint, + Method: http.MethodPost, + Headers: httpReq.Header.Clone(), + Body: body, + Provider: e.Identifier(), + AuthID: authID, + AuthLabel: authLabel, + AuthType: authType, + AuthValue: authValue, + }) + + httpResp, err := httpClient.Do(httpReq) + if err != nil { + recordAPIResponseError(ctx, e.cfg, err) + return nil, iflowRequestAttempt{}, err + } + return httpResp, attempt, nil + } + + httpResp, firstAttempt, err := send(true) + if err != nil { + return nil, err + } + if httpResp.StatusCode == http.StatusForbidden { + logIFlowRejectedRequestDiagnostic(ctx, httpResp.StatusCode, firstAttempt, httpResp.Header.Clone(), "", false) + return httpResp, nil + } + if httpResp.StatusCode != http.StatusNotAcceptable { + return httpResp, nil + } + + firstBody, _ := io.ReadAll(httpResp.Body) + if errClose := httpResp.Body.Close(); errClose != nil { + log.Errorf("iflow executor: close response body error: %v", errClose) + } + recordAPIResponseMetadata(ctx, e.cfg, httpResp.StatusCode, httpResp.Header.Clone()) + appendAPIResponseChunk(ctx, e.cfg, firstBody) + logIFlowRejectedRequestDiagnostic( + ctx, + httpResp.StatusCode, + firstAttempt, + httpResp.Header.Clone(), + summarizeErrorBody(httpResp.Header.Get("Content-Type"), firstBody), + true, + ) + + retryResp, retryAttempt, err := send(false) + if err != nil { + return nil, err + } + if retryResp.StatusCode == http.StatusForbidden || retryResp.StatusCode == http.StatusNotAcceptable { + logIFlowRejectedRequestDiagnostic(ctx, retryResp.StatusCode, retryAttempt, retryResp.Header.Clone(), "", false) + } + return retryResp, nil +} + +func applyIFlowHeaders(r *http.Request, apiKey string, stream bool, withSignature bool, sessionID, conversationID string) { + // iFlow CLI does not set an explicit Accept header for chat/completions requests. + // The upstream decides stream format from payload.stream. + _ = stream + r.Header.Set("Content-Type", "application/json") r.Header.Set("Authorization", "Bearer "+apiKey) - r.Header.Set("User-Agent", iflowUserAgent) - - // Generate session-id - sessionID := "session-" + generateUUID() + r.Header.Set("user-agent", iflowUserAgent) r.Header.Set("session-id", sessionID) + r.Header.Set("conversation-id", conversationID) - // Generate timestamp and signature - timestamp := time.Now().UnixMilli() - r.Header.Set("x-iflow-timestamp", fmt.Sprintf("%d", timestamp)) + if withSignature { + // Generate timestamp and signature + timestamp := time.Now().UnixMilli() + r.Header.Set("x-iflow-timestamp", fmt.Sprintf("%d", timestamp)) - signature := createIFlowSignature(iflowUserAgent, sessionID, timestamp, apiKey) - if signature != "" { - r.Header.Set("x-iflow-signature", signature) - } - - if stream { - r.Header.Set("Accept", "text/event-stream") - } else { - r.Header.Set("Accept", "application/json") + signature := createIFlowSignature(iflowUserAgent, sessionID, timestamp, apiKey) + if signature != "" { + r.Header.Set("x-iflow-signature", signature) + } } + r.Header.Del("Accept") } // createIFlowSignature generates HMAC-SHA256 signature for iFlow API requests. @@ -494,6 +623,110 @@ func generateUUID() string { return uuid.New().String() } +func iflowRequestIDs(opts cliproxyexecutor.Options, body []byte) (sessionID, conversationID string) { + if len(opts.Metadata) > 0 { + if v, ok := opts.Metadata[iflowSessionIDMetadataKey]; ok { + sessionID = normalizeMetadataString(v) + } + if v, ok := opts.Metadata[iflowConversationIDMetadataKey]; ok { + conversationID = normalizeMetadataString(v) + } + } + + // Best-effort extraction from request payload when metadata is unavailable. + if conversationID == "" { + conversationID = strings.TrimSpace(gjson.GetBytes(body, "conversation_id").String()) + } + if conversationID == "" && len(opts.OriginalRequest) > 0 { + conversationID = strings.TrimSpace(gjson.GetBytes(opts.OriginalRequest, "conversation_id").String()) + } + if conversationID == "" && len(opts.OriginalRequest) > 0 { + conversationID = strings.TrimSpace(gjson.GetBytes(opts.OriginalRequest, "conversationId").String()) + } + + if sessionID == "" { + sessionID = strings.TrimSpace(gjson.GetBytes(body, "session_id").String()) + } + if sessionID == "" && len(opts.OriginalRequest) > 0 { + sessionID = strings.TrimSpace(gjson.GetBytes(opts.OriginalRequest, "session_id").String()) + } + if sessionID == "" && len(opts.OriginalRequest) > 0 { + sessionID = strings.TrimSpace(gjson.GetBytes(opts.OriginalRequest, "sessionId").String()) + } + + // Keep session id stable and non-empty for signature generation. + if sessionID == "" && conversationID != "" { + sessionID = conversationID + } + if sessionID == "" { + sessionID = generateUUID() + } + + return sessionID, conversationID +} + +func normalizeMetadataString(v any) string { + switch typed := v.(type) { + case string: + return strings.TrimSpace(typed) + case []byte: + return strings.TrimSpace(string(typed)) + default: + return "" + } +} + +func snapshotIFlowRequestAttempt(r *http.Request, withSignature bool) iflowRequestAttempt { + if r == nil { + return iflowRequestAttempt{WithSignature: withSignature} + } + return iflowRequestAttempt{ + WithSignature: withSignature, + UserAgent: strings.TrimSpace(r.Header.Get("user-agent")), + SessionID: strings.TrimSpace(r.Header.Get("session-id")), + Conversation: strings.TrimSpace(r.Header.Get("conversation-id")), + Accept: strings.TrimSpace(r.Header.Get("Accept")), + ContentType: strings.TrimSpace(r.Header.Get("Content-Type")), + HasSignature: strings.TrimSpace(r.Header.Get("x-iflow-signature")) != "", + HasTimestamp: strings.TrimSpace(r.Header.Get("x-iflow-timestamp")) != "", + } +} + +func logIFlowRejectedRequestDiagnostic( + ctx context.Context, + status int, + attempt iflowRequestAttempt, + responseHeaders http.Header, + errorSummary string, + retryingWithoutSignature bool, +) { + if status != http.StatusForbidden && status != http.StatusNotAcceptable { + return + } + + fields := log.Fields{ + "status": status, + "with_signature": attempt.WithSignature, + "request_user_agent": attempt.UserAgent, + "request_session_id": util.HideAPIKey(attempt.SessionID), + "request_conversation_id": util.HideAPIKey(attempt.Conversation), + "request_accept": attempt.Accept, + "request_content_type": attempt.ContentType, + "request_has_signature": attempt.HasSignature, + "request_has_timestamp": attempt.HasTimestamp, + "response_content_type": strings.TrimSpace(responseHeaders.Get("Content-Type")), + "response_server": strings.TrimSpace(responseHeaders.Get("Server")), + "response_www_authenticate": util.MaskSensitiveHeaderValue("WWW-Authenticate", strings.TrimSpace(responseHeaders.Get("WWW-Authenticate"))), + "response_x_request_id": strings.TrimSpace(responseHeaders.Get("X-Request-Id")), + "retrying_without_signature": retryingWithoutSignature, + } + if errorSummary != "" { + fields["error_summary"] = errorSummary + } + + logWithRequestID(ctx).WithFields(fields).Warn("iflow executor: upstream rejected request") +} + func iflowCreds(a *cliproxyauth.Auth) (apiKey, baseURL string) { if a == nil { return "", "" @@ -528,6 +761,209 @@ func ensureToolsArray(body []byte) []byte { return updated } +func synthesizeOpenAIStreamingChunksFromNonStream(raw []byte) [][]byte { + root := gjson.ParseBytes(raw) + choices := root.Get("choices") + if !choices.Exists() || !choices.IsArray() || len(choices.Array()) == 0 { + return nil + } + + chunk := `{"id":"","object":"chat.completion.chunk","created":0,"model":"","choices":[]}` + chunk, _ = sjson.Set(chunk, "id", root.Get("id").String()) + chunk, _ = sjson.Set(chunk, "object", "chat.completion.chunk") + chunk, _ = sjson.Set(chunk, "created", root.Get("created").Int()) + chunk, _ = sjson.Set(chunk, "model", root.Get("model").String()) + + choices.ForEach(func(key, choice gjson.Result) bool { + index := int(choice.Get("index").Int()) + if !choice.Get("index").Exists() { + index = int(key.Int()) + } + + streamChoice := `{"index":0,"delta":{},"finish_reason":null}` + streamChoice, _ = sjson.Set(streamChoice, "index", index) + + role := strings.TrimSpace(choice.Get("message.role").String()) + if role != "" { + streamChoice, _ = sjson.Set(streamChoice, "delta.role", role) + } + + content := choice.Get("message.content") + if content.Exists() && content.Type != gjson.Null { + if content.Type == gjson.String { + streamChoice, _ = sjson.Set(streamChoice, "delta.content", content.String()) + } else { + streamChoice, _ = sjson.SetRaw(streamChoice, "delta.content", content.Raw) + } + } + + reasoning := choice.Get("message.reasoning_content") + if reasoning.Exists() && reasoning.Type != gjson.Null { + streamChoice, _ = sjson.SetRaw(streamChoice, "delta.reasoning_content", reasoning.Raw) + } + + toolCalls := choice.Get("message.tool_calls") + if toolCalls.Exists() && toolCalls.Type != gjson.Null { + streamChoice, _ = sjson.SetRaw(streamChoice, "delta.tool_calls", toolCalls.Raw) + } + + finishReason := choice.Get("finish_reason") + if finishReason.Exists() && finishReason.Type != gjson.Null { + streamChoice, _ = sjson.Set(streamChoice, "finish_reason", finishReason.String()) + } + + path := fmt.Sprintf("choices.%d", index) + chunk, _ = sjson.SetRaw(chunk, path, streamChoice) + return true + }) + + out := [][]byte{[]byte(chunk)} + usage := root.Get("usage") + if usage.Exists() && usage.Type != gjson.Null { + usageChunk := `{"id":"","object":"chat.completion.chunk","created":0,"model":"","choices":[],"usage":{}}` + usageChunk, _ = sjson.Set(usageChunk, "id", root.Get("id").String()) + usageChunk, _ = sjson.Set(usageChunk, "object", "chat.completion.chunk") + usageChunk, _ = sjson.Set(usageChunk, "created", root.Get("created").Int()) + usageChunk, _ = sjson.Set(usageChunk, "model", root.Get("model").String()) + usageChunk, _ = sjson.SetRaw(usageChunk, "usage", usage.Raw) + out = append(out, []byte(usageChunk)) + } + return out +} + +func parseIFlowBusinessStatusError(raw []byte) (statusErr, bool) { + root := gjson.ParseBytes(raw) + + message := strings.TrimSpace(root.Get("msg").String()) + if message == "" { + message = strings.TrimSpace(root.Get("message").String()) + } + if message == "" { + message = strings.TrimSpace(root.Get("error.message").String()) + } + + statusRaw := strings.TrimSpace(root.Get("status").String()) + statusCode := parseNumericStatus(statusRaw) + if statusCode == 0 { + statusCode = int(root.Get("status").Int()) + } + if statusCode == 0 { + statusCode = parseNumericStatus(strings.TrimSpace(root.Get("error.code").String())) + } + + normalized := normalizeIFlowBusinessStatus(statusCode, message) + if normalized > 0 { + if message == "" { + message = fmt.Sprintf("status %d", normalized) + } + return statusErr{code: normalized, msg: message}, true + } + + // No explicit business status. If an error object is present, treat it as a bad request. + if root.Get("error").Exists() { + if message == "" { + message = strings.TrimSpace(root.Get("error").Raw) + } + if message == "" { + message = "iflow upstream returned error payload" + } + return statusErr{code: http.StatusBadRequest, msg: message}, true + } + return statusErr{}, false +} + +func parseNumericStatus(raw string) int { + if strings.TrimSpace(raw) == "" { + return 0 + } + code, err := strconv.Atoi(strings.TrimSpace(raw)) + if err != nil { + return 0 + } + return code +} + +func normalizeIFlowBusinessStatus(statusCode int, message string) int { + if statusCode == 449 { + // iFlow business status used for rate-limiting. + return http.StatusTooManyRequests + } + if statusCode >= 400 && statusCode < 600 { + return statusCode + } + + msg := strings.ToLower(strings.TrimSpace(message)) + switch { + case strings.Contains(msg, "rate limit"), + strings.Contains(msg, "too many requests"), + strings.Contains(msg, "quota"): + return http.StatusTooManyRequests + case strings.Contains(msg, "forbidden"): + return http.StatusForbidden + case strings.Contains(msg, "unauthorized"), + strings.Contains(msg, "invalid api key"), + strings.Contains(msg, "invalid token"): + return http.StatusUnauthorized + case strings.Contains(msg, "not acceptable"): + return http.StatusNotAcceptable + case strings.Contains(msg, "timeout"): + return http.StatusRequestTimeout + default: + return 0 + } +} + +func parseOpenAIStreamNetworkErrorWithoutContent(line []byte) (statusErr, bool) { + payload := bytes.TrimSpace(line) + if len(payload) == 0 { + return statusErr{}, false + } + + // SSE data frame. + if bytes.HasPrefix(payload, []byte("data:")) { + payload = bytes.TrimSpace(payload[len("data:"):]) + } + if len(payload) == 0 || bytes.Equal(payload, []byte("[DONE]")) { + return statusErr{}, false + } + if !gjson.ValidBytes(payload) { + return statusErr{}, false + } + + root := gjson.ParseBytes(payload) + choices := root.Get("choices") + if !choices.Exists() || !choices.IsArray() || len(choices.Array()) == 0 { + return statusErr{}, false + } + + hasNetworkError := false + hasContent := false + choices.ForEach(func(_, choice gjson.Result) bool { + if strings.EqualFold(strings.TrimSpace(choice.Get("finish_reason").String()), "network_error") { + hasNetworkError = true + } + + content := strings.TrimSpace(choice.Get("delta.content").String()) + reasoning := strings.TrimSpace(choice.Get("delta.reasoning_content").String()) + toolCalls := choice.Get("delta.tool_calls") + if content != "" || reasoning != "" || (toolCalls.Exists() && toolCalls.Type != gjson.Null && strings.TrimSpace(toolCalls.Raw) != "" && toolCalls.Raw != "[]") { + hasContent = true + } + return true + }) + + if !hasNetworkError || hasContent { + return statusErr{}, false + } + + model := strings.TrimSpace(root.Get("model").String()) + if model == "" { + model = "unknown" + } + msg := fmt.Sprintf("iflow upstream stream network_error for model %s", model) + return statusErr{code: http.StatusBadGateway, msg: msg}, true +} + // preserveReasoningContentInMessages checks if reasoning_content from assistant messages // is preserved in conversation history for iFlow models that support thinking. // This is helpful for multi-turn conversations where the model may benefit from seeing diff --git a/internal/runtime/executor/iflow_executor_test.go b/internal/runtime/executor/iflow_executor_test.go index 8ed172b7cd..66babbf15c 100644 --- a/internal/runtime/executor/iflow_executor_test.go +++ b/internal/runtime/executor/iflow_executor_test.go @@ -1,9 +1,23 @@ package executor import ( + "context" + "errors" + "io" + "net/http" + "net/http/httptest" + "strings" + "sync" "testing" + "github.com/kooshapari/cliproxyapi-plusplus/v6/internal/config" "github.com/kooshapari/cliproxyapi-plusplus/v6/internal/thinking" + _ "github.com/kooshapari/cliproxyapi-plusplus/v6/internal/translator" + cliproxyauth "github.com/kooshapari/cliproxyapi-plusplus/v6/sdk/cliproxy/auth" + cliproxyexecutor "github.com/kooshapari/cliproxyapi-plusplus/v6/sdk/cliproxy/executor" + sdktranslator "github.com/kooshapari/cliproxyapi-plusplus/v6/sdk/translator" + log "github.com/sirupsen/logrus" + logtest "github.com/sirupsen/logrus/hooks/test" ) func TestIFlowExecutorParseSuffix(t *testing.T) { @@ -65,3 +79,522 @@ func TestPreserveReasoningContentInMessages(t *testing.T) { }) } } + +func TestIFlowExecutorExecute_RetryWithoutSignatureOn406(t *testing.T) { + var ( + mu sync.Mutex + attempts int + signatures []string + sessions []string + conversations []string + conversationHeaderPresence []bool + accepts []string + timestamps []string + ) + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + mu.Lock() + attempts++ + signatures = append(signatures, r.Header.Get("x-iflow-signature")) + sessions = append(sessions, r.Header.Get("session-id")) + conversations = append(conversations, r.Header.Get("conversation-id")) + _, hasConversation := r.Header["Conversation-Id"] + conversationHeaderPresence = append(conversationHeaderPresence, hasConversation) + accepts = append(accepts, r.Header.Get("Accept")) + timestamps = append(timestamps, r.Header.Get("x-iflow-timestamp")) + currentAttempt := attempts + mu.Unlock() + + if r.URL.Path != "/chat/completions" { + w.WriteHeader(http.StatusNotFound) + _, _ = io.WriteString(w, `{"error":{"message":"unexpected path"}}`) + return + } + + if currentAttempt == 1 { + w.WriteHeader(http.StatusNotAcceptable) + _, _ = io.WriteString(w, `{"error":{"message":"status 406","type":"invalid_request_error"}}`) + return + } + + w.Header().Set("Content-Type", "application/json") + _, _ = io.WriteString(w, `{"id":"chatcmpl-1","object":"chat.completion","created":1,"model":"glm-5","choices":[{"index":0,"message":{"role":"assistant","content":"ok"},"finish_reason":"stop"}],"usage":{"prompt_tokens":1,"completion_tokens":1,"total_tokens":2}}`) + })) + defer server.Close() + + executor := NewIFlowExecutor(&config.Config{}) + auth := &cliproxyauth.Auth{ + ID: "iflow-test-auth", + Provider: "iflow", + Attributes: map[string]string{ + "api_key": "test-key", + "base_url": server.URL, + }, + } + req := cliproxyexecutor.Request{ + Model: "glm-5", + Payload: []byte(`{"model":"glm-5","messages":[{"role":"user","content":"hi"}]}`), + } + opts := cliproxyexecutor.Options{ + SourceFormat: sdktranslator.FromString("openai"), + Metadata: map[string]any{ + "idempotency_key": "sess-test-123", + }, + } + + resp, err := executor.Execute(context.Background(), auth, req, opts) + if err != nil { + t.Fatalf("Execute() unexpected error: %v", err) + } + if !strings.Contains(string(resp.Payload), `"content":"ok"`) { + t.Fatalf("Execute() response missing expected content: %s", string(resp.Payload)) + } + + mu.Lock() + defer mu.Unlock() + if attempts != 2 { + t.Fatalf("attempts = %d, want 2", attempts) + } + if signatures[0] == "" { + t.Fatalf("first attempt should include x-iflow-signature") + } + if sessions[0] == "" { + t.Fatalf("first attempt should include session-id") + } + if sessions[0] != "sess-test-123" { + t.Fatalf("first attempt session-id = %q, want %q", sessions[0], "sess-test-123") + } + if timestamps[0] == "" { + t.Fatalf("first attempt should include x-iflow-timestamp") + } + if signatures[1] != "" { + t.Fatalf("second attempt should not include x-iflow-signature, got %q", signatures[1]) + } + if sessions[1] != sessions[0] { + t.Fatalf("second attempt should reuse session-id, got %q vs %q", sessions[1], sessions[0]) + } + if timestamps[1] != "" { + t.Fatalf("second attempt should not include x-iflow-timestamp, got %q", timestamps[1]) + } + if conversations[0] != "" || conversations[1] != "" { + t.Fatalf("conversation-id should be empty in this test, got first=%q second=%q", conversations[0], conversations[1]) + } + if !conversationHeaderPresence[0] || !conversationHeaderPresence[1] { + t.Fatalf("conversation-id header should be present on both attempts, got first=%v second=%v", conversationHeaderPresence[0], conversationHeaderPresence[1]) + } + if accepts[0] != "" || accepts[1] != "" { + t.Fatalf("accept header should be omitted on both attempts, got first=%q second=%q", accepts[0], accepts[1]) + } +} + +func TestIFlowExecutorExecute_LogsDiagnosticOn403(t *testing.T) { + hook := logtest.NewLocal(log.StandardLogger()) + defer hook.Reset() + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path != "/chat/completions" { + w.WriteHeader(http.StatusNotFound) + _, _ = io.WriteString(w, `{"error":{"message":"unexpected path"}}`) + return + } + w.Header().Set("Content-Type", "application/json") + w.Header().Set("X-Request-Id", "iflow-req-403") + w.WriteHeader(http.StatusForbidden) + _, _ = io.WriteString(w, `{"error":{"message":"status 403","type":"invalid_request_error"}}`) + })) + defer server.Close() + + executor := NewIFlowExecutor(&config.Config{}) + auth := &cliproxyauth.Auth{ + Provider: "iflow", + Attributes: map[string]string{ + "api_key": "test-key", + "base_url": server.URL, + }, + } + req := cliproxyexecutor.Request{ + Model: "glm-5", + Payload: []byte(`{"model":"glm-5","messages":[{"role":"user","content":"hi"}]}`), + } + opts := cliproxyexecutor.Options{ + SourceFormat: sdktranslator.FromString("openai"), + Metadata: map[string]any{ + "idempotency_key": "sess-test-403", + }, + } + + _, err := executor.Execute(context.Background(), auth, req, opts) + if err == nil { + t.Fatal("Execute() expected error, got nil") + } + var sErr statusErr + if !errors.As(err, &sErr) || sErr.code != http.StatusForbidden { + t.Fatalf("Execute() error = %v, want statusErr code=%d", err, http.StatusForbidden) + } + + var diagnostic *log.Entry + for _, entry := range hook.AllEntries() { + if entry.Message == "iflow executor: upstream rejected request" { + diagnostic = entry + break + } + } + if diagnostic == nil { + t.Fatal("expected diagnostic warning log for 403, got none") + } + + if got, ok := diagnostic.Data["status"].(int); !ok || got != http.StatusForbidden { + t.Fatalf("diagnostic status = %#v, want %d", diagnostic.Data["status"], http.StatusForbidden) + } + if got, ok := diagnostic.Data["with_signature"].(bool); !ok || !got { + t.Fatalf("diagnostic with_signature = %#v, want true", diagnostic.Data["with_signature"]) + } + if got, ok := diagnostic.Data["request_has_signature"].(bool); !ok || !got { + t.Fatalf("diagnostic request_has_signature = %#v, want true", diagnostic.Data["request_has_signature"]) + } + if got, ok := diagnostic.Data["request_has_timestamp"].(bool); !ok || !got { + t.Fatalf("diagnostic request_has_timestamp = %#v, want true", diagnostic.Data["request_has_timestamp"]) + } + if got, ok := diagnostic.Data["retrying_without_signature"].(bool); !ok || got { + t.Fatalf("diagnostic retrying_without_signature = %#v, want false", diagnostic.Data["retrying_without_signature"]) + } + + maskedSession, ok := diagnostic.Data["request_session_id"].(string) + if !ok || maskedSession == "" { + t.Fatalf("diagnostic request_session_id = %#v, want masked non-empty string", diagnostic.Data["request_session_id"]) + } + if maskedSession == "sess-test-403" { + t.Fatalf("diagnostic request_session_id should be masked, got %q", maskedSession) + } + if got := diagnostic.Data["response_x_request_id"]; got != "iflow-req-403" { + t.Fatalf("diagnostic response_x_request_id = %#v, want %q", got, "iflow-req-403") + } + if _, exists := diagnostic.Data["request_signature"]; exists { + t.Fatalf("diagnostic should not include raw request_signature field") + } +} + +func TestIFlowExecutorExecute_NoDiagnosticOn500(t *testing.T) { + hook := logtest.NewLocal(log.StandardLogger()) + defer hook.Reset() + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path != "/chat/completions" { + w.WriteHeader(http.StatusNotFound) + _, _ = io.WriteString(w, `{"error":{"message":"unexpected path"}}`) + return + } + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusInternalServerError) + _, _ = io.WriteString(w, `{"error":{"message":"status 500","type":"server_error"}}`) + })) + defer server.Close() + + executor := NewIFlowExecutor(&config.Config{}) + auth := &cliproxyauth.Auth{ + Provider: "iflow", + Attributes: map[string]string{ + "api_key": "test-key", + "base_url": server.URL, + }, + } + req := cliproxyexecutor.Request{ + Model: "glm-5", + Payload: []byte(`{"model":"glm-5","messages":[{"role":"user","content":"hi"}]}`), + } + opts := cliproxyexecutor.Options{ + SourceFormat: sdktranslator.FromString("openai"), + } + + _, err := executor.Execute(context.Background(), auth, req, opts) + if err == nil { + t.Fatal("Execute() expected error, got nil") + } + + for _, entry := range hook.AllEntries() { + if entry.Message == "iflow executor: upstream rejected request" { + t.Fatalf("unexpected diagnostic warning log on 500: %#v", entry.Data) + } + } +} + +func TestIFlowExecutorExecuteStream_EmitsMessageStopWhenUpstreamOmitsDone(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path != "/chat/completions" { + w.WriteHeader(http.StatusNotFound) + _, _ = io.WriteString(w, `{"error":{"message":"unexpected path"}}`) + return + } + + w.Header().Set("Content-Type", "text/event-stream") + _, _ = io.WriteString(w, "data: "+`{"id":"chatcmpl-1","object":"chat.completion.chunk","created":1,"model":"glm-5","choices":[{"index":0,"delta":{"role":"assistant","content":"ok"},"finish_reason":"stop"}]}`+"\n\n") + if flusher, ok := w.(http.Flusher); ok { + flusher.Flush() + } + })) + defer server.Close() + + executor := NewIFlowExecutor(&config.Config{}) + auth := &cliproxyauth.Auth{ + Provider: "iflow", + Attributes: map[string]string{ + "api_key": "test-key", + "base_url": server.URL, + }, + } + req := cliproxyexecutor.Request{ + Model: "glm-5", + Payload: []byte(`{ + "model":"glm-5", + "stream":true, + "max_tokens":64, + "messages":[{"role":"user","content":[{"type":"text","text":"hi"}]}] + }`), + } + opts := cliproxyexecutor.Options{ + SourceFormat: sdktranslator.FromString("claude"), + OriginalRequest: req.Payload, + } + + stream, err := executor.ExecuteStream(context.Background(), auth, req, opts) + if err != nil { + t.Fatalf("ExecuteStream() unexpected error: %v", err) + } + + output := collectIFlowStreamPayload(t, stream) + if !strings.Contains(output, "event: message_stop") { + t.Fatalf("expected message_stop in stream output, got: %s", output) + } +} + +func TestIFlowExecutorExecuteStream_FallbacksWhenUpstreamReturnsJSON(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path != "/chat/completions" { + w.WriteHeader(http.StatusNotFound) + _, _ = io.WriteString(w, `{"error":{"message":"unexpected path"}}`) + return + } + + w.Header().Set("Content-Type", "application/json") + _, _ = io.WriteString(w, `{"id":"chatcmpl-2","object":"chat.completion","created":2,"model":"glm-5","choices":[{"index":0,"message":{"role":"assistant","content":"fallback-ok"},"finish_reason":"stop"}],"usage":{"prompt_tokens":1,"completion_tokens":2,"total_tokens":3}}`) + })) + defer server.Close() + + executor := NewIFlowExecutor(&config.Config{}) + auth := &cliproxyauth.Auth{ + Provider: "iflow", + Attributes: map[string]string{ + "api_key": "test-key", + "base_url": server.URL, + }, + } + req := cliproxyexecutor.Request{ + Model: "glm-5", + Payload: []byte(`{ + "model":"glm-5", + "stream":true, + "max_tokens":64, + "messages":[{"role":"user","content":[{"type":"text","text":"hi"}]}] + }`), + } + opts := cliproxyexecutor.Options{ + SourceFormat: sdktranslator.FromString("claude"), + OriginalRequest: req.Payload, + } + + stream, err := executor.ExecuteStream(context.Background(), auth, req, opts) + if err != nil { + t.Fatalf("ExecuteStream() unexpected error: %v", err) + } + + output := collectIFlowStreamPayload(t, stream) + if !strings.Contains(output, "fallback-ok") { + t.Fatalf("expected fallback content in stream output, got: %s", output) + } + if !strings.Contains(output, "event: message_stop") { + t.Fatalf("expected message_stop in fallback stream output, got: %s", output) + } +} + +func TestIFlowExecutorExecuteStream_ReturnsRateLimitErrorWhenUpstreamReturnsBusinessJSON(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path != "/chat/completions" { + w.WriteHeader(http.StatusNotFound) + _, _ = io.WriteString(w, `{"error":{"message":"unexpected path"}}`) + return + } + + w.Header().Set("Content-Type", "application/json") + _, _ = io.WriteString(w, `{"status":"449","msg":"You exceeded your current rate limit","body":null}`) + })) + defer server.Close() + + executor := NewIFlowExecutor(&config.Config{}) + auth := &cliproxyauth.Auth{ + Provider: "iflow", + Attributes: map[string]string{ + "api_key": "test-key", + "base_url": server.URL, + }, + } + req := cliproxyexecutor.Request{ + Model: "glm-5", + Payload: []byte(`{ + "model":"glm-5", + "stream":true, + "max_tokens":64, + "messages":[{"role":"user","content":[{"type":"text","text":"hi"}]}] + }`), + } + opts := cliproxyexecutor.Options{ + SourceFormat: sdktranslator.FromString("claude"), + OriginalRequest: req.Payload, + } + + stream, err := executor.ExecuteStream(context.Background(), auth, req, opts) + if err != nil { + t.Fatalf("ExecuteStream() unexpected error: %v", err) + } + + var gotErr error + for chunk := range stream { + if len(chunk.Payload) > 0 { + t.Fatalf("expected no payload when business error occurs, got: %s", string(chunk.Payload)) + } + if chunk.Err != nil { + gotErr = chunk.Err + } + } + if gotErr == nil { + t.Fatal("expected stream error, got nil") + } + + var sErr statusErr + if !errors.As(gotErr, &sErr) { + t.Fatalf("expected statusErr, got %T: %v", gotErr, gotErr) + } + if sErr.code != http.StatusTooManyRequests { + t.Fatalf("statusErr.code = %d, want %d", sErr.code, http.StatusTooManyRequests) + } +} + +func TestIFlowExecutorExecute_ReturnsRateLimitErrorWhenUpstreamReturnsBusinessJSON(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path != "/chat/completions" { + w.WriteHeader(http.StatusNotFound) + _, _ = io.WriteString(w, `{"error":{"message":"unexpected path"}}`) + return + } + + w.Header().Set("Content-Type", "application/json") + _, _ = io.WriteString(w, `{"status":"449","msg":"You exceeded your current rate limit","body":null}`) + })) + defer server.Close() + + executor := NewIFlowExecutor(&config.Config{}) + auth := &cliproxyauth.Auth{ + Provider: "iflow", + Attributes: map[string]string{ + "api_key": "test-key", + "base_url": server.URL, + }, + } + req := cliproxyexecutor.Request{ + Model: "glm-5", + Payload: []byte(`{"model":"glm-5","messages":[{"role":"user","content":"hi"}]}`), + } + opts := cliproxyexecutor.Options{ + SourceFormat: sdktranslator.FromString("openai"), + } + + _, err := executor.Execute(context.Background(), auth, req, opts) + if err == nil { + t.Fatal("Execute() expected error, got nil") + } + + var sErr statusErr + if !errors.As(err, &sErr) { + t.Fatalf("expected statusErr, got %T: %v", err, err) + } + if sErr.code != http.StatusTooManyRequests { + t.Fatalf("statusErr.code = %d, want %d", sErr.code, http.StatusTooManyRequests) + } +} + +func TestIFlowExecutorExecuteStream_ReturnsErrorWhenUpstreamSSENetworkErrorWithoutContent(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path != "/chat/completions" { + w.WriteHeader(http.StatusNotFound) + _, _ = io.WriteString(w, `{"error":{"message":"unexpected path"}}`) + return + } + + w.Header().Set("Content-Type", "text/event-stream;charset=UTF-8") + _, _ = io.WriteString(w, "data:"+`{"id":"chatcmpl-neterr","created":1,"model":"glm-5","choices":[{"index":0,"finish_reason":"network_error","delta":{"role":"assistant","content":""}}]}`+"\n\n") + if flusher, ok := w.(http.Flusher); ok { + flusher.Flush() + } + })) + defer server.Close() + + executor := NewIFlowExecutor(&config.Config{}) + auth := &cliproxyauth.Auth{ + Provider: "iflow", + Attributes: map[string]string{ + "api_key": "test-key", + "base_url": server.URL, + }, + } + req := cliproxyexecutor.Request{ + Model: "glm-5", + Payload: []byte(`{ + "model":"glm-5", + "stream":true, + "max_tokens":64, + "messages":[{"role":"user","content":[{"type":"text","text":"hi"}]}] + }`), + } + opts := cliproxyexecutor.Options{ + SourceFormat: sdktranslator.FromString("claude"), + OriginalRequest: req.Payload, + } + + stream, err := executor.ExecuteStream(context.Background(), auth, req, opts) + if err != nil { + t.Fatalf("ExecuteStream() unexpected error: %v", err) + } + + var gotErr error + for chunk := range stream { + if len(chunk.Payload) > 0 { + t.Fatalf("expected no payload when network_error without content occurs, got: %s", string(chunk.Payload)) + } + if chunk.Err != nil { + gotErr = chunk.Err + } + } + if gotErr == nil { + t.Fatal("expected stream error, got nil") + } + + var sErr statusErr + if !errors.As(gotErr, &sErr) { + t.Fatalf("expected statusErr, got %T: %v", gotErr, gotErr) + } + if sErr.code != http.StatusBadGateway { + t.Fatalf("statusErr.code = %d, want %d", sErr.code, http.StatusBadGateway) + } +} + +func collectIFlowStreamPayload(t *testing.T, stream <-chan cliproxyexecutor.StreamChunk) string { + t.Helper() + + var builder strings.Builder + for chunk := range stream { + if chunk.Err != nil { + t.Fatalf("stream chunk returned error: %v", chunk.Err) + } + builder.Write(chunk.Payload) + } + return builder.String() +} diff --git a/sdk/cliproxy/auth/conductor.go b/sdk/cliproxy/auth/conductor.go index 3dfe5faa86..879e155a17 100644 --- a/sdk/cliproxy/auth/conductor.go +++ b/sdk/cliproxy/auth/conductor.go @@ -15,12 +15,12 @@ import ( "time" "github.com/google/uuid" - internalconfig "github.com/router-for-me/CLIProxyAPI/v6/internal/config" - "github.com/router-for-me/CLIProxyAPI/v6/internal/logging" - "github.com/router-for-me/CLIProxyAPI/v6/internal/registry" - "github.com/router-for-me/CLIProxyAPI/v6/internal/thinking" - "github.com/router-for-me/CLIProxyAPI/v6/internal/util" - cliproxyexecutor "github.com/router-for-me/CLIProxyAPI/v6/sdk/cliproxy/executor" + internalconfig "github.com/kooshapari/cliproxyapi-plusplus/v6/internal/config" + "github.com/kooshapari/cliproxyapi-plusplus/v6/internal/logging" + "github.com/kooshapari/cliproxyapi-plusplus/v6/internal/registry" + "github.com/kooshapari/cliproxyapi-plusplus/v6/internal/thinking" + "github.com/kooshapari/cliproxyapi-plusplus/v6/internal/util" + cliproxyexecutor "github.com/kooshapari/cliproxyapi-plusplus/v6/sdk/cliproxy/executor" log "github.com/sirupsen/logrus" ) @@ -83,6 +83,15 @@ func quotaCooldownDisabledForAuth(auth *Auth) bool { return quotaCooldownDisabled.Load() } +func noAuthAvailableError() *Error { + return &Error{ + Code: "auth_unavailable", + Message: "no auth available", + Retryable: true, + HTTPStatus: http.StatusServiceUnavailable, + } +} + // Result captures execution outcome used to adjust auth state. type Result struct { // AuthID references the auth that produced this result. @@ -526,7 +535,7 @@ func (m *Manager) Execute(ctx context.Context, providers []string, req cliproxye if lastErr != nil { return cliproxyexecutor.Response{}, lastErr } - return cliproxyexecutor.Response{}, &Error{Code: "auth_not_found", Message: "no auth available"} + return cliproxyexecutor.Response{}, noAuthAvailableError() } // ExecuteCount performs a non-streaming execution using the configured selector and executor. @@ -557,7 +566,7 @@ func (m *Manager) ExecuteCount(ctx context.Context, providers []string, req clip if lastErr != nil { return cliproxyexecutor.Response{}, lastErr } - return cliproxyexecutor.Response{}, &Error{Code: "auth_not_found", Message: "no auth available"} + return cliproxyexecutor.Response{}, noAuthAvailableError() } // ExecuteStream performs a streaming execution using the configured selector and executor. @@ -588,7 +597,7 @@ func (m *Manager) ExecuteStream(ctx context.Context, providers []string, req cli if lastErr != nil { return nil, lastErr } - return nil, &Error{Code: "auth_not_found", Message: "no auth available"} + return nil, noAuthAvailableError() } func (m *Manager) executeWithFallback( @@ -1282,7 +1291,11 @@ func (m *Manager) MarkResult(ctx context.Context, result Result) { shouldSuspendModel = true setModelQuota = true case 408, 500, 502, 503, 504: - if quotaCooldownDisabledForAuth(auth) { + providerForAvailability := strings.TrimSpace(result.Provider) + if providerForAvailability == "" { + providerForAvailability = strings.TrimSpace(auth.Provider) + } + if quotaCooldownDisabledForAuth(auth) || !m.hasAlternativeAuthForModelLocked(auth.ID, providerForAvailability, result.Model, now) { state.NextRetryAfter = time.Time{} } else { next := now.Add(1 * time.Minute) @@ -1334,6 +1347,48 @@ func ensureModelState(auth *Auth, model string) *ModelState { return state } +func (m *Manager) hasAlternativeAuthForModelLocked(currentAuthID, provider, model string, now time.Time) bool { + if m == nil { + return false + } + providerKey := strings.TrimSpace(strings.ToLower(provider)) + if providerKey == "" { + return false + } + modelKey := strings.TrimSpace(model) + if modelKey != "" { + parsed := thinking.ParseSuffix(modelKey) + if parsed.ModelName != "" { + modelKey = strings.TrimSpace(parsed.ModelName) + } + } + registryRef := registry.GetGlobalRegistry() + for _, candidate := range m.auths { + if candidate == nil || candidate.ID == currentAuthID { + continue + } + if candidate.Disabled || candidate.Status == StatusDisabled { + continue + } + candidateProvider := strings.TrimSpace(strings.ToLower(candidate.Provider)) + if candidateProvider != providerKey { + continue + } + if modelKey != "" && registryRef != nil { + models := registryRef.GetModelsForClient(candidate.ID) + if len(models) > 0 && !registryRef.ClientSupportsModel(candidate.ID, modelKey) { + continue + } + } + blocked, _, _ := isAuthBlockedForModel(candidate, model, now) + if blocked { + continue + } + return true + } + return false +} + func resetModelState(state *ModelState, now time.Time) { if state == nil { return @@ -1691,7 +1746,7 @@ func (m *Manager) pickNext(ctx context.Context, provider, model string, opts cli } if len(candidates) == 0 { m.mu.RUnlock() - return nil, nil, &Error{Code: "auth_not_found", Message: "no auth available"} + return nil, nil, noAuthAvailableError() } selected, errPick := m.selector.Pick(ctx, provider, model, opts, candidates) if errPick != nil { @@ -1768,7 +1823,7 @@ func (m *Manager) pickNextMixed(ctx context.Context, providers []string, model s } if len(candidates) == 0 { m.mu.RUnlock() - return nil, nil, "", &Error{Code: "auth_not_found", Message: "no auth available"} + return nil, nil, "", noAuthAvailableError() } selected, errPick := m.selector.Pick(ctx, "mixed", model, opts, candidates) if errPick != nil { diff --git a/sdk/cliproxy/auth/conductor_overrides_test.go b/sdk/cliproxy/auth/conductor_overrides_test.go index ef39ed829c..1fb2bb05cb 100644 --- a/sdk/cliproxy/auth/conductor_overrides_test.go +++ b/sdk/cliproxy/auth/conductor_overrides_test.go @@ -95,3 +95,83 @@ func TestManager_MarkResult_RespectsAuthDisableCoolingOverride(t *testing.T) { t.Fatalf("expected NextRetryAfter to be zero when disable_cooling=true, got %v", state.NextRetryAfter) } } + +func TestManager_MarkResult_TransientErrorWithoutAlternativeDoesNotCooldown(t *testing.T) { + m := NewManager(nil, nil, nil) + + auth := &Auth{ + ID: "auth-1", + Provider: "iflow", + } + if _, errRegister := m.Register(context.Background(), auth); errRegister != nil { + t.Fatalf("register auth: %v", errRegister) + } + + model := "glm-5" + m.MarkResult(context.Background(), Result{ + AuthID: "auth-1", + Provider: "iflow", + Model: model, + Success: false, + Error: &Error{HTTPStatus: 500, Message: "boom"}, + }) + + updated, ok := m.GetByID("auth-1") + if !ok || updated == nil { + t.Fatalf("expected auth to be present") + } + state := updated.ModelStates[model] + if state == nil { + t.Fatalf("expected model state to be present") + } + if !state.NextRetryAfter.IsZero() { + t.Fatalf("expected NextRetryAfter to be zero when no alternative auth is available, got %v", state.NextRetryAfter) + } + blocked, _, _ := isAuthBlockedForModel(updated, model, time.Now()) + if blocked { + t.Fatalf("expected auth to stay selectable when no alternative auth exists") + } +} + +func TestManager_MarkResult_TransientErrorWithAlternativeKeepsCooldown(t *testing.T) { + m := NewManager(nil, nil, nil) + + authPrimary := &Auth{ + ID: "auth-1", + Provider: "iflow", + } + authSecondary := &Auth{ + ID: "auth-2", + Provider: "iflow", + } + if _, errRegister := m.Register(context.Background(), authPrimary); errRegister != nil { + t.Fatalf("register primary auth: %v", errRegister) + } + if _, errRegister := m.Register(context.Background(), authSecondary); errRegister != nil { + t.Fatalf("register secondary auth: %v", errRegister) + } + + model := "glm-5" + m.MarkResult(context.Background(), Result{ + AuthID: "auth-1", + Provider: "iflow", + Model: model, + Success: false, + Error: &Error{HTTPStatus: 500, Message: "boom"}, + }) + + updated, ok := m.GetByID("auth-1") + if !ok || updated == nil { + t.Fatalf("expected primary auth to be present") + } + state := updated.ModelStates[model] + if state == nil { + t.Fatalf("expected model state to be present") + } + if state.NextRetryAfter.IsZero() { + t.Fatalf("expected NextRetryAfter to be set when alternative auth exists") + } + if !state.NextRetryAfter.After(time.Now()) { + t.Fatalf("expected NextRetryAfter to be in the future, got %v", state.NextRetryAfter) + } +} diff --git a/sdk/cliproxy/auth/selector.go b/sdk/cliproxy/auth/selector.go index ecae87c3f8..fb01f585d2 100644 --- a/sdk/cliproxy/auth/selector.go +++ b/sdk/cliproxy/auth/selector.go @@ -240,7 +240,12 @@ func getAvailableAuths(auths []*Auth, provider, model string, now time.Time) ([] } return nil, newModelCooldownError(model, providerForError, resetIn) } - return nil, &Error{Code: "auth_unavailable", Message: "no auth available"} + return nil, &Error{ + Code: "auth_unavailable", + Message: "no auth available", + Retryable: true, + HTTPStatus: http.StatusServiceUnavailable, + } } bestPriority := 0 diff --git a/sdk/cliproxy/auth/selector_test.go b/sdk/cliproxy/auth/selector_test.go index 79431a9ada..63a140333f 100644 --- a/sdk/cliproxy/auth/selector_test.go +++ b/sdk/cliproxy/auth/selector_test.go @@ -281,6 +281,42 @@ func TestSelectorPick_AllCooldownReturnsModelCooldownError(t *testing.T) { }) } +func TestSelectorPick_AllUnavailableReturnsServiceUnavailable(t *testing.T) { + t.Parallel() + + model := "test-model" + now := time.Now() + auths := []*Auth{ + { + ID: "a", + ModelStates: map[string]*ModelState{ + model: { + Status: StatusError, + Unavailable: true, + NextRetryAfter: now.Add(30 * time.Second), + }, + }, + }, + } + + selector := &FillFirstSelector{} + _, err := selector.Pick(context.Background(), "iflow", model, cliproxyexecutor.Options{}, auths) + if err == nil { + t.Fatalf("Pick() error = nil") + } + + var authErr *Error + if !errors.As(err, &authErr) { + t.Fatalf("Pick() error = %T, want *Error", err) + } + if authErr.Code != "auth_unavailable" { + t.Fatalf("error code = %q, want %q", authErr.Code, "auth_unavailable") + } + if authErr.StatusCode() != http.StatusServiceUnavailable { + t.Fatalf("StatusCode() = %d, want %d", authErr.StatusCode(), http.StatusServiceUnavailable) + } +} + func TestIsAuthBlockedForModel_UnavailableWithoutNextRetryIsNotBlocked(t *testing.T) { t.Parallel() From 02fd1927dad08da8bc486d65c744bb42d1f3847a Mon Sep 17 00:00:00 2001 From: Koosha Paridehpour Date: Fri, 27 Feb 2026 04:19:51 -0700 Subject: [PATCH 12/12] chore(iflow): address review feedback on body read and id extraction Follow-up for upstream PR #1650. Addresses review feedback on iflow executor body read handling and session ID extraction. Co-Authored-By: Claude Opus 4.6 --- internal/runtime/executor/iflow_executor.go | 21 ++++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) diff --git a/internal/runtime/executor/iflow_executor.go b/internal/runtime/executor/iflow_executor.go index 70a0f695f5..b8fc35d411 100644 --- a/internal/runtime/executor/iflow_executor.go +++ b/internal/runtime/executor/iflow_executor.go @@ -557,7 +557,14 @@ func (e *IFlowExecutor) executeChatCompletionsRequest( return httpResp, nil } - firstBody, _ := io.ReadAll(httpResp.Body) + firstBody, errRead := io.ReadAll(httpResp.Body) + if errRead != nil { + recordAPIResponseError(ctx, e.cfg, errRead) + logWithRequestID(ctx). + WithError(errRead). + Warn("iflow executor: failed to read 406 response body before unsigned retry") + firstBody = nil + } if errClose := httpResp.Body.Close(); errClose != nil { log.Errorf("iflow executor: close response body error: %v", errClose) } @@ -639,9 +646,9 @@ func iflowRequestIDs(opts cliproxyexecutor.Options, body []byte) (sessionID, con } if conversationID == "" && len(opts.OriginalRequest) > 0 { conversationID = strings.TrimSpace(gjson.GetBytes(opts.OriginalRequest, "conversation_id").String()) - } - if conversationID == "" && len(opts.OriginalRequest) > 0 { - conversationID = strings.TrimSpace(gjson.GetBytes(opts.OriginalRequest, "conversationId").String()) + if conversationID == "" { + conversationID = strings.TrimSpace(gjson.GetBytes(opts.OriginalRequest, "conversationId").String()) + } } if sessionID == "" { @@ -649,9 +656,9 @@ func iflowRequestIDs(opts cliproxyexecutor.Options, body []byte) (sessionID, con } if sessionID == "" && len(opts.OriginalRequest) > 0 { sessionID = strings.TrimSpace(gjson.GetBytes(opts.OriginalRequest, "session_id").String()) - } - if sessionID == "" && len(opts.OriginalRequest) > 0 { - sessionID = strings.TrimSpace(gjson.GetBytes(opts.OriginalRequest, "sessionId").String()) + if sessionID == "" { + sessionID = strings.TrimSpace(gjson.GetBytes(opts.OriginalRequest, "sessionId").String()) + } } // Keep session id stable and non-empty for signature generation.