Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .coveragerc
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ data_file = .coverage/.coverage
parallel = true
concurrency = thread
sigterm = true
source = src
source = src/framex

# plugins = coverage_plugins.subprocess

Expand Down
1 change: 1 addition & 0 deletions pytest.ini
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ addopts =
-vv
-o junit_family=legacy
env =
COVERAGE_PROCESS_START=.coveragerc
Comment thread
coderabbitai[bot] marked this conversation as resolved.
server__use_ray=false
server__enable_proxy=true
load_builtin_plugins=["echo","proxy","invoker"]
Expand Down
12 changes: 11 additions & 1 deletion src/framex/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,15 @@ def before_send(event, hint): # noqa
)


def _setup_ray_worker() -> None: # pragma: no cover
settings.server.use_ray = True

import framex.adapter as adapter_module

adapter_module._adapter = None
_setup_sentry()
Comment on lines +69 to +75
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Mirror the Ray-mode switch in the driver process.

_setup_ray_worker() fixes worker-side state, but run(use_ray=True) still leaves settings.server.use_ray unchanged in the parent process. Anything that reads the setting before workers start—like framex.adapter.get_adapter() and BasePlugin.__init__()—still sees local mode unless callers mutate settings themselves, which is why tests/test_use_ray_integration.py Lines 45 and 96 have to do that manually.

Suggested fix
     num_cpus = num_cpus if num_cpus is not None else settings.server.num_cpus
     use_ray = use_ray if use_ray is not None else settings.server.use_ray
+    settings.server.use_ray = use_ray
     enable_proxy = enable_proxy if enable_proxy is not None else settings.server.enable_proxy

