From 2344e7f26e2443f379572ffb46097332b6bb8288 Mon Sep 17 00:00:00 2001
From: touale <136764239@qq.com>
Date: Thu, 16 Apr 2026 14:34:55 +0800
Subject: [PATCH 1/6] feat: add plugin release check and config view
---
README.md | 2 +-
src/framex/config.py | 60 ++++++
src/framex/driver/application.py | 35 +++-
src/framex/plugin/on.py | 1 +
src/framex/plugins/proxy/__init__.py | 1 +
src/framex/repository/__init__.py | 3 +
src/framex/repository/providers/__init__.py | 17 ++
src/framex/repository/providers/base.py | 66 ++++++
src/framex/repository/providers/github.py | 44 ++++
src/framex/repository/providers/gitlab.py | 46 ++++
src/framex/repository/versioning.py | 37 ++++
src/framex/utils.py | 219 +++++++++++++++++++-
tests/api/test_proxy.py | 36 ++++
tests/conftest.py | 11 +-
tests/mock.py | 11 +
tests/test_config.py | 58 +++++-
tests/test_utils.py | 185 ++++++++++++++++-
17 files changed, 815 insertions(+), 17 deletions(-)
create mode 100644 src/framex/repository/__init__.py
create mode 100644 src/framex/repository/providers/__init__.py
create mode 100644 src/framex/repository/providers/base.py
create mode 100644 src/framex/repository/providers/github.py
create mode 100644 src/framex/repository/providers/gitlab.py
create mode 100644 src/framex/repository/versioning.py
diff --git a/README.md b/README.md
index 42066ef..a6f4ad3 100644
--- a/README.md
+++ b/README.md
@@ -5,7 +5,7 @@
FrameX
- Build modular Python services with plug-and-play plugins, clear team boundaries, and transparent API integration.
+ 🚀 Build scalable Python services with plugins — like FastAPI + Ray, but modular by design.
diff --git a/src/framex/config.py b/src/framex/config.py
index 64ba37b..96c02d3 100644
--- a/src/framex/config.py
+++ b/src/framex/config.py
@@ -84,6 +84,65 @@ def model_post_init(self, context: Any) -> None:
self.jwt_secret = secrets.token_urlsafe(32)
+class RepositoryProviderAuthConfig(BaseModel):
+ token: str = ""
+ token_header: str = "Authorization" # noqa
+ token_scheme: str = "Bearer" # noqa
+
+ def build_headers(self) -> dict[str, str]:
+ if not self.token:
+ return {}
+ if self.token_scheme:
+ return {self.token_header: f"{self.token_scheme} {self.token}"}
+ return {self.token_header: self.token}
+
+
+class GitLabRepositoryAuthEndpointConfig(RepositoryProviderAuthConfig):
+ host: str
+ path_prefix: str = ""
+ token_header: str = "PRIVATE-TOKEN" # noqa
+ token_scheme: str = ""
+
+ def matches(self, host: str, path: str) -> bool:
+ normalized_prefix = self.normalized_path_prefix
+ return self.host.lower() == host.lower() and (not normalized_prefix or path.startswith(normalized_prefix))
+
+ @property
+ def normalized_path_prefix(self) -> str:
+ if not self.path_prefix:
+ return ""
+ return self.path_prefix if self.path_prefix.startswith("/") else f"/{self.path_prefix}"
+
+
+class GitLabRepositoryAuthConfig(RepositoryProviderAuthConfig):
+ token_header: str = "PRIVATE-TOKEN" # noqa
+ token_scheme: str = ""
+ endpoints: list[GitLabRepositoryAuthEndpointConfig] = Field(default_factory=list)
+
+ def configured_hosts(self) -> set[str]:
+ return {endpoint.host.lower() for endpoint in self.endpoints}
+
+ def build_headers_for_url(self, host: str, path: str) -> dict[str, str]:
+ if endpoint := self.resolve_endpoint(host, path):
+ return endpoint.build_headers()
+ return self.build_headers()
+
+ def resolve_endpoint(self, host: str, path: str) -> GitLabRepositoryAuthEndpointConfig | None:
+ matches = [endpoint for endpoint in self.endpoints if endpoint.matches(host, path)]
+ if not matches:
+ return None
+ return max(matches, key=lambda endpoint: len(endpoint.normalized_path_prefix))
+
+
+class RepositoryAuthConfig(BaseModel):
+ github: RepositoryProviderAuthConfig = Field(default_factory=RepositoryProviderAuthConfig)
+ gitlab: GitLabRepositoryAuthConfig = Field(default_factory=GitLabRepositoryAuthConfig)
+
+
+class RepositoryConfig(BaseModel):
+ auth: RepositoryAuthConfig = Field(default_factory=RepositoryAuthConfig)
+
+
class AuthConfig(BaseModel):
oauth: OauthConfig | None = Field(default=None)
rules: dict[str, list[str]] = Field(default_factory=dict)
@@ -133,6 +192,7 @@ class Settings(BaseSettings):
test: TestConfig = Field(default_factory=TestConfig)
sentry: SentryConfig = Field(default_factory=SentryConfig)
auth: AuthConfig = Field(default_factory=AuthConfig)
+ repository: RepositoryConfig = Field(default_factory=RepositoryConfig)
model_config = SettingsConfigDict(
# `.env.prod` takes priority over `.env`
diff --git a/src/framex/driver/application.py b/src/framex/driver/application.py
index 1c730ea..951fcc6 100644
--- a/src/framex/driver/application.py
+++ b/src/framex/driver/application.py
@@ -24,7 +24,9 @@
from framex.config import settings
from framex.consts import API_PRE_STR, DOCS_URL, OPENAPI_URL, PROJECT_NAME, REDOC_URL, VERSION
from framex.driver.auth import authenticate, oauth_callback
-from framex.utils import build_swagger_ui_html, format_uptime, safe_error_message
+from framex.plugin import get_plugin
+from framex.repository import get_latest_repository_version, has_newer_release_version
+from framex.utils import build_plugin_config_html, build_swagger_ui_html, format_uptime, safe_error_message
FRAME_START_TIME = datetime.now(tz=UTC)
SHANGHAI_TZ = ZoneInfo("Asia/Shanghai")
@@ -109,6 +111,37 @@ async def _on_start(deployment: Any) -> None:
async def get_documentation(_: Annotated[str, Depends(authenticate)]) -> HTMLResponse:
return build_swagger_ui_html(openapi_url=OPENAPI_URL, title="FrameX Docs")
+ @application.get("/docs/plugin-config", include_in_schema=False)
+ async def get_plugin_config_documentation(
+ plugin: str,
+ _: Annotated[str, Depends(authenticate)],
+ ) -> HTMLResponse:
+ loaded_plugin = get_plugin(plugin)
+ if loaded_plugin is not None and loaded_plugin.config is not None:
+ return build_plugin_config_html(loaded_plugin.config.model_dump())
+
+ if config_data := settings.plugins.get(plugin):
+ return build_plugin_config_html(config_data)
+
+ raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"Plugin config not found: {plugin}")
+
+ @application.get("/docs/plugin-release", include_in_schema=False)
+ async def get_plugin_release_documentation(
+ plugin: str,
+ _: Annotated[str, Depends(authenticate)],
+ ) -> dict[str, Any]:
+ loaded_plugin = get_plugin(plugin)
+ if loaded_plugin is None or loaded_plugin.metadata is None:
+ raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"Plugin not found: {plugin}")
+
+ current_version = loaded_plugin.metadata.version
+ current_version = current_version if current_version.startswith("v") else f"v{current_version}"
+ repo_url = loaded_plugin.metadata.url
+ latest_version = get_latest_repository_version(repo_url)
+ if not latest_version or not has_newer_release_version(current_version, latest_version):
+ return {"has_update": False, "latest_version": None, "repo_url": repo_url}
+ return {"has_update": True, "latest_version": latest_version, "repo_url": repo_url}
+
@application.get(REDOC_URL, include_in_schema=False)
async def get_redoc_documentation(_: Annotated[str, Depends(authenticate)]) -> HTMLResponse:
return get_redoc_html(openapi_url=OPENAPI_URL, title="FrameX Redoc")
diff --git a/src/framex/plugin/on.py b/src/framex/plugin/on.py
index d418a1f..a4c9880 100644
--- a/src/framex/plugin/on.py
+++ b/src/framex/plugin/on.py
@@ -57,6 +57,7 @@ def decorator(cls: type) -> type:
version,
plugin.module.__plugin_meta__.description,
plugin.module.__plugin_meta__.url,
+ plugin.name,
)
plugin_apis.append(
diff --git a/src/framex/plugins/proxy/__init__.py b/src/framex/plugins/proxy/__init__.py
index dfe2c63..78d1aa2 100644
--- a/src/framex/plugins/proxy/__init__.py
+++ b/src/framex/plugins/proxy/__init__.py
@@ -174,6 +174,7 @@ async def _parse_openai_docs(self, url: str) -> None:
f"v{__plugin_meta__.version}",
__plugin_meta__.description,
__plugin_meta__.url,
+ __plugin_meta__.name,
)
await adapter.call_func(
plugin_api,
diff --git a/src/framex/repository/__init__.py b/src/framex/repository/__init__.py
new file mode 100644
index 0000000..f3dc4d0
--- /dev/null
+++ b/src/framex/repository/__init__.py
@@ -0,0 +1,3 @@
+from .versioning import get_latest_repository_version, has_newer_release_version
+
+__all__ = ["get_latest_repository_version", "has_newer_release_version"]
diff --git a/src/framex/repository/providers/__init__.py b/src/framex/repository/providers/__init__.py
new file mode 100644
index 0000000..e724da5
--- /dev/null
+++ b/src/framex/repository/providers/__init__.py
@@ -0,0 +1,17 @@
+"""Repository hosting providers used by version lookup."""
+
+from .base import RepositoryVersionProvider
+from .github import GITHUB_PROVIDER
+from .gitlab import GITLAB_PROVIDER
+
+REPOSITORY_VERSION_PROVIDERS: tuple[RepositoryVersionProvider, ...] = (
+ GITHUB_PROVIDER,
+ GITLAB_PROVIDER,
+)
+
+__all__ = [
+ "GITHUB_PROVIDER",
+ "GITLAB_PROVIDER",
+ "REPOSITORY_VERSION_PROVIDERS",
+ "RepositoryVersionProvider",
+]
diff --git a/src/framex/repository/providers/base.py b/src/framex/repository/providers/base.py
new file mode 100644
index 0000000..7d9a5bb
--- /dev/null
+++ b/src/framex/repository/providers/base.py
@@ -0,0 +1,66 @@
+"""Base types and shared helpers for repository version providers."""
+
+from abc import ABC, abstractmethod
+from typing import Any
+from urllib.parse import ParseResult
+
+import httpx
+
+DEFAULT_HTTP_TIMEOUT = 2.0
+
+
+class RepositoryVersionProvider(ABC):
+ """Abstract interface for repository hosting providers."""
+
+ name: str
+
+ @abstractmethod
+ def matches(self, parsed_url: ParseResult) -> bool:
+ """Return whether this provider can handle the parsed repository URL."""
+
+ @abstractmethod
+ def get_latest_version(self, parsed_url: ParseResult) -> str | None:
+ """Return the latest published version for the repository URL."""
+
+ @staticmethod
+ def extract_repository_parts(parsed_url: ParseResult) -> list[str]:
+ """Split a repository path into normalized URL parts."""
+
+ parts = [part for part in parsed_url.path.split("/") if part]
+ if parts:
+ parts[-1] = parts[-1].removesuffix(".git")
+ return parts
+
+ @staticmethod
+ def fetch_json(url: str, headers: dict[str, str] | None = None) -> dict[str, Any] | None:
+ """Fetch a JSON object and return `None` when it cannot be consumed."""
+
+ try:
+ with httpx.Client(timeout=DEFAULT_HTTP_TIMEOUT, headers=headers) as client:
+ response = client.get(url, follow_redirects=True)
+ except httpx.HTTPError:
+ return None
+
+ if response.status_code != 200:
+ return None
+
+ try:
+ payload = response.json()
+ except ValueError:
+ return None
+
+ return payload if isinstance(payload, dict) else None
+
+ @staticmethod
+ def extract_version(payload: dict[str, Any] | None) -> str | None:
+ """Read a version-like string from a release payload."""
+
+ if payload is None:
+ return None
+
+ latest_version = payload.get("tag_name") or payload.get("name")
+ if not isinstance(latest_version, str):
+ return None
+
+ latest_version = latest_version.strip()
+ return latest_version or None
diff --git a/src/framex/repository/providers/github.py b/src/framex/repository/providers/github.py
new file mode 100644
index 0000000..f93beda
--- /dev/null
+++ b/src/framex/repository/providers/github.py
@@ -0,0 +1,44 @@
+"""GitHub repository version provider."""
+
+from urllib.parse import ParseResult
+
+from framex.config import settings
+
+from .base import RepositoryVersionProvider
+
+GITHUB_HOSTS = frozenset({"github.com", "www.github.com"})
+GITHUB_API_HEADERS = {
+ "Accept": "application/vnd.github+json",
+ "User-Agent": "framex-docs",
+}
+
+
+class GitHubRepositoryVersionProvider(RepositoryVersionProvider):
+ """Resolve latest release versions for GitHub repositories."""
+
+ name = "github"
+
+ def matches(self, parsed_url: ParseResult) -> bool:
+ return parsed_url.netloc in GITHUB_HOSTS
+
+ def get_latest_version(self, parsed_url: ParseResult) -> str | None:
+ repository = self._extract_owner_and_repository(parsed_url)
+ if repository is None:
+ return None
+
+ owner, repo = repository
+ api_url = f"https://api.github.com/repos/{owner}/{repo}/releases/latest"
+ headers = {**GITHUB_API_HEADERS, **settings.repository.auth.github.build_headers()}
+ payload = self.fetch_json(api_url, headers=headers)
+ return self.extract_version(payload)
+
+ def _extract_owner_and_repository(self, parsed_url: ParseResult) -> tuple[str, str] | None:
+ parts = self.extract_repository_parts(parsed_url)
+ if len(parts) < 2:
+ return None
+ return parts[0], parts[1]
+
+
+GITHUB_PROVIDER = GitHubRepositoryVersionProvider()
+
+__all__ = ["GITHUB_PROVIDER", "GitHubRepositoryVersionProvider"]
diff --git a/src/framex/repository/providers/gitlab.py b/src/framex/repository/providers/gitlab.py
new file mode 100644
index 0000000..a409a84
--- /dev/null
+++ b/src/framex/repository/providers/gitlab.py
@@ -0,0 +1,46 @@
+"""GitLab repository version provider."""
+
+from urllib.parse import ParseResult, quote
+
+from framex.config import settings
+
+from .base import RepositoryVersionProvider
+
+GITLAB_PRIMARY_HOST = "gitlab.com"
+GITLAB_HOST_SUFFIX = ".gitlab.com"
+
+
+class GitLabRepositoryVersionProvider(RepositoryVersionProvider):
+ """Resolve latest release versions for GitLab repositories."""
+
+ name = "gitlab"
+
+ def matches(self, parsed_url: ParseResult) -> bool:
+ host = parsed_url.netloc.lower()
+ return (
+ host == GITLAB_PRIMARY_HOST
+ or host.endswith(GITLAB_HOST_SUFFIX)
+ or host in settings.repository.auth.gitlab.configured_hosts()
+ )
+
+ def get_latest_version(self, parsed_url: ParseResult) -> str | None:
+ project_path = self._extract_project_path(parsed_url)
+ if project_path is None:
+ return None
+
+ project_id = quote(project_path, safe="")
+ api_url = f"{parsed_url.scheme}://{parsed_url.netloc}/api/v4/projects/{project_id}/releases/permalink/latest"
+ headers = settings.repository.auth.gitlab.build_headers_for_url(parsed_url.netloc, parsed_url.path) or None
+ payload = self.fetch_json(api_url, headers=headers)
+ return self.extract_version(payload)
+
+ def _extract_project_path(self, parsed_url: ParseResult) -> str | None:
+ parts = self.extract_repository_parts(parsed_url)
+ if len(parts) < 2:
+ return None
+ return "/".join(parts)
+
+
+GITLAB_PROVIDER = GitLabRepositoryVersionProvider()
+
+__all__ = ["GITLAB_PROVIDER", "GitLabRepositoryVersionProvider"]
diff --git a/src/framex/repository/versioning.py b/src/framex/repository/versioning.py
new file mode 100644
index 0000000..b4d3912
--- /dev/null
+++ b/src/framex/repository/versioning.py
@@ -0,0 +1,37 @@
+"""Repository version lookup entrypoints."""
+
+import re
+from functools import lru_cache
+from urllib.parse import urlparse
+
+from .providers import REPOSITORY_VERSION_PROVIDERS
+
+VERSION_PATTERN = re.compile(r"\d+(?:\.\d+)*")
+
+
+@lru_cache(maxsize=128)
+def get_latest_repository_version(repo_url: str) -> str | None:
+ parsed_url = urlparse(repo_url)
+ for provider in REPOSITORY_VERSION_PROVIDERS:
+ if provider.matches(parsed_url):
+ return provider.get_latest_version(parsed_url)
+ return None
+
+
+def has_newer_release_version(current_version: str, latest_version: str) -> bool:
+ current_parts = _normalize_version(current_version)
+ latest_parts = _normalize_version(latest_version)
+ if current_parts is None or latest_parts is None:
+ return False
+
+ max_length = max(len(current_parts), len(latest_parts))
+ current_padded = current_parts + (0,) * (max_length - len(current_parts))
+ latest_padded = latest_parts + (0,) * (max_length - len(latest_parts))
+ return latest_padded > current_padded
+
+
+def _normalize_version(version: str) -> tuple[int, ...] | None:
+ match = VERSION_PATTERN.search(version)
+ if match is None:
+ return None
+ return tuple(int(part) for part in match.group(0).split("."))
diff --git a/src/framex/utils.py b/src/framex/utils.py
index 044018f..061c81e 100644
--- a/src/framex/utils.py
+++ b/src/framex/utils.py
@@ -1,4 +1,5 @@
import base64
+import html
import importlib
import inspect
import json
@@ -10,6 +11,7 @@
from itertools import cycle
from pathlib import Path
from typing import Any
+from urllib.parse import quote
from fastapi.responses import HTMLResponse
from pydantic import BaseModel
@@ -240,7 +242,6 @@ def build_swagger_ui_html(openapi_url: str, title: str) -> HTMLResponse:
box-shadow: var(--fx-shadow);
}}
- /* 外层三列: tag | description | arrow */
.swagger-ui .opblock-tag {{
display: grid !important;
grid-template-columns: 420px minmax(0, 1fr) 28px;
@@ -256,7 +257,6 @@ def build_swagger_ui_html(openapi_url: str, title: str) -> HTMLResponse:
background: #fafbfc;
}}
- /* tag 标题 */
.swagger-ui .opblock-tag .nostyle {{
grid-column: 1;
min-width: 0;
@@ -270,7 +270,6 @@ def build_swagger_ui_html(openapi_url: str, title: str) -> HTMLResponse:
color: var(--fx-text) !important;
}}
- /* description 容器 */
.swagger-ui .opblock-tag small {{
grid-column: 2;
display: block !important;
@@ -293,7 +292,6 @@ def build_swagger_ui_html(openapi_url: str, title: str) -> HTMLResponse:
padding: 0 !important;
}}
- /* 第一行 description */
.swagger-ui .opblock-tag small .markdown p:first-child {{
margin-bottom: 3px !important;
color: var(--fx-text) !important;
@@ -308,14 +306,12 @@ def build_swagger_ui_html(openapi_url: str, title: str) -> HTMLResponse:
color: var(--fx-text) !important;
}}
- /* 第二行: 作者、版本、Repo */
.swagger-ui .opblock-tag small .markdown p:last-child {{
color: var(--fx-text-soft) !important;
font-size: 12px !important;
line-height: 1.4 !important;
}}
- /* Repo 链接 */
.swagger-ui .opblock-tag small a {{
color: var(--fx-link);
text-decoration: none;
@@ -327,7 +323,6 @@ def build_swagger_ui_html(openapi_url: str, title: str) -> HTMLResponse:
text-decoration: underline;
}}
- /* 右侧展开箭头 */
.swagger-ui .opblock-tag > button {{
grid-column: 3 !important;
justify-self: end !important;
@@ -370,7 +365,6 @@ def build_swagger_ui_html(openapi_url: str, title: str) -> HTMLResponse:
word-break: break-word;
}}
- /* 新增: 按钮容器, 放在第一个 tag 上方 */
.swagger-ui .tag-toolbar {{
display: flex;
justify-content: flex-end;
@@ -394,6 +388,10 @@ def build_swagger_ui_html(openapi_url: str, title: str) -> HTMLResponse:
background: #f9fafb;
}}
+ .swagger-ui .opblock-tag small a[href*="/docs/plugin-config?payload="] {{
+ font-weight: 600;
+ }}
+
@media (max-width: 1400px) {{
.swagger-ui .opblock-tag {{
grid-template-columns: 360px minmax(0, 1fr) 28px;
@@ -495,6 +493,61 @@ def build_swagger_ui_html(openapi_url: str, title: str) -> HTMLResponse:
syncToolbarText();
}}
+ function getTagDescriptionLink(target) {{
+ const link = target.closest(".swagger-ui .opblock-tag small a");
+ return link instanceof HTMLAnchorElement ? link : null;
+ }}
+
+ function hydrateLatestReleaseLinks() {{
+ const releaseLinks = document.querySelectorAll('.swagger-ui .opblock-tag small a[href*="/docs/plugin-release?plugin="]');
+ releaseLinks.forEach((link) => {{
+ if (!(link instanceof HTMLAnchorElement) || link.dataset.releaseHydrated === "true") {{
+ return;
+ }}
+
+ link.dataset.releaseHydrated = "true";
+ fetch(link.href, {{ credentials: "same-origin" }})
+ .then((response) => response.ok ? response.json() : null)
+ .then((data) => {{
+ if (!data || !data.has_update || !data.latest_version) {{
+ link.remove();
+ return;
+ }}
+
+ link.textContent = "⬆️ " + data.latest_version;
+ if (data.repo_url) {{
+ link.href = data.repo_url;
+ }}
+ }})
+ .catch(() => {{
+ link.remove();
+ }});
+ }});
+ }}
+
+ document.addEventListener("pointerdown", (event) => {{
+ const link = getTagDescriptionLink(event.target);
+ if (!link) return;
+
+ event.stopPropagation();
+ if (link.href.includes("/docs/plugin-config?plugin=")) {{
+ event.preventDefault();
+ }}
+ }}, true);
+
+ document.addEventListener("click", (event) => {{
+ const link = getTagDescriptionLink(event.target);
+ if (!link) return;
+
+ event.stopPropagation();
+ if (!link.href.includes("/docs/plugin-config?plugin=")) {{
+ return;
+ }}
+
+ event.preventDefault();
+ window.open(link.href, "_blank", "noopener,noreferrer");
+ }}, true);
+
window.ui = SwaggerUIBundle({{
url: "{openapi_url}",
dom_id: "#swagger-ui",
@@ -509,11 +562,13 @@ def build_swagger_ui_html(openapi_url: str, title: str) -> HTMLResponse:
layout: "BaseLayout",
onComplete: function() {{
insertToolbar();
+ hydrateLatestReleaseLinks();
}}
}});
const observer = new MutationObserver(() => {{
insertToolbar();
+ hydrateLatestReleaseLinks();
}});
observer.observe(document.body, {{
@@ -527,10 +582,156 @@ def build_swagger_ui_html(openapi_url: str, title: str) -> HTMLResponse:
) # roqa
+def _format_toml_key(key: str) -> str:
+ if re.fullmatch(r"[A-Za-z0-9_-]+", key):
+ return key
+ return json.dumps(key, ensure_ascii=False)
+
+
+def _format_toml_value(value: Any) -> str:
+ if isinstance(value, bool):
+ return str(value).lower()
+ if isinstance(value, str):
+ return json.dumps(value, ensure_ascii=False)
+ if isinstance(value, int | float):
+ return str(value)
+ if value is None:
+ return '""'
+ if isinstance(value, list):
+ return f"[{', '.join(_format_toml_value(item) for item in value)}]"
+ if isinstance(value, dict):
+ items = ", ".join(f"{_format_toml_key(str(key))} = {_format_toml_value(item)}" for key, item in value.items())
+ return f"{{ {items} }}"
+ return json.dumps(value, ensure_ascii=False)
+
+
+def _dump_toml_table(data: dict[str, Any], prefix: tuple[str, ...] = ()) -> list[str]:
+ lines: list[str] = []
+ nested_items: list[tuple[str, Any]] = []
+
+ for key, value in data.items():
+ if isinstance(value, dict):
+ nested_items.append((key, value))
+ continue
+ if isinstance(value, list) and value and all(isinstance(item, dict) for item in value):
+ nested_items.append((key, value))
+ continue
+ lines.append(f"{_format_toml_key(str(key))} = {_format_toml_value(value)}")
+
+ for key, value in nested_items:
+ section_name = ".".join([*prefix, _format_toml_key(str(key))])
+ if isinstance(value, dict):
+ if lines:
+ lines.append("")
+ lines.append(f"[{section_name}]")
+ lines.extend(_dump_toml_table(value, (*prefix, _format_toml_key(str(key)))))
+ continue
+
+ for item in value:
+ if lines:
+ lines.append("")
+ lines.append(f"[[{section_name}]]")
+ lines.extend(_dump_toml_table(item, (*prefix, _format_toml_key(str(key)))))
+
+ return lines
+
+
+def _format_plugin_config_toml(config_data: Any) -> str:
+ payload = config_data
+ if not isinstance(payload, dict):
+ return _format_toml_value(payload)
+ return "\n".join(_dump_toml_table(payload))
+
+
+def build_plugin_config_html(config_data: Any) -> HTMLResponse:
+ escaped_toml = html.escape(_format_plugin_config_toml(config_data))
+ return HTMLResponse(
+ f"""
+
+
+
+
+
+ Plugin Config
+
+
+
+
+
+
+ """
+ )
+
+
+def _format_plugin_release_view(plugin_name: str | None = None) -> str:
+ if not plugin_name:
+ return ""
+
+ plugin_query = quote(plugin_name)
+ return f" [](/docs/plugin-release?plugin={plugin_query})"
+
+
+def _format_plugin_config_view(plugin_name: str | None = None) -> str:
+ if not plugin_name:
+ return ""
+
+ plugin_query = quote(plugin_name)
+ return f"[⚙️ View Config](/docs/plugin-config?plugin={plugin_query})"
+
+
def build_plugin_description(
author: str,
version: str,
description: str,
repo: str,
+ plugin_name: str | None = None,
) -> str:
- return f"**{description}**\n\n\n👤 {author} · 🧩 {version} · [🔗 Repo]({repo})"
+ latest_release = _format_plugin_release_view(plugin_name)
+ config_view = _format_plugin_config_view(plugin_name)
+ return f"**{description}**{latest_release}\n\n\n👤 {author} · 🧩 {version} · [🔗 Repo]({repo}) · {config_view}"
diff --git a/tests/api/test_proxy.py b/tests/api/test_proxy.py
index 1e5b160..416d65d 100644
--- a/tests/api/test_proxy.py
+++ b/tests/api/test_proxy.py
@@ -60,6 +60,42 @@ def test_get_proxy_upload(client: TestClient):
}
+def test_openapi_tag_description_shows_lazy_release_view(client: TestClient):
+ data = client.get("/api/v1/openapi.json").json()
+
+ descriptions = [tag.get("description") or "" for tag in data.get("tags", [])]
+ assert any("/docs/plugin-release?plugin=proxy" in description for description in descriptions)
+
+
+def test_openapi_tag_description_shows_plugin_config(client: TestClient):
+ data = client.get("/api/v1/openapi.json").json()
+
+ descriptions = [tag.get("description") or "" for tag in data.get("tags", [])]
+ assert any("View Config" in description for description in descriptions)
+ assert any("/docs/plugin-config?plugin=proxy" in description for description in descriptions)
+
+
+def test_get_plugin_release_documentation(client: TestClient, monkeypatch):
+ monkeypatch.setattr("framex.driver.application.get_latest_repository_version", lambda _: "v9.9.9")
+
+ response = client.get("/docs/plugin-release", params={"plugin": "proxy"})
+
+ assert response.status_code == 200
+ assert response.json() == {
+ "has_update": True,
+ "latest_version": "v9.9.9",
+ "repo_url": "https://github.com/touale/FrameX-kit",
+ }
+
+
+def test_get_plugin_config_documentation(client: TestClient):
+ response = client.get("/docs/plugin-config", params={"plugin": "proxy"})
+
+ assert response.status_code == 200
+ assert "Plugin Config (TOML)" in response.text
+ assert "proxy_urls" in response.text
+
+
def test_get_proxy_upload_openapi(client: TestClient):
data = client.get("/api/v1/openapi.json").json()
post = data["paths"]["/proxy/mock/upload"]["post"]
diff --git a/tests/conftest.py b/tests/conftest.py
index b0c4a06..9086741 100644
--- a/tests/conftest.py
+++ b/tests/conftest.py
@@ -9,7 +9,7 @@
import framex
from framex.config import settings
-from tests.mock import mock_get, mock_request
+from tests.mock import mock_get, mock_repository_fetch_json, mock_request
@pytest.fixture(autouse=True)
@@ -58,13 +58,16 @@ def before_record_response(response):
@pytest.fixture(scope="session", autouse=True)
def test_app() -> Generator:
- plugins = framex.load_plugins(str(Path(__file__).parent / "plugins"))
- assert len(plugins) == len(["invoker", "export", "alias_model"])
-
with (
+ patch(
+ "framex.repository.providers.base.RepositoryVersionProvider.fetch_json",
+ new=staticmethod(mock_repository_fetch_json),
+ ),
patch("httpx.AsyncClient.get", new=mock_get),
patch("httpx.AsyncClient.request", new=mock_request),
):
+ plugins = framex.load_plugins(str(Path(__file__).parent / "plugins"))
+ assert len(plugins) == len(["invoker", "export", "alias_model"])
yield framex.run(test_mode=True) # type: ignore[return-value]
diff --git a/tests/mock.py b/tests/mock.py
index 5cbfb04..0a50f1f 100644
--- a/tests/mock.py
+++ b/tests/mock.py
@@ -106,3 +106,14 @@ async def mock_request(_, method: str, url: str, **kwargs: Any):
raise AssertionError(f"Unexpected request: {method} {url}")
return resp
+
+
+def mock_repository_fetch_json(url: str, headers: dict[str, str] | None = None):
+ headers = headers or {}
+ if url.endswith("/releases/latest") and "api.github.com/repos/" in url:
+ return {"tag_name": "v9.9.9"}
+ if url.endswith("/releases/permalink/latest") and "/api/v4/projects/" in url:
+ if headers.get("PRIVATE-TOKEN") == "gitlab-private-token":
+ return {"tag_name": "v8.8.8"}
+ return None
+ raise AssertionError(f"Unexpected repository metadata request: {url}")
diff --git a/tests/test_config.py b/tests/test_config.py
index ca8dea7..47f823f 100644
--- a/tests/test_config.py
+++ b/tests/test_config.py
@@ -1,4 +1,4 @@
-from framex.config import OauthConfig
+from framex.config import OauthConfig, RepositoryConfig
def test_config():
@@ -60,3 +60,59 @@ def test_proxy_config():
assert not proxy_config.is_white_url("http://localhost:10000", "/health")
assert proxy_config.is_white_url("http://localhost:10000", "/echo")
assert proxy_config.is_white_url("http://localhost:10001", "/health")
+
+
+def test_repository_auth_config_default_headers():
+ cfg = RepositoryConfig()
+
+ assert cfg.auth.github.build_headers() == {}
+ assert cfg.auth.gitlab.build_headers() == {}
+
+
+def test_repository_auth_config_builds_provider_headers():
+ cfg = RepositoryConfig(
+ auth={
+ "github": {"token": "gh-secret"},
+ "gitlab": {"token": "gl-secret"},
+ }
+ )
+
+ assert cfg.auth.github.build_headers() == {"Authorization": "Bearer gh-secret"}
+ assert cfg.auth.gitlab.build_headers() == {"PRIVATE-TOKEN": "gl-secret"}
+
+
+def test_gitlab_repository_auth_config_uses_matching_endpoint_headers():
+ cfg = RepositoryConfig(
+ auth={
+ "gitlab": {
+ "endpoints": [
+ {"host": "gitlab.company.internal", "token": "team-a-token", "path_prefix": "/team-a"},
+ {"host": "gitlab.company.internal", "token": "team-b-token", "path_prefix": "/team-b"},
+ ]
+ }
+ }
+ )
+
+ assert cfg.auth.gitlab.build_headers_for_url("gitlab.company.internal", "/team-a/repo") == {
+ "PRIVATE-TOKEN": "team-a-token"
+ }
+ assert cfg.auth.gitlab.build_headers_for_url("gitlab.company.internal", "/team-b/repo") == {
+ "PRIVATE-TOKEN": "team-b-token"
+ }
+
+
+def test_gitlab_repository_auth_config_prefers_longest_path_prefix():
+ cfg = RepositoryConfig(
+ auth={
+ "gitlab": {
+ "endpoints": [
+ {"host": "gitlab.company.internal", "token": "group-token", "path_prefix": "/team"},
+ {"host": "gitlab.company.internal", "token": "project-token", "path_prefix": "/team/project"},
+ ]
+ }
+ }
+ )
+
+ assert cfg.auth.gitlab.build_headers_for_url("gitlab.company.internal", "/team/project/repo") == {
+ "PRIVATE-TOKEN": "project-token"
+ }
diff --git a/tests/test_utils.py b/tests/test_utils.py
index f09d13d..11c412f 100644
--- a/tests/test_utils.py
+++ b/tests/test_utils.py
@@ -1,3 +1,5 @@
+import html
+import importlib
import json
from datetime import datetime, timedelta
from typing import Any
@@ -5,9 +7,12 @@
import pytest
from pydantic import BaseModel
-from framex.config import AuthConfig
+from framex.config import AuthConfig, GitLabRepositoryAuthEndpointConfig, settings
+from framex.repository import get_latest_repository_version, has_newer_release_version
from framex.utils import (
StreamEnventType,
+ build_plugin_config_html,
+ build_plugin_description,
cache_decode,
cache_encode,
format_uptime,
@@ -181,3 +186,181 @@ def test_safe_error_message_fallback():
e = Exception()
e.args = ()
assert safe_error_message(e) == "Internal Server Error"
+
+
+def test_build_plugin_description_shows_lazy_release_view():
+ description = build_plugin_description(
+ author="tester",
+ version="v0.3.4",
+ description="demo plugin",
+ repo="https://github.com/example/repo",
+ plugin_name="demo",
+ )
+
+ assert "/docs/plugin-release?plugin=demo" in description
+
+
+def test_build_plugin_description_shows_config_view():
+ class DemoConfig(BaseModel):
+ enabled: bool = True
+ name: str = "demo"
+
+ description = build_plugin_description(
+ author="tester",
+ version="v0.3.4",
+ description="demo plugin",
+ repo="https://github.com/example/repo",
+ plugin_name="demo",
+ )
+
+ assert "View Config" in description
+ assert "/docs/plugin-config?plugin=demo" in description
+
+
+def test_build_plugin_config_html_uses_toml_format():
+ response = build_plugin_config_html(
+ {
+ "enabled": True,
+ "name": "demo",
+ "proxy_urls": ["https://example.com"],
+ "nested": {"timeout": 30},
+ "endpoints": [{"host": "gitlab.example.com", "token": "demo-token"}],
+ }
+ )
+
+ body = html.unescape(response.body.decode()) # type: ignore
+ assert "Plugin Config (TOML)" in body
+ assert "enabled = true" in body
+ assert 'name = "demo"' in body
+ assert 'proxy_urls = ["https://example.com"]' in body
+ assert "[nested]" in body
+ assert "timeout = 30" in body
+ assert "[[endpoints]]" in body
+ assert 'host = "gitlab.example.com"' in body
+
+
+def test_build_plugin_description_skips_lazy_release_view_without_plugin_name():
+ description = build_plugin_description(
+ author="tester",
+ version="v0.3.4",
+ description="demo plugin",
+ repo="https://github.com/example/repo",
+ )
+
+ assert "/docs/plugin-release?plugin=" not in description
+ assert "⬆️" not in description
+
+
+def test_has_newer_release_version():
+ assert has_newer_release_version("v0.3.4", "v0.3.5")
+ assert not has_newer_release_version("v0.3.4", "v0.3.4")
+ assert not has_newer_release_version("v0.3.4", "invalid")
+
+
+def test_get_latest_repository_version_uses_github_auth_token(monkeypatch):
+ get_latest_repository_version.cache_clear()
+ monkeypatch.setattr(settings.repository.auth.github, "token", "gh-private-token")
+
+ captured_headers: dict[str, str | None] = {}
+
+ def fake_fetch_json(url: str, headers: dict[str, str] | None = None):
+ captured_headers["authorization"] = (headers or {}).get("Authorization")
+ return {"tag_name": "v1.2.3"}
+
+ monkeypatch.setattr(
+ "framex.repository.providers.base.RepositoryVersionProvider.fetch_json",
+ staticmethod(fake_fetch_json),
+ )
+
+ version = get_latest_repository_version("https://github.com/example/private-repo")
+
+ assert version == "v1.2.3"
+ assert captured_headers["authorization"] == "Bearer gh-private-token"
+ get_latest_repository_version.cache_clear()
+
+
+def test_get_latest_repository_version_uses_gitlab_private_token(monkeypatch):
+ get_latest_repository_version.cache_clear()
+ monkeypatch.setattr(settings.repository.auth.gitlab, "token", "gitlab-private-token")
+
+ captured_headers: dict[str, str | None] = {}
+
+ def fake_fetch_json(url: str, headers: dict[str, str] | None = None):
+ captured_headers["private_token"] = (headers or {}).get("PRIVATE-TOKEN")
+ return {"tag_name": "v2.0.0"}
+
+ monkeypatch.setattr(
+ "framex.repository.providers.base.RepositoryVersionProvider.fetch_json",
+ staticmethod(fake_fetch_json),
+ )
+
+ version = get_latest_repository_version("https://gitlab.com/example/private-repo")
+
+ assert version == "v2.0.0"
+ assert captured_headers["private_token"] == "gitlab-private-token" # noqa
+ get_latest_repository_version.cache_clear()
+
+
+def test_get_latest_repository_version_uses_gitlab_endpoint_token(monkeypatch):
+ get_latest_repository_version.cache_clear()
+ monkeypatch.setattr(
+ settings.repository.auth.gitlab,
+ "endpoints",
+ [
+ GitLabRepositoryAuthEndpointConfig(
+ host="gitlab.company.internal",
+ path_prefix="/team-a",
+ token="team-a-token", # noqa
+ )
+ ],
+ )
+
+ captured_headers: dict[str, str | None] = {}
+
+ def fake_fetch_json(url: str, headers: dict[str, str] | None = None):
+ captured_headers["private_token"] = (headers or {}).get("PRIVATE-TOKEN")
+ return {"tag_name": "v3.0.0"}
+
+ monkeypatch.setattr(
+ "framex.repository.providers.base.RepositoryVersionProvider.fetch_json",
+ staticmethod(fake_fetch_json),
+ )
+
+ version = get_latest_repository_version("https://gitlab.company.internal/team-a/private-repo")
+
+ assert version == "v3.0.0"
+ assert captured_headers["private_token"] == "team-a-token" # noqa
+ get_latest_repository_version.cache_clear()
+
+
+def test_repository_fetch_json_follows_redirects(monkeypatch):
+ import framex.repository.providers.base as base_module
+
+ base_module = importlib.reload(base_module)
+ captured: dict[str, bool] = {}
+
+ class FakeClient:
+ def __init__(self, *args, **kwargs):
+ pass
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, exc_type, exc, tb):
+ return False
+
+ def get(self, url: str, follow_redirects: bool = False): # noqa
+ captured["follow_redirects"] = follow_redirects
+ response = type("Response", (), {})()
+ response.status_code = 200
+ response.json = lambda: {"tag_name": "v0.0.15"}
+ return response
+
+ monkeypatch.setattr(base_module.httpx, "Client", FakeClient)
+
+ payload = base_module.RepositoryVersionProvider.fetch_json(
+ "https://gitlab.company.internal/api/v4/projects/184/releases/permalink/latest"
+ )
+
+ assert payload == {"tag_name": "v0.0.15"}
+ assert captured["follow_redirects"] is True
From 85fe10d36b1a2e2d4a9c9882be40a850f5a5db25 Mon Sep 17 00:00:00 2001
From: touale <136764239@qq.com>
Date: Thu, 16 Apr 2026 16:42:05 +0800
Subject: [PATCH 2/6] feat: add embedded config files support in plugin config
---
src/framex/config.py | 5 +
src/framex/driver/application.py | 30 ++-
src/framex/utils/__init__.py | 41 +++
src/framex/utils/cache.py | 80 ++++++
src/framex/utils/common.py | 86 ++++++
src/framex/utils/config_docs.py | 357 +++++++++++++++++++++++++
src/framex/{utils.py => utils/docs.py} | 293 --------------------
tests/api/test_proxy.py | 52 ++++
tests/test_utils.py | 133 ++++++++-
9 files changed, 768 insertions(+), 309 deletions(-)
create mode 100644 src/framex/utils/__init__.py
create mode 100644 src/framex/utils/cache.py
create mode 100644 src/framex/utils/common.py
create mode 100644 src/framex/utils/config_docs.py
rename src/framex/{utils.py => utils/docs.py} (60%)
diff --git a/src/framex/config.py b/src/framex/config.py
index 96c02d3..3708f98 100644
--- a/src/framex/config.py
+++ b/src/framex/config.py
@@ -143,6 +143,10 @@ class RepositoryConfig(BaseModel):
auth: RepositoryAuthConfig = Field(default_factory=RepositoryAuthConfig)
+class DocsConfig(BaseModel):
+ embedded_config_file_whitelist: list[str] = Field(default_factory=list)
+
+
class AuthConfig(BaseModel):
oauth: OauthConfig | None = Field(default=None)
rules: dict[str, list[str]] = Field(default_factory=dict)
@@ -190,6 +194,7 @@ class Settings(BaseSettings):
load_builtin_plugins: list[str] = Field(default_factory=list)
test: TestConfig = Field(default_factory=TestConfig)
+ docs: DocsConfig = Field(default_factory=DocsConfig)
sentry: SentryConfig = Field(default_factory=SentryConfig)
auth: AuthConfig = Field(default_factory=AuthConfig)
repository: RepositoryConfig = Field(default_factory=RepositoryConfig)
diff --git a/src/framex/driver/application.py b/src/framex/driver/application.py
index 951fcc6..03fced3 100644
--- a/src/framex/driver/application.py
+++ b/src/framex/driver/application.py
@@ -5,6 +5,7 @@
from collections.abc import Callable
from contextlib import asynccontextmanager
from datetime import UTC, datetime
+from pathlib import Path
from typing import Annotated, Any
from zoneinfo import ZoneInfo
@@ -26,7 +27,13 @@
from framex.driver.auth import authenticate, oauth_callback
from framex.plugin import get_plugin
from framex.repository import get_latest_repository_version, has_newer_release_version
-from framex.utils import build_plugin_config_html, build_swagger_ui_html, format_uptime, safe_error_message
+from framex.utils import (
+ build_plugin_config_html,
+ build_swagger_ui_html,
+ collect_embedded_config_files,
+ format_uptime,
+ safe_error_message,
+)
FRAME_START_TIME = datetime.now(tz=UTC)
SHANGHAI_TZ = ZoneInfo("Asia/Shanghai")
@@ -118,10 +125,25 @@ async def get_plugin_config_documentation(
) -> HTMLResponse:
loaded_plugin = get_plugin(plugin)
if loaded_plugin is not None and loaded_plugin.config is not None:
- return build_plugin_config_html(loaded_plugin.config.model_dump())
+ config_data = loaded_plugin.config.model_dump()
+ return build_plugin_config_html(
+ config_data,
+ collect_embedded_config_files(
+ config_data,
+ workspace_root=Path.cwd().resolve(),
+ whitelist=settings.docs.embedded_config_file_whitelist,
+ ),
+ )
- if config_data := settings.plugins.get(plugin):
- return build_plugin_config_html(config_data)
+ if config_data := settings.plugins.get(plugin): # type: ignore
+ return build_plugin_config_html(
+ config_data,
+ collect_embedded_config_files(
+ config_data,
+ workspace_root=Path.cwd().resolve(),
+ whitelist=settings.docs.embedded_config_file_whitelist,
+ ),
+ )
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"Plugin config not found: {plugin}")
diff --git a/src/framex/utils/__init__.py b/src/framex/utils/__init__.py
new file mode 100644
index 0000000..05c8324
--- /dev/null
+++ b/src/framex/utils/__init__.py
@@ -0,0 +1,41 @@
+from .cache import cache_decode, cache_encode
+from .common import (
+ StreamEnventType,
+ escape_tag,
+ extract_method_params,
+ format_uptime,
+ make_stream_event,
+ path_to_module_name,
+ plugin_to_deployment_name,
+ safe_error_message,
+ shorten_str,
+)
+from .config_docs import (
+ build_plugin_config_html,
+ collect_embedded_config_files,
+ mask_sensitive_config_data,
+ mask_sensitive_config_text,
+ mask_sensitive_embedded_config_content,
+)
+from .docs import build_plugin_description, build_swagger_ui_html
+
+__all__ = [
+ "StreamEnventType",
+ "build_plugin_config_html",
+ "build_plugin_description",
+ "build_swagger_ui_html",
+ "cache_decode",
+ "cache_encode",
+ "collect_embedded_config_files",
+ "escape_tag",
+ "extract_method_params",
+ "format_uptime",
+ "make_stream_event",
+ "mask_sensitive_config_data",
+ "mask_sensitive_config_text",
+ "mask_sensitive_embedded_config_content",
+ "path_to_module_name",
+ "plugin_to_deployment_name",
+ "safe_error_message",
+ "shorten_str",
+]
diff --git a/src/framex/utils/cache.py b/src/framex/utils/cache.py
new file mode 100644
index 0000000..fd4a9e1
--- /dev/null
+++ b/src/framex/utils/cache.py
@@ -0,0 +1,80 @@
+import base64
+import importlib
+import json
+import zlib
+from datetime import datetime
+from enum import Enum
+from itertools import cycle
+from typing import Any
+
+
+def xor_crypt(data: bytes, key: str = "01234567890abcdefghijklmnopqrstuvwxyz") -> bytes:
+ return bytes(a ^ b for a, b in zip(data, cycle(key.encode())))
+
+
+def cache_encode(data: Any) -> str:
+ def transform(obj: Any) -> Any:
+ if hasattr(obj, "__dict__"):
+ raw_attributes = {k: transform(v) for k, v in obj.__dict__.items() if not k.startswith("_")}
+ return {
+ "__type__": "dynamic_obj",
+ "__module__": obj.__class__.__module__,
+ "__class__": obj.__class__.__name__,
+ "data": raw_attributes,
+ }
+ if isinstance(obj, list):
+ return [transform(i) for i in obj]
+ if isinstance(obj, dict):
+ return {k: transform(v) for k, v in obj.items()}
+ if isinstance(obj, datetime):
+ return obj.isoformat()
+ if isinstance(obj, Enum):
+ return obj.value
+ return obj
+
+ json_str = json.dumps(transform(data), ensure_ascii=False)
+ compressed = zlib.compress(json_str.encode("utf-8"))
+ encrypted = xor_crypt(compressed)
+ return base64.b64encode(encrypted).decode("ascii")
+
+
+def cache_decode(res: Any) -> Any:
+ current = res
+ while isinstance(current, str):
+ try:
+ decoded_bytes = base64.b64decode(current, validate=True)
+ current = zlib.decompress(xor_crypt(decoded_bytes)).decode("utf-8")
+ except Exception:
+ try:
+ temp = json.loads(current)
+ if temp == current:
+ break
+ current = temp
+ except Exception:
+ break
+
+ def restore_models(item: Any) -> Any:
+ if isinstance(item, list):
+ return [restore_models(i) for i in item]
+
+ if isinstance(item, dict):
+ if item.get("__type__") == "dynamic_obj":
+ try:
+ module = importlib.import_module(item["__module__"])
+ cls = getattr(module, item["__class__"])
+
+ cleaned_data = {k: restore_models(v) for k, v in item["data"].items()}
+
+ if hasattr(cls, "model_validate"):
+ return cls.model_validate(cleaned_data)
+ return cls(**cleaned_data)
+ except Exception:
+ from types import SimpleNamespace
+
+ return SimpleNamespace(**{k: restore_models(v) for k, v in item["data"].items()})
+
+ return {k: restore_models(v) for k, v in item.items()}
+
+ return item
+
+ return restore_models(current)
diff --git a/src/framex/utils/common.py b/src/framex/utils/common.py
new file mode 100644
index 0000000..66cf271
--- /dev/null
+++ b/src/framex/utils/common.py
@@ -0,0 +1,86 @@
+import inspect
+import json
+import re
+from collections.abc import Callable
+from datetime import timedelta
+from enum import StrEnum
+from pathlib import Path
+from typing import Any
+
+from pydantic import BaseModel
+
+
+def plugin_to_deployment_name(plugin_name: str, obj_name: str) -> str:
+ return f"{plugin_name}.{obj_name}"
+
+
+def path_to_module_name(path: Path) -> str:
+ """Convert path to module name."""
+ rel_path = path.resolve().relative_to(Path.cwd().resolve())
+ if rel_path.stem == "__init__":
+ module_name = ".".join(rel_path.parts[:-1])
+ else:
+ module_name = ".".join([*rel_path.parts[:-1], rel_path.stem]) # type: ignore[arg-type]
+ return module_name.removeprefix("src.")
+
+
+def escape_tag(s: str) -> str:
+ """Escape like markers used in colored logs."""
+ return re.sub(r"?((?:[fb]g\s)?[^<>\s]*)>", r"\\\g<0>", s)
+
+
+def extract_method_params(func: Callable) -> list[tuple[str, Any]]:
+ sig = inspect.signature(func)
+ params: list[tuple[str, Any]] = []
+ for param in sig.parameters.values():
+ if param.name == "self":
+ continue
+ params.append((param.name, param.annotation))
+ return params
+
+
+class StreamEnventType(StrEnum):
+ MESSAGE_CHUNK = "message_chunk"
+ FINISH = "finish"
+ ERROR = "error"
+ DEBUG = "debug"
+
+
+def make_stream_event(event_type: StreamEnventType | str, data: str | dict[str, Any] | BaseModel | None = None) -> str:
+ if not data:
+ data = {}
+ elif isinstance(data, BaseModel):
+ data = data.model_dump()
+ elif isinstance(data, str):
+ data = {"content": data}
+ return f"event: {event_type}\ndata: {json.dumps(data, ensure_ascii=False)}\n\n"
+
+
+def format_uptime(delta: timedelta) -> str:
+ days = delta.days
+ hours, remainder = divmod(delta.seconds, 3600)
+ minutes, seconds = divmod(remainder, 60)
+
+ parts: list[str] = []
+ if days:
+ parts.append(f"{days}d")
+ if hours:
+ parts.append(f"{hours}h")
+ if minutes:
+ parts.append(f"{minutes}m")
+ if seconds or not parts:
+ parts.append(f"{seconds}s")
+
+ return " ".join(parts)
+
+
+def safe_error_message(e: Exception) -> str:
+ if hasattr(e, "cause") and e.cause:
+ return str(e.cause)
+ if e.args:
+ return str(e.args[0])
+ return "Internal Server Error"
+
+
+def shorten_str(data: str, max_len: int = 45) -> str:
+ return data if len(data) <= max_len else data[: max_len - 3] + "..."
diff --git a/src/framex/utils/config_docs.py b/src/framex/utils/config_docs.py
new file mode 100644
index 0000000..b321515
--- /dev/null
+++ b/src/framex/utils/config_docs.py
@@ -0,0 +1,357 @@
+import html
+import json
+import re
+import tomllib
+from collections.abc import Sequence
+from fnmatch import fnmatch
+from pathlib import Path
+from typing import Any
+
+import yaml
+from fastapi.responses import HTMLResponse
+
+SUPPORTED_EMBEDDED_CONFIG_SUFFIXES = (".yaml", ".yml", ".toml")
+
+SENSITIVE_CONFIG_KEYWORDS = (
+ "token",
+ "secret",
+ "password",
+ "passwd",
+ "authorization",
+ "api_key",
+ "apikey",
+ "access_key",
+ "private_key",
+ "client_secret",
+ "cookie",
+ "session",
+ "credential",
+)
+
+
+def _normalize_whitelist_pattern(pattern: str) -> str:
+ return pattern.strip().lstrip("/")
+
+
+def _is_whitelisted_embedded_config_path(candidate: Path, workspace_root: Path, whitelist: Sequence[str]) -> bool:
+ if not whitelist:
+ return False
+
+ relative_path = candidate.relative_to(workspace_root).as_posix()
+ return any(
+ fnmatch(relative_path, _normalize_whitelist_pattern(pattern)) for pattern in whitelist if pattern.strip()
+ )
+
+
+def _resolve_embedded_config_path(
+ path_value: str,
+ workspace_root: Path,
+ whitelist: Sequence[str],
+) -> Path | None:
+ candidate = Path(path_value).expanduser()
+ if not candidate.is_absolute(): # noqa
+ candidate = (workspace_root / candidate).resolve()
+ else:
+ candidate = candidate.resolve()
+
+ if candidate.suffix.lower() not in SUPPORTED_EMBEDDED_CONFIG_SUFFIXES:
+ return None
+ if not candidate.is_file():
+ return None
+
+ try:
+ candidate.relative_to(workspace_root)
+ except ValueError:
+ return None
+
+ if not _is_whitelisted_embedded_config_path(candidate, workspace_root, whitelist):
+ return None
+
+ return candidate
+
+
+def collect_embedded_config_files(
+ config_data: Any,
+ workspace_root: Path | None = None,
+ whitelist: Sequence[str] = (),
+) -> list[tuple[str, str]]:
+ found_files: list[tuple[str, str]] = []
+ visited_paths: set[Path] = set()
+ resolved_workspace_root = (workspace_root or Path.cwd()).resolve()
+
+ def walk(value: Any) -> None:
+ if isinstance(value, dict):
+ for nested_value in value.values():
+ walk(nested_value)
+ return
+ if isinstance(value, list):
+ for item in value:
+ walk(item)
+ return
+ if not isinstance(value, str):
+ return
+
+ resolved_path = _resolve_embedded_config_path(value, resolved_workspace_root, whitelist)
+ if resolved_path is None or resolved_path in visited_paths:
+ return
+
+ visited_paths.add(resolved_path)
+ found_files.append((str(resolved_path), resolved_path.read_text(encoding="utf-8")))
+
+ walk(config_data)
+ return found_files
+
+
+def _format_toml_key(key: str) -> str:
+ if re.fullmatch(r"[A-Za-z0-9_-]+", key):
+ return key
+ return json.dumps(key, ensure_ascii=False)
+
+
+def _mask_sensitive_string(value: str) -> str:
+ if not value:
+ return value
+ if len(value) <= 4:
+ return "****"
+ return f"{value[:2]}{'*' * max(len(value) - 4, 4)}{value[-2:]}"
+
+
+def _is_sensitive_config_key(key: str) -> bool:
+ normalized_key = key.lower().replace("-", "_")
+ return any(keyword in normalized_key for keyword in SENSITIVE_CONFIG_KEYWORDS)
+
+
+def _should_mask_config_path(key_path: tuple[str, ...]) -> bool:
+ if any(_is_sensitive_config_key(segment) for segment in key_path):
+ return True
+ if len(key_path) >= 2 and key_path[-2] == "rules" and "auth" in key_path: # noqa
+ return True
+ return False
+
+
+def mask_sensitive_config_data(config_data: Any, key_path: tuple[str, ...] = ()) -> Any:
+ if isinstance(config_data, dict):
+ return {
+ key: mask_sensitive_config_data(value, key_path=(*key_path, str(key)))
+ for key, value in config_data.items()
+ }
+ if isinstance(config_data, list):
+ return [mask_sensitive_config_data(item, key_path=key_path) for item in config_data]
+ if isinstance(config_data, str) and _should_mask_config_path(key_path):
+ return _mask_sensitive_string(config_data)
+ return config_data
+
+
+def mask_sensitive_config_text(content: str) -> str:
+ lines: list[str] = []
+ pattern = re.compile(
+ r"^(?P\s*(?:-\s*)?[\"']?(?P[A-Za-z0-9_.-]+)[\"']?\s*(?::|=)\s*)(?P.*?)(?P\s*(?:#.*)?)$"
+ )
+
+ for line in content.splitlines():
+ match = pattern.match(line)
+ if not match or not _is_sensitive_config_key(match.group("key")):
+ lines.append(line)
+ continue
+
+ raw_value = match.group("value").strip()
+ if not raw_value:
+ lines.append(line)
+ continue
+
+ quote_char = ""
+ if raw_value[0] in {'"', "'"} and raw_value[-1] == raw_value[0]:
+ quote_char = raw_value[0]
+ inner_value = raw_value[1:-1]
+ else:
+ inner_value = raw_value
+
+ masked_value = _mask_sensitive_string(inner_value)
+ rendered_value = f"{quote_char}{masked_value}{quote_char}" if quote_char else masked_value
+ lines.append(f"{match.group('prefix')}{rendered_value}{match.group('suffix')}")
+
+ return "\n".join(lines)
+
+
+def _format_toml_value(value: Any) -> str:
+ if isinstance(value, bool):
+ return str(value).lower()
+ if isinstance(value, str):
+ return json.dumps(value, ensure_ascii=False)
+ if isinstance(value, int | float):
+ return str(value)
+ if value is None:
+ return '""'
+ if isinstance(value, list):
+ return f"[{', '.join(_format_toml_value(item) for item in value)}]"
+ if isinstance(value, dict):
+ items = ", ".join(f"{_format_toml_key(str(key))} = {_format_toml_value(item)}" for key, item in value.items())
+ return f"{{ {items} }}"
+ return json.dumps(value, ensure_ascii=False)
+
+
+def _dump_toml_table(data: dict[str, Any], prefix: tuple[str, ...] = ()) -> list[str]:
+ lines: list[str] = []
+ nested_items: list[tuple[str, Any]] = []
+
+ for key, value in data.items():
+ if isinstance(value, dict):
+ nested_items.append((key, value))
+ continue
+ if isinstance(value, list) and value and all(isinstance(item, dict) for item in value):
+ nested_items.append((key, value))
+ continue
+ lines.append(f"{_format_toml_key(str(key))} = {_format_toml_value(value)}")
+
+ for key, value in nested_items:
+ section_name = ".".join([*prefix, _format_toml_key(str(key))])
+ if isinstance(value, dict):
+ if lines:
+ lines.append("")
+ lines.append(f"[{section_name}]")
+ lines.extend(_dump_toml_table(value, (*prefix, _format_toml_key(str(key)))))
+ continue
+
+ for item in value:
+ if lines:
+ lines.append("")
+ lines.append(f"[[{section_name}]]")
+ lines.extend(_dump_toml_table(item, (*prefix, _format_toml_key(str(key)))))
+
+ return lines
+
+
+def _format_plugin_config_toml(config_data: Any) -> str:
+ if not isinstance(config_data, dict):
+ return _format_toml_value(config_data)
+ return "\n".join(_dump_toml_table(config_data))
+
+
+def _normalize_display_config_paths(config_data: Any, workspace_root: Path | None = None) -> Any:
+ resolved_workspace_root = (workspace_root or Path.cwd()).resolve()
+ if isinstance(config_data, dict):
+ return {
+ key: _normalize_display_config_paths(value, workspace_root=resolved_workspace_root)
+ for key, value in config_data.items()
+ }
+ if isinstance(config_data, list):
+ return [_normalize_display_config_paths(item, workspace_root=resolved_workspace_root) for item in config_data]
+ if isinstance(config_data, str):
+ candidate = Path(config_data).expanduser()
+ if candidate.suffix.lower() not in SUPPORTED_EMBEDDED_CONFIG_SUFFIXES:
+ return config_data
+ if not candidate.is_absolute():
+ candidate = (resolved_workspace_root / candidate).resolve()
+ else:
+ candidate = candidate.resolve()
+ if candidate.is_file():
+ return _to_display_embedded_config_path(str(candidate), workspace_root=resolved_workspace_root)
+ return config_data
+
+
+def mask_sensitive_embedded_config_content(file_path: str, content: str) -> str:
+ suffix = Path(file_path).suffix.lower()
+
+ try:
+ if suffix == ".toml":
+ parsed = tomllib.loads(content)
+ return _format_plugin_config_toml(mask_sensitive_config_data(parsed))
+ if suffix in {".yaml", ".yml"}:
+ parsed = yaml.safe_load(content)
+ masked = mask_sensitive_config_data(parsed)
+ return yaml.safe_dump(masked, allow_unicode=True, sort_keys=False).rstrip()
+ except Exception:
+ return mask_sensitive_config_text(content)
+
+ return mask_sensitive_config_text(content)
+
+
+def _to_display_embedded_config_path(file_path: str, workspace_root: Path | None = None) -> str:
+ resolved_workspace_root = (workspace_root or Path.cwd()).resolve()
+ resolved_file_path = Path(file_path).resolve()
+ try:
+ return resolved_file_path.relative_to(resolved_workspace_root).as_posix()
+ except ValueError:
+ return resolved_file_path.as_posix()
+
+
+def build_plugin_config_html(config_data: Any, embedded_files: list[tuple[str, str]] | None = None) -> HTMLResponse:
+ normalized_config_data = _normalize_display_config_paths(config_data)
+ masked_config_data = mask_sensitive_config_data(normalized_config_data)
+ escaped_toml = html.escape(_format_plugin_config_toml(masked_config_data))
+ masked_embedded_files = [
+ (
+ _to_display_embedded_config_path(file_path),
+ mask_sensitive_embedded_config_content(file_path, file_content),
+ )
+ for file_path, file_content in (embedded_files or [])
+ ]
+ embedded_sections = "".join(
+ f"""
+ \n \n {html.escape(file_content)}\n
+ """
+ for file_path, file_content in masked_embedded_files
+ )
+ return HTMLResponse(
+ f"""
+
+
+
+
+
+ Plugin Config
+
+
+
+
+ {embedded_sections}
+
+
+ """
+ )
diff --git a/src/framex/utils.py b/src/framex/utils/docs.py
similarity index 60%
rename from src/framex/utils.py
rename to src/framex/utils/docs.py
index 061c81e..af9d187 100644
--- a/src/framex/utils.py
+++ b/src/framex/utils/docs.py
@@ -1,168 +1,6 @@
-import base64
-import html
-import importlib
-import inspect
-import json
-import re
-import zlib
-from collections.abc import Callable
-from datetime import datetime, timedelta
-from enum import Enum, StrEnum
-from itertools import cycle
-from pathlib import Path
-from typing import Any
from urllib.parse import quote
from fastapi.responses import HTMLResponse
-from pydantic import BaseModel
-
-
-def plugin_to_deployment_name(plugin_name: str, obj_name: str) -> str:
- return f"{plugin_name}.{obj_name}"
-
-
-def path_to_module_name(path: Path) -> str:
- """Convert path to module name"""
- rel_path = path.resolve().relative_to(Path.cwd().resolve())
- if rel_path.stem == "__init__":
- module_name = ".".join(rel_path.parts[:-1])
- else:
- module_name = ".".join([*rel_path.parts[:-1], rel_path.stem]) # type: ignore
- return module_name.removeprefix("src.")
-
-
-def escape_tag(s: str) -> str:
- """Used to escape `` type special tags when recording color logs"""
- return re.sub(r"?((?:[fb]g\s)?[^<>\s]*)>", r"\\\g<0>", s)
-
-
-def extract_method_params(func: Callable) -> list[tuple[str, Any]]:
- sig = inspect.signature(func)
- params = []
- for param in sig.parameters.values():
- if param.name == "self":
- continue
- params.append((param.name, param.annotation))
- return params
-
-
-class StreamEnventType(StrEnum):
- MESSAGE_CHUNK = "message_chunk"
- FINISH = "finish"
- ERROR = "error"
- DEBUG = "debug"
-
-
-def make_stream_event(event_type: StreamEnventType | str, data: str | dict[str, Any] | BaseModel | None = None) -> str:
- if not data:
- data = {}
- elif isinstance(data, BaseModel):
- data = data.model_dump()
- elif isinstance(data, str):
- data = {"content": data}
- return f"event: {event_type}\ndata: {json.dumps(data, ensure_ascii=False)}\n\n"
-
-
-def xor_crypt(data: bytes, key: str = "01234567890abcdefghijklmnopqrstuvwxyz") -> bytes:
- return bytes(a ^ b for a, b in zip(data, cycle(key.encode())))
-
-
-def cache_encode(data: Any) -> str:
- def transform(obj: Any) -> Any:
- if hasattr(obj, "__dict__"):
- raw_attributes = {k: transform(v) for k, v in obj.__dict__.items() if not k.startswith("_")}
- return {
- "__type__": "dynamic_obj",
- "__module__": obj.__class__.__module__,
- "__class__": obj.__class__.__name__,
- "data": raw_attributes,
- }
- if isinstance(obj, list):
- return [transform(i) for i in obj]
- if isinstance(obj, dict):
- return {k: transform(v) for k, v in obj.items()}
- if isinstance(obj, datetime):
- return obj.isoformat()
- if isinstance(obj, Enum):
- return obj.value
- return obj
-
- json_str = json.dumps(transform(data), ensure_ascii=False)
- compressed = zlib.compress(json_str.encode("utf-8"))
- encrypted = xor_crypt(compressed)
- return base64.b64encode(encrypted).decode("ascii")
-
-
-def cache_decode(res: Any) -> Any:
- current = res
- while isinstance(current, str):
- try:
- decoded_bytes = base64.b64decode(current, validate=True)
- current = zlib.decompress(xor_crypt(decoded_bytes)).decode("utf-8")
- except Exception:
- try:
- temp = json.loads(current)
- if temp == current:
- break
- current = temp
- except Exception:
- break
-
- def restore_models(item: Any) -> Any:
- if isinstance(item, list):
- return [restore_models(i) for i in item]
-
- if isinstance(item, dict):
- if item.get("__type__") == "dynamic_obj":
- try:
- module = importlib.import_module(item["__module__"])
- cls = getattr(module, item["__class__"])
-
- cleaned_data = {k: restore_models(v) for k, v in item["data"].items()}
-
- if hasattr(cls, "model_validate"):
- return cls.model_validate(cleaned_data)
- return cls(**cleaned_data)
- except Exception:
- from types import SimpleNamespace
-
- return SimpleNamespace(**{k: restore_models(v) for k, v in item["data"].items()})
-
- return {k: restore_models(v) for k, v in item.items()}
-
- return item
-
- return restore_models(current)
-
-
-def format_uptime(delta: timedelta) -> str:
- days = delta.days
- hours, remainder = divmod(delta.seconds, 3600)
- minutes, seconds = divmod(remainder, 60)
-
- parts = []
- if days:
- parts.append(f"{days}d")
- if hours:
- parts.append(f"{hours}h")
- if minutes:
- parts.append(f"{minutes}m")
- if seconds or not parts:
- parts.append(f"{seconds}s")
-
- return " ".join(parts)
-
-
-def safe_error_message(e: Exception) -> str:
- if hasattr(e, "cause") and e.cause:
- return str(e.cause)
- if e.args:
- return str(e.args[0])
- return "Internal Server Error"
-
-
-def shorten_str(data: str, max_len: int = 45) -> str:
- return data if len(data) <= max_len else data[: max_len - 3] + "..."
def build_swagger_ui_html(openapi_url: str, title: str) -> HTMLResponse:
@@ -388,10 +226,6 @@ def build_swagger_ui_html(openapi_url: str, title: str) -> HTMLResponse:
background: #f9fafb;
}}
- .swagger-ui .opblock-tag small a[href*="/docs/plugin-config?payload="] {{
- font-weight: 600;
- }}
-
@media (max-width: 1400px) {{
.swagger-ui .opblock-tag {{
grid-template-columns: 360px minmax(0, 1fr) 28px;
@@ -577,133 +411,6 @@ def build_swagger_ui_html(openapi_url: str, title: str) -> HTMLResponse:
}});