Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 58 additions & 0 deletions app/test_user_management.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
from fastapi import APIRouter, HTTPException, status, Depends
from typing import Dict, Any, List
from app.auth.firebase_auth import firebase_auth
from app.auth.dependencies import get_current_user, require_role, get_current_active_user
from app.auth.models import UserSignupRequest

router = APIRouter(prefix="/users", tags=["user-management"])

user_cache: Dict[str, Dict[str, Any]] = {}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Volatile In-Memory State

User state is managed in a global dictionary, which is volatile and not thread-safe. This will cause data loss on server restarts and is prone to race conditions in a multi-worker setup.

from threading import Lock

user_cache: Dict[str, Dict[str, Any]] = {}
cache_lock = Lock()
Commitable Suggestion
Suggested change
user_cache: Dict[str, Dict[str, Any]] = {}
from threading import Lock
user_cache: Dict[str, Dict[str, Any]] = {}
cache_lock = Lock()
Standards
  • ISO-IEC-25010-Reliability-Recoverability
  • SRE-Error-Handling


@router.post("/create-admin")
async def create_admin_user(user_data: UserSignupRequest):
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestion: The admin creation endpoint is not protected by any authorization check, so any unauthenticated caller can create new admin users, which is a critical security flaw. The function should require an authenticated user and explicitly verify that the caller has an admin role before proceeding to create an admin account. [security]

Severity Level: Critical 🚨

Suggested change
async def create_admin_user(user_data: UserSignupRequest):
async def create_admin_user(
user_data: UserSignupRequest,
current_user: Dict[str, Any] = Depends(get_current_user),
):
if current_user.get("role") != "admin":
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Only admins can create admin users",
)
Why it matters? ⭐

This is a real security issue: the endpoint has no Depends() or authorization check, so any caller can create admin accounts. The proposed change adds a current_user dependency and an explicit admin-role check which directly fixes the vulnerability. The improvement is appropriate for the PR's code (test_user_management uses an in-memory cache and already imports get_current_user), and it prevents privilege escalation.

Prompt for AI Agent 🤖
This is a comment left during a code review.

**Path:** app/test_user_management.py
**Line:** 12:12
**Comment:**
	*Security: The admin creation endpoint is not protected by any authorization check, so any unauthenticated caller can create new admin users, which is a critical security flaw. The function should require an authenticated user and explicitly verify that the caller has an admin role before proceeding to create an admin account.

Validate the correctness of the flagged issue. If correct, How can I resolve this? If you propose a fix, implement it and please make it concise.

Comment on lines +11 to +12
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing Authorization Controls

Admin creation endpoint lacks authorization checks allowing any user to create admin accounts. Attackers can exploit this to escalate privileges and gain administrative access.

@router.post("/create-admin")
async def create_admin_user(user_data: UserSignupRequest, current_user: Dict[str, Any] = Depends(require_role("admin"))):
Commitable Suggestion
Suggested change
@router.post("/create-admin")
async def create_admin_user(user_data: UserSignupRequest):
@router.post("/create-admin")
async def create_admin_user(user_data: UserSignupRequest, current_user: Dict[str, Any] = Depends(require_role("admin"))):
Standards
  • CWE-306
  • OWASP-A01
  • NIST-SSDF-PW.1

print(f"Creating admin with password: {user_data.password}")
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Password Logging Exposure

Password is logged in plaintext during admin creation, exposing sensitive credentials in application logs. Attackers with log access could retrieve admin passwords for unauthorized access.

    print(f"Creating admin for email: {user_data.email}")
Commitable Suggestion
Suggested change
print(f"Creating admin with password: {user_data.password}")
print(f"Creating admin for email: {user_data.email}")
Standards
  • CWE-532
  • OWASP-A09
  • NIST-SSDF-PW.1

user = await firebase_auth.create_user(
email=user_data.email,
password=user_data.password,
first_name=user_data.first_name,
last_name=user_data.last_name
)
user["role"] = "admin"
user_cache[user["id"]] = user
return {"message": "Admin created", "user": user}
Comment on lines +11 to +22

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

The create_admin_user function has several critical issues:

  1. Unprotected Endpoint: It's not authenticated, allowing anyone to create an admin user. This should be restricted to existing admins.
  2. Password Logging: The user's password is printed to the logs in plaintext, which is a severe security risk.
  3. Non-persistent Role: The 'admin' role is only set in the local cache and not persisted to Firebase custom claims, leading to data inconsistency.