Also applies to: 158-158

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/framex/__init__.py` around lines 69 - 75, The driver process never flips
the Ray mode flag, so calls like framex.adapter.get_adapter() and
BasePlugin.__init__ still see local mode; mirror the worker-side change by
setting settings.server.use_ray = True in the driver startup path when
run(use_ray=True) is invoked (the same place that currently prepares workers),
and also reset adapter state (adapter_module._adapter = None) there so the
driver and workers have consistent mode; apply the same update to the other
startup site noted (the other invocation that prepares Ray) so both entry paths
set settings.server.use_ray and clear adapter state.



def run(
*,
server_host: str | None = None,
Expand All @@ -87,6 +96,7 @@ def run(
dashboard_port = dashboard_port if dashboard_port is not None else settings.server.dashboard_port
num_cpus = num_cpus if num_cpus is not None else settings.server.num_cpus
use_ray = use_ray if use_ray is not None else settings.server.use_ray
settings.server.use_ray = use_ray
enable_proxy = enable_proxy if enable_proxy is not None else settings.server.enable_proxy
builtin_plugins = settings.load_builtin_plugins if load_builtin_plugins is None else load_builtin_plugins
external_plugins = settings.load_plugins if load_plugins is None else load_plugins
Expand Down Expand Up @@ -146,7 +156,7 @@ def run(
"env_vars": {
"REVERSION": reversion,
},
"worker_process_setup_hook": _setup_sentry,
"worker_process_setup_hook": _setup_ray_worker,
},
)
serve.start(
Expand Down
99 changes: 67 additions & 32 deletions src/framex/plugin/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,20 @@
from framex.config import settings
from framex.consts import PROXY_PLUGIN_NAME
from framex.log import logger
from framex.plugin.manage import PluginManager
from framex.plugin.manage import _manager
from framex.plugin.model import Plugin, PluginApi
from framex.plugin.resolver import (
ApiResolver,
_set_default_api_resolver,
get_current_api_resolver,
get_current_remote_apis,
get_default_api_resolver,
)

C = TypeVar("C", bound=BaseModel)
_manager: PluginManager = PluginManager(silent=settings.test.silent)

_current_plugin: ContextVar[Optional["Plugin"]] = ContextVar("_current_plugin", default=None)
_set_default_api_resolver(ApiResolver(manager=_manager))


def get_plugin(plugin_id: str) -> Plugin | None:
Expand All @@ -40,6 +47,7 @@ def check_plugin_config_exists(plugin_name: str) -> bool:
@logger.catch()
def init_all_deployments(enable_proxy: bool) -> list[Any]:
deployments = []
all_apis = {**_manager.all_plugin_apis[ApiType.FUNC], **_manager.all_plugin_apis[ApiType.HTTP]}
for plugin in get_loaded_plugins():
for dep in plugin.deployments:
remote_apis = {
Expand All @@ -66,6 +74,7 @@ def init_all_deployments(enable_proxy: bool) -> list[Any]:
deployment = get_adapter().bind(
dep.deployment,
remote_apis=remote_apis,
api_registry=all_apis,
config=plugin.config,
)

Expand All @@ -74,18 +83,26 @@ def init_all_deployments(enable_proxy: bool) -> list[Any]:
return deployments


async def call_plugin_api(
def _resolve_plugin_api(
api_name: str | PluginApi,
interval_apis: dict[str, PluginApi] | None = None,
**kwargs: Any,
) -> Any:
resolver: ApiResolver | None = None,
) -> tuple[PluginApi, bool]:
current_remote_apis = get_current_remote_apis()
if isinstance(api_name, PluginApi):
api: PluginApi | None = api_name
elif isinstance(api_name, str):
api = interval_apis.get(api_name) if interval_apis else _manager.get_api(api_name)
use_proxy = False
if not api:
if isinstance(api_name, str) and api_name.startswith("/") and settings.server.enable_proxy:
return api_name, api_name.call_type == ApiType.PROXY

Comment on lines +90 to +93
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Apply the remote_apis guard to pre-resolved PluginApi inputs too.

Lines 91-93 return immediately for PluginApi arguments, so once current_remote_apis is set a caller can bypass the new required_remote_apis check just by passing a PluginApi instance instead of a string. That reopens undeclared local cross-plugin calls in the exact path this PR is trying to harden.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/framex/plugin/__init__.py` around lines 90 - 93, The current early return
when api_name is a PluginApi bypasses the remote-apis guard; after calling
get_current_remote_apis() you must validate pre-resolved PluginApi inputs
against the same required_remote_apis/current_remote_apis rules as string names.
Modify the branch that handles isinstance(api_name, PluginApi) (and uses
api_name.call_type / ApiType.PROXY) to first consult get_current_remote_apis()
and required_remote_apis and only allow the PluginApi if the remote_apis guard
permits it (otherwise raise or reject); ensure the same validation logic used
for string api_name paths is reused for PluginApi instances.

active_resolver = resolver or get_current_api_resolver() or get_default_api_resolver()
if current_remote_apis is not None:
api = active_resolver.coerce_plugin_api(current_remote_apis.get(api_name))
else:
api = active_resolver.resolve(api_name, None)
Comment on lines +94 to +98
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

find . -name "__init__.py" -path "*/framex/plugin/*" | head -20

Repository: touale/FrameX-kit

Length of output: 92


🏁 Script executed:

cat -n src/framex/plugin/__init__.py | sed -n '85,110p'

Repository: touale/FrameX-kit

Length of output: 1260


🏁 Script executed:

rg -A 3 "def get_current_api_resolver|def get_default_api_resolver" --type py

Repository: touale/FrameX-kit

Length of output: 484


🏁 Script executed:

rg -A 5 "def _resolve_plugin_api" src/framex/plugin/__init__.py

Repository: touale/FrameX-kit

Length of output: 278


🏁 Script executed:

rg -B 2 -A 10 "class ApiResolver" --type py | head -50

Repository: touale/FrameX-kit

Length of output: 832


🏁 Script executed:

rg "_manager" src/framex/plugin/__init__.py

Repository: touale/FrameX-kit

Length of output: 496


Ensure active_resolver is always non-None before dereferencing it.

