Skip to content
40 changes: 34 additions & 6 deletions astrbot/core/provider/sources/anthropic_source.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import base64
import json
from collections.abc import AsyncGenerator
from typing import Literal
from typing import Any, Literal

import anthropic
import httpx
Expand Down Expand Up @@ -104,15 +104,35 @@ def _init_api_key(self, provider_config: dict) -> None:
api_key=self.chosen_api_key,
timeout=self.timeout,
base_url=self.base_url,
default_headers=self.custom_headers,
http_client=self._create_http_client(provider_config),
)

def _create_http_client(self, provider_config: dict) -> httpx.AsyncClient:
"""创建带代理的 HTTP 客户端,使用系统 SSL 证书"""
def _create_http_client(self, provider_config: dict) -> httpx.AsyncClient | None:
"""Create an HTTP client with optional proxy and system SSL trust store.

The Anthropic SDK validates ``http_client`` with
``isinstance(..., httpx.AsyncClient)`` against its own ``httpx`` import.
When multiple ``httpx`` installations are present on ``sys.path``
(e.g. bundled Python + system Python), constructing the client from a
different ``httpx`` module makes that check fail. We therefore prefer
the SDK's own ``httpx`` module when available.
"""
proxy = provider_config.get("proxy", "")
if not proxy:
return None
httpx_module: Any = httpx
try:
from anthropic import _base_client as anthropic_base_client

httpx_module = getattr(anthropic_base_client, "httpx", httpx)
except ImportError:
pass
return create_proxy_client(
"Anthropic",
provider_config.get("proxy", ""),
proxy,
headers=self.custom_headers,
httpx_module=httpx_module,
)