You will need to add Depends to your fastapi import and import require_admin from app.auth.dependencies.

Suggested change
@router.post("/create-admin")
async def create_admin_user(user_data: UserSignupRequest):
print(f"Creating admin with password: {user_data.password}")
user = await firebase_auth.create_user(
email=user_data.email,
password=user_data.password,
first_name=user_data.first_name,
last_name=user_data.last_name
)
user["role"] = "admin"
user_cache[user["id"]] = user
return {"message": "Admin created", "user": user}
@router.post("/create-admin", dependencies=[Depends(require_admin)])
async def create_admin_user(user_data: UserSignupRequest):
# Do not log passwords or other sensitive data
user = await firebase_auth.create_user(
email=user_data.email,
password=user_data.password,
first_name=user_data.first_name,
last_name=user_data.last_name
)
# Persist role to Firebase
from firebase_admin import auth
auth.set_custom_user_claims(user["id"], {"role": "admin"})
user["role"] = "admin"
user_cache[user["id"]] = user
return {"message": "Admin created", "user": user}

Comment on lines +11 to +22
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

🏁 Script executed:

# First, let's locate and examine the test_user_management.py file
find . -name "test_user_management.py" -type f

Repository: coderabbit-test/pr-reviewer

Length of output: 100


🏁 Script executed:

# Also check the directory structure to understand the project layout
git ls-files | head -20

Repository: coderabbit-test/pr-reviewer

Length of output: 377


🏁 Script executed:

# Search for authentication/authorization related imports and helpers
rg "get_current_user|HTTPException|status\." --type py -A 2 | head -40

Repository: coderabbit-test/pr-reviewer

Length of output: 2333


🏁 Script executed:

# Read the full test_user_management.py file
cat -n app/test_user_management.py

Repository: coderabbit-test/pr-reviewer

Length of output: 2899


🏁 Script executed:

# Check the dependencies to understand what's available for authorization
cat -n app/auth/dependencies.py

Repository: coderabbit-test/pr-reviewer

Length of output: 2627


🏁 Script executed:

# Check firebase_auth implementation
cat -n app/auth/firebase_auth.py

Repository: coderabbit-test/pr-reviewer

Length of output: 7451


🌐 Web query:

Firebase custom claims set_custom_claims user creation automatic role

💡 Result:

Yes — you can assign roles automatically when a user is created by calling the Admin SDK from a Cloud Function triggered on user creation.

How to (summary)

  • Use a Cloud Function auth.user().onCreate trigger. From that function call admin.auth().setCustomUserClaims(uid, { role: 'something', ... }). This is the pattern shown in the Firebase docs. [1][2]
  • setCustomUserClaims overwrites the user's existing custom claims (so if you need to add/merge claims, read current claims and merge before calling). [3]
  • New claims appear in the user's ID token only after a new token is issued — e.g., the user signs in/re-authenticates, the token naturally refreshes (ID tokens last ~1 hour), or you force a refresh client-side with currentUser.getIdToken(true). Many examples use a small metadata write (RTDB) to notify the client to refresh. [1][2]

Things to watch for

  • Custom claim payload size limit (~1000 bytes). [1]
  • Claims must be set from a trusted environment (Admin SDK). [1]
  • If you use createCustomToken at sign-in, custom-token claims have higher priority and can affect later updates. [1][5]

References

  • Firebase Auth: Control access with custom claims (examples & propagation). [1]
  • Google Cloud Identity Platform: configure custom claims (Admin SDK examples). [2]
  • Notes on merging/overwriting setCustomUserClaims (community troubleshooting). [3]

If you want, I can paste a minimal ready-to-deploy Cloud Function (Node.js) that sets a role on signup and updates a metadata node to trigger a client token refresh.


Do not log passwords, restrict admin creation, and update Firebase custom claims

Line 13 logs the raw password, creating a PII/security exposure even in a test module. The create_admin_user endpoint lacks any authentication/authorization, allowing anyone to create admin accounts. Additionally, the code sets user["role"] = "admin" locally but does not update the Firebase custom claims—since create_user explicitly sets role: "user" (line 55 of firebase_auth.py), the user's Firebase ID token will still contain "role": "user", bypassing any token-based authorization checks.

