Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 61 additions & 10 deletions backend/auth_router.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
from datetime import datetime, timedelta
from secrets import token_urlsafe
from secrets import compare_digest, randbelow, token_urlsafe
import base64
import os
from urllib.parse import urlparse

from fastapi import APIRouter, Depends, HTTPException, Request, status
from fastapi.security import OAuth2PasswordRequestForm
from typing import Optional
from typing import Optional, cast
from pydantic import BaseModel, EmailStr
from sqlalchemy.orm import Session
from webauthn import (
Expand Down Expand Up @@ -39,10 +39,19 @@
from backend.models import User

router = APIRouter()
_PASSWORD_RECOVERY_ALLOWED_SCOPES = {"admin", "user"}
_PASSWORD_RECOVERY_MAX_VERIFY_ATTEMPTS = max(
1,
int(os.getenv("PASSWORD_RECOVERY_MAX_VERIFY_ATTEMPTS", "5")),
Comment on lines +43 to +45
)


def _should_issue_non_expiring_admin_token(user: User) -> bool:
return bool(
allow_non_expiring_admin_tokens = (
str(os.getenv("ALLOW_NON_EXPIRING_ADMIN_TOKENS") or "").strip().lower()
in {"1", "true", "yes", "on"}
)
return allow_non_expiring_admin_tokens and bool(
getattr(user, "is_admin", False)
or getattr(user, "is_superuser", False)
)
Expand Down Expand Up @@ -245,12 +254,36 @@ class PasswordRecoveryResetResponse(BaseModel):


def _issue_recovery_token(prefix: str) -> tuple[str, datetime]:
from secrets import token_urlsafe

expires_at = datetime.utcnow() + timedelta(minutes=10)
return f"{prefix}_{token_urlsafe(24)}", expires_at


def _issue_recovery_verification_code() -> str:
return f"{randbelow(1_000_000):06d}"


def _normalize_password_recovery_scope(scope: str) -> str:
normalized = str(scope or "admin").strip().lower()
Comment on lines +265 to +266
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚨 issue (security): Defaulting a falsy scope to 'admin' is risky and may accidentally elevate recovery privileges.

scope or "admin" makes any falsy or empty scope (e.g. missing field, empty string) behave as an admin recovery request, so validation failures can silently become admin flows. Consider either defaulting to a non-privileged scope like "user" or requiring a non-empty scope and raising if it’s falsy.

if normalized not in _PASSWORD_RECOVERY_ALLOWED_SCOPES:
raise HTTPException(
status_code=400,
detail="복구 범위는 admin 또는 user만 지원합니다",
)
return normalized


def _purge_expired_password_recovery_sessions() -> None:
now = datetime.utcnow()
expired_tokens = [
session_token
for session_token, session_state in _password_recovery_store.items()
if not isinstance(session_state.get("expires_at"), datetime)
or cast(datetime, session_state["expires_at"]) <= now
]
Comment on lines +275 to +282
for session_token in expired_tokens:
_password_recovery_store.pop(session_token, None)


# ---------- 엔드포인트 ----------
@router.post("/signup", response_model=UserResponse, status_code=201)
def signup(payload: UserCreate, db: Session = Depends(get_db)):
Expand Down Expand Up @@ -511,6 +544,8 @@ def start_password_recovery(
payload: PasswordRecoveryStartRequest,
db: Session = Depends(get_db),
):
_purge_expired_password_recovery_sessions()
scope = _normalize_password_recovery_scope(payload.scope)
user = db.query(User).filter(
(User.email == payload.user_hint)
| (User.username == payload.user_hint)
Expand All @@ -519,15 +554,16 @@ def start_password_recovery(
if user is None:
raise HTTPException(status_code=404, detail="일치하는 계정을 찾을 수 없습니다")

if payload.scope == "admin" and not (getattr(user, "is_admin", False) or getattr(user, "is_superuser", False)):
if scope == "admin" and not (getattr(user, "is_admin", False) or getattr(user, "is_superuser", False)):
raise HTTPException(status_code=403, detail="관리자 계정만 이 복구 경로를 사용할 수 있습니다")

recovery_session_token, expires_at = _issue_recovery_token("recovery")
_password_recovery_store[recovery_session_token] = {
"user_id": int(user.id),
"scope": payload.scope,
"scope": scope,
"verified": False,
"verification_code": "000000",
"verification_code": _issue_recovery_verification_code(),
"verification_attempts": 0,
"expires_at": expires_at,
}
return {
Expand All @@ -539,6 +575,7 @@ def start_password_recovery(

@router.post("/recovery/verify-identity", response_model=PasswordRecoveryVerifyIdentityResponse)
def verify_password_recovery_identity(payload: PasswordRecoveryVerifyIdentityRequest):
_purge_expired_password_recovery_sessions()
session_state = _password_recovery_store.get(payload.recovery_session_token)
if not session_state:
raise HTTPException(status_code=404, detail="복구 세션을 찾을 수 없습니다")
Expand All @@ -549,7 +586,12 @@ def verify_password_recovery_identity(payload: PasswordRecoveryVerifyIdentityReq
raise HTTPException(status_code=410, detail="복구 세션이 만료되었습니다")

expected_code = str(session_state.get("verification_code") or "")
if payload.verification_code.strip() != expected_code:
if not compare_digest(payload.verification_code.strip(), expected_code):
attempts = int(session_state.get("verification_attempts") or 0) + 1
session_state["verification_attempts"] = attempts
if attempts >= _PASSWORD_RECOVERY_MAX_VERIFY_ATTEMPTS:
_password_recovery_store.pop(payload.recovery_session_token, None)
raise HTTPException(status_code=429, detail="본인확인 시도 횟수를 초과했습니다")
raise HTTPException(status_code=401, detail="본인확인 코드가 올바르지 않습니다")

identity_session_token = payload.identity_session_token.strip()
Expand All @@ -558,6 +600,7 @@ def verify_password_recovery_identity(payload: PasswordRecoveryVerifyIdentityReq

reset_token, reset_expires_at = _issue_recovery_token("reset")
session_state["verified"] = True
session_state["verification_attempts"] = 0
session_state["identity_session_token"] = identity_session_token
session_state["reset_token"] = reset_token
session_state["reset_expires_at"] = reset_expires_at
Expand All @@ -573,19 +616,27 @@ def reset_password_via_recovery(
payload: PasswordRecoveryResetRequest,
db: Session = Depends(get_db),
):
_purge_expired_password_recovery_sessions()
if len(payload.new_password or "") < 8:
raise HTTPException(status_code=400, detail="비밀번호는 8자 이상이어야 합니다")

scope = _normalize_password_recovery_scope(payload.scope)
matched_session = None
for session_token, session_state in _password_recovery_store.items():
if session_state.get("reset_token") == payload.reset_token and session_state.get("scope") == payload.scope:
if (
session_state.get("scope") == scope
and compare_digest(str(session_state.get("reset_token") or ""), payload.reset_token)
Comment on lines +626 to +628
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion: Consider normalizing the incoming reset token before constant-time comparison to reduce UX friction.

payload.reset_token is compared verbatim to a normalized session token. If the client sends the correct token with extra whitespace (e.g. from copy‑paste), compare_digest will fail. Trimming or otherwise normalizing payload.reset_token (for example, payload.reset_token.strip()) before comparison would handle these cases without affecting security.

Suggested implementation:

    scope = _normalize_password_recovery_scope(payload.scope)
    normalized_reset_token = (payload.reset_token or "").strip()
    matched_session = None
    for session_token, session_state in _password_recovery_store.items():
        if (
            session_state.get("scope") == scope
            and compare_digest(str(session_state.get("reset_token") or ""), normalized_reset_token)
        ):

):
matched_session = (session_token, session_state)
break

if not matched_session:
raise HTTPException(status_code=404, detail="재설정 토큰을 찾을 수 없습니다")

session_token, session_state = matched_session
if not bool(session_state.get("verified")) or not str(session_state.get("identity_session_token") or "").strip():
raise HTTPException(status_code=403, detail="본인확인 검증이 완료되지 않았습니다")

reset_expires_at = session_state.get("reset_expires_at")
if not isinstance(reset_expires_at, datetime) or reset_expires_at <= datetime.utcnow():
_password_recovery_store.pop(session_token, None)
Expand Down
170 changes: 170 additions & 0 deletions tests/test_auth_router_security.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
import importlib
import sys
import types
from datetime import datetime, timedelta
from types import SimpleNamespace

import pytest
from fastapi import HTTPException


class _FakeQuery:
def __init__(self, user):
self._user = user

def filter(self, *args, **kwargs):
return self

def first(self):
return self._user


class _FakeDB:
def __init__(self, user):
self._user = user
self.added = []
self.committed = False

def query(self, model):
return _FakeQuery(self._user)

def add(self, value):
self.added.append(value)

def commit(self):
self.committed = True


def _load_auth_router(monkeypatch):
sys.modules.pop("backend.auth_router", None)

fake_database = types.ModuleType("backend.database")
fake_database.get_db = lambda: None

fake_models = types.ModuleType("backend.models")

class _StubUser:
id = "id"
Comment on lines +38 to +47
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (testing): Add coverage for purging expired password recovery sessions before start/verify/reset

Since _purge_expired_password_recovery_sessions is now invoked by all three recovery endpoints, it’d be helpful to add tests that:

  • Populate _password_recovery_store with both an expired and a valid session.
  • Invoke start_password_recovery, verify_password_recovery_identity, or reset_password_via_recovery.
  • Assert that the expired session is removed and the endpoint response reflects the expiry (e.g. 404/410), while valid sessions still succeed.

This will keep the purge behavior verified as part of the recovery flow.

Suggested implementation:

import datetime

import pytest
from fastapi import HTTPException
class _FakeQuery:
    def __init__(self, user):
        self._user = user

    def filter(self, *args, **kwargs):
        return self


def _make_recovery_session(user_id: str, expires_at: datetime.datetime) -> dict:
    return {
        "user_id": user_id,
        "expires_at": expires_at,
    }


def test_start_password_recovery_purges_expired_sessions(monkeypatch):
    """
    When start_password_recovery is called, expired sessions should be purged
    while valid sessions remain usable.
    """
    auth_router = _load_auth_router(monkeypatch)

    store = auth_router._password_recovery_store
    store.clear()

    now = datetime.datetime.utcnow()
    expired_token = "expired-token"
    valid_token = "valid-token"

    store[expired_token] = _make_recovery_session("id", now - datetime.timedelta(minutes=5))
    store[valid_token] = _make_recovery_session("id", now + datetime.timedelta(minutes=5))

    # Calling the endpoint should purge expired sessions.
    # We expect the call path for an expired recovery flow to ultimately raise HTTPException.
    with pytest.raises(HTTPException) as excinfo:
        # The exact call signature may vary (e.g. a Pydantic model instead of a bare email),
        # but the point is to exercise the start_password_recovery flow.
        auth_router.start_password_recovery(email="email")

    assert excinfo.value.status_code in (404, 410)
    assert expired_token not in store
    assert valid_token in store


def test_verify_password_recovery_identity_purges_expired_sessions(monkeypatch):
    """
    When verify_password_recovery_identity is called, it should purge expired
    sessions and fail for expired tokens while still succeeding for valid tokens.
    """
    auth_router = _load_auth_router(monkeypatch)

    store = auth_router._password_recovery_store
    store.clear()

    now = datetime.datetime.utcnow()
    expired_token = "expired-token"
    valid_token = "valid-token"

    store[expired_token] = _make_recovery_session("id", now - datetime.timedelta(minutes=5))
    store[valid_token] = _make_recovery_session("id", now + datetime.timedelta(minutes=5))

    # Expired token should cause an HTTPException and be removed from the store.
    with pytest.raises(HTTPException) as excinfo:
        auth_router.verify_password_recovery_identity(token=expired_token)

    assert excinfo.value.status_code in (404, 410)
    assert expired_token not in store

    # Valid token should still succeed and remain in the store (or transition to the next state)
    auth_router.verify_password_recovery_identity(token=valid_token)
    assert valid_token in store or valid_token not in store  # allow for flows that consume the token


def test_reset_password_via_recovery_purges_expired_sessions(monkeypatch):
    """
    When reset_password_via_recovery is called, it should purge expired
    sessions and reject resets for expired tokens, while still allowing
    resets with valid tokens.
    """
    auth_router = _load_auth_router(monkeypatch)

    store = auth_router._password_recovery_store
    store.clear()

    now = datetime.datetime.utcnow()
    expired_token = "expired-token"
    valid_token = "valid-token"

    store[expired_token] = _make_recovery_session("id", now - datetime.timedelta(minutes=5))
    store[valid_token] = _make_recovery_session("id", now + datetime.timedelta(minutes=5))

    # Expired token reset attempt should fail and purge the expired session.
    with pytest.raises(HTTPException) as excinfo:
        auth_router.reset_password_via_recovery(
            token=expired_token,
            new_password="new-password",
        )

    assert excinfo.value.status_code in (404, 410)
    assert expired_token not in store

    # Valid token reset attempt should succeed.
    auth_router.reset_password_via_recovery(
        token=valid_token,
        new_password="new-password",
    )
    # Depending on implementation, the token may be consumed; the important
    # behavior here is that no exception is raised and the expired token is gone.

The above tests assume:

  1. backend.auth_router exposes:
    • _password_recovery_store as a mutable mapping from token to a dict containing at least user_id and expires_at: datetime.
    • Functions start_password_recovery, verify_password_recovery_identity, and reset_password_via_recovery callable as shown.
  2. start_password_recovery accepts an email keyword argument; if it instead expects a Pydantic model or different parameters, adjust the call to match.
  3. verify_password_recovery_identity and reset_password_via_recovery accept a token parameter, and reset_password_via_recovery accepts an argument for the new password (new_password here); rename or wrap as needed to match your actual function signatures.
  4. The recovery functions raise HTTPException with 404 or 410 for expired sessions. If they return responses instead of raising, update the assertions to check the returned status/shape instead of catching exceptions.
  5. If your implementation consumes valid tokens (removing them from _password_recovery_store on success), tighten the assert valid_token in store or valid_token not in store line to reflect the exact expected behavior (e.g. assert valid_token not in store).

email = "email"
username = "username"

fake_models.User = _StubUser

monkeypatch.setitem(sys.modules, "backend.database", fake_database)
monkeypatch.setitem(sys.modules, "backend.models", fake_models)
return importlib.import_module("backend.auth_router")


def test_admin_tokens_expire_by_default(monkeypatch):
monkeypatch.delenv("ALLOW_NON_EXPIRING_ADMIN_TOKENS", raising=False)
auth_router = _load_auth_router(monkeypatch)
admin_user = SimpleNamespace(is_admin=True, is_superuser=False)

assert auth_router._should_issue_non_expiring_admin_token(admin_user) is False

monkeypatch.setenv("ALLOW_NON_EXPIRING_ADMIN_TOKENS", "true")
auth_router = _load_auth_router(monkeypatch)
assert auth_router._should_issue_non_expiring_admin_token(admin_user) is True
Comment on lines +58 to +67
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (testing): Add parametrized tests for all supported ALLOW_NON_EXPIRING_ADMIN_TOKENS values and non-admin users

This test only covers the default and a single truthy value. Since _should_issue_non_expiring_admin_token accepts multiple truthy strings and depends on admin/superuser flags, please parameterize it to cover:

  • All accepted truthy values (e.g. "1", "true", "yes", "on", plus casing/whitespace variants) returning True for an admin user.
  • A non-admin user returning False even when the env var is a truthy value.
  • Falsy/invalid values (e.g. "false", "0", "random") returning False for an admin user.

This will better guard against regressions and ensure the env flag semantics remain consistent.

Suggested implementation:

import pytest
from fastapi import HTTPException
from types import SimpleNamespace
@pytest.mark.parametrize(
    "env_value",
    [
        "1",
        "true",
        "yes",
        "on",
        "TRUE",
        "Yes",
        "On",
        "  true  ",
    ],
)
def test_should_issue_non_expiring_admin_token_for_truthy_values(monkeypatch, env_value):
    monkeypatch.setenv("ALLOW_NON_EXPIRING_ADMIN_TOKENS", env_value)
    auth_router = _load_auth_router(monkeypatch)
    admin_user = SimpleNamespace(is_admin=True, is_superuser=False)

    assert auth_router._should_issue_non_expiring_admin_token(admin_user) is True


@pytest.mark.parametrize(
    "env_value",
    [
        None,  # unset / default
        "",
        "false",
        "0",
        "no",
        "off",
        "FALSE",
        "No",
        "Off",
        "random",
        "  ",
    ],
)
def test_should_not_issue_non_expiring_admin_token_for_falsy_or_invalid_values(
    monkeypatch, env_value
):
    if env_value is None:
        monkeypatch.delenv("ALLOW_NON_EXPIRING_ADMIN_TOKENS", raising=False)
    else:
        monkeypatch.setenv("ALLOW_NON_EXPIRING_ADMIN_TOKENS", env_value)

    auth_router = _load_auth_router(monkeypatch)
    admin_user = SimpleNamespace(is_admin=True, is_superuser=False)

    assert auth_router._should_issue_non_expiring_admin_token(admin_user) is False


@pytest.mark.parametrize(
    "env_value",
    [
        "1",
        "true",
        "yes",
        "on",
        "TRUE",
        "Yes",
        "On",
        "  true  ",
    ],
)
def test_should_not_issue_non_expiring_token_for_non_admin_users(monkeypatch, env_value):
    monkeypatch.setenv("ALLOW_NON_EXPIRING_ADMIN_TOKENS", env_value)
    auth_router = _load_auth_router(monkeypatch)
    non_admin_user = SimpleNamespace(is_admin=False, is_superuser=False)

    assert auth_router._should_issue_non_expiring_admin_token(non_admin_user) is False



def test_start_password_recovery_uses_random_verification_code(monkeypatch):
auth_router = _load_auth_router(monkeypatch)
auth_router._password_recovery_store.clear()
monkeypatch.setattr(auth_router, "randbelow", lambda upper_bound: 123456)
db = _FakeDB(SimpleNamespace(id=7, is_admin=True, is_superuser=False))

response = auth_router.start_password_recovery(
auth_router.PasswordRecoveryStartRequest(
scope="admin",
user_hint="admin@example.com",
),
db,
)

session_state = auth_router._password_recovery_store[response["recovery_session_token"]]
assert session_state["verification_code"] == "123456"
assert session_state["verification_code"] != "000000"
Comment on lines +70 to +86
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (testing): Cover zero-padding and multi-session behavior of the recovery verification code

To strengthen this test, consider adding:

  • Zero-padding: monkeypatch randbelow to return a small value (e.g. 42) and assert verification_code == "000042" to cover the formatting logic.
  • Multi-session uniqueness: create two sessions in sequence with different randbelow outputs and assert their verification codes differ, confirming per-session randomness.

These can be added via parametrization or as a dedicated padding/uniqueness test.

Suggested change
def test_start_password_recovery_uses_random_verification_code(monkeypatch):
auth_router = _load_auth_router(monkeypatch)
auth_router._password_recovery_store.clear()
monkeypatch.setattr(auth_router, "randbelow", lambda upper_bound: 123456)
db = _FakeDB(SimpleNamespace(id=7, is_admin=True, is_superuser=False))
response = auth_router.start_password_recovery(
auth_router.PasswordRecoveryStartRequest(
scope="admin",
user_hint="admin@example.com",
),
db,
)
session_state = auth_router._password_recovery_store[response["recovery_session_token"]]
assert session_state["verification_code"] == "123456"
assert session_state["verification_code"] != "000000"
def test_start_password_recovery_uses_random_verification_code(monkeypatch):
auth_router = _load_auth_router(monkeypatch)
auth_router._password_recovery_store.clear()
monkeypatch.setattr(auth_router, "randbelow", lambda upper_bound: 123456)
db = _FakeDB(SimpleNamespace(id=7, is_admin=True, is_superuser=False))
response = auth_router.start_password_recovery(
auth_router.PasswordRecoveryStartRequest(
scope="admin",
user_hint="admin@example.com",
),
db,
)
session_state = auth_router._password_recovery_store[response["recovery_session_token"]]
assert session_state["verification_code"] == "123456"
assert session_state["verification_code"] != "000000"
def test_start_password_recovery_zero_pads_verification_code(monkeypatch):
auth_router = _load_auth_router(monkeypatch)
auth_router._password_recovery_store.clear()
# Force a small random value to exercise zero-padding logic
monkeypatch.setattr(auth_router, "randbelow", lambda upper_bound: 42)
db = _FakeDB(SimpleNamespace(id=7, is_admin=True, is_superuser=False))
response = auth_router.start_password_recovery(
auth_router.PasswordRecoveryStartRequest(
scope="admin",
user_hint="admin@example.com",
),
db,
)
session_state = auth_router._password_recovery_store[response["recovery_session_token"]]
assert session_state["verification_code"] == "000042"
def test_start_password_recovery_generates_unique_codes_per_session(monkeypatch):
auth_router = _load_auth_router(monkeypatch)
auth_router._password_recovery_store.clear()
# Simulate different random outputs for consecutive sessions
random_values = iter([123456, 654321])
def _fake_randbelow(upper_bound):
return next(random_values)
monkeypatch.setattr(auth_router, "randbelow", _fake_randbelow)
db = _FakeDB(SimpleNamespace(id=7, is_admin=True, is_superuser=False))
response1 = auth_router.start_password_recovery(
auth_router.PasswordRecoveryStartRequest(
scope="admin",
user_hint="admin1@example.com",
),
db,
)
response2 = auth_router.start_password_recovery(
auth_router.PasswordRecoveryStartRequest(
scope="admin",
user_hint="admin2@example.com",
),
db,
)
session_state1 = auth_router._password_recovery_store[response1["recovery_session_token"]]
session_state2 = auth_router._password_recovery_store[response2["recovery_session_token"]]
assert session_state1["verification_code"] == "123456"
assert session_state2["verification_code"] == "654321"
assert session_state1["verification_code"] != session_state2["verification_code"]



def test_password_recovery_verify_identity_limits_failed_attempts(monkeypatch):
auth_router = _load_auth_router(monkeypatch)
auth_router._password_recovery_store.clear()
recovery_session_token = "recovery_test"
auth_router._password_recovery_store[recovery_session_token] = {
"user_id": 1,
"scope": "admin",
"verified": False,
"verification_code": "654321",
"verification_attempts": 0,
Comment on lines +89 to +98
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (testing): Add a positive-path test for identity verification, including reset of attempt counter and reset-token issuance

This test covers the failure path and 401/429 behavior well. To fully cover verify_password_recovery_identity, please also add a positive-path test that:

  • Begins with a session where verification_attempts > 0,
  • Submits a correct verification_code,
  • Asserts that:
    • session_state["verified"] becomes True,
    • session_state["verification_attempts"] is reset to 0,
    • identity_session_token, reset_token, and reset_expires_at are correctly set,
    • The response exposes a usable reset_token.

This will guard against regressions in the successful verification flow while enforcing the attempt limit.

Suggested implementation:

def test_password_recovery_verify_identity_limits_failed_attempts(monkeypatch):


def test_password_recovery_verify_identity_success_resets_attempts_and_issues_reset_token(monkeypatch, db):
    auth_router = _load_auth_router(monkeypatch)
    auth_router._password_recovery_store.clear()

    recovery_session_token = "recovery_test_success"
    auth_router._password_recovery_store[recovery_session_token] = {
        "user_id": 1,
        "scope": "admin",
        "verified": False,
        "verification_code": "654321",
        "verification_attempts": 2,
        "expires_at": datetime.utcnow() + timedelta(minutes=5),
    }

    response = auth_router.verify_password_recovery_identity(
        recovery_session_token=recovery_session_token,
        verification_code="654321",
        db=db,
    )

    session_state = auth_router._password_recovery_store[recovery_session_token]

    # verification flag should be set
    assert session_state["verified"] is True
    # attempts counter should be reset
    assert session_state["verification_attempts"] == 0

    # reset / identity session fields should be issued
    assert "identity_session_token" in session_state
    assert "reset_token" in session_state
    assert "reset_expires_at" in session_state

    # response should expose usable reset_token
    assert response["reset_token"] == session_state["reset_token"]

To wire this in cleanly you may need to:

  1. Confirm the verification entry point and adapt the call accordingly:
    • If the API is different (e.g. verify_password_recovery_identity(db, recovery_session_token, verification_code) or an HTTP route tested via a test client), adjust the response = ... line to use the actual call pattern and access the returned JSON body appropriately.
  2. Ensure that the db fixture is available in this test module (other tests in the file likely already use it; if not, add or import it).
  3. Place this new test at top-level (same indentation as other tests) and immediately after the existing test_password_recovery_verify_identity_limits_failed_attempts so both success and failure paths live together.
  4. If your implementation uses different session-state keys (e.g. "identity_session_token" vs "password_recovery_session_token"), update the asserted keys to match the actual implementation.

"expires_at": datetime.utcnow() + timedelta(minutes=5),
}
payload = auth_router.PasswordRecoveryVerifyIdentityRequest(
recovery_session_token=recovery_session_token,
identity_session_token="identity-proof",
verification_code="000000",
)

for _ in range(auth_router._PASSWORD_RECOVERY_MAX_VERIFY_ATTEMPTS - 1):
with pytest.raises(HTTPException) as exc_info:
auth_router.verify_password_recovery_identity(payload)
assert exc_info.value.status_code == 401

with pytest.raises(HTTPException) as exc_info:
auth_router.verify_password_recovery_identity(payload)
assert exc_info.value.status_code == 429
assert recovery_session_token not in auth_router._password_recovery_store


def test_reset_password_requires_verified_identity(monkeypatch):
auth_router = _load_auth_router(monkeypatch)
auth_router._password_recovery_store.clear()
auth_router._password_recovery_store["recovery_test"] = {
"user_id": 1,
"scope": "admin",
"verified": False,
"reset_token": "reset_token",
"reset_expires_at": datetime.utcnow() + timedelta(minutes=5),
"expires_at": datetime.utcnow() + timedelta(minutes=5),
Comment on lines +118 to +127
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (testing): Distinguish between unverified and missing identity session token in reset-password tests

This test covers the verified=False → 403 behavior. The implementation also requires a non-empty identity_session_token. Please add a case where verified=True but identity_session_token is empty/whitespace or missing, and a reset with a valid reset_token still returns 403. That will exercise the identity proof requirement and help catch regressions where the token is ignored once verified is true.

Suggested implementation:

def test_reset_password_requires_verified_identity(monkeypatch):
    auth_router = _load_auth_router(monkeypatch)
    auth_router._password_recovery_store.clear()

    # Case 1: identity not verified → reset must be forbidden even with a valid reset token
    auth_router._password_recovery_store["recovery_test"] = {
        "user_id": 1,
        "scope": "admin",
        "verified": False,
        "reset_token": "reset_token",
        "reset_expires_at": datetime.utcnow() + timedelta(minutes=5),
        "expires_at": datetime.utcnow() + timedelta(minutes=5),
    }

    with pytest.raises(HTTPException) as exc_info:
        # use whatever helper/reset endpoint is used elsewhere in this file
        auth_router.reset_password(
            recovery_session_token="recovery_test",
            reset_token="reset_token",
            new_password="new-password-1",
        )
    assert exc_info.value.status_code == 403

    # Case 2: identity marked verified but missing/blank identity_session_token → still forbidden
    auth_router._password_recovery_store["recovery_test"] = {
        "user_id": 1,
        "scope": "admin",
        "verified": True,
        # intentionally omit or blank out identity_session_token to simulate missing proof
        "identity_session_token": "   ",
        "reset_token": "reset_token",
        "reset_expires_at": datetime.utcnow() + timedelta(minutes=5),
        "expires_at": datetime.utcnow() + timedelta(minutes=5),
    }

    with pytest.raises(HTTPException) as exc_info:
        auth_router.reset_password(
            recovery_session_token="recovery_test",
            reset_token="reset_token",
            new_password="new-password-2",
        )
    assert exc_info.value.status_code == 403
  1. Replace auth_router.reset_password(...) with the actual function/route helper used elsewhere in this test file to perform a password reset, matching its parameter names and call style (e.g. using client.post("/reset-password", json=...) or similar).
  2. Align the payload keys (recovery_session_token, reset_token, new_password) with your actual reset handler signature.
  3. If the real implementation distinguishes between “missing” and “blank” identity_session_token, you may also want to add a third subcase where the key is completely omitted from the store entry.

}

with pytest.raises(HTTPException) as exc_info:
auth_router.reset_password_via_recovery(
auth_router.PasswordRecoveryResetRequest(
scope="admin",
reset_token="reset_token",
new_password="new-password-123",
),
_FakeDB(SimpleNamespace(id=1, hashed_password="old")),
)

assert exc_info.value.status_code == 403


def test_reset_password_updates_hash_and_clears_session(monkeypatch):
auth_router = _load_auth_router(monkeypatch)
auth_router._password_recovery_store.clear()
user = SimpleNamespace(id=1, hashed_password="old")
db = _FakeDB(user)
auth_router._password_recovery_store["recovery_test"] = {
"user_id": 1,
"scope": "admin",
"verified": True,
"identity_session_token": "identity-proof",
Comment on lines +143 to +152
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (testing): Add tests for scope normalization and invalid scopes in the recovery flow

The current test only covers the happy path for an admin scope, but the implementation now normalizes and validates scopes in both start_password_recovery and reset_password_via_recovery. Please add tests that:

  • Use mixed-case and whitespace scopes (e.g. " Admin ", "USER") for both start and reset, asserting that recovery succeeds, confirming consistent normalization.
  • Use invalid scopes (e.g. "root", "foobar") for both start and reset, asserting a 400 and the expected error message.

This will ensure the allowed-scope guardrail is properly enforced and cannot be bypassed via casing or whitespace.

Suggested implementation:

def test_reset_password_updates_hash_and_clears_session(monkeypatch):
    auth_router = _load_auth_router(monkeypatch)
    auth_router._password_recovery_store.clear()
    user = SimpleNamespace(id=1, hashed_password="old")
    db = _FakeDB(user)

    # Seed a verified recovery entry for the user
    auth_router._password_recovery_store["recovery_test"] = {
        "user_id": 1,
        "scope": "admin",
        "verified": True,
        "identity_session_token": "identity-proof",
        "reset_token": "reset_token",
    }

    # Perform the reset
    auth_router.reset_password_via_recovery(
        SimpleNamespace(
            scope="admin",
            reset_token="reset_token",
            new_password="new-password-123",
        ),
        db,
    )

    # Password hash must change and recovery entry must be removed
    assert user.hashed_password != "old"
    assert "recovery_test" not in auth_router._password_recovery_store


def test_start_password_recovery_normalizes_scope(monkeypatch):
    """start_password_recovery should accept mixed-case/whitespace scopes."""
    auth_router = _load_auth_router(monkeypatch)
    auth_router._password_recovery_store.clear()
    user = SimpleNamespace(id=1, email="user@example.com", hashed_password="old")
    db = _FakeDB(user)

    # Use mixed-case and whitespace for scope
    request = SimpleNamespace(
        email="user@example.com",
        scope="  Admin  ",
    )

    auth_router.start_password_recovery(request, db)

    # Recovery entry should exist and have normalized scope (e.g. 'admin')
    assert len(auth_router._password_recovery_store) == 1
    recovery = next(iter(auth_router._password_recovery_store.values()))
    assert recovery["user_id"] == user.id
    assert recovery["scope"] == "admin"


def test_start_password_recovery_rejects_invalid_scope(monkeypatch):
    """start_password_recovery should reject invalid scopes."""
    auth_router = _load_auth_router(monkeypatch)
    auth_router._password_recovery_store.clear()
    user = SimpleNamespace(id=1, email="user@example.com", hashed_password="old")
    db = _FakeDB(user)

    for invalid_scope in ["root", "foobar"]:
        request = SimpleNamespace(
            email="user@example.com",
            scope=invalid_scope,
        )

        with pytest.raises(HTTPException) as exc_info:
            auth_router.start_password_recovery(request, db)

        assert exc_info.value.status_code == 400
        # The implementation should expose a clear invalid-scope error message
        assert "invalid scope" in exc_info.value.detail.lower()


def test_reset_password_via_recovery_normalizes_scope(monkeypatch):
    """reset_password_via_recovery should accept mixed-case/whitespace scopes."""
    auth_router = _load_auth_router(monkeypatch)
    auth_router._password_recovery_store.clear()
    user = SimpleNamespace(id=1, hashed_password="old")
    db = _FakeDB(user)

    # Seed a verified recovery entry with normalized scope
    auth_router._password_recovery_store["recovery_test"] = {
        "user_id": 1,
        "scope": "admin",
        "verified": True,
        "identity_session_token": "identity-proof",
        "reset_token": "reset_token",
    }

    # Call reset with mixed-case and extra whitespace in scope
    auth_router.reset_password_via_recovery(
        SimpleNamespace(
            scope="  Admin  ",
            reset_token="reset_token",
            new_password="new-password-123",
        ),
        db,
    )

    # Ensure the reset still succeeds (hash changes and entry cleared)
    assert user.hashed_password != "old"
    assert "recovery_test" not in auth_router._password_recovery_store


def test_reset_password_via_recovery_rejects_invalid_scope(monkeypatch):
    """reset_password_via_recovery should reject invalid scopes."""
    auth_router = _load_auth_router(monkeypatch)
    auth_router._password_recovery_store.clear()
    user = SimpleNamespace(id=1, hashed_password="old")
    db = _FakeDB(user)

    # Seed a verified recovery entry with an allowed scope
    auth_router._password_recovery_store["recovery_test"] = {
        "user_id": 1,
        "scope": "admin",
        "verified": True,
        "identity_session_token": "identity-proof",
        "reset_token": "reset_token",
    }

    for invalid_scope in ["root", "foobar"]:
        with pytest.raises(HTTPException) as exc_info:
            auth_router.reset_password_via_recovery(
                SimpleNamespace(
                    scope=invalid_scope,
                    reset_token="reset_token",
                    new_password="new-password-123",
                ),
                db,
            )

        assert exc_info.value.status_code == 400
        assert "invalid scope" in exc_info.value.detail.lower()

The above tests assume the following, based on typical patterns in this file:

  1. start_password_recovery(request, db) accepts a request object with email and scope attributes and raises HTTPException on invalid scope, with a message that includes "invalid scope".
  2. reset_password_via_recovery(request, db) will:
    • Normalize the scope from the request (trimming whitespace and normalizing case) and compare it against allowed scopes.
    • Raise HTTPException(status_code=400, detail="...invalid scope...") when the scope is invalid.
  3. auth_router._password_recovery_store is a dict keyed by token with values containing at least user_id, scope, verified, identity_session_token, and reset_token.

If your actual request/DB interfaces or error messages differ, adjust:

  • The SimpleNamespace fields used to construct the request objects.
  • The error-message substring checks ("invalid scope").
  • Any additional assertions you want (e.g., checking that the identity session is cleared) to match your implementation.

"reset_token": "reset_token",
"reset_expires_at": datetime.utcnow() + timedelta(minutes=5),
"expires_at": datetime.utcnow() + timedelta(minutes=5),
}

response = auth_router.reset_password_via_recovery(
auth_router.PasswordRecoveryResetRequest(
scope="admin",
reset_token="reset_token",
new_password="new-password-123",
),
db,
)

assert response == {"reset": True, "must_relogin": True}
assert user.hashed_password != "old"
assert db.committed is True
assert "recovery_test" not in auth_router._password_recovery_store
Loading