diff --git a/backend/auth_router.py b/backend/auth_router.py index d104cf8..0fb4767 100644 --- a/backend/auth_router.py +++ b/backend/auth_router.py @@ -1,12 +1,12 @@ from datetime import datetime, timedelta -from secrets import token_urlsafe +from secrets import compare_digest, randbelow, token_urlsafe import base64 import os from urllib.parse import urlparse from fastapi import APIRouter, Depends, HTTPException, Request, status from fastapi.security import OAuth2PasswordRequestForm -from typing import Optional +from typing import Optional, cast from pydantic import BaseModel, EmailStr from sqlalchemy.orm import Session from webauthn import ( @@ -39,10 +39,19 @@ from backend.models import User router = APIRouter() +_PASSWORD_RECOVERY_ALLOWED_SCOPES = {"admin", "user"} +_PASSWORD_RECOVERY_MAX_VERIFY_ATTEMPTS = max( + 1, + int(os.getenv("PASSWORD_RECOVERY_MAX_VERIFY_ATTEMPTS", "5")), +) def _should_issue_non_expiring_admin_token(user: User) -> bool: - return bool( + allow_non_expiring_admin_tokens = ( + str(os.getenv("ALLOW_NON_EXPIRING_ADMIN_TOKENS") or "").strip().lower() + in {"1", "true", "yes", "on"} + ) + return allow_non_expiring_admin_tokens and bool( getattr(user, "is_admin", False) or getattr(user, "is_superuser", False) ) @@ -245,12 +254,36 @@ class PasswordRecoveryResetResponse(BaseModel): def _issue_recovery_token(prefix: str) -> tuple[str, datetime]: - from secrets import token_urlsafe - expires_at = datetime.utcnow() + timedelta(minutes=10) return f"{prefix}_{token_urlsafe(24)}", expires_at +def _issue_recovery_verification_code() -> str: + return f"{randbelow(1_000_000):06d}" + + +def _normalize_password_recovery_scope(scope: str) -> str: + normalized = str(scope or "admin").strip().lower() + if normalized not in _PASSWORD_RECOVERY_ALLOWED_SCOPES: + raise HTTPException( + status_code=400, + detail="복구 범위는 admin 또는 user만 지원합니다", + ) + return normalized + + +def _purge_expired_password_recovery_sessions() -> None: + now = datetime.utcnow() + expired_tokens = [ + session_token + for session_token, session_state in _password_recovery_store.items() + if not isinstance(session_state.get("expires_at"), datetime) + or cast(datetime, session_state["expires_at"]) <= now + ] + for session_token in expired_tokens: + _password_recovery_store.pop(session_token, None) + + # ---------- 엔드포인트 ---------- @router.post("/signup", response_model=UserResponse, status_code=201) def signup(payload: UserCreate, db: Session = Depends(get_db)): @@ -511,6 +544,8 @@ def start_password_recovery( payload: PasswordRecoveryStartRequest, db: Session = Depends(get_db), ): + _purge_expired_password_recovery_sessions() + scope = _normalize_password_recovery_scope(payload.scope) user = db.query(User).filter( (User.email == payload.user_hint) | (User.username == payload.user_hint) @@ -519,15 +554,16 @@ def start_password_recovery( if user is None: raise HTTPException(status_code=404, detail="일치하는 계정을 찾을 수 없습니다") - if payload.scope == "admin" and not (getattr(user, "is_admin", False) or getattr(user, "is_superuser", False)): + if scope == "admin" and not (getattr(user, "is_admin", False) or getattr(user, "is_superuser", False)): raise HTTPException(status_code=403, detail="관리자 계정만 이 복구 경로를 사용할 수 있습니다") recovery_session_token, expires_at = _issue_recovery_token("recovery") _password_recovery_store[recovery_session_token] = { "user_id": int(user.id), - "scope": payload.scope, + "scope": scope, "verified": False, - "verification_code": "000000", + "verification_code": _issue_recovery_verification_code(), + "verification_attempts": 0, "expires_at": expires_at, } return { @@ -539,6 +575,7 @@ def start_password_recovery( @router.post("/recovery/verify-identity", response_model=PasswordRecoveryVerifyIdentityResponse) def verify_password_recovery_identity(payload: PasswordRecoveryVerifyIdentityRequest): + _purge_expired_password_recovery_sessions() session_state = _password_recovery_store.get(payload.recovery_session_token) if not session_state: raise HTTPException(status_code=404, detail="복구 세션을 찾을 수 없습니다") @@ -549,7 +586,12 @@ def verify_password_recovery_identity(payload: PasswordRecoveryVerifyIdentityReq raise HTTPException(status_code=410, detail="복구 세션이 만료되었습니다") expected_code = str(session_state.get("verification_code") or "") - if payload.verification_code.strip() != expected_code: + if not compare_digest(payload.verification_code.strip(), expected_code): + attempts = int(session_state.get("verification_attempts") or 0) + 1 + session_state["verification_attempts"] = attempts + if attempts >= _PASSWORD_RECOVERY_MAX_VERIFY_ATTEMPTS: + _password_recovery_store.pop(payload.recovery_session_token, None) + raise HTTPException(status_code=429, detail="본인확인 시도 횟수를 초과했습니다") raise HTTPException(status_code=401, detail="본인확인 코드가 올바르지 않습니다") identity_session_token = payload.identity_session_token.strip() @@ -558,6 +600,7 @@ def verify_password_recovery_identity(payload: PasswordRecoveryVerifyIdentityReq reset_token, reset_expires_at = _issue_recovery_token("reset") session_state["verified"] = True + session_state["verification_attempts"] = 0 session_state["identity_session_token"] = identity_session_token session_state["reset_token"] = reset_token session_state["reset_expires_at"] = reset_expires_at @@ -573,12 +616,17 @@ def reset_password_via_recovery( payload: PasswordRecoveryResetRequest, db: Session = Depends(get_db), ): + _purge_expired_password_recovery_sessions() if len(payload.new_password or "") < 8: raise HTTPException(status_code=400, detail="비밀번호는 8자 이상이어야 합니다") + scope = _normalize_password_recovery_scope(payload.scope) matched_session = None for session_token, session_state in _password_recovery_store.items(): - if session_state.get("reset_token") == payload.reset_token and session_state.get("scope") == payload.scope: + if ( + session_state.get("scope") == scope + and compare_digest(str(session_state.get("reset_token") or ""), payload.reset_token) + ): matched_session = (session_token, session_state) break @@ -586,6 +634,9 @@ def reset_password_via_recovery( raise HTTPException(status_code=404, detail="재설정 토큰을 찾을 수 없습니다") session_token, session_state = matched_session + if not bool(session_state.get("verified")) or not str(session_state.get("identity_session_token") or "").strip(): + raise HTTPException(status_code=403, detail="본인확인 검증이 완료되지 않았습니다") + reset_expires_at = session_state.get("reset_expires_at") if not isinstance(reset_expires_at, datetime) or reset_expires_at <= datetime.utcnow(): _password_recovery_store.pop(session_token, None) diff --git a/tests/test_auth_router_security.py b/tests/test_auth_router_security.py new file mode 100644 index 0000000..5b861c5 --- /dev/null +++ b/tests/test_auth_router_security.py @@ -0,0 +1,170 @@ +import importlib +import sys +import types +from datetime import datetime, timedelta +from types import SimpleNamespace + +import pytest +from fastapi import HTTPException + + +class _FakeQuery: + def __init__(self, user): + self._user = user + + def filter(self, *args, **kwargs): + return self + + def first(self): + return self._user + + +class _FakeDB: + def __init__(self, user): + self._user = user + self.added = [] + self.committed = False + + def query(self, model): + return _FakeQuery(self._user) + + def add(self, value): + self.added.append(value) + + def commit(self): + self.committed = True + + +def _load_auth_router(monkeypatch): + sys.modules.pop("backend.auth_router", None) + + fake_database = types.ModuleType("backend.database") + fake_database.get_db = lambda: None + + fake_models = types.ModuleType("backend.models") + + class _StubUser: + id = "id" + email = "email" + username = "username" + + fake_models.User = _StubUser + + monkeypatch.setitem(sys.modules, "backend.database", fake_database) + monkeypatch.setitem(sys.modules, "backend.models", fake_models) + return importlib.import_module("backend.auth_router") + + +def test_admin_tokens_expire_by_default(monkeypatch): + monkeypatch.delenv("ALLOW_NON_EXPIRING_ADMIN_TOKENS", raising=False) + auth_router = _load_auth_router(monkeypatch) + admin_user = SimpleNamespace(is_admin=True, is_superuser=False) + + assert auth_router._should_issue_non_expiring_admin_token(admin_user) is False + + monkeypatch.setenv("ALLOW_NON_EXPIRING_ADMIN_TOKENS", "true") + auth_router = _load_auth_router(monkeypatch) + assert auth_router._should_issue_non_expiring_admin_token(admin_user) is True + + +def test_start_password_recovery_uses_random_verification_code(monkeypatch): + auth_router = _load_auth_router(monkeypatch) + auth_router._password_recovery_store.clear() + monkeypatch.setattr(auth_router, "randbelow", lambda upper_bound: 123456) + db = _FakeDB(SimpleNamespace(id=7, is_admin=True, is_superuser=False)) + + response = auth_router.start_password_recovery( + auth_router.PasswordRecoveryStartRequest( + scope="admin", + user_hint="admin@example.com", + ), + db, + ) + + session_state = auth_router._password_recovery_store[response["recovery_session_token"]] + assert session_state["verification_code"] == "123456" + assert session_state["verification_code"] != "000000" + + +def test_password_recovery_verify_identity_limits_failed_attempts(monkeypatch): + auth_router = _load_auth_router(monkeypatch) + auth_router._password_recovery_store.clear() + recovery_session_token = "recovery_test" + auth_router._password_recovery_store[recovery_session_token] = { + "user_id": 1, + "scope": "admin", + "verified": False, + "verification_code": "654321", + "verification_attempts": 0, + "expires_at": datetime.utcnow() + timedelta(minutes=5), + } + payload = auth_router.PasswordRecoveryVerifyIdentityRequest( + recovery_session_token=recovery_session_token, + identity_session_token="identity-proof", + verification_code="000000", + ) + + for _ in range(auth_router._PASSWORD_RECOVERY_MAX_VERIFY_ATTEMPTS - 1): + with pytest.raises(HTTPException) as exc_info: + auth_router.verify_password_recovery_identity(payload) + assert exc_info.value.status_code == 401 + + with pytest.raises(HTTPException) as exc_info: + auth_router.verify_password_recovery_identity(payload) + assert exc_info.value.status_code == 429 + assert recovery_session_token not in auth_router._password_recovery_store + + +def test_reset_password_requires_verified_identity(monkeypatch): + auth_router = _load_auth_router(monkeypatch) + auth_router._password_recovery_store.clear() + auth_router._password_recovery_store["recovery_test"] = { + "user_id": 1, + "scope": "admin", + "verified": False, + "reset_token": "reset_token", + "reset_expires_at": datetime.utcnow() + timedelta(minutes=5), + "expires_at": datetime.utcnow() + timedelta(minutes=5), + } + + with pytest.raises(HTTPException) as exc_info: + auth_router.reset_password_via_recovery( + auth_router.PasswordRecoveryResetRequest( + scope="admin", + reset_token="reset_token", + new_password="new-password-123", + ), + _FakeDB(SimpleNamespace(id=1, hashed_password="old")), + ) + + assert exc_info.value.status_code == 403 + + +def test_reset_password_updates_hash_and_clears_session(monkeypatch): + auth_router = _load_auth_router(monkeypatch) + auth_router._password_recovery_store.clear() + user = SimpleNamespace(id=1, hashed_password="old") + db = _FakeDB(user) + auth_router._password_recovery_store["recovery_test"] = { + "user_id": 1, + "scope": "admin", + "verified": True, + "identity_session_token": "identity-proof", + "reset_token": "reset_token", + "reset_expires_at": datetime.utcnow() + timedelta(minutes=5), + "expires_at": datetime.utcnow() + timedelta(minutes=5), + } + + response = auth_router.reset_password_via_recovery( + auth_router.PasswordRecoveryResetRequest( + scope="admin", + reset_token="reset_token", + new_password="new-password-123", + ), + db, + ) + + assert response == {"reset": True, "must_relogin": True} + assert user.hashed_password != "old" + assert db.committed is True + assert "recovery_test" not in auth_router._password_recovery_store