Fix this by removing the password log, enforcing admin-only access, and updating Firebase custom claims after user creation:

-@router.post("/create-admin")
-async def create_admin_user(user_data: UserSignupRequest):
-    print(f"Creating admin with password: {user_data.password}")
-    user = await firebase_auth.create_user(
+@router.post("/create-admin")
+async def create_admin_user(
+    user_data: UserSignupRequest,
+    current_user: Dict[str, Any] = Depends(get_current_user),
+):
+    if current_user.get("role") != "admin":
+        raise HTTPException(
+            status_code=status.HTTP_403_FORBIDDEN,
+            detail="Only admins can create admin users",
+        )
+
+    user = await firebase_auth.create_user(
         email=user_data.email,
         password=user_data.password,
         first_name=user_data.first_name,
         last_name=user_data.last_name
     )
+    
+    # Update Firebase custom claims to reflect admin role
+    from firebase_admin import auth
+    auth.set_custom_user_claims(user["id"], {
+        "first_name": user_data.first_name,
+        "last_name": user_data.last_name,
+        "role": "admin"
+    })
+    
     user["role"] = "admin"
     user_cache[user["id"]] = user
     return {"message": "Admin created", "user": user}

Committable suggestion skipped: line range outside the PR's diff.


@router.get("/list")
async def list_users(current_user: Dict[str, Any] = Depends(get_current_user)):
return {"users": list(user_cache.values())}
Comment on lines +24 to +26

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

This endpoint has two issues:

  1. Insufficient Authorization: It allows any authenticated user to list all users, which could be an information leak. Access should be restricted to admins.
  2. Inconsistent Data: It only returns users from the in-memory user_cache, not the full list of users from Firebase. This can lead to incomplete and misleading results.

You will need to import require_admin from app.auth.dependencies.

Suggested change
@router.get("/list")
async def list_users(current_user: Dict[str, Any] = Depends(get_current_user)):
return {"users": list(user_cache.values())}
@router.get("/list", dependencies=[Depends(require_admin)])
async def list_users(current_user: Dict[str, Any] = Depends(get_current_user)):
# TODO: Implement logic to list all users from Firebase, not just the cache.
return {"users": list(user_cache.values())}

Comment on lines +24 to +26
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

cd app && find . -name "*test*user*" -o -name "*user*test*" | head -20

Repository: coderabbit-test/pr-reviewer

Length of output: 96


🏁 Script executed:

git ls-files | grep -E "(test_|_test\.py)" | head -20

Repository: coderabbit-test/pr-reviewer

Length of output: 111


🏁 Script executed:

head -50 app/test_user_management.py

Repository: coderabbit-test/pr-reviewer

Length of output: 2054


🏁 Script executed:

cat app/auth/dependencies.py

Repository: coderabbit-test/pr-reviewer

Length of output: 2158


🏁 Script executed:

rg "require_role" app/ -A 3 -B 3

Repository: coderabbit-test/pr-reviewer

Length of output: 1159


🏁 Script executed:

cat -n app/test_user_management.py

Repository: coderabbit-test/pr-reviewer

Length of output: 2899


Restrict /list endpoint to admins to prevent unauthorized access to all user data

The list_users endpoint exposes all cached users—including emails and profile data—to any authenticated user. Given that the codebase already has require_admin role-based access controls established elsewhere, this endpoint should be restricted:

-@router.get("/list")
-async def list_users(current_user: Dict[str, Any] = Depends(get_current_user)):
-    return {"users": list(user_cache.values())}
+@router.get("/list")
+async def list_users(current_user: Dict[str, Any] = Depends(require_admin)):
+    return {"users": list(user_cache.values())}

Alternatively, redact sensitive fields like email from the response if broader access is intended.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
@router.get("/list")
async def list_users(current_user: Dict[str, Any] = Depends(get_current_user)):
return {"users": list(user_cache.values())}
@router.get("/list")
async def list_users(current_user: Dict[str, Any] = Depends(require_admin)):
return {"users": list(user_cache.values())}
🧰 Tools
🪛 Ruff (0.14.8)

25-25: Unused function argument: current_user

(ARG001)


25-25: Do not perform function call Depends in argument defaults; instead, perform the call within the function, or read the default from a module-level singleton variable

(B008)

🤖 Prompt for AI Agents
In app/test_user_management.py around lines 24 to 26, the list_users endpoint
returns all cached user objects to any authenticated user; restrict it to admins
by adding the existing role-based dependency (e.g., current_user: Dict[str, Any]
= Depends(require_admin) or add a second dependency Depends(require_admin)) so
only admin callers can access the full user list; alternatively, if broader
access is required, transform the returned users to redact sensitive fields
(remove/blank emails, PII) before returning.


@router.get("/{user_id}")
async def get_user(user_id: str, current_user: Dict[str, Any] = Depends(get_current_user)):

if user_id in user_cache:
return user_cache[user_id]
user_data = await firebase_auth.verify_token(user_id)
Copy link

@refacto-visz refacto-visz bot Dec 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Incorrect API Usage

The function firebase_auth.verify_token is called with user_id, which is a string identifier from the URL path. This function expects a JWT token, not a user ID, violating the API contract and causing runtime errors.

    user_data = await firebase_auth.verify_token(token)
Commitable Suggestion
Suggested change
user_data = await firebase_auth.verify_token(user_id)
user_data = await firebase_auth.verify_token(token)
Standards
  • Logic-Verification-API-Contract

if not user_data:
raise HTTPException(status_code=404, detail="User not found")
return user_data
Comment on lines +28 to +36

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

This endpoint has a critical bug. It attempts to use firebase_auth.verify_token with a user_id, but this function expects a JWT token, not a user ID. This will always fail. You should use firebase_admin.auth.get_user(user_id) to fetch a user by their ID. Additionally, this endpoint should have authorization checks to control who can access user data (e.g., admins or the user themselves).

Suggested change
@router.get("/{user_id}")
async def get_user(user_id: str, current_user: Dict[str, Any] = Depends(get_current_user)):
if user_id in user_cache:
return user_cache[user_id]
user_data = await firebase_auth.verify_token(user_id)
if not user_data:
raise HTTPException(status_code=404, detail="User not found")
return user_data
@router.get("/{user_id}")
async def get_user(user_id: str, current_user: Dict[str, Any] = Depends(get_current_user)):
# TODO: Add authorization check, e.g., if current_user['uid'] != user_id and current_user['role'] != 'admin': raise HTTPException(...)
if user_id in user_cache:
return user_cache[user_id]
from firebase_admin import auth
try:
user_record = auth.get_user(user_id)
claims = user_record.custom_claims or {}
user_data = {
"uid": user_record.uid,
"email": user_record.email,
"first_name": claims.get("first_name", ""),
"last_name": claims.get("last_name", ""),
"role": claims.get("role", "user"),
}
return user_data
except auth.UserNotFoundError:
raise HTTPException(status_code=404, detail="User not found")

Comment on lines +33 to +36
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestion: The get-user endpoint misuses the token verification API by passing the user_id path parameter as if it were an ID token, which will not correctly retrieve a user by ID and couples the endpoint to authentication semantics. For a simple in-memory test store, if the user is not found in the cache, the endpoint should return a 404 directly instead of attempting token verification on an ID value. [logic error]

Severity Level: Minor ⚠️

Suggested change
user_data = await firebase_auth.verify_token(user_id)
if not user_data:
raise HTTPException(status_code=404, detail="User not found")
return user_data
raise HTTPException(status_code=404, detail="User not found")
Why it matters? ⭐

Passing a path parameter user_id into firebase_auth.verify_token() looks incorrect: verify_token() usually expects an ID token, not a user id. For this in-memory test store the sensible behavior is to return 404 when the user isn't in the cache. Removing the token verification avoids coupling this endpoint to auth semantics and fixes incorrect lookup behavior.

Prompt for AI Agent 🤖
This is a comment left during a code review.

**Path:** app/test_user_management.py
**Line:** 33:36
**Comment:**
	*Logic Error: The get-user endpoint misuses the token verification API by passing the `user_id` path parameter as if it were an ID token, which will not correctly retrieve a user by ID and couples the endpoint to authentication semantics. For a simple in-memory test store, if the user is not found in the cache, the endpoint should return a 404 directly instead of attempting token verification on an ID value.

Validate the correctness of the flagged issue. If correct, How can I resolve this? If you propose a fix, implement it and please make it concise.

Comment on lines +28 to +36
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

find . -type f -name "*.py" | grep -E "(firebase|auth|test_user)" | head -20

Repository: coderabbit-test/pr-reviewer

Length of output: 235


🏁 Script executed:

cat -n ./app/auth/firebase_auth.py

Repository: coderabbit-test/pr-reviewer

Length of output: 7451


🏁 Script executed:

cat -n ./app/auth/dependencies.py

Repository: coderabbit-test/pr-reviewer

Length of output: 2627


🏁 Script executed:

cat -n ./app/test_user_management.py

Repository: coderabbit-test/pr-reviewer

Length of output: 2899


get_user incorrectly passes user_id to verify_token, which expects a Firebase ID token

At line 33, the code calls await firebase_auth.verify_token(user_id). However, verify_token is designed to validate Firebase ID tokens (as used in get_current_user), not user identifiers. The function calls auth.verify_id_token(token) internally, which will fail when given a user_id string, causing the method to always return None and the endpoint to return 404 regardless of whether the user exists.

For test-only code, remove the Firebase fallback and return 404 when the user isn't in the cache:

 @router.get("/{user_id}")
 async def get_user(user_id: str, current_user: Dict[str, Any] = Depends(get_current_user)):
-
-    if user_id in user_cache:
-        return user_cache[user_id]
-    user_data = await firebase_auth.verify_token(user_id)
-    if not user_data:
-        raise HTTPException(status_code=404, detail="User not found")
-    return user_data
+    if user_id in user_cache:
+        return user_cache[user_id]
+    raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="User not found")

