From 8b9e47450819ca4fb075afb6b11b0bc04069f67a Mon Sep 17 00:00:00 2001 From: touale <136764239@qq.com> Date: Mon, 22 Dec 2025 18:56:35 +0800 Subject: [PATCH 01/10] feat: add proxy function support --- pyproject.toml | 1 + pytest.ini | 3 ++ src/framex/adapter/base.py | 2 +- src/framex/plugin/__init__.py | 12 +++-- src/framex/plugin/on.py | 47 +++++++++++++++-- src/framex/plugins/proxy/__init__.py | 76 ++++++++++++++++++++++++++++ src/framex/plugins/proxy/config.py | 13 ++++- src/framex/plugins/proxy/model.py | 14 +++++ src/framex/utils.py | 22 ++++++++ tests/mock.py | 11 ++++ tests/test_plugin.py | 13 ----- tests/test_plugins.py | 47 ++++++++++++++++- uv.lock | 17 ++++++- 13 files changed, 251 insertions(+), 27 deletions(-) create mode 100644 src/framex/plugins/proxy/model.py diff --git a/pyproject.toml b/pyproject.toml index d94dea9..29b8efc 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -8,6 +8,7 @@ requires-python = ">=3.11" dependencies = [ "aiocache>=0.12.3", "click>=8.1.8", + "cloudpickle>=3.1.2", "fastapi>=0.115.13", "httpx>=0.28.1", "loguru>=0.7.3", diff --git a/pytest.ini b/pytest.ini index 45a0d58..c1095db 100644 --- a/pytest.ini +++ b/pytest.ini @@ -27,9 +27,12 @@ env = plugins__proxy__auth__general_auth_keys=["i_am_proxy_general_auth_keys"] plugins__proxy__auth__auth_urls=["/proxy/mock/auth/*"] plugins__proxy__auth__special_auth_keys={{"/proxy/mock/auth/sget":["i_am_proxy_special_auth_keys"]}} + plugins__proxy__proxy_functions={{"http://localhost:9527":["tests.test_plugins.remote_exchange_key_value"]}} ; plugins__proxy__force_stream_apis=[] sentry__enable=false test__silent=true auth__general_auth_keys=["i_am_general_auth_keys"] auth__auth_urls=["/api/v1/echo"] + + asyncio_mode = auto diff --git a/src/framex/adapter/base.py b/src/framex/adapter/base.py index bbd8c8a..4446644 100644 --- a/src/framex/adapter/base.py +++ b/src/framex/adapter/base.py @@ -29,7 +29,7 @@ def to_deployment(self, cls: type, **kwargs: Any) -> type: # noqa: ARG002 async def call_func(self, api: PluginApi, **kwargs: Any) -> Any: func = self.get_handle_func(api.deployment_name, api.func_name) stream = api.stream - if api.call_type == ApiType.PROXY: + if api.call_type == ApiType.PROXY and api.api: kwargs["proxy_path"] = api.api stream = await self._check_is_gen_api(api.api) if stream: diff --git a/src/framex/plugin/__init__.py b/src/framex/plugin/__init__.py index ba28e6f..ad1303e 100644 --- a/src/framex/plugin/__init__.py +++ b/src/framex/plugin/__init__.py @@ -76,14 +76,17 @@ def init_all_deployments(enable_proxy: bool) -> list[DeploymentHandle]: async def call_plugin_api( - api_name: str, + api_name: str | PluginApi, interval_apis: dict[str, PluginApi] | None = None, **kwargs: Any, ) -> Any: - api = interval_apis.get(api_name) if interval_apis else _manager.get_api(api_name) + if isinstance(api_name, PluginApi): + api: PluginApi | None = api_name + elif isinstance(api_name, str): + api = interval_apis.get(api_name) if interval_apis else _manager.get_api(api_name) use_proxy = False if not api: - if api_name.startswith("/") and settings.server.enable_proxy: + if isinstance(api_name, str) and api_name.startswith("/") and settings.server.enable_proxy: api = PluginApi( api=api_name, deployment_name=PROXY_PLUGIN_NAME, @@ -116,7 +119,8 @@ async def call_plugin_api( return result.model_dump(by_alias=True) if use_proxy: if not isinstance(result, dict): - raise RuntimeError(f"Proxy API {api_name} returned non-dict result: {type(result)}") + # logger.warning(f"Proxy API {api_name} returned non-dict result: {type(result)}") + return result if "status" not in result: raise RuntimeError(f"Proxy API {api_name} returned invalid response: missing 'status' field") res = result.get("data") diff --git a/src/framex/plugin/on.py b/src/framex/plugin/on.py index c04399d..7b08361 100644 --- a/src/framex/plugin/on.py +++ b/src/framex/plugin/on.py @@ -1,3 +1,4 @@ +import functools import inspect import types from collections.abc import Callable @@ -6,11 +7,11 @@ from pydantic import BaseModel from framex.adapter import get_adapter -from framex.consts import API_STR +from framex.consts import API_STR, PROXY_PLUGIN_NAME from framex.plugin.model import ApiType, PluginApi, PluginDeployment -from framex.utils import extract_method_params, plugin_to_deployment_name +from framex.utils import cache_decode, cache_encode, extract_method_params, plugin_to_deployment_name -from . import _current_plugin +from . import _current_plugin, call_plugin_api def on_register(**kwargs: Any) -> Callable[[type], type]: @@ -97,6 +98,46 @@ def wrapper(func: Callable) -> Callable: return wrapper +def on_proxy(proxy_only: bool = False) -> Callable: + def decorator(func: Callable) -> Callable: + from framex.config import settings + + if not settings.server.enable_proxy: + return func + + is_registered = proxy_only + full_func_name = f"{func.__module__}.{func.__name__}" + + @functools.wraps(func) + async def wrapper(*args: Any, **kwargs: Any) -> Any: + nonlocal is_registered + + if args: + raise TypeError( + f"The proxy function '{func.__name__}' only supports keyword arguments. " + f"Please call it using the syntax {func.__name__}(param=value)." + ) + + if not is_registered: + api_reg = PluginApi( + deployment_name=PROXY_PLUGIN_NAME, call_type=ApiType.PROXY, func_name="register_proxy_function" + ) + await call_plugin_api(api_reg, None, func_name=full_func_name, func_callable=func) + is_registered = True + + api_call = PluginApi( + deployment_name=PROXY_PLUGIN_NAME, call_type=ApiType.PROXY, func_name="call_proxy_function" + ) + encode_kwargs = cache_encode(kwargs) + encode_func_name = cache_encode(full_func_name) + res = await call_plugin_api(api_call, None, func_name=encode_func_name, data=encode_kwargs) + return cache_decode(res) + + return wrapper + + return decorator + + def remote() -> Callable: def wrapper(func: Callable) -> Any: adapter = get_adapter() diff --git a/src/framex/plugins/proxy/__init__.py b/src/framex/plugins/proxy/__init__.py index 178ac0a..dd7bdca 100644 --- a/src/framex/plugins/proxy/__init__.py +++ b/src/framex/plugins/proxy/__init__.py @@ -16,6 +16,8 @@ from framex.plugin.on import on_request from framex.plugins.proxy.builder import create_pydantic_model, type_map from framex.plugins.proxy.config import ProxyPluginConfig, settings +from framex.plugins.proxy.model import ProxyFunc, ProxyFuncHttpBody +from framex.utils import cache_decode, cache_encode __plugin_meta__ = PluginMetadata( name="proxy", @@ -33,7 +35,10 @@ class ProxyPlugin(BasePlugin): def __init__(self, **kwargs: Any) -> None: self.func_map: dict[str, Any] = {} + self.proxy_func_map: dict[str, ProxyFunc] = {} self.time_out = settings.timeout + self.init_proxy_func_route = False + self.proxy_func_http_path = "/proxy/remote" super().__init__(**kwargs) @override @@ -41,8 +46,16 @@ async def on_start(self) -> None: if not settings.proxy_urls: # pragma: no cover logger.opt(colors=True).warning("No url provided, skipping proxy plugin") return + for url in settings.proxy_urls: await self._parse_openai_docs(url) + + for url, funcs in settings.proxy_functions.items(): + for func in funcs: + await self._parse_proxy_function(func, url) + else: + logger.debug("No proxy functions to register") + logger.success(f"Succeeded to parse openai docs form {url}") @on_request(call_type=ApiType.FUNC) @@ -126,6 +139,45 @@ async def _parse_openai_docs(self, url: str) -> None: # Proxy api to map self.func_map[path] = func + async def register_proxy_func_route( + self, + ) -> None: + adapter: BaseAdapter = get_adapter() + # Register router + plugin_api = PluginApi( + deployment_name=BACKEND_NAME, + func_name="register_route", + ) + + handle = adapter.get_handle(PROXY_PLUGIN_NAME) + await adapter.call_func( + plugin_api, + path=self.proxy_func_http_path, + methods=["POST"], + func_name=self.call_proxy_function, + params=[("data", str), ("func_name", str)], + handle=handle, + stream=False, + direct_output=True, + tags=[__plugin_meta__.name], + ) + + async def _parse_proxy_function(self, func_name: str, url: str) -> None: + logger.opt(colors=True).debug(f"Found proxy function {url}") + + params: list[tuple[str, type]] = [("model", ProxyFuncHttpBody)] + + if auth_api_key := settings.auth.get_auth_keys(self.proxy_func_http_path): + headers = {"Authorization": auth_api_key[0]} # Use the first auth key set + logger.debug(f"Proxy func({self.proxy_func_http_path}) requires auth") + else: + headers = None + + func = self._create_dynamic_method( + func_name, "POST", params, f"{url}{self.proxy_func_http_path}", stream=False, headers=headers + ) + await self.register_proxy_function(func_name, func, is_remote=True) + async def __call__(self, proxy_path: str, **kwargs: Any) -> Any: if func := self.func_map.get(proxy_path): return await func(**kwargs) @@ -205,3 +257,27 @@ async def dynamic_method(**kwargs: Any) -> AsyncGenerator[str, None] | dict[str, @override def _post_call_remote_api_hook(self, data: Any) -> Any: return data.get("data") or data + + async def register_proxy_function( + self, func_name: str, func_callable: Callable[..., Any], is_remote: bool = False + ) -> bool: + if not self.init_proxy_func_route: + await self.register_proxy_func_route() + self.init_proxy_func_route = True + + logger.info(f"Registering proxy function: {func_name}") + self.proxy_func_map[func_name] = ProxyFunc(func=func_callable, is_remote=is_remote) + return True + + async def call_proxy_function(self, func_name: str, data: str) -> str: + decode_func_name = cache_decode(func_name) + if proxy_func := self.proxy_func_map.get(decode_func_name): + if proxy_func.is_remote: + kwargs = {"model": ProxyFuncHttpBody(data=data, func_name=func_name)} + else: + kwargs = cache_decode(data) + tag = "remote" if proxy_func.is_remote else "local" + logger.info(f"Calling proxy function[{tag}]: {decode_func_name}, kwargs: {kwargs}") + res = await proxy_func.func(**kwargs) + return cache_encode(res) + raise RuntimeError(f"Proxy function({decode_func_name}) not registered") diff --git a/src/framex/plugins/proxy/config.py b/src/framex/plugins/proxy/config.py index 1b37b4d..aa7324e 100644 --- a/src/framex/plugins/proxy/config.py +++ b/src/framex/plugins/proxy/config.py @@ -1,6 +1,6 @@ -from typing import Any +from typing import Any, Self -from pydantic import BaseModel +from pydantic import BaseModel, model_validator from framex.config import AuthConfig from framex.plugin import get_plugin_config @@ -16,6 +16,8 @@ class ProxyPluginConfig(BaseModel): auth: AuthConfig = AuthConfig() + proxy_functions: dict[str, list[str]] = {} + def is_white_url(self, url: str) -> bool: """Check if a URL is protected by any auth_urls rule.""" if self.white_list == []: # pragma: no cover @@ -27,5 +29,12 @@ def is_white_url(self, url: str) -> bool: return True return False + @model_validator(mode="after") + def validate_proxy_functions(self) -> Self: + for url in self.proxy_functions: + if url not in self.proxy_urls: + raise ValueError(f"proxy_functions url '{url}' is not covered by any proxy_urls rule") + return self + settings = get_plugin_config("proxy", ProxyPluginConfig) diff --git a/src/framex/plugins/proxy/model.py b/src/framex/plugins/proxy/model.py new file mode 100644 index 0000000..c1755d4 --- /dev/null +++ b/src/framex/plugins/proxy/model.py @@ -0,0 +1,14 @@ +from collections.abc import Callable +from typing import Any + +from pydantic import BaseModel + + +class ProxyFunc(BaseModel): + func: Callable[..., Any] + is_remote: bool = False + + +class ProxyFuncHttpBody(BaseModel): + data: str + func_name: str diff --git a/src/framex/utils.py b/src/framex/utils.py index e67808f..2fe2859 100644 --- a/src/framex/utils.py +++ b/src/framex/utils.py @@ -1,11 +1,14 @@ import inspect import json import re +import zlib from collections.abc import Callable from enum import StrEnum +from itertools import cycle from pathlib import Path from typing import Any +import cloudpickle # type: ignore[import-untyped] from pydantic import BaseModel @@ -53,3 +56,22 @@ def make_stream_event(event_type: StreamEnventType | str, data: str | dict[str, elif isinstance(data, str): data = {"content": data} return f"event: {event_type}\ndata: {json.dumps(data, ensure_ascii=False)}\n\n" + + +def xor_crypt(data: bytes, key: str = "01234567890abcdefghijklmnopqrstuvwxyz") -> bytes: + return bytes(a ^ b for a, b in zip(data, cycle(key.encode()))) + + +def cache_encode(data: Any) -> str: + packed = cloudpickle.dumps(data) + + compressed = zlib.compress(packed) + obfuscated = xor_crypt(compressed) + return obfuscated.hex() + + +def cache_decode(data: str) -> Any: + raw = bytes.fromhex(data) + de_obfuscated = xor_crypt(raw) + decompressed = zlib.decompress(de_obfuscated) + return cloudpickle.loads(decompressed) # nosec S301 diff --git a/tests/mock.py b/tests/mock.py index 0bf5812..d4a43d1 100644 --- a/tests/mock.py +++ b/tests/mock.py @@ -1,6 +1,7 @@ from typing import Any from unittest.mock import MagicMock +from framex.utils import cache_decode from tests.consts import MOCK_RESPONSE @@ -64,6 +65,16 @@ async def mock_request(_, method: str, url: str, **kwargs: Any): "method": "GET", "params": params, } + elif url.endswith("/proxy/remote") and method == "POST": + json_data = kwargs.get("json", {}) + func_name = json_data.get("func_name") + data = json_data.get("data", {}) + assert func_name + assert data + decode_func_name = cache_decode(func_name) + decode_data = cache_decode(data) + resp.json.return_value = {"result": decode_func_name, "data": decode_data} + else: raise AssertionError(f"Unexpected request: {method} {url}") diff --git a/tests/test_plugin.py b/tests/test_plugin.py index 9377ad7..46784ab 100644 --- a/tests/test_plugin.py +++ b/tests/test_plugin.py @@ -238,19 +238,6 @@ async def test_call_plugin_api_with_multiple_kwargs(self): assert call_kwargs["b"] == "test" assert call_kwargs["c"] is True - @pytest.mark.asyncio - async def test_call_plugin_api_with_proxy_non_dict_result(self): - """Test proxy API call raises when result is not a dict.""" - with ( - patch("framex.plugin._manager.get_api", return_value=None), - patch("framex.plugin.settings.server.enable_proxy", True), - patch("framex.plugin.get_adapter") as mock_adapter, - patch("framex.plugin.logger"), - ): - mock_adapter.return_value.call_func = AsyncMock(return_value="not_a_dict") - with pytest.raises(RuntimeError, match="returned non-dict result"): - await call_plugin_api("/external/api") - @pytest.mark.asyncio async def test_call_plugin_api_with_proxy_missing_status(self): """Test proxy API call raises when status field is missing.""" diff --git a/tests/test_plugins.py b/tests/test_plugins.py index 5084304..3c1d0b1 100644 --- a/tests/test_plugins.py +++ b/tests/test_plugins.py @@ -5,6 +5,8 @@ import pytest from pydantic import BaseModel, ConfigDict, Field +from framex.plugin.on import on_proxy + class MatchMataConfig(BaseModel): disable_duplicate_school_detect: bool = Field(False, alias="disableDuplicateSchoolDetect") @@ -148,5 +150,46 @@ def test_resolve_default(): assert "Cannot instantiate default" in str(exc_info.value) -# if __name__ == "__main__": -# test_resolve_annotation() +class SubModel(BaseModel): + id: int + name: str + + +class ExchangeModel(BaseModel): + id: str + name: int + model: SubModel + + +@on_proxy() +async def lcoal_exchange_key_value(a_str: str, b_int: int, c_model: ExchangeModel) -> Any: + return {"a_str": a_str, "b_int": b_int, "c_model": c_model} + + +@on_proxy(proxy_only=True) +async def remote_exchange_key_value(a_str: str, b_int: int, c_model: ExchangeModel) -> Any: # noqa: ARG001 + raise RuntimeError("This function should be called remotely") + + +async def test_on_proxy_local_call(): + res = await lcoal_exchange_key_value( + a_str="test", b_int=123, c_model=ExchangeModel(id="id_1", name=100, model=SubModel(id=1, name="sub_name")) + ) + assert res["a_str"] == "test" + assert res["b_int"] == 123 + assert ( + res["c_model"].model_dump() + == ExchangeModel(id="id_1", name=100, model=SubModel(id=1, name="sub_name")).model_dump() + ) + + +async def test_on_proxy_remote_call(): + res = await remote_exchange_key_value( + a_str="test", b_int=123, c_model=ExchangeModel(id="id_1", name=100, model=SubModel(id=1, name="sub_name")) + ) + assert res["result"] == "tests.test_plugins.remote_exchange_key_value" + assert res["data"] == { + "a_str": "test", + "b_int": 123, + "c_model": ExchangeModel(id="id_1", name=100, model=SubModel(id=1, name="sub_name")), + } diff --git a/uv.lock b/uv.lock index 2b09d4d..8cbd8a7 100644 --- a/uv.lock +++ b/uv.lock @@ -1,4 +1,5 @@ version = 1 +revision = 1 requires-python = ">=3.11" resolution-markers = [ "python_full_version >= '3.13' and platform_python_implementation == 'PyPy'", @@ -300,7 +301,7 @@ name = "click" version = "8.1.8" source = { registry = "https://pypi.org/simple" } dependencies = [ - { name = "colorama", marker = "platform_system == 'Windows'" }, + { name = "colorama", marker = "sys_platform == 'win32'" }, ] sdist = { url = "https://files.pythonhosted.org/packages/b9/2e/0090cbf739cee7d23781ad4b89a9894a41538e4fcf4c31dcdd705b78eb8b/click-8.1.8.tar.gz", hash = "sha256:ed53c9d8990d83c2a27deae68e4ee337473f6330c040a31d4225c9574d16096a", size = 226593 } wheels = [ @@ -319,6 +320,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/75/45/54bb2d8d4138964a94bef6e9afe48b0be4705ba66ac442ae7d8a8dc4ffef/click_option_group-0.5.9-py3-none-any.whl", hash = "sha256:ad2599248bd373e2e19bec5407967c3eec1d0d4fc4a5e77b08a0481e75991080", size = 11553 }, ] +[[package]] +name = "cloudpickle" +version = "3.1.2" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/27/fb/576f067976d320f5f0114a8d9fa1215425441bb35627b1993e5afd8111e5/cloudpickle-3.1.2.tar.gz", hash = "sha256:7fda9eb655c9c230dab534f1983763de5835249750e85fbcef43aaa30a9a2414", size = 22330 } +wheels = [ + { url = "https://files.pythonhosted.org/packages/88/39/799be3f2f0f38cc727ee3b4f1445fe6d5e4133064ec2e4115069418a5bb6/cloudpickle-3.1.2-py3-none-any.whl", hash = "sha256:9acb47f6afd73f60dc1df93bb801b472f05ff42fa6c84167d25cb206be1fbf4a", size = 22228 }, +] + [[package]] name = "colorama" version = "0.4.6" @@ -333,7 +343,7 @@ name = "colorful" version = "0.5.8" source = { registry = "https://pypi.org/simple" } dependencies = [ - { name = "colorama", marker = "platform_system == 'Windows'" }, + { name = "colorama", marker = "sys_platform == 'win32'" }, ] sdist = { url = "https://files.pythonhosted.org/packages/82/31/109ef4bedeb32b4202e02ddb133162457adc4eb890a9ed9c05c9dd126ed0/colorful-0.5.8.tar.gz", hash = "sha256:bb16502b198be2f1c42ba3c52c703d5f651d826076817185f0294c1a549a7445", size = 209361 } wheels = [ @@ -493,6 +503,7 @@ source = { editable = "." } dependencies = [ { name = "aiocache" }, { name = "click" }, + { name = "cloudpickle" }, { name = "fastapi" }, { name = "httpx" }, { name = "loguru" }, @@ -530,6 +541,7 @@ dev = [ requires-dist = [ { name = "aiocache", specifier = ">=0.12.3" }, { name = "click", specifier = ">=8.1.8" }, + { name = "cloudpickle", specifier = ">=3.1.2" }, { name = "fastapi", specifier = ">=0.115.13" }, { name = "httpx", specifier = ">=0.28.1" }, { name = "loguru", specifier = ">=0.7.3" }, @@ -542,6 +554,7 @@ requires-dist = [ { name = "sentry-sdk", extras = ["fastapi"], specifier = ">=2.33.0" }, { name = "tomli", specifier = ">=2.2.1" }, ] +provides-extras = ["release"] [package.metadata.requires-dev] dev = [ From f16972742989225a94b75eff570ba6d3363a2936 Mon Sep 17 00:00:00 2001 From: touale <136764239@qq.com> Date: Tue, 23 Dec 2025 10:04:35 +0800 Subject: [PATCH 02/10] feat: enhance proxy function registration and auth --- src/framex/config.py | 4 +++ src/framex/consts.py | 2 ++ src/framex/plugin/__init__.py | 1 - src/framex/plugins/proxy/__init__.py | 52 ++++++++++++++++------------ tests/mock.py | 5 ++- tests/test_plugins.py | 4 +-- 6 files changed, 39 insertions(+), 29 deletions(-) diff --git a/src/framex/config.py b/src/framex/config.py index c51411d..8e0d865 100644 --- a/src/framex/config.py +++ b/src/framex/config.py @@ -9,6 +9,8 @@ TomlConfigSettingsSource, ) +from framex.consts import PROXY_FUNC_HTTP_PATH + class LogConfig(BaseModel): simple_log: bool = True @@ -58,6 +60,8 @@ class AuthConfig(BaseModel): @model_validator(mode="after") def validate_special_auth_urls(self) -> Self: + if PROXY_FUNC_HTTP_PATH not in self.auth_urls: + self.auth_urls.append("/proxy/remote") for special_url in self.special_auth_keys: if not self._is_url_protected(special_url): raise ValueError(f"special_auth_keys url '{special_url}' is not covered by any auth_urls rule") diff --git a/src/framex/consts.py b/src/framex/consts.py index a989db7..0b140b1 100644 --- a/src/framex/consts.py +++ b/src/framex/consts.py @@ -4,6 +4,8 @@ BACKEND_NAME = "backend" APP_NAME = "default" + PROXY_PLUGIN_NAME = "proxy.ProxyPlugin" +PROXY_FUNC_HTTP_PATH = f"{API_STR}/proxy/remote" DEFAULT_ENV = {"RAY_COLOR_PREFIX": "1", "RAY_DEDUP_LOGS": "1", "RAY_SERVE_RUN_SYNC_IN_THREADPOOL": "1"} diff --git a/src/framex/plugin/__init__.py b/src/framex/plugin/__init__.py index ad1303e..399afc8 100644 --- a/src/framex/plugin/__init__.py +++ b/src/framex/plugin/__init__.py @@ -119,7 +119,6 @@ async def call_plugin_api( return result.model_dump(by_alias=True) if use_proxy: if not isinstance(result, dict): - # logger.warning(f"Proxy API {api_name} returned non-dict result: {type(result)}") return result if "status" not in result: raise RuntimeError(f"Proxy API {api_name} returned invalid response: missing 'status' field") diff --git a/src/framex/plugins/proxy/__init__.py b/src/framex/plugins/proxy/__init__.py index dd7bdca..e7ea59d 100644 --- a/src/framex/plugins/proxy/__init__.py +++ b/src/framex/plugins/proxy/__init__.py @@ -9,7 +9,7 @@ from framex.adapter import get_adapter from framex.adapter.base import BaseAdapter -from framex.consts import BACKEND_NAME, PROXY_PLUGIN_NAME, VERSION +from framex.consts import BACKEND_NAME, PROXY_FUNC_HTTP_PATH, PROXY_PLUGIN_NAME, VERSION from framex.log import logger from framex.plugin import BasePlugin, PluginApi, PluginMetadata, on_register from framex.plugin.model import ApiType @@ -38,7 +38,6 @@ def __init__(self, **kwargs: Any) -> None: self.proxy_func_map: dict[str, ProxyFunc] = {} self.time_out = settings.timeout self.init_proxy_func_route = False - self.proxy_func_http_path = "/proxy/remote" super().__init__(**kwargs) @override @@ -50,9 +49,10 @@ async def on_start(self) -> None: for url in settings.proxy_urls: await self._parse_openai_docs(url) - for url, funcs in settings.proxy_functions.items(): - for func in funcs: - await self._parse_proxy_function(func, url) + if settings.proxy_functions: + for url, funcs in settings.proxy_functions.items(): + for func in funcs: + await self._parse_proxy_function(func, url) else: logger.debug("No proxy functions to register") @@ -63,10 +63,10 @@ async def check_is_gen_api(self, path: str) -> bool: return path in settings.force_stream_apis async def _get_openai_docs(self, url: str) -> dict[str, Any]: - clent = httpx.AsyncClient(timeout=self.time_out) - response = await clent.get(f"{url}/api/v1/openapi.json") - response.raise_for_status() - return cast(dict[str, Any], response.json()) + async with httpx.AsyncClient(timeout=self.time_out) as client: + response = await client.get(f"{url}/api/v1/openapi.json") + response.raise_for_status() + return cast(dict[str, Any], response.json()) async def _parse_openai_docs(self, url: str) -> None: adapter: BaseAdapter = get_adapter() @@ -152,7 +152,7 @@ async def register_proxy_func_route( handle = adapter.get_handle(PROXY_PLUGIN_NAME) await adapter.call_func( plugin_api, - path=self.proxy_func_http_path, + path=PROXY_FUNC_HTTP_PATH, methods=["POST"], func_name=self.call_proxy_function, params=[("data", str), ("func_name", str)], @@ -167,14 +167,14 @@ async def _parse_proxy_function(self, func_name: str, url: str) -> None: params: list[tuple[str, type]] = [("model", ProxyFuncHttpBody)] - if auth_api_key := settings.auth.get_auth_keys(self.proxy_func_http_path): + if auth_api_key := settings.auth.get_auth_keys(PROXY_FUNC_HTTP_PATH): headers = {"Authorization": auth_api_key[0]} # Use the first auth key set - logger.debug(f"Proxy func({self.proxy_func_http_path}) requires auth") + logger.debug(f"Proxy func({PROXY_FUNC_HTTP_PATH}) requires auth") else: headers = None func = self._create_dynamic_method( - func_name, "POST", params, f"{url}{self.proxy_func_http_path}", stream=False, headers=headers + func_name, "POST", params, f"{url}{PROXY_FUNC_HTTP_PATH}", stream=False, headers=headers ) await self.register_proxy_function(func_name, func, is_remote=True) @@ -188,22 +188,24 @@ async def fetch_response( stream: bool = False, **kwargs: Any, ) -> AsyncGenerator[str, None] | dict | str: - clent = httpx.AsyncClient(timeout=self.time_out) if stream: + client = httpx.AsyncClient(timeout=self.time_out) async def stream_generator() -> AsyncGenerator[str, None]: - async with clent.stream(**kwargs) as response: + async with client.stream(**kwargs) as response: response.raise_for_status() async for chunk in response.aiter_text(): yield chunk + await client.aclose() return stream_generator() - response = await clent.request(**kwargs) - response.raise_for_status() - try: - return cast(dict, response.json()) - except json.JSONDecodeError: - return response.text + async with httpx.AsyncClient(timeout=self.time_out) as client: + response = await client.request(**kwargs) + response.raise_for_status() + try: + return cast(dict, response.json()) + except json.JSONDecodeError: + return response.text def _create_dynamic_method( self, @@ -264,6 +266,9 @@ async def register_proxy_function( if not self.init_proxy_func_route: await self.register_proxy_func_route() self.init_proxy_func_route = True + if func_name in self.proxy_func_map: + logger.warning(f"Proxy function {func_name} already registered, skipping...") + return False logger.info(f"Registering proxy function: {func_name}") self.proxy_func_map[func_name] = ProxyFunc(func=func_callable, is_remote=is_remote) @@ -271,13 +276,14 @@ async def register_proxy_function( async def call_proxy_function(self, func_name: str, data: str) -> str: decode_func_name = cache_decode(func_name) + decode_kwargs = cache_decode(data) if proxy_func := self.proxy_func_map.get(decode_func_name): if proxy_func.is_remote: kwargs = {"model": ProxyFuncHttpBody(data=data, func_name=func_name)} else: - kwargs = cache_decode(data) + kwargs = decode_kwargs tag = "remote" if proxy_func.is_remote else "local" - logger.info(f"Calling proxy function[{tag}]: {decode_func_name}, kwargs: {kwargs}") + logger.info(f"Calling proxy function[{tag}]: {decode_func_name}, kwargs: {decode_kwargs}") res = await proxy_func.func(**kwargs) return cache_encode(res) raise RuntimeError(f"Proxy function({decode_func_name}) not registered") diff --git a/tests/mock.py b/tests/mock.py index d4a43d1..23639ab 100644 --- a/tests/mock.py +++ b/tests/mock.py @@ -69,12 +69,11 @@ async def mock_request(_, method: str, url: str, **kwargs: Any): json_data = kwargs.get("json", {}) func_name = json_data.get("func_name") data = json_data.get("data", {}) - assert func_name - assert data + if not func_name or not data: + raise ValueError("Missing required fields: func_name and data") decode_func_name = cache_decode(func_name) decode_data = cache_decode(data) resp.json.return_value = {"result": decode_func_name, "data": decode_data} - else: raise AssertionError(f"Unexpected request: {method} {url}") diff --git a/tests/test_plugins.py b/tests/test_plugins.py index 3c1d0b1..f064be9 100644 --- a/tests/test_plugins.py +++ b/tests/test_plugins.py @@ -162,7 +162,7 @@ class ExchangeModel(BaseModel): @on_proxy() -async def lcoal_exchange_key_value(a_str: str, b_int: int, c_model: ExchangeModel) -> Any: +async def local_exchange_key_value(a_str: str, b_int: int, c_model: ExchangeModel) -> Any: return {"a_str": a_str, "b_int": b_int, "c_model": c_model} @@ -172,7 +172,7 @@ async def remote_exchange_key_value(a_str: str, b_int: int, c_model: ExchangeMod async def test_on_proxy_local_call(): - res = await lcoal_exchange_key_value( + res = await local_exchange_key_value( a_str="test", b_int=123, c_model=ExchangeModel(id="id_1", name=100, model=SubModel(id=1, name="sub_name")) ) assert res["a_str"] == "test" From 081026b89cb0e1cc0b537f6c3b03aa6acb55a4fc Mon Sep 17 00:00:00 2001 From: touale <136764239@qq.com> Date: Tue, 23 Dec 2025 10:07:36 +0800 Subject: [PATCH 03/10] refactor: simplify proxy decorator and remove proxy_only param --- src/framex/plugin/on.py | 4 ++-- tests/test_plugins.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/framex/plugin/on.py b/src/framex/plugin/on.py index 7b08361..0229209 100644 --- a/src/framex/plugin/on.py +++ b/src/framex/plugin/on.py @@ -98,14 +98,14 @@ def wrapper(func: Callable) -> Callable: return wrapper -def on_proxy(proxy_only: bool = False) -> Callable: +def on_proxy() -> Callable: def decorator(func: Callable) -> Callable: from framex.config import settings if not settings.server.enable_proxy: return func - is_registered = proxy_only + is_registered = False full_func_name = f"{func.__module__}.{func.__name__}" @functools.wraps(func) diff --git a/tests/test_plugins.py b/tests/test_plugins.py index f064be9..947a910 100644 --- a/tests/test_plugins.py +++ b/tests/test_plugins.py @@ -166,7 +166,7 @@ async def local_exchange_key_value(a_str: str, b_int: int, c_model: ExchangeMode return {"a_str": a_str, "b_int": b_int, "c_model": c_model} -@on_proxy(proxy_only=True) +@on_proxy() async def remote_exchange_key_value(a_str: str, b_int: int, c_model: ExchangeModel) -> Any: # noqa: ARG001 raise RuntimeError("This function should be called remotely") From ffa9438f3ebfb5ee2563a2b9f57e352f24603467 Mon Sep 17 00:00:00 2001 From: touale <136764239@qq.com> Date: Tue, 23 Dec 2025 10:24:00 +0800 Subject: [PATCH 04/10] feat: enhance proxy function auth with random key generation --- pytest.ini | 2 +- src/framex/config.py | 11 +++++++++-- src/framex/driver/ingress.py | 12 +++++++----- src/framex/log.py | 4 ++-- src/framex/plugins/proxy/__init__.py | 2 +- src/framex/plugins/proxy/config.py | 2 +- tests/mock.py | 22 ++++++++++++++-------- 7 files changed, 35 insertions(+), 20 deletions(-) diff --git a/pytest.ini b/pytest.ini index c1095db..a192d21 100644 --- a/pytest.ini +++ b/pytest.ini @@ -26,7 +26,7 @@ env = plugins__proxy__white_list=["/api/v1/proxy/mock/info","/proxy/mock/get","/proxy/mock/post","/proxy/mock/post_model","/proxy/mock/auth/*"] plugins__proxy__auth__general_auth_keys=["i_am_proxy_general_auth_keys"] plugins__proxy__auth__auth_urls=["/proxy/mock/auth/*"] - plugins__proxy__auth__special_auth_keys={{"/proxy/mock/auth/sget":["i_am_proxy_special_auth_keys"]}} + plugins__proxy__auth__special_auth_keys={{"/proxy/mock/auth/sget":["i_am_proxy_special_auth_keys"],"/api/v1/proxy/remote":["i_am_proxy_func_auth_keys"]}} plugins__proxy__proxy_functions={{"http://localhost:9527":["tests.test_plugins.remote_exchange_key_value"]}} ; plugins__proxy__force_stream_apis=[] sentry__enable=false diff --git a/src/framex/config.py b/src/framex/config.py index 8e0d865..fc3ce66 100644 --- a/src/framex/config.py +++ b/src/framex/config.py @@ -1,4 +1,5 @@ from typing import Any, Literal, Self +from uuid import uuid4 from pydantic import BaseModel, Field, model_validator from pydantic_settings import ( @@ -59,9 +60,15 @@ class AuthConfig(BaseModel): special_auth_keys: dict[str, list[str]] = Field(default_factory=dict) @model_validator(mode="after") - def validate_special_auth_urls(self) -> Self: + def normalize_and_validate(self) -> Self: if PROXY_FUNC_HTTP_PATH not in self.auth_urls: - self.auth_urls.append("/proxy/remote") + self.auth_urls.append(PROXY_FUNC_HTTP_PATH) + if not self.general_auth_keys: # pragma: no cover + from framex.log import logger + + key = str(uuid4()) + logger.warning(f"No general_auth_keys set, generate a random key: {key}") + self.general_auth_keys = [key] for special_url in self.special_auth_keys: if not self._is_url_protected(special_url): raise ValueError(f"special_auth_keys url '{special_url}' is not covered by any auth_urls rule") diff --git a/src/framex/driver/ingress.py b/src/framex/driver/ingress.py index 5c194a2..e81bcf4 100644 --- a/src/framex/driver/ingress.py +++ b/src/framex/driver/ingress.py @@ -44,9 +44,6 @@ def __init__(self, deployments: list[DeploymentHandle], plugin_apis: list["Plugi ApiType.ALL, ] ): - from framex.config import settings - - auth_keys = settings.auth.get_auth_keys(plugin_api.api) self.register_route( plugin_api.api, plugin_api.methods, @@ -56,7 +53,6 @@ def __init__(self, deployments: list[DeploymentHandle], plugin_apis: list["Plugi stream=plugin_api.stream, direct_output=False, tags=plugin_api.tags, - auth_keys=auth_keys, ) def register_route( @@ -71,10 +67,16 @@ def register_route( tags: list[str | Enum] | None = None, auth_keys: list[str] | None = None, ) -> bool: + from framex.log import logger + if tags is None: tags = ["default"] + if auth_keys is None: + from framex.config import settings + + auth_keys = settings.auth.get_auth_keys(path) + logger.debug(f"API({path}) with tags {tags} requires auth_keys {auth_keys}") adapter = get_adapter() - from framex.log import logger try: routes: list[str] = [route.path for route in app.routes if isinstance(route, Route | APIRoute)] diff --git a/src/framex/log.py b/src/framex/log.py index d1e4657..dc51bc8 100644 --- a/src/framex/log.py +++ b/src/framex/log.py @@ -5,8 +5,6 @@ import loguru -from framex.config import settings - if TYPE_CHECKING: # avoid sphinx autodoc resolve annotation failed # because loguru module do not have `Logger` class actually @@ -17,6 +15,8 @@ class LoguruHandler(logging.Handler): # pragma: no cover def emit(self, record: logging.LogRecord) -> None: + from framex.config import settings + msg = record.getMessage() if settings.log.simple_log and ( (record.name == "ray.serve" and msg.startswith(settings.log.ignored_prefixes)) or record.name == "filelock" diff --git a/src/framex/plugins/proxy/__init__.py b/src/framex/plugins/proxy/__init__.py index e7ea59d..51dc0de 100644 --- a/src/framex/plugins/proxy/__init__.py +++ b/src/framex/plugins/proxy/__init__.py @@ -53,7 +53,7 @@ async def on_start(self) -> None: for url, funcs in settings.proxy_functions.items(): for func in funcs: await self._parse_proxy_function(func, url) - else: + else: # pragma: no cover logger.debug("No proxy functions to register") logger.success(f"Succeeded to parse openai docs form {url}") diff --git a/src/framex/plugins/proxy/config.py b/src/framex/plugins/proxy/config.py index aa7324e..32218fa 100644 --- a/src/framex/plugins/proxy/config.py +++ b/src/framex/plugins/proxy/config.py @@ -32,7 +32,7 @@ def is_white_url(self, url: str) -> bool: @model_validator(mode="after") def validate_proxy_functions(self) -> Self: for url in self.proxy_functions: - if url not in self.proxy_urls: + if url not in self.proxy_urls: # pragma: no cover raise ValueError(f"proxy_functions url '{url}' is not covered by any proxy_urls rule") return self diff --git a/tests/mock.py b/tests/mock.py index 23639ab..4ea7861 100644 --- a/tests/mock.py +++ b/tests/mock.py @@ -66,14 +66,20 @@ async def mock_request(_, method: str, url: str, **kwargs: Any): "params": params, } elif url.endswith("/proxy/remote") and method == "POST": - json_data = kwargs.get("json", {}) - func_name = json_data.get("func_name") - data = json_data.get("data", {}) - if not func_name or not data: - raise ValueError("Missing required fields: func_name and data") - decode_func_name = cache_decode(func_name) - decode_data = cache_decode(data) - resp.json.return_value = {"result": decode_func_name, "data": decode_data} + if headers.get("Authorization") != "i_am_proxy_func_auth_keys": + resp.json.return_value = { + "status": 401, + "message": f"Invalid API Key({headers.get('Authorization')}) for API(/api/v1/proxy/mock/auth/get)", + } + else: + json_data = kwargs.get("json", {}) + func_name = json_data.get("func_name") + data = json_data.get("data", {}) + if not func_name or not data: + raise ValueError("Missing required fields: func_name and data") + decode_func_name = cache_decode(func_name) + decode_data = cache_decode(data) + resp.json.return_value = {"result": decode_func_name, "data": decode_data} else: raise AssertionError(f"Unexpected request: {method} {url}") From 7288013342ce81cc39dce4dbb07c668997f59723 Mon Sep 17 00:00:00 2001 From: touale <136764239@qq.com> Date: Tue, 23 Dec 2025 15:17:16 +0800 Subject: [PATCH 05/10] fix: fix proxy function serialization and error handling --- src/framex/plugin/on.py | 45 +++++++++++---- src/framex/plugins/proxy/__init__.py | 7 ++- src/framex/utils.py | 83 +++++++++++++++++++++++----- tests/test_plugins.py | 9 +-- 4 files changed, 114 insertions(+), 30 deletions(-) diff --git a/src/framex/plugin/on.py b/src/framex/plugin/on.py index 0229209..c29292b 100644 --- a/src/framex/plugin/on.py +++ b/src/framex/plugin/on.py @@ -108,29 +108,54 @@ def decorator(func: Callable) -> Callable: is_registered = False full_func_name = f"{func.__module__}.{func.__name__}" + async def safe_callable(*args: Any, **kwargs: Any) -> Any: + try: + return await func(*args, **kwargs) + except Exception: + raw = func + + if isinstance(raw, (classmethod, staticmethod)): + raw = raw.__func__ + + while hasattr(raw, "__wrapped__"): + raw = raw.__wrapped__ + + if inspect.iscoroutinefunction(raw): + return await raw(*args, **kwargs) + return raw(*args, **kwargs) + @functools.wraps(func) async def wrapper(*args: Any, **kwargs: Any) -> Any: nonlocal is_registered if args: - raise TypeError( - f"The proxy function '{func.__name__}' only supports keyword arguments. " - f"Please call it using the syntax {func.__name__}(param=value)." - ) + raise TypeError(f"The proxy function '{func.__name__}' only supports keyword arguments.") if not is_registered: api_reg = PluginApi( - deployment_name=PROXY_PLUGIN_NAME, call_type=ApiType.PROXY, func_name="register_proxy_function" + deployment_name=PROXY_PLUGIN_NAME, + call_type=ApiType.PROXY, + func_name="register_proxy_function", + ) + await call_plugin_api( + api_reg, + None, + func_name=full_func_name, + func_callable=safe_callable, ) - await call_plugin_api(api_reg, None, func_name=full_func_name, func_callable=func) is_registered = True api_call = PluginApi( - deployment_name=PROXY_PLUGIN_NAME, call_type=ApiType.PROXY, func_name="call_proxy_function" + deployment_name=PROXY_PLUGIN_NAME, + call_type=ApiType.PROXY, + func_name="call_proxy_function", + ) + res = await call_plugin_api( + api_call, + None, + func_name=cache_encode(full_func_name), + data=cache_encode(data=kwargs), ) - encode_kwargs = cache_encode(kwargs) - encode_func_name = cache_encode(full_func_name) - res = await call_plugin_api(api_call, None, func_name=encode_func_name, data=encode_kwargs) return cache_decode(res) return wrapper diff --git a/src/framex/plugins/proxy/__init__.py b/src/framex/plugins/proxy/__init__.py index 51dc0de..4fd3330 100644 --- a/src/framex/plugins/proxy/__init__.py +++ b/src/framex/plugins/proxy/__init__.py @@ -154,14 +154,17 @@ async def register_proxy_func_route( plugin_api, path=PROXY_FUNC_HTTP_PATH, methods=["POST"], - func_name=self.call_proxy_function, - params=[("data", str), ("func_name", str)], + func_name=self._proxy_func_route.__name__, + params=[("model", ProxyFuncHttpBody)], handle=handle, stream=False, direct_output=True, tags=[__plugin_meta__.name], ) + async def _proxy_func_route(self, model: ProxyFuncHttpBody) -> Any: + return await self.call_proxy_function(model.func_name, model.data) + async def _parse_proxy_function(self, func_name: str, url: str) -> None: logger.opt(colors=True).debug(f"Found proxy function {url}") diff --git a/src/framex/utils.py b/src/framex/utils.py index 2fe2859..8643df9 100644 --- a/src/framex/utils.py +++ b/src/framex/utils.py @@ -1,14 +1,16 @@ +import base64 +import importlib import inspect import json import re import zlib from collections.abc import Callable -from enum import StrEnum +from datetime import datetime +from enum import Enum, StrEnum from itertools import cycle from pathlib import Path from typing import Any -import cloudpickle # type: ignore[import-untyped] from pydantic import BaseModel @@ -63,15 +65,68 @@ def xor_crypt(data: bytes, key: str = "01234567890abcdefghijklmnopqrstuvwxyz") - def cache_encode(data: Any) -> str: - packed = cloudpickle.dumps(data) - - compressed = zlib.compress(packed) - obfuscated = xor_crypt(compressed) - return obfuscated.hex() - - -def cache_decode(data: str) -> Any: - raw = bytes.fromhex(data) - de_obfuscated = xor_crypt(raw) - decompressed = zlib.decompress(de_obfuscated) - return cloudpickle.loads(decompressed) # nosec S301 + def transform(obj: Any) -> Any: + if hasattr(obj, "__dict__"): + raw_attributes = {k: transform(v) for k, v in obj.__dict__.items() if not k.startswith("_")} + return { + "__type__": "dynamic_obj", + "__module__": obj.__class__.__module__, + "__class__": obj.__class__.__name__, + "data": raw_attributes, + } + if isinstance(obj, list): + return [transform(i) for i in obj] + if isinstance(obj, dict): + return {k: transform(v) for k, v in obj.items()} + if isinstance(obj, datetime): + return obj.isoformat() + if isinstance(obj, Enum): + return obj.value + return obj + + json_str = json.dumps(transform(data), ensure_ascii=False) + compressed = zlib.compress(json_str.encode("utf-8")) + encrypted = xor_crypt(compressed) + return base64.b64encode(encrypted).decode("ascii") + + +def cache_decode(res: Any) -> Any: + current = res + while isinstance(current, str): + try: + decoded_bytes = base64.b64decode(current, validate=True) + current = zlib.decompress(xor_crypt(decoded_bytes)).decode("utf-8") + except Exception: + try: + temp = json.loads(current) + if temp == current: + break + current = temp + except Exception: + break + + def restore_models(item: Any) -> Any: + if isinstance(item, list): + return [restore_models(i) for i in item] + + if isinstance(item, dict): + if item.get("__type__") == "dynamic_obj": + try: + module = importlib.import_module(item["__module__"]) + cls = getattr(module, item["__class__"]) + + cleaned_data = {k: restore_models(v) for k, v in item["data"].items()} + + if hasattr(cls, "model_validate"): + return cls.model_validate(cleaned_data) + return cls(**cleaned_data) + except Exception: + from types import SimpleNamespace + + return SimpleNamespace(**{k: restore_models(v) for k, v in item["data"].items()}) + + return {k: restore_models(v) for k, v in item.items()} + + return item + + return restore_models(current) diff --git a/tests/test_plugins.py b/tests/test_plugins.py index 947a910..14e0ca3 100644 --- a/tests/test_plugins.py +++ b/tests/test_plugins.py @@ -177,10 +177,11 @@ async def test_on_proxy_local_call(): ) assert res["a_str"] == "test" assert res["b_int"] == 123 - assert ( - res["c_model"].model_dump() - == ExchangeModel(id="id_1", name=100, model=SubModel(id=1, name="sub_name")).model_dump() - ) + model = res["c_model"] + assert model.id == "id_1" + assert model.name == 100 + assert model.model.id == 1 + assert model.model.name == "sub_name" async def test_on_proxy_remote_call(): From 73a780fc8a5182ceff01b666767019e5363d9129 Mon Sep 17 00:00:00 2001 From: touale <136764239@qq.com> Date: Tue, 23 Dec 2025 15:20:38 +0800 Subject: [PATCH 06/10] deps: remove cloudpickle dependency --- pyproject.toml | 1 - uv.lock | 11 ----------- 2 files changed, 12 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 29b8efc..d94dea9 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -8,7 +8,6 @@ requires-python = ">=3.11" dependencies = [ "aiocache>=0.12.3", "click>=8.1.8", - "cloudpickle>=3.1.2", "fastapi>=0.115.13", "httpx>=0.28.1", "loguru>=0.7.3", diff --git a/uv.lock b/uv.lock index 8cbd8a7..3d3fd62 100644 --- a/uv.lock +++ b/uv.lock @@ -320,15 +320,6 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/75/45/54bb2d8d4138964a94bef6e9afe48b0be4705ba66ac442ae7d8a8dc4ffef/click_option_group-0.5.9-py3-none-any.whl", hash = "sha256:ad2599248bd373e2e19bec5407967c3eec1d0d4fc4a5e77b08a0481e75991080", size = 11553 }, ] -[[package]] -name = "cloudpickle" -version = "3.1.2" -source = { registry = "https://pypi.org/simple" } -sdist = { url = "https://files.pythonhosted.org/packages/27/fb/576f067976d320f5f0114a8d9fa1215425441bb35627b1993e5afd8111e5/cloudpickle-3.1.2.tar.gz", hash = "sha256:7fda9eb655c9c230dab534f1983763de5835249750e85fbcef43aaa30a9a2414", size = 22330 } -wheels = [ - { url = "https://files.pythonhosted.org/packages/88/39/799be3f2f0f38cc727ee3b4f1445fe6d5e4133064ec2e4115069418a5bb6/cloudpickle-3.1.2-py3-none-any.whl", hash = "sha256:9acb47f6afd73f60dc1df93bb801b472f05ff42fa6c84167d25cb206be1fbf4a", size = 22228 }, -] - [[package]] name = "colorama" version = "0.4.6" @@ -503,7 +494,6 @@ source = { editable = "." } dependencies = [ { name = "aiocache" }, { name = "click" }, - { name = "cloudpickle" }, { name = "fastapi" }, { name = "httpx" }, { name = "loguru" }, @@ -541,7 +531,6 @@ dev = [ requires-dist = [ { name = "aiocache", specifier = ">=0.12.3" }, { name = "click", specifier = ">=8.1.8" }, - { name = "cloudpickle", specifier = ">=3.1.2" }, { name = "fastapi", specifier = ">=0.115.13" }, { name = "httpx", specifier = ">=0.28.1" }, { name = "loguru", specifier = ">=0.7.3" }, From db98eee7b0264019821cf993080de72eb2ff385c Mon Sep 17 00:00:00 2001 From: touale <136764239@qq.com> Date: Tue, 23 Dec 2025 15:47:37 +0800 Subject: [PATCH 07/10] fix: handle negative num_cpus value in ray initialization --- src/framex/__init__.py | 2 +- src/framex/config.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/framex/__init__.py b/src/framex/__init__.py index 763038c..b074de1 100644 --- a/src/framex/__init__.py +++ b/src/framex/__init__.py @@ -98,7 +98,7 @@ def run( from ray import serve ray.init( - num_cpus=num_cpus, + num_cpus=num_cpus if num_cpus > 0 else None, dashboard_host=dashboard_host, dashboard_port=dashboard_port, configure_logging=False, diff --git a/src/framex/config.py b/src/framex/config.py index fc3ce66..a4fe0bc 100644 --- a/src/framex/config.py +++ b/src/framex/config.py @@ -44,7 +44,7 @@ class ServerConfig(BaseModel): use_ray: bool = False enable_proxy: bool = False legal_proxy_code: list[int] = [200] - num_cpus: int = 8 + num_cpus: int = -1 excluded_log_paths: list[str] = [] ingress_config: dict[str, Any] = {"max_ongoing_requests": 60} From 9488998f1b876317f25af62c6a3ee16d2df488db Mon Sep 17 00:00:00 2001 From: touale <136764239@qq.com> Date: Tue, 23 Dec 2025 16:03:12 +0800 Subject: [PATCH 08/10] test: enhance cache encode/decode and proxy exception handling --- src/framex/plugin/on.py | 2 +- tests/test_cli.py | 1 - tests/test_plugins.py | 10 ++++++ tests/test_utils.py | 74 ++++++++++++++++++++++++++++++++++++++++- 4 files changed, 84 insertions(+), 3 deletions(-) diff --git a/src/framex/plugin/on.py b/src/framex/plugin/on.py index c29292b..a72a5b5 100644 --- a/src/framex/plugin/on.py +++ b/src/framex/plugin/on.py @@ -128,7 +128,7 @@ async def safe_callable(*args: Any, **kwargs: Any) -> Any: async def wrapper(*args: Any, **kwargs: Any) -> Any: nonlocal is_registered - if args: + if args: # pragma: no cover raise TypeError(f"The proxy function '{func.__name__}' only supports keyword arguments.") if not is_registered: diff --git a/tests/test_cli.py b/tests/test_cli.py index a5f6361..7345878 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -30,4 +30,3 @@ def _fake_run(**kwargs: Any) -> None: assert called["server_port"] == 8080 assert called["dashboard_host"] == "127.0.0.1" assert called["dashboard_port"] == 8260 - assert called["num_cpus"] == 8 diff --git a/tests/test_plugins.py b/tests/test_plugins.py index 14e0ca3..d0e8f5a 100644 --- a/tests/test_plugins.py +++ b/tests/test_plugins.py @@ -1,5 +1,6 @@ import importlib from collections.abc import Callable +from functools import wraps from typing import Any, Literal, Optional, Union, get_args, get_origin import pytest @@ -161,12 +162,21 @@ class ExchangeModel(BaseModel): model: SubModel +def supply_execption(func): + @wraps(func) + async def wrapper(*_: Any, **__: Any) -> None: + raise RuntimeError("I am def supply_execption(func): exception") + + return wrapper + + @on_proxy() async def local_exchange_key_value(a_str: str, b_int: int, c_model: ExchangeModel) -> Any: return {"a_str": a_str, "b_int": b_int, "c_model": c_model} @on_proxy() +@supply_execption async def remote_exchange_key_value(a_str: str, b_int: int, c_model: ExchangeModel) -> Any: # noqa: ARG001 raise RuntimeError("This function should be called remotely") diff --git a/tests/test_utils.py b/tests/test_utils.py index 22af3ee..2efc933 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -1,10 +1,11 @@ +import json from typing import Any import pytest from pydantic import BaseModel from framex.config import AuthConfig -from framex.utils import StreamEnventType, make_stream_event +from framex.utils import StreamEnventType, cache_decode, cache_encode, make_stream_event class StreamDataModel(BaseModel): @@ -53,3 +54,74 @@ def test_get_auth_keys_by_url(): assert auth.get_auth_keys("/api/v3/sub") == ["g"] assert auth.get_auth_keys("/api/v3/echo/1") == ["b"] assert auth.get_auth_keys("/api/v3/echo/hi") == ["c"] + + +from datetime import datetime +from typing import Any + +import pytest +from pydantic import BaseModel + + +class SubModel(BaseModel): + id: int + name: str + + +class ExchangeModel(BaseModel): + id: str + name: int + model: SubModel + created_at: datetime + + +def test_basic_types(): + data = {"a": 1, "b": "string", "c": [1, 2, 3], "d": {"nested": True}} + encoded = cache_encode(data) + decoded = cache_decode(encoded) + assert decoded == data + assert isinstance(decoded["c"], list) + + +def test_datetime_and_enum(): + now = datetime(2025, 12, 23, 10, 0, 0) + data = {"time": now} + encoded = cache_encode(data) + decoded = cache_decode(encoded) + assert "2025-12-23T10:00:00" in str(decoded["time"]) + + +def test_nested_pydantic_models(): + sub = SubModel(id=1, name="sub_name") + main = ExchangeModel(id="main_id", name=100, model=sub, created_at=datetime.now()) + + original_data = {"status": "success", "result_list": [main, main], "single_model": main} + + encoded = cache_encode(original_data) + assert isinstance(encoded, str) + + decoded = cache_decode(encoded) + + res_model = decoded["single_model"] + assert res_model.id == "main_id" + assert res_model.name == 100 + + assert isinstance(res_model.model, SubModel) + assert res_model.model.id == 1 + assert res_model.model.name == "sub_name" + + assert decoded["result_list"][0].model.id == 1 + + +def test_recovery_failure_fallback(): + fake_payload = { + "__type__": "dynamic_obj", + "__module__": "non.existent.module", + "__class__": "MissingClass", + "data": {"id": 999, "info": "test"}, + } + encoded = json.dumps(fake_payload) + decoded = cache_decode(encoded) + + assert decoded.id == 999 + assert decoded.info == "test" From 425984cdf9e53ccae19a962266ec7597752a3abe Mon Sep 17 00:00:00 2001 From: touale <136764239@qq.com> Date: Tue, 23 Dec 2025 16:22:06 +0800 Subject: [PATCH 09/10] test: add pytest-order and enhance proxy tests --- pyproject.toml | 1 + src/framex/plugin/on.py | 2 +- src/framex/plugins/proxy/__init__.py | 2 +- tests/api/test_proxy.py | 26 ++++++++++++++++++++++++++ tests/test_plugins.py | 1 + uv.lock | 14 ++++++++++++++ 6 files changed, 44 insertions(+), 2 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index d94dea9..6bf561f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -29,6 +29,7 @@ dev = [ "pytest-asyncio>=1.1.0", "pytest-cov>=6.2.1", "pytest-env>=1.1.5", + "pytest-order>=1.3.0", "pytest-recording>=0.13.4", "ruff==0.14.9", "types-pytz>=2025.2.0.20250516", diff --git a/src/framex/plugin/on.py b/src/framex/plugin/on.py index a72a5b5..2493062 100644 --- a/src/framex/plugin/on.py +++ b/src/framex/plugin/on.py @@ -102,7 +102,7 @@ def on_proxy() -> Callable: def decorator(func: Callable) -> Callable: from framex.config import settings - if not settings.server.enable_proxy: + if not settings.server.enable_proxy: # pragma: no cover return func is_registered = False diff --git a/src/framex/plugins/proxy/__init__.py b/src/framex/plugins/proxy/__init__.py index 4fd3330..5163351 100644 --- a/src/framex/plugins/proxy/__init__.py +++ b/src/framex/plugins/proxy/__init__.py @@ -173,7 +173,7 @@ async def _parse_proxy_function(self, func_name: str, url: str) -> None: if auth_api_key := settings.auth.get_auth_keys(PROXY_FUNC_HTTP_PATH): headers = {"Authorization": auth_api_key[0]} # Use the first auth key set logger.debug(f"Proxy func({PROXY_FUNC_HTTP_PATH}) requires auth") - else: + else: # pragma: no cover headers = None func = self._create_dynamic_method( diff --git a/tests/api/test_proxy.py b/tests/api/test_proxy.py index 0c745b8..75aa16a 100644 --- a/tests/api/test_proxy.py +++ b/tests/api/test_proxy.py @@ -1,6 +1,9 @@ +import pytest from fastapi.testclient import TestClient from framex.consts import API_STR +from framex.utils import cache_decode, cache_encode +from tests.test_plugins import ExchangeModel, SubModel def test_get_proxy_version(client: TestClient): @@ -41,3 +44,26 @@ def test_get_proxy_auth_sget(client: TestClient): params = {"message": "hello world"} res = client.get("/proxy/mock/auth/sget", params=params).json() assert res == {"method": "GET", "params": params} + + +@pytest.mark.order(2) +def test_call_proxy_func(client: TestClient): + func = cache_encode("tests.test_plugins.local_exchange_key_value") + data = cache_encode( + { + "a_str": "test", + "b_int": 123, + "c_model": ExchangeModel(id="id_1", name=100, model=SubModel(id=1, name="sub_name")), + } + ) + body = {"func_name": func, "data": data} + headers = {"Authorization": "i_am_general_auth_keys"} + res = client.post("/api/v1/proxy/remote", json=body, headers=headers).json() + res = cache_decode(res) + assert res["a_str"] == "test" + assert res["b_int"] == 123 + model = res["c_model"] + assert model.id == "id_1" + assert model.name == 100 + assert model.model.id == 1 + assert model.model.name == "sub_name" diff --git a/tests/test_plugins.py b/tests/test_plugins.py index d0e8f5a..3b6a0a6 100644 --- a/tests/test_plugins.py +++ b/tests/test_plugins.py @@ -181,6 +181,7 @@ async def remote_exchange_key_value(a_str: str, b_int: int, c_model: ExchangeMod raise RuntimeError("This function should be called remotely") +@pytest.mark.order(1) async def test_on_proxy_local_call(): res = await local_exchange_key_value( a_str="test", b_int=123, c_model=ExchangeModel(id="id_1", name=100, model=SubModel(id=1, name="sub_name")) diff --git a/uv.lock b/uv.lock index 3d3fd62..509b2c0 100644 --- a/uv.lock +++ b/uv.lock @@ -521,6 +521,7 @@ dev = [ { name = "pytest-asyncio" }, { name = "pytest-cov" }, { name = "pytest-env" }, + { name = "pytest-order" }, { name = "pytest-recording" }, { name = "ruff" }, { name = "types-pytz" }, @@ -555,6 +556,7 @@ dev = [ { name = "pytest-asyncio", specifier = ">=1.1.0" }, { name = "pytest-cov", specifier = ">=6.2.1" }, { name = "pytest-env", specifier = ">=1.1.5" }, + { name = "pytest-order", specifier = ">=1.3.0" }, { name = "pytest-recording", specifier = ">=0.13.4" }, { name = "ruff", specifier = "==0.14.9" }, { name = "types-pytz", specifier = ">=2025.2.0.20250516" }, @@ -1797,6 +1799,18 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/27/98/822b924a4a3eb58aacba84444c7439fce32680592f394de26af9c76e2569/pytest_env-1.2.0-py3-none-any.whl", hash = "sha256:d7e5b7198f9b83c795377c09feefa45d56083834e60d04767efd64819fc9da00", size = 6251 }, ] +[[package]] +name = "pytest-order" +version = "1.3.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "pytest" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/1d/66/02ae17461b14a52ce5a29ae2900156b9110d1de34721ccc16ccd79419876/pytest_order-1.3.0.tar.gz", hash = "sha256:51608fec3d3ee9c0adaea94daa124a5c4c1d2bb99b00269f098f414307f23dde", size = 47544 } +wheels = [ + { url = "https://files.pythonhosted.org/packages/1b/73/59b038d1aafca89f8e9936eaa8ffa6bb6138d00459d13a32ce070be4f280/pytest_order-1.3.0-py3-none-any.whl", hash = "sha256:2cd562a21380345dd8d5774aa5fd38b7849b6ee7397ca5f6999bbe6e89f07f6e", size = 14609 }, +] + [[package]] name = "pytest-recording" version = "0.13.4" From 8a1b9db3f094b662ced812d6792487dd3bbaef41 Mon Sep 17 00:00:00 2001 From: touale <136764239@qq.com> Date: Tue, 23 Dec 2025 17:03:28 +0800 Subject: [PATCH 10/10] feat: add proxy function registration placeholder --- src/framex/plugin/load.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/framex/plugin/load.py b/src/framex/plugin/load.py index 3c08580..ac7729f 100644 --- a/src/framex/plugin/load.py +++ b/src/framex/plugin/load.py @@ -1,3 +1,5 @@ +from collections.abc import Callable + from framex.config import Settings from framex.log import logger from framex.plugin.model import Plugin @@ -36,3 +38,7 @@ def load_from_settings(settings: Settings) -> set[Plugin]: builtin_plugin_instances = load_builtin_plugins(*candidate_builtin_plugins) if candidate_builtin_plugins else set() plugin_instances = load_plugins(*settings.load_plugins) if settings.load_plugins else set() return builtin_plugin_instances | plugin_instances + + +def register_proxy_func(_: Callable) -> None: # pragma: no cover + pass