def _apply_thinking_config(self, payloads: dict) -> None:
Expand Down Expand Up @@ -591,7 +611,11 @@ async def text_chat(

# Anthropic has a different way of handling system prompts
if system_prompt:
payloads["system"] = system_prompt
payloads["system"] = (
[{"type": "text", "text": system_prompt}]
if isinstance(system_prompt, str)
else system_prompt
)

llm_response = None
try:
Expand Down Expand Up @@ -654,7 +678,11 @@ async def text_chat_stream(

# Anthropic has a different way of handling system prompts
if system_prompt:
payloads["system"] = system_prompt
payloads["system"] = (
[{"type": "text", "text": system_prompt}]
if isinstance(system_prompt, str)
else system_prompt
)

async for llm_response in self._query_stream(payloads, func_tool):
yield llm_response
Expand Down
4 changes: 2 additions & 2 deletions astrbot/core/utils/network_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,8 @@ def create_proxy_client(
provider_label: The provider name for log prefix (e.g., "OpenAI", "Gemini")
proxy: The proxy address (e.g., "http://127.0.0.1:7890"), or None/empty
headers: Optional custom headers to include in every request
verify: Optional override for TLS verification. Defaults to the shared
system SSL context when not provided.
verify: Optional override for TLS verification. Defaults to the hybrid
SSL context (system store + certifi) when not provided.
httpx_module: Optional httpx module to construct AsyncClient from. This is
useful when a provider SDK performs isinstance checks against its own
httpx import.
Expand Down
163 changes: 154 additions & 9 deletions tests/test_anthropic_kimi_code_provider.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import httpx
import builtins

import pytest

import astrbot.core.provider.sources.anthropic_source as anthropic_source
Expand All @@ -15,7 +16,7 @@ async def close(self):
return None


def test_anthropic_provider_injects_custom_headers_into_http_client(monkeypatch):
def test_anthropic_provider_passes_custom_headers_via_default_headers(monkeypatch):
monkeypatch.setattr(anthropic_source, "AsyncAnthropic", _FakeAsyncAnthropic)

provider = anthropic_source.ProviderAnthropic(
Expand All @@ -36,9 +37,13 @@ def test_anthropic_provider_injects_custom_headers_into_http_client(monkeypatch)
"User-Agent": "custom-agent/1.0",
"X-Test-Header": "123",
}
assert isinstance(provider.client.kwargs["http_client"], httpx.AsyncClient)
assert provider.client.kwargs["http_client"].headers["User-Agent"] == "custom-agent/1.0"
assert provider.client.kwargs["http_client"].headers["X-Test-Header"] == "123"
# Custom headers are forwarded via the SDK's `default_headers` parameter,
# not via a custom http_client (which is reserved for proxy configuration).
assert provider.client.kwargs["default_headers"] == {
"User-Agent": "custom-agent/1.0",
"X-Test-Header": "123",
}
assert provider.client.kwargs["http_client"] is None


def test_kimi_code_provider_sets_defaults_and_preserves_custom_headers(monkeypatch):
Expand All @@ -60,10 +65,10 @@ def test_kimi_code_provider_sets_defaults_and_preserves_custom_headers(monkeypat
"User-Agent": kimi_code_source.KIMI_CODE_USER_AGENT,
"X-Trace-Id": "trace-1",
}
assert provider.client.kwargs["http_client"].headers["User-Agent"] == (
kimi_code_source.KIMI_CODE_USER_AGENT
)
assert provider.client.kwargs["http_client"].headers["X-Trace-Id"] == "trace-1"
assert provider.client.kwargs["default_headers"] == {
"User-Agent": kimi_code_source.KIMI_CODE_USER_AGENT,
"X-Trace-Id": "trace-1",
}


def test_kimi_code_provider_restores_required_user_agent_when_blank(monkeypatch):
Expand All @@ -84,6 +89,146 @@ def test_kimi_code_provider_restores_required_user_agent_when_blank(monkeypatch)
}


def test_create_http_client_returns_none_when_no_proxy(monkeypatch):
def fail_if_called(*args, **kwargs):
raise AssertionError("create_proxy_client should not be called without a proxy")

monkeypatch.setattr(anthropic_source, "create_proxy_client", fail_if_called)

provider = anthropic_source.ProviderAnthropic.__new__(
anthropic_source.ProviderAnthropic
)
provider.custom_headers = {"X-Trace-Id": "abc"}

assert provider._create_http_client({"proxy": ""}) is None


def test_create_http_client_uses_anthropic_httpx_module(monkeypatch):
captured: dict[str, object] = {}

def fake_create_proxy_client(
provider_label: str,
proxy: str | None = None,
headers: dict[str, str] | None = None,
verify=None,
httpx_module=None,
):
captured["provider_label"] = provider_label
captured["proxy"] = proxy
captured["headers"] = headers
captured["httpx_module"] = httpx_module
return object()

monkeypatch.setattr(
anthropic_source, "create_proxy_client", fake_create_proxy_client
)

provider = anthropic_source.ProviderAnthropic.__new__(
anthropic_source.ProviderAnthropic
)
provider.custom_headers = {"X-Trace-Id": "trace-1"}
provider._create_http_client({"proxy": "http://127.0.0.1:7890"})

from anthropic import _base_client as anthropic_base_client

assert captured["provider_label"] == "Anthropic"
assert captured["proxy"] == "http://127.0.0.1:7890"
assert captured["headers"] == {"X-Trace-Id": "trace-1"}
assert captured["httpx_module"] is anthropic_base_client.httpx


def test_create_http_client_falls_back_to_global_httpx_module(monkeypatch):
captured: dict[str, object] = {}

def fake_create_proxy_client(
provider_label: str,
proxy: str | None = None,
headers: dict[str, str] | None = None,
verify=None,
httpx_module=None,
):
captured["httpx_module"] = httpx_module
return object()

real_import = builtins.__import__

def fake_import(name, globals=None, locals=None, fromlist=(), level=0):
if name == "anthropic" and fromlist:
raise ImportError("missing anthropic._base_client")
return real_import(name, globals, locals, fromlist, level)

monkeypatch.setattr(
anthropic_source, "create_proxy_client", fake_create_proxy_client
)
monkeypatch.setattr(builtins, "__import__", fake_import)

provider = anthropic_source.ProviderAnthropic.__new__(
anthropic_source.ProviderAnthropic
)
provider.custom_headers = None
provider._create_http_client({"proxy": "http://127.0.0.1:7890"})

assert captured["httpx_module"] is anthropic_source.httpx


@pytest.mark.asyncio
async def test_text_chat_wraps_string_system_prompt_as_list(monkeypatch):
monkeypatch.setattr(anthropic_source, "AsyncAnthropic", _FakeAsyncAnthropic)

provider = anthropic_source.ProviderAnthropic(
provider_config={
"id": "anthropic-test",
"type": "anthropic_chat_completion",
"model": "claude-test",
"key": ["test-key"],
},
provider_settings={},
)

captured_payloads: dict[str, object] = {}

async def fake_query(payloads, tools):
captured_payloads.update(payloads)
return LLMResponse(role="assistant", completion_text="ok")

monkeypatch.setattr(provider, "_query", fake_query)

await provider.text_chat(prompt="hello", system_prompt="You are helpful.")

assert captured_payloads["system"] == [{"type": "text", "text": "You are helpful."}]


@pytest.mark.asyncio
async def test_text_chat_passes_through_list_system_prompt(monkeypatch):
monkeypatch.setattr(anthropic_source, "AsyncAnthropic", _FakeAsyncAnthropic)

provider = anthropic_source.ProviderAnthropic(
provider_config={
"id": "anthropic-test",
"type": "anthropic_chat_completion",
"model": "claude-test",
"key": ["test-key"],
},
provider_settings={},
)

captured_payloads: dict[str, object] = {}

async def fake_query(payloads, tools):
captured_payloads.update(payloads)
return LLMResponse(role="assistant", completion_text="ok")

monkeypatch.setattr(provider, "_query", fake_query)

structured_system = [
{"type": "text", "text": "Persona block."},
{"type": "text", "text": "Style guide."},
]
await provider.text_chat(prompt="hello", system_prompt=structured_system)

assert captured_payloads["system"] == structured_system


def test_anthropic_empty_output_raises_empty_model_output_error():
llm_response = LLMResponse(role="assistant")

Expand Down