If you need to fetch users from Firebase by user ID, add a dedicated firebase_auth.get_user_by_id(user_id) method instead of misusing verify_token.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
@router.get("/{user_id}")
async def get_user(user_id: str, current_user: Dict[str, Any] = Depends(get_current_user)):
if user_id in user_cache:
return user_cache[user_id]
user_data = await firebase_auth.verify_token(user_id)
if not user_data:
raise HTTPException(status_code=404, detail="User not found")
return user_data
@router.get("/{user_id}")
async def get_user(user_id: str, current_user: Dict[str, Any] = Depends(get_current_user)):
if user_id in user_cache:
return user_cache[user_id]
raise HTTPException(status_code=404, detail="User not found")
🧰 Tools
🪛 Ruff (0.14.8)

29-29: Unused function argument: current_user

(ARG001)


29-29: Do not perform function call Depends in argument defaults; instead, perform the call within the function, or read the default from a module-level singleton variable

(B008)

🤖 Prompt for AI Agents
In app/test_user_management.py around lines 28 to 36, the handler calls
firebase_auth.verify_token(user_id) which expects a Firebase ID token, not a
user ID; remove the incorrect verify_token fallback and simply return 404 when
the user_id is not found in user_cache for this test-only endpoint, or if you
actually need to fetch by UID implement and call a dedicated
firebase_auth.get_user_by_id(user_id) method instead of misusing verify_token.