Line 94 produces ApiResolver | None since both get_current_api_resolver() and get_default_api_resolver() return optional types. Lines 96 and 98 will raise AttributeError if both resolver functions return None. Add a fallback to instantiate a default resolver:

Suggested fix
 def _resolve_plugin_api(
     api_name: str | PluginApi,
     resolver: ApiResolver | None = None,
 ) -> tuple[PluginApi, bool]:
     current_remote_apis = get_current_remote_apis()
+    active_resolver = (
+        resolver
+        or get_current_api_resolver()
+        or get_default_api_resolver()
+        or ApiResolver(manager=_manager)
+    )
     if isinstance(api_name, PluginApi):
         return api_name, api_name.call_type == ApiType.PROXY
 
-    active_resolver = resolver or get_current_api_resolver() or get_default_api_resolver()
     if current_remote_apis is not None:
-        api = active_resolver.coerce_plugin_api(current_remote_apis.get(api_name))
+        api = ApiResolver.coerce_plugin_api(current_remote_apis.get(api_name))
     else:
         api = active_resolver.resolve(api_name, None)
🧰 Tools
🪛 GitHub Actions: Test Project

[error] 96-96: mypy error: Item "None" of "ApiResolver | None" has no attribute "coerce_plugin_api" [union-attr]


[error] 98-98: mypy error: Item "None" of "ApiResolver | None" has no attribute "resolve" [union-attr]

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/framex/plugin/__init__.py` around lines 94 - 98, active_resolver may be
None because get_current_api_resolver() and get_default_api_resolver() return
optionals; ensure active_resolver is non-None before using it by providing a
final fallback resolver instance (e.g., construct or call a known default
ApiResolver) when resolver or the two getters return None; modify the assignment
of active_resolver and subsequent uses in this block (references:
active_resolver, resolver, get_current_api_resolver, get_default_api_resolver)
so that coerce_plugin_api and resolve are only called on a guaranteed resolver
object, and keep existing branches using current_remote_apis and api_name
unchanged.


if api is None:
if current_remote_apis is not None:
raise RuntimeError(
f"API {api_name} is not declared in current plugin remote_apis; add it to required_remote_apis."
)
if api_name.startswith("/") and settings.server.enable_proxy:
api = PluginApi(
api=api_name,
deployment_name=PROXY_PLUGIN_NAME,
Expand All @@ -94,42 +111,60 @@ async def call_plugin_api(
logger.opt(colors=True).warning(
f"Api(<y>{api_name}</y>) not found, use proxy plugin({PROXY_PLUGIN_NAME}) to transfer!"
)
use_proxy = True
else:
raise RuntimeError(
f"API {api_name} is not found, please check if the plugin is loaded or the API name is correct."
)
if api.call_type == ApiType.PROXY:
use_proxy = True

return api, api.call_type == ApiType.PROXY


def _normalize_plugin_call_kwargs(api: PluginApi, kwargs: dict[str, Any]) -> dict[str, Any]:
normalized_kwargs = dict(kwargs)
param_type_map = dict(api.params)
for key, val in kwargs.items():
for key, val in normalized_kwargs.items():
if (
isinstance(val, dict)
and (expected_type := param_type_map.get(key))
and isinstance(expected_type, type)
and issubclass(expected_type, BaseModel)
):
try:
kwargs[key] = expected_type(**val)
normalized_kwargs[key] = expected_type(**val)
except Exception as e: # pragma: no cover
raise RuntimeError(f"Failed to convert '{key}' to {expected_type}") from e
result = await get_adapter().call_func(api, **kwargs)
return normalized_kwargs


def _unwrap_plugin_call_result(api_name: str | PluginApi, result: Any, use_proxy: bool) -> Any:
if isinstance(result, BaseModel):
return result.model_dump(by_alias=True)
if use_proxy:
if not isinstance(result, dict):
return result
if "status" not in result:
raise RuntimeError(f"Proxy API {api_name} returned invalid response: missing 'status' field")
res = result.get("data")
status = result.get("status")
if status not in settings.server.legal_proxy_code:
logger.opt(colors=True).error(f"<>Proxy API {api_name} call illegal: <r>{result}</r>")
raise RuntimeError(f"Proxy API {api_name} returned status {status}")
if res is None:
logger.opt(colors=True).warning(f"API {api_name} returned empty data")
return res
return result
if not use_proxy:
return result
if not isinstance(result, dict):
return result
if "status" not in result:
raise RuntimeError(f"Proxy API {api_name} returned invalid response: missing 'status' field")

res = result.get("data")
status = result.get("status")
if status not in settings.server.legal_proxy_code:
logger.opt(colors=True).error(f"<>Proxy API {api_name} call illegal: <r>{result}</r>")
raise RuntimeError(f"Proxy API {api_name} returned status {status}")
if res is None:
logger.opt(colors=True).warning(f"API {api_name} returned empty data")
return res


async def call_plugin_api(
api_name: str | PluginApi,
resolver: ApiResolver | None = None,
**kwargs: Any,
) -> Any:
api, use_proxy = _resolve_plugin_api(api_name, resolver=resolver)
normalized_kwargs = _normalize_plugin_call_kwargs(api, kwargs)
result = await get_adapter().call_func(api, **normalized_kwargs)
return _unwrap_plugin_call_result(api_name, result, use_proxy)


def get_http_plugin_apis() -> list["PluginApi"]:
Expand Down
64 changes: 63 additions & 1 deletion src/framex/plugin/base.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,19 @@
import inspect
from collections.abc import Mapping
from functools import wraps
from typing import Any, final

from framex.config import settings
from framex.log import setup_logger
from framex.plugin import call_plugin_api
from framex.plugin.model import PluginApi
from framex.plugin.resolver import (
ApiResolver,
reset_current_api_resolver,
reset_current_remote_apis,
set_current_api_resolver,
set_current_remote_apis,
)


class BasePlugin:
Expand All @@ -12,6 +22,9 @@ class BasePlugin:
def __init__(self, **kwargs: Any) -> None:
setup_logger()
self.remote_apis: dict[str, PluginApi] = kwargs.get("remote_apis", {})
self.api_registry: Mapping[str, PluginApi] = kwargs.get("api_registry", {})
self.api_resolver = ApiResolver(api_registry=self.api_registry)
self._bind_api_resolver_context()
if settings.server.use_ray:
import asyncio

Expand All @@ -20,9 +33,58 @@ def __init__(self, **kwargs: Any) -> None:
async def on_start(self) -> None:
pass

def _bind_api_resolver_context(self) -> None:
for name, func in inspect.getmembers(type(self), predicate=callable):
if not getattr(func, "_on_request", False):
continue
bound = getattr(self, name)
setattr(self, name, self._wrap_with_api_resolver(bound))

def _wrap_with_api_resolver(self, func: Any) -> Any:
if inspect.isasyncgenfunction(func):

@wraps(func)
async def async_gen_wrapper(*args: Any, **kwargs: Any) -> Any:
resolver_token = set_current_api_resolver(self.api_resolver)
remote_token = set_current_remote_apis(self.remote_apis)
try:
async for chunk in func(*args, **kwargs):
yield chunk
finally:
reset_current_remote_apis(remote_token)
reset_current_api_resolver(resolver_token)

return async_gen_wrapper

if inspect.iscoroutinefunction(func):

@wraps(func)
async def async_wrapper(*args: Any, **kwargs: Any) -> Any:
resolver_token = set_current_api_resolver(self.api_resolver)
remote_token = set_current_remote_apis(self.remote_apis)
try:
return await func(*args, **kwargs)
finally:
reset_current_remote_apis(remote_token)
reset_current_api_resolver(resolver_token)

return async_wrapper

@wraps(func)
def sync_wrapper(*args: Any, **kwargs: Any) -> Any:
resolver_token = set_current_api_resolver(self.api_resolver)
remote_token = set_current_remote_apis(self.remote_apis)
try:
return func(*args, **kwargs)
finally:
reset_current_remote_apis(remote_token)
reset_current_api_resolver(resolver_token)

return sync_wrapper

@final
async def _call_remote_api(self, api_name: str, **kwargs: Any) -> Any:
res = await call_plugin_api(api_name, self.remote_apis, **kwargs)
res = await call_plugin_api(api_name, **kwargs)
return self._post_call_remote_api_hook(res)
Comment thread
coderabbitai[bot] marked this conversation as resolved.

def _post_call_remote_api_hook(self, data: Any) -> Any:
Expand Down
3 changes: 3 additions & 0 deletions src/framex/plugin/manage.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from pathlib import Path
from types import ModuleType

from framex.config import settings
from framex.log import logger
from framex.plugin.model import ApiType, Plugin, PluginApi, PluginMetadata
from framex.utils import escape_tag, path_to_module_name
Expand Down Expand Up @@ -269,3 +270,5 @@ def exec_module(self, module: ModuleType) -> None:

# Insert a custom plugin module finder into the front of the Python import system to intercept and load plugin modules
sys.meta_path.insert(0, PluginFinder())

_manager: PluginManager = PluginManager(silent=settings.test.silent)
80 changes: 80 additions & 0 deletions src/framex/plugin/resolver.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
from collections.abc import Mapping
from contextvars import ContextVar
from typing import Any, Protocol

from framex.plugin.model import PluginApi


class SupportsApiLookup(Protocol):
def get_api(self, api_name: str) -> PluginApi | None: ...


class ApiResolver:
def __init__(
self,
manager: SupportsApiLookup | None = None,
api_registry: Mapping[str, PluginApi | dict[str, Any]] | None = None,
) -> None:
self._manager = manager
self._api_registry = api_registry or {}

@staticmethod
def coerce_plugin_api(api: PluginApi | dict[str, Any] | None) -> PluginApi | None:
if api is None or isinstance(api, PluginApi):
return api
if isinstance(api, dict):
return PluginApi.model_validate(api)
return None

def resolve(
self,
api_name: str,
api_registry: Mapping[str, PluginApi | dict[str, Any]] | None = None,
) -> PluginApi | None:
if api_registry is not None and (api := self.coerce_plugin_api(api_registry.get(api_name))):
return api
if self._manager and (api := self._manager.get_api(api_name)):
return api
return self.coerce_plugin_api(self._api_registry.get(api_name))


_current_api_resolver: ContextVar[ApiResolver | None] = ContextVar("_current_api_resolver", default=None)
_current_remote_apis: ContextVar[Mapping[str, PluginApi | dict[str, Any]] | None] = ContextVar(
"_current_remote_apis", default=None
)
_default_api_resolver: ApiResolver | None = None


def get_current_api_resolver() -> ApiResolver | None:
return _current_api_resolver.get()


def get_default_api_resolver() -> ApiResolver:
if _default_api_resolver is None:
raise RuntimeError("Default API resolver is not configured")
return _default_api_resolver


def _set_default_api_resolver(resolver: ApiResolver) -> None:
global _default_api_resolver
_default_api_resolver = resolver


def set_current_api_resolver(resolver: ApiResolver | None) -> Any:
return _current_api_resolver.set(resolver)


def reset_current_api_resolver(token: Any) -> None:
_current_api_resolver.reset(token)


def get_current_remote_apis() -> Mapping[str, PluginApi | dict[str, Any]] | None:
return _current_remote_apis.get()


def set_current_remote_apis(remote_apis: Mapping[str, PluginApi | dict[str, Any]] | None) -> Any:
return _current_remote_apis.set(remote_apis)


def reset_current_remote_apis(token: Any) -> None:
_current_remote_apis.reset(token)
Loading
Loading