-
Notifications
You must be signed in to change notification settings - Fork 0
Docs support config view #124
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
2344e7f
85fe10d
e6ab19d
b9499ac
411c241
36d58fe
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -55,6 +55,7 @@ class TestConfig(BaseModel): | |
|
|
||
|
|
||
| class OauthConfig(BaseModel): | ||
| provider: str = "" | ||
| client_id: str = "" | ||
| client_secret: str = "" | ||
| authorization_url: str = "" | ||
|
|
@@ -84,6 +85,73 @@ def model_post_init(self, context: Any) -> None: | |
| self.jwt_secret = secrets.token_urlsafe(32) | ||
|
|
||
|
|
||
| class RepositoryProviderAuthConfig(BaseModel): | ||
| token: str = "" | ||
| token_header: str = "Authorization" # noqa | ||
| token_scheme: str = "Bearer" # noqa | ||
|
|
||
| def build_headers(self) -> dict[str, str]: | ||
| if not self.token: | ||
| return {} | ||
| if self.token_scheme: | ||
| return {self.token_header: f"{self.token_scheme} {self.token}"} | ||
| return {self.token_header: self.token} | ||
|
|
||
|
|
||
| class GitLabRepositoryAuthEndpointConfig(RepositoryProviderAuthConfig): | ||
| host: str | ||
| path_prefix: str = "" | ||
| token_header: str = "PRIVATE-TOKEN" # noqa | ||
| token_scheme: str = "" | ||
|
|
||
| def matches(self, host: str, path: str) -> bool: | ||
| normalized_prefix = self.normalized_path_prefix | ||
| if self.host.lower() != host.lower(): | ||
| return False | ||
| if not normalized_prefix: | ||
| return True | ||
| return path == normalized_prefix or path.startswith(f"{normalized_prefix}/") | ||
|
|
||
| @property | ||
| def normalized_path_prefix(self) -> str: | ||
| if not self.path_prefix: | ||
| return "" | ||
| return self.path_prefix if self.path_prefix.startswith("/") else f"/{self.path_prefix}" | ||
|
coderabbitai[bot] marked this conversation as resolved.
|
||
|
|
||
|
|
||
| class GitLabRepositoryAuthConfig(RepositoryProviderAuthConfig): | ||
| token_header: str = "PRIVATE-TOKEN" # noqa | ||
| token_scheme: str = "" | ||
| endpoints: list[GitLabRepositoryAuthEndpointConfig] = Field(default_factory=list) | ||
|
|
||
| def configured_hosts(self) -> set[str]: | ||
| return {endpoint.host.lower() for endpoint in self.endpoints} | ||
|
Comment on lines
+122
to
+128
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Global GitLab auth cannot target a self-hosted instance by itself.
One way to model the missing host mapping class GitLabRepositoryAuthConfig(RepositoryProviderAuthConfig):
+ hosts: list[str] = Field(default_factory=list)
token_header: str = "PRIVATE-TOKEN" # noqa
token_scheme: str = ""
endpoints: list[GitLabRepositoryAuthEndpointConfig] = Field(default_factory=list)
def configured_hosts(self) -> set[str]:
- return {endpoint.host.lower() for endpoint in self.endpoints}
+ return {host.lower() for host in self.hosts} | {endpoint.host.lower() for endpoint in self.endpoints}🤖 Prompt for AI Agents |
||
|
|
||
| def build_headers_for_url(self, host: str, path: str) -> dict[str, str]: | ||
| if endpoint := self.resolve_endpoint(host, path): | ||
| return endpoint.build_headers() | ||
| return self.build_headers() | ||
|
|
||
| def resolve_endpoint(self, host: str, path: str) -> GitLabRepositoryAuthEndpointConfig | None: | ||
| matches = [endpoint for endpoint in self.endpoints if endpoint.matches(host, path)] | ||
| if not matches: | ||
| return None | ||
| return max(matches, key=lambda endpoint: len(endpoint.normalized_path_prefix)) | ||
|
|
||
|
|
||
| class RepositoryAuthConfig(BaseModel): | ||
| github: RepositoryProviderAuthConfig = Field(default_factory=RepositoryProviderAuthConfig) | ||
| gitlab: GitLabRepositoryAuthConfig = Field(default_factory=GitLabRepositoryAuthConfig) | ||
|
|
||
|
|
||
| class RepositoryConfig(BaseModel): | ||
| auth: RepositoryAuthConfig = Field(default_factory=RepositoryAuthConfig) | ||
|
|
||
|
|
||
| class DocsConfig(BaseModel): | ||
| embedded_config_file_whitelist: list[str] = Field(default_factory=list) | ||
|
|
||
|
|
||
| class AuthConfig(BaseModel): | ||
| oauth: OauthConfig | None = Field(default=None) | ||
| rules: dict[str, list[str]] = Field(default_factory=dict) | ||
|
|
@@ -131,8 +199,10 @@ class Settings(BaseSettings): | |
| load_builtin_plugins: list[str] = Field(default_factory=list) | ||
|
|
||
| test: TestConfig = Field(default_factory=TestConfig) | ||
| docs: DocsConfig = Field(default_factory=DocsConfig) | ||
| sentry: SentryConfig = Field(default_factory=SentryConfig) | ||
| auth: AuthConfig = Field(default_factory=AuthConfig) | ||
| repository: RepositoryConfig = Field(default_factory=RepositoryConfig) | ||
|
|
||
| model_config = SettingsConfigDict( | ||
| # `.env.prod` takes priority over `.env` | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,4 +1,6 @@ | ||
| from datetime import UTC, datetime, timedelta | ||
| from secrets import token_urlsafe | ||
| from typing import Any | ||
|
|
||
| import httpx | ||
| import jwt | ||
|
|
@@ -11,57 +13,97 @@ | |
| from framex.consts import AUTH_COOKIE_NAME, DOCS_URL | ||
|
|
||
| api_key_header = APIKeyHeader(name="Authorization", auto_error=False) | ||
| SESSION_LIFETIME = timedelta(hours=24) | ||
| _AUTH_SESSIONS: dict[str, dict[str, Any]] = {} | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This session store only works on one process.
Also applies to: 46-55 🤖 Prompt for AI Agents |
||
|
|
||
|
|
||
| def create_jwt(payload: dict) -> str: | ||
| def _now_utc() -> datetime: | ||
| return datetime.now(UTC) | ||
|
|
||
|
|
||
| def _purge_expired_sessions(now_utc: datetime | None = None) -> None: | ||
| current = now_utc or _now_utc() | ||
| expired_session_ids = [ | ||
| session_id for session_id, payload in _AUTH_SESSIONS.items() if payload.get("expires_at", current) <= current | ||
| ] | ||
| for session_id in expired_session_ids: | ||
| _AUTH_SESSIONS.pop(session_id, None) | ||
|
|
||
|
|
||
| def create_jwt(payload: dict[str, Any]) -> str: | ||
| if not settings.auth.oauth: | ||
| raise RuntimeError("OAuth not configured") | ||
|
|
||
| now_utc = datetime.now(UTC) | ||
| now_utc = _now_utc() | ||
| token_payload = { | ||
| **payload, | ||
| "iat": int(now_utc.timestamp()), | ||
| "exp": int((now_utc + SESSION_LIFETIME).timestamp()), | ||
| } | ||
| return jwt.encode(token_payload, settings.auth.oauth.jwt_secret, algorithm=settings.auth.oauth.jwt_algorithm) | ||
|
|
||
| payload.update( | ||
| { | ||
| "iat": int(now_utc.timestamp()), | ||
| "exp": int((now_utc + timedelta(hours=24)).timestamp()), | ||
| } | ||
| ) | ||
| return jwt.encode(payload, settings.auth.oauth.jwt_secret, algorithm=settings.auth.oauth.jwt_algorithm) | ||
|
|
||
| def create_auth_session(session_payload: dict[str, Any]) -> str: | ||
| now_utc = _now_utc() | ||
| expires_at = now_utc + SESSION_LIFETIME | ||
| session_id = token_urlsafe(32) | ||
| _AUTH_SESSIONS[session_id] = { | ||
| **session_payload, | ||
| "expires_at": expires_at, | ||
| } | ||
| _purge_expired_sessions(now_utc) | ||
| return session_id | ||
|
|
||
| def auth_jwt(request: Request) -> bool: | ||
| if not settings.auth.oauth: | ||
| return False | ||
|
|
||
| token = request.cookies.get(AUTH_COOKIE_NAME) | ||
| if not token: | ||
| return False | ||
| def decode_auth_token(token: str | None) -> dict[str, Any] | None: | ||
| if not settings.auth.oauth or not token: | ||
| return None | ||
|
|
||
| try: | ||
| jwt.decode( | ||
| payload = jwt.decode( | ||
| token, | ||
| settings.auth.oauth.jwt_secret, | ||
| algorithms=[settings.auth.oauth.jwt_algorithm], | ||
| ) | ||
| return True | ||
| except (jwt.InvalidTokenError, jwt.ExpiredSignatureError): | ||
| return False | ||
| return None | ||
|
|
||
| if not isinstance(payload, dict): | ||
| return None | ||
|
|
||
| def authenticate(request: Request, api_key: str | None = Depends(api_key_header)) -> None: | ||
| if settings.auth.oauth: | ||
| if token := request.cookies.get(AUTH_COOKIE_NAME): | ||
| try: | ||
| jwt.decode( | ||
| token, | ||
| settings.auth.oauth.jwt_secret, | ||
| algorithms=[settings.auth.oauth.jwt_algorithm], | ||
| ) | ||
| return | ||
| session_id = payload.get("session_id") | ||
| if not isinstance(session_id, str) or not session_id: | ||
| return None | ||
|
|
||
| except Exception as e: | ||
| from framex.log import logger | ||
| now_utc = _now_utc() | ||
| _purge_expired_sessions(now_utc) | ||
| session_payload = _AUTH_SESSIONS.get(session_id) | ||
| if session_payload is None: | ||
| return None | ||
|
|
||
| logger.warning(f"JWT decode failed: {e}") | ||
| expires_at = session_payload.get("expires_at") | ||
| if not isinstance(expires_at, datetime) or expires_at <= now_utc: | ||
| _AUTH_SESSIONS.pop(session_id, None) | ||
| return None | ||
|
|
||
| return { | ||
| **payload, | ||
| **{key: value for key, value in session_payload.items() if key != "expires_at"}, | ||
| } | ||
|
|
||
|
|
||
| def get_auth_payload(request: Request) -> dict[str, Any] | None: | ||
| return decode_auth_token(request.cookies.get(AUTH_COOKIE_NAME)) | ||
|
|
||
|
|
||
| def auth_jwt(request: Request) -> bool: | ||
| return get_auth_payload(request) is not None | ||
|
|
||
|
|
||
| def authenticate(request: Request, api_key: str | None = Depends(api_key_header)) -> None: | ||
| if settings.auth.oauth: | ||
| if get_auth_payload(request) is not None: | ||
| return | ||
|
|
||
| if api_key and api_key in (settings.auth.get_auth_keys(request.url.path) or []): | ||
| return | ||
|
|
@@ -74,7 +116,7 @@ def authenticate(request: Request, api_key: str | None = Depends(api_key_header) | |
| f"?client_id={settings.auth.oauth.client_id}" | ||
| "&response_type=code" | ||
| f"&redirect_uri={settings.auth.oauth.call_back_url}" | ||
| "&scope=read_user" | ||
| "&scope=read_user%20read_api" | ||
| ) | ||
| }, | ||
| ) | ||
|
|
@@ -116,12 +158,23 @@ async def oauth_callback(code: str) -> Response: | |
| "message": f"Welcome {username}", | ||
| "username": username, | ||
| "email": user_resp.get("email"), | ||
| "oauth_provider": settings.auth.oauth.provider, | ||
| "oauth_access_token": auth_token, | ||
| } | ||
| session_id = create_auth_session(user_info) | ||
|
|
||
| res = RedirectResponse(url=DOCS_URL, status_code=status.HTTP_302_FOUND) | ||
| res.set_cookie( | ||
| AUTH_COOKIE_NAME, | ||
| create_jwt(user_info), | ||
| create_jwt( | ||
| { | ||
| "message": user_info["message"], | ||
| "username": username, | ||
| "email": user_info["email"], | ||
| "oauth_provider": settings.auth.oauth.provider, | ||
| "session_id": session_id, | ||
| } | ||
| ), | ||
| httponly=True, | ||
| samesite="lax", | ||
| ) | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
providerdefaults to empty string — verify downstream tolerance.OauthConfig.providerdefaults to"", which meansoauth_callbackwill stamp an empty string into the session/JWT (oauth_provider).can_access_repository(..., auth_payload.get("oauth_provider"), ...)must handle empty/unknown providers gracefully (e.g., fall back to URL-based detection), otherwise misconfigured deployments silently fail the repo access check. Consider makingproviderrequired viaLiteral["github", "gitlab"]or validating it inmodel_post_init.🤖 Prompt for AI Agents