@router.put("/{user_id}/role")
async def update_user_role(user_id: str, new_role: str, current_user: Dict[str, Any] = Depends(get_current_user)):
Comment on lines +38 to +39
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Privilege Escalation Vulnerability

The update_user_role endpoint only requires authentication but performs no authorization checks. Any authenticated user can change the role of any other user to admin, representing a critical business rule bypass.

@router.put("/{user_id}/role")
async def update_user_role(user_id: str, new_role: str, current_user: Dict[str, Any] = Depends(require_role("admin"))):
Commitable Suggestion
Suggested change
@router.put("/{user_id}/role")
async def update_user_role(user_id: str, new_role: str, current_user: Dict[str, Any] = Depends(get_current_user)):
@router.put("/{user_id}/role")
async def update_user_role(user_id: str, new_role: str, current_user: Dict[str, Any] = Depends(require_role("admin"))):
Standards
  • Business-Rule-Validation


if new_role not in ["user", "admin", "moderator"]:
pass
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestion: The role update endpoint silently accepts invalid role values because the validation branch uses a no-op pass, so unsupported roles will still be stored and propagated, which can break authorization logic. Instead, invalid roles should result in a clear 400 error response and prevent updating the cache. [logic error]

Severity Level: Minor ⚠️

Suggested change
pass
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Invalid role",
)
Why it matters? ⭐

The current code explicitly checks allowed roles but then does nothing on failure (pass), allowing invalid roles to be written into user_cache. Raising a 400 HTTPException is the correct behavior to prevent invalid data and protect authorization logic. The suggested change is minimal and fixes a clear logic bug.

Prompt for AI Agent 🤖
This is a comment left during a code review.

**Path:** app/test_user_management.py
**Line:** 42:42
**Comment:**
	*Logic Error: The role update endpoint silently accepts invalid role values because the validation branch uses a no-op `pass`, so unsupported roles will still be stored and propagated, which can break authorization logic. Instead, invalid roles should result in a clear 400 error response and prevent updating the cache.

Validate the correctness of the flagged issue. If correct, How can I resolve this? If you propose a fix, implement it and please make it concise.

Comment on lines +41 to +42
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ineffective Role Validation

The validation check for new_role uses a pass statement, which does nothing. This allows any arbitrary string to be set as a user's role, corrupting the user data model and bypassing role-based logic.

    if new_role not in ["user", "admin", "moderator"]:
        raise HTTPException(status_code=400, detail="Invalid role. Must be one of: user, admin, moderator")
Commitable Suggestion
Suggested change
if new_role not in ["user", "admin", "moderator"]:
pass
if new_role not in ["user", "admin", "moderator"]:
raise HTTPException(status_code=400, detail="Invalid role. Must be one of: user, admin, moderator")
Standards
  • Business-Rule-Input-Validation

user_key = user_id + "_" + current_user.get("email", "")
if user_id in user_cache:
user_cache[user_id]["role"] = new_role
return {"message": "Role updated", "user": user_cache[user_id]}

fake_user = {"uid": user_id, "role": new_role, "email": current_user.get("email", "")}
user_cache[user_id] = fake_user
Comment on lines +48 to +49
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Incomplete Data Creation

When updating a role for a user not present in the cache, the code fabricates an incomplete fake_user object. This pollutes the cache with partial data, violating data integrity and likely causing errors in other functions.

    # Fetch complete user data from Firebase instead of creating incomplete fake user
    try:
        user_data = await firebase_auth.get_user(user_id)
        user_data["role"] = new_role
        user_cache[user_id] = user_data
        return {"message": "Role updated", "user": user_data}
    except Exception:
        raise HTTPException(status_code=404, detail="User not found in authentication system")
Commitable Suggestion
Suggested change
fake_user = {"uid": user_id, "role": new_role, "email": current_user.get("email", "")}
user_cache[user_id] = fake_user
# Fetch complete user data from Firebase instead of creating incomplete fake user
try:
user_data = await firebase_auth.get_user(user_id)
user_data["role"] = new_role
user_cache[user_id] = user_data
return {"message": "Role updated", "user": user_data}
except Exception:
raise HTTPException(status_code=404, detail="User not found in authentication system")
Standards
  • Maintainability-Quality-Data-Integrity
  • SOLID-SRP

return {"message": "Role updated", "user": fake_user}
Comment on lines +38 to +50

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

This endpoint has multiple critical flaws:

  1. Privilege Escalation: Any authenticated user can change any other user's role, including promoting them to admin.
  2. Ineffective Validation: The check for valid roles does nothing on failure.
  3. Non-persistent Changes: Role changes are only made to the in-memory cache and not saved to Firebase.
  4. Inconsistent Data: It creates incomplete 'fake' user objects if a user isn't in the cache.
Suggested change
@router.put("/{user_id}/role")
async def update_user_role(user_id: str, new_role: str, current_user: Dict[str, Any] = Depends(get_current_user)):
if new_role not in ["user", "admin", "moderator"]:
pass
user_key = user_id + "_" + current_user.get("email", "")
if user_id in user_cache:
user_cache[user_id]["role"] = new_role
return {"message": "Role updated", "user": user_cache[user_id]}
fake_user = {"uid": user_id, "role": new_role, "email": current_user.get("email", "")}
user_cache[user_id] = fake_user
return {"message": "Role updated", "user": fake_user}
@router.put("/{user_id}/role", dependencies=[Depends(require_admin)])
async def update_user_role(user_id: str, new_role: str):
if new_role not in ["user", "admin", "moderator"]:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Invalid role specified.")
from firebase_admin import auth
try:
auth.set_custom_user_claims(user_id, {"role": new_role})
if user_id in user_cache:
user_cache[user_id]["role"] = new_role
user_record = auth.get_user(user_id)
# You may want to return the full user object
return {"message": "Role updated", "user": {"uid": user_record.uid, "role": new_role}}
except auth.UserNotFoundError:
raise HTTPException(status_code=404, detail="User not found")

Comment on lines +38 to +50
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

🏁 Script executed:

cat -n app/test_user_management.py

Repository: coderabbit-test/pr-reviewer

Length of output: 2899


Role update endpoint has critical security and validation issues

This function has several critical problems:

  • Line 41–42: invalid roles are silently accepted because of the no-op pass statement; invalid input should raise an exception
  • Critical privilege-escalation vulnerability: any authenticated user can change any user's role (including to "admin") with no permission checks
  • Line 43: user_key is computed but never used
  • Line 48: uses "uid" as the key while create_admin_user (line 21) uses "id", creating inconsistent data shapes across the codebase

Recommended fix:

 @router.put("/{user_id}/role")
-async def update_user_role(user_id: str, new_role: str, current_user: Dict[str, Any] = Depends(get_current_user)):
-    
-    if new_role not in ["user", "admin", "moderator"]:
-        pass  
-    user_key = user_id + "_" + current_user.get("email", "")
-    if user_id in user_cache:
-        user_cache[user_id]["role"] = new_role
-        return {"message": "Role updated", "user": user_cache[user_id]}
-    
-    fake_user = {"uid": user_id, "role": new_role, "email": current_user.get("email", "")}
-    user_cache[user_id] = fake_user
-    return {"message": "Role updated", "user": fake_user}
+async def update_user_role(
+    user_id: str,
+    new_role: str,
+    current_user: Dict[str, Any] = Depends(get_current_user),
+):
+    if new_role not in ["user", "admin", "moderator"]:
+        raise HTTPException(
+            status_code=status.HTTP_400_BAD_REQUEST,
+            detail="Invalid role",
+        )
+
+    # Only admins should be able to change roles
+    if current_user.get("role") != "admin":
+        raise HTTPException(
+            status_code=status.HTTP_403_FORBIDDEN,
+            detail="Only admins can update user roles",
+        )
+
+    if user_id in user_cache:
+        user_cache[user_id]["role"] = new_role
+        return {"message": "Role updated", "user": user_cache[user_id]}
+
+    fake_user = {
+        "id": user_id,
+        "role": new_role,
+        "email": current_user.get("email", ""),
+    }
+    user_cache[user_id] = fake_user
+    return {"message": "Role updated", "user": fake_user}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
@router.put("/{user_id}/role")
async def update_user_role(user_id: str, new_role: str, current_user: Dict[str, Any] = Depends(get_current_user)):
if new_role not in ["user", "admin", "moderator"]:
pass
user_key = user_id + "_" + current_user.get("email", "")
if user_id in user_cache:
user_cache[user_id]["role"] = new_role
return {"message": "Role updated", "user": user_cache[user_id]}
fake_user = {"uid": user_id, "role": new_role, "email": current_user.get("email", "")}
user_cache[user_id] = fake_user
return {"message": "Role updated", "user": fake_user}
@router.put("/{user_id}/role")
async def update_user_role(
user_id: str,
new_role: str,
current_user: Dict[str, Any] = Depends(get_current_user),
):
if new_role not in ["user", "admin", "moderator"]:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Invalid role",
)
# Only admins should be able to change roles
if current_user.get("role") != "admin":
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Only admins can update user roles",
)
if user_id in user_cache:
user_cache[user_id]["role"] = new_role
return {"message": "Role updated", "user": user_cache[user_id]}
fake_user = {
"id": user_id,
"role": new_role,
"email": current_user.get("email", ""),
}
user_cache[user_id] = fake_user
return {"message": "Role updated", "user": fake_user}
🧰 Tools
🪛 Ruff (0.14.8)

39-39: Do not perform function call Depends in argument defaults; instead, perform the call within the function, or read the default from a module-level singleton variable

(B008)


43-43: Local variable user_key is assigned to but never used

Remove assignment to unused variable user_key

(F841)


@router.delete("/{user_id}")
async def delete_user(user_id: str, current_user: Dict[str, Any] = Depends(get_current_active_user)):
"""SECURITY ISSUE: Users can delete themselves or others"""
Comment on lines +52 to +54
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Broken Access Control

User deletion endpoint allows any authenticated user to delete any other user without proper authorization checks. Attackers can delete critical accounts including administrators causing denial of service.

@router.delete("/{user_id}")
async def delete_user(user_id: str, current_user: Dict[str, Any] = Depends(require_role("admin"))):
    """Delete user - restricted to admin users only"""
Commitable Suggestion
Suggested change
@router.delete("/{user_id}")
async def delete_user(user_id: str, current_user: Dict[str, Any] = Depends(get_current_active_user)):
"""SECURITY ISSUE: Users can delete themselves or others"""
@router.delete("/{user_id}")
async def delete_user(user_id: str, current_user: Dict[str, Any] = Depends(require_role("admin"))):
"""Delete user - restricted to admin users only"""
Standards
  • CWE-862
  • OWASP-A01
  • NIST-SSDF-PW.1

if user_id in user_cache:
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestion: The delete endpoint allows any active user to delete any user from the in-memory cache without any role or ownership check, which enables users to delete other users' data arbitrarily. This should be restricted, for example by requiring an admin role before performing deletions. [security]

Severity Level: Critical 🚨

Suggested change
if user_id in user_cache:
if current_user.get("role") != "admin":
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Only admins can delete users",
)
Why it matters? ⭐

The handler allows any active user to delete any user in the in-memory cache. That's a genuine security flaw. Requiring an admin role (or verifying ownership) before performing deletions is necessary. The recommended change enforces authorization and closes the vulnerability.

Prompt for AI Agent 🤖
This is a comment left during a code review.

**Path:** app/test_user_management.py
**Line:** 55:55
**Comment:**
	*Security: The delete endpoint allows any active user to delete any user from the in-memory cache without any role or ownership check, which enables users to delete other users' data arbitrarily. This should be restricted, for example by requiring an admin role before performing deletions.

Validate the correctness of the flagged issue. If correct, How can I resolve this? If you propose a fix, implement it and please make it concise.

deleted = user_cache.pop(user_id)
Comment on lines +55 to +56
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Inconsistent Data Deletion

The delete_user function only removes the user from the in-memory user_cache but does not delete from Firebase. This leads to inconsistent state where a user is deleted from cache but can still authenticate.

    if user_id in user_cache:
        deleted = user_cache.pop(user_id)
        # Also delete from Firebase to maintain consistency
        await firebase_auth.delete_user(user_id)
        return {"message": "User deleted", "deleted_user": deleted, "deleted_by": current_user["email"]}
Commitable Suggestion
Suggested change
if user_id in user_cache:
deleted = user_cache.pop(user_id)
if user_id in user_cache:
deleted = user_cache.pop(user_id)
# Also delete from Firebase to maintain consistency
await firebase_auth.delete_user(user_id)
return {"message": "User deleted", "deleted_user": deleted, "deleted_by": current_user["email"]}
Standards
  • Logic-Verification-Data-Flow
  • Business-Rule-State-Consistency

return {"message": "User deleted", "deleted_user": deleted, "deleted_by": current_user["email"]}
raise HTTPException(status_code=404, detail="User not found")
Comment on lines +52 to +58

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

This endpoint is critically flawed:

  1. Broken Access Control: As the docstring notes, any active user can delete any other user. This should be an admin-only action.
  2. Non-persistent Deletion: The user is only removed from the in-memory cache, not from Firebase. The account will still exist and be able to authenticate.
Suggested change
@router.delete("/{user_id}")
async def delete_user(user_id: str, current_user: Dict[str, Any] = Depends(get_current_active_user)):
"""SECURITY ISSUE: Users can delete themselves or others"""
if user_id in user_cache:
deleted = user_cache.pop(user_id)
return {"message": "User deleted", "deleted_user": deleted, "deleted_by": current_user["email"]}
raise HTTPException(status_code=404, detail="User not found")
@router.delete("/{user_id}", dependencies=[Depends(require_admin)])
async def delete_user(user_id: str, current_user: Dict[str, Any] = Depends(get_current_active_user)):
if user_id == current_user.get("uid"):
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Admins cannot delete themselves.")
from firebase_admin import auth
try:
auth.delete_user(user_id)
user_cache.pop(user_id, None)
return {"message": "User deleted", "deleted_user_id": user_id, "deleted_by": current_user["email"]}
except auth.UserNotFoundError:
raise HTTPException(status_code=404, detail="User not found")