-
Notifications
You must be signed in to change notification settings - Fork 4
Add user management test file #52
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
|
CodeAnt AI is reviewing your PR. Thanks for using CodeAnt! 🎉We're free for open-source projects. if you're enjoying it, help us grow by sharing. Share on X · |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Your free trial has ended. If you'd like to continue receiving code reviews, you can add a payment method here.
|
@refacto-visz |
Summary of ChangesHello @visz11, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request introduces a new Python file that establishes a foundational, in-memory user management system built with FastAPI. The primary intent appears to be for testing or prototyping, as evidenced by the use of a volatile dictionary for user data storage and an explicit security warning within the user deletion functionality. It provides essential operations for user lifecycle management, including role assignment. Highlights
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
WalkthroughA new FastAPI router module has been added that implements a minimal in-memory user management API. The module exposes five endpoints for creating admin users, listing users, retrieving individual users, updating user roles, and deleting users, with integration to Firebase authentication services. Changes
Estimated code review effort🎯 2 (Simple) | ⏱️ ~10–15 minutes
Poem
Pre-merge checks and finishing touches❌ Failed checks (2 warnings)
✅ Passed checks (1 passed)
✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
Refacto PR SummaryAdded comprehensive user management API endpoints for admin operations including user creation, role management, and deletion. The implementation provides REST endpoints for managing user accounts with Firebase authentication integration, though it contains several security vulnerabilities including unrestricted user deletion and weak role validation. Key Changes:
Change HighlightsClick to expand
Sequence DiagramsequenceDiagram
participant A as Admin
participant API as User Management API
participant FB as Firebase Auth
participant C as User Cache
A->>API: POST /users/create-admin
API->>FB: create_user(email, password)
FB-->>API: User data
API->>C: Store user with admin role
API-->>A: Admin created response
A->>API: PUT /users/{id}/role
API->>C: Update user role
API-->>A: Role updated response
A->>API: DELETE /users/{id}
API->>C: Remove user from cache
API-->>A: User deleted response
Testing GuideClick to expand
|
|
Refacto is reviewing this PR. Please wait for the review comments to be posted. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request introduces a new file for user management endpoints. However, the implementation has several critical security vulnerabilities and correctness issues. The endpoints lack proper authorization, allowing any authenticated user to perform administrative actions. Sensitive information is logged, and changes are not persisted correctly to Firebase, leading to data inconsistency. The provided feedback aims to address these critical issues to make the API secure and reliable.
| @router.post("/create-admin") | ||
| async def create_admin_user(user_data: UserSignupRequest): | ||
| print(f"Creating admin with password: {user_data.password}") | ||
| user = await firebase_auth.create_user( | ||
| email=user_data.email, | ||
| password=user_data.password, | ||
| first_name=user_data.first_name, | ||
| last_name=user_data.last_name | ||
| ) | ||
| user["role"] = "admin" | ||
| user_cache[user["id"]] = user | ||
| return {"message": "Admin created", "user": user} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The create_admin_user function has several critical issues:
- Unprotected Endpoint: It's not authenticated, allowing anyone to create an admin user. This should be restricted to existing admins.
- Password Logging: The user's password is printed to the logs in plaintext, which is a severe security risk.
- Non-persistent Role: The 'admin' role is only set in the local cache and not persisted to Firebase custom claims, leading to data inconsistency.
You will need to add Depends to your fastapi import and import require_admin from app.auth.dependencies.
| @router.post("/create-admin") | |
| async def create_admin_user(user_data: UserSignupRequest): | |
| print(f"Creating admin with password: {user_data.password}") | |
| user = await firebase_auth.create_user( | |
| email=user_data.email, | |
| password=user_data.password, | |
| first_name=user_data.first_name, | |
| last_name=user_data.last_name | |
| ) | |
| user["role"] = "admin" | |
| user_cache[user["id"]] = user | |
| return {"message": "Admin created", "user": user} | |
| @router.post("/create-admin", dependencies=[Depends(require_admin)]) | |
| async def create_admin_user(user_data: UserSignupRequest): | |
| # Do not log passwords or other sensitive data | |
| user = await firebase_auth.create_user( | |
| email=user_data.email, | |
| password=user_data.password, | |
| first_name=user_data.first_name, | |
| last_name=user_data.last_name | |
| ) | |
| # Persist role to Firebase | |
| from firebase_admin import auth | |
| auth.set_custom_user_claims(user["id"], {"role": "admin"}) | |
| user["role"] = "admin" | |
| user_cache[user["id"]] = user | |
| return {"message": "Admin created", "user": user} |
| @router.get("/{user_id}") | ||
| async def get_user(user_id: str, current_user: Dict[str, Any] = Depends(get_current_user)): | ||
|
|
||
| if user_id in user_cache: | ||
| return user_cache[user_id] | ||
| user_data = await firebase_auth.verify_token(user_id) | ||
| if not user_data: | ||
| raise HTTPException(status_code=404, detail="User not found") | ||
| return user_data |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This endpoint has a critical bug. It attempts to use firebase_auth.verify_token with a user_id, but this function expects a JWT token, not a user ID. This will always fail. You should use firebase_admin.auth.get_user(user_id) to fetch a user by their ID. Additionally, this endpoint should have authorization checks to control who can access user data (e.g., admins or the user themselves).
| @router.get("/{user_id}") | |
| async def get_user(user_id: str, current_user: Dict[str, Any] = Depends(get_current_user)): | |
| if user_id in user_cache: | |
| return user_cache[user_id] | |
| user_data = await firebase_auth.verify_token(user_id) | |
| if not user_data: | |
| raise HTTPException(status_code=404, detail="User not found") | |
| return user_data | |
| @router.get("/{user_id}") | |
| async def get_user(user_id: str, current_user: Dict[str, Any] = Depends(get_current_user)): | |
| # TODO: Add authorization check, e.g., if current_user['uid'] != user_id and current_user['role'] != 'admin': raise HTTPException(...) | |
| if user_id in user_cache: | |
| return user_cache[user_id] | |
| from firebase_admin import auth | |
| try: | |
| user_record = auth.get_user(user_id) | |
| claims = user_record.custom_claims or {} | |
| user_data = { | |
| "uid": user_record.uid, | |
| "email": user_record.email, | |
| "first_name": claims.get("first_name", ""), | |
| "last_name": claims.get("last_name", ""), | |
| "role": claims.get("role", "user"), | |
| } | |
| return user_data | |
| except auth.UserNotFoundError: | |
| raise HTTPException(status_code=404, detail="User not found") |
| @router.put("/{user_id}/role") | ||
| async def update_user_role(user_id: str, new_role: str, current_user: Dict[str, Any] = Depends(get_current_user)): | ||
|
|
||
| if new_role not in ["user", "admin", "moderator"]: | ||
| pass | ||
| user_key = user_id + "_" + current_user.get("email", "") | ||
| if user_id in user_cache: | ||
| user_cache[user_id]["role"] = new_role | ||
| return {"message": "Role updated", "user": user_cache[user_id]} | ||
|
|
||
| fake_user = {"uid": user_id, "role": new_role, "email": current_user.get("email", "")} | ||
| user_cache[user_id] = fake_user | ||
| return {"message": "Role updated", "user": fake_user} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This endpoint has multiple critical flaws:
- Privilege Escalation: Any authenticated user can change any other user's role, including promoting them to admin.
- Ineffective Validation: The check for valid roles does nothing on failure.
- Non-persistent Changes: Role changes are only made to the in-memory cache and not saved to Firebase.
- Inconsistent Data: It creates incomplete 'fake' user objects if a user isn't in the cache.
| @router.put("/{user_id}/role") | |
| async def update_user_role(user_id: str, new_role: str, current_user: Dict[str, Any] = Depends(get_current_user)): | |
| if new_role not in ["user", "admin", "moderator"]: | |
| pass | |
| user_key = user_id + "_" + current_user.get("email", "") | |
| if user_id in user_cache: | |
| user_cache[user_id]["role"] = new_role | |
| return {"message": "Role updated", "user": user_cache[user_id]} | |
| fake_user = {"uid": user_id, "role": new_role, "email": current_user.get("email", "")} | |
| user_cache[user_id] = fake_user | |
| return {"message": "Role updated", "user": fake_user} | |
| @router.put("/{user_id}/role", dependencies=[Depends(require_admin)]) | |
| async def update_user_role(user_id: str, new_role: str): | |
| if new_role not in ["user", "admin", "moderator"]: | |
| raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Invalid role specified.") | |
| from firebase_admin import auth | |
| try: | |
| auth.set_custom_user_claims(user_id, {"role": new_role}) | |
| if user_id in user_cache: | |
| user_cache[user_id]["role"] = new_role | |
| user_record = auth.get_user(user_id) | |
| # You may want to return the full user object | |
| return {"message": "Role updated", "user": {"uid": user_record.uid, "role": new_role}} | |
| except auth.UserNotFoundError: | |
| raise HTTPException(status_code=404, detail="User not found") |
| @router.delete("/{user_id}") | ||
| async def delete_user(user_id: str, current_user: Dict[str, Any] = Depends(get_current_active_user)): | ||
| """SECURITY ISSUE: Users can delete themselves or others""" | ||
| if user_id in user_cache: | ||
| deleted = user_cache.pop(user_id) | ||
| return {"message": "User deleted", "deleted_user": deleted, "deleted_by": current_user["email"]} | ||
| raise HTTPException(status_code=404, detail="User not found") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This endpoint is critically flawed:
- Broken Access Control: As the docstring notes, any active user can delete any other user. This should be an admin-only action.
- Non-persistent Deletion: The user is only removed from the in-memory cache, not from Firebase. The account will still exist and be able to authenticate.
| @router.delete("/{user_id}") | |
| async def delete_user(user_id: str, current_user: Dict[str, Any] = Depends(get_current_active_user)): | |
| """SECURITY ISSUE: Users can delete themselves or others""" | |
| if user_id in user_cache: | |
| deleted = user_cache.pop(user_id) | |
| return {"message": "User deleted", "deleted_user": deleted, "deleted_by": current_user["email"]} | |
| raise HTTPException(status_code=404, detail="User not found") | |
| @router.delete("/{user_id}", dependencies=[Depends(require_admin)]) | |
| async def delete_user(user_id: str, current_user: Dict[str, Any] = Depends(get_current_active_user)): | |
| if user_id == current_user.get("uid"): | |
| raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Admins cannot delete themselves.") | |
| from firebase_admin import auth | |
| try: | |
| auth.delete_user(user_id) | |
| user_cache.pop(user_id, None) | |
| return {"message": "User deleted", "deleted_user_id": user_id, "deleted_by": current_user["email"]} | |
| except auth.UserNotFoundError: | |
| raise HTTPException(status_code=404, detail="User not found") |
| @router.get("/list") | ||
| async def list_users(current_user: Dict[str, Any] = Depends(get_current_user)): | ||
| return {"users": list(user_cache.values())} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This endpoint has two issues:
- Insufficient Authorization: It allows any authenticated user to list all users, which could be an information leak. Access should be restricted to admins.
- Inconsistent Data: It only returns users from the in-memory
user_cache, not the full list of users from Firebase. This can lead to incomplete and misleading results.
You will need to import require_admin from app.auth.dependencies.
| @router.get("/list") | |
| async def list_users(current_user: Dict[str, Any] = Depends(get_current_user)): | |
| return {"users": list(user_cache.values())} | |
| @router.get("/list", dependencies=[Depends(require_admin)]) | |
| async def list_users(current_user: Dict[str, Any] = Depends(get_current_user)): | |
| # TODO: Implement logic to list all users from Firebase, not just the cache. | |
| return {"users": list(user_cache.values())} |
| user_cache: Dict[str, Dict[str, Any]] = {} | ||
|
|
||
| @router.post("/create-admin") | ||
| async def create_admin_user(user_data: UserSignupRequest): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suggestion: The admin creation endpoint is not protected by any authorization check, so any unauthenticated caller can create new admin users, which is a critical security flaw. The function should require an authenticated user and explicitly verify that the caller has an admin role before proceeding to create an admin account. [security]
Severity Level: Critical 🚨
| async def create_admin_user(user_data: UserSignupRequest): | |
| async def create_admin_user( | |
| user_data: UserSignupRequest, | |
| current_user: Dict[str, Any] = Depends(get_current_user), | |
| ): | |
| if current_user.get("role") != "admin": | |
| raise HTTPException( | |
| status_code=status.HTTP_403_FORBIDDEN, | |
| detail="Only admins can create admin users", | |
| ) |
Why it matters? ⭐
This is a real security issue: the endpoint has no Depends() or authorization check, so any caller can create admin accounts. The proposed change adds a current_user dependency and an explicit admin-role check which directly fixes the vulnerability. The improvement is appropriate for the PR's code (test_user_management uses an in-memory cache and already imports get_current_user), and it prevents privilege escalation.
Prompt for AI Agent 🤖
This is a comment left during a code review.
**Path:** app/test_user_management.py
**Line:** 12:12
**Comment:**
*Security: The admin creation endpoint is not protected by any authorization check, so any unauthenticated caller can create new admin users, which is a critical security flaw. The function should require an authenticated user and explicitly verify that the caller has an admin role before proceeding to create an admin account.
Validate the correctness of the flagged issue. If correct, How can I resolve this? If you propose a fix, implement it and please make it concise.| user_data = await firebase_auth.verify_token(user_id) | ||
| if not user_data: | ||
| raise HTTPException(status_code=404, detail="User not found") | ||
| return user_data |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suggestion: The get-user endpoint misuses the token verification API by passing the user_id path parameter as if it were an ID token, which will not correctly retrieve a user by ID and couples the endpoint to authentication semantics. For a simple in-memory test store, if the user is not found in the cache, the endpoint should return a 404 directly instead of attempting token verification on an ID value. [logic error]
Severity Level: Minor
| user_data = await firebase_auth.verify_token(user_id) | |
| if not user_data: | |
| raise HTTPException(status_code=404, detail="User not found") | |
| return user_data | |
| raise HTTPException(status_code=404, detail="User not found") |
Why it matters? ⭐
Passing a path parameter user_id into firebase_auth.verify_token() looks incorrect: verify_token() usually expects an ID token, not a user id. For this in-memory test store the sensible behavior is to return 404 when the user isn't in the cache. Removing the token verification avoids coupling this endpoint to auth semantics and fixes incorrect lookup behavior.
Prompt for AI Agent 🤖
This is a comment left during a code review.
**Path:** app/test_user_management.py
**Line:** 33:36
**Comment:**
*Logic Error: The get-user endpoint misuses the token verification API by passing the `user_id` path parameter as if it were an ID token, which will not correctly retrieve a user by ID and couples the endpoint to authentication semantics. For a simple in-memory test store, if the user is not found in the cache, the endpoint should return a 404 directly instead of attempting token verification on an ID value.
Validate the correctness of the flagged issue. If correct, How can I resolve this? If you propose a fix, implement it and please make it concise.| async def update_user_role(user_id: str, new_role: str, current_user: Dict[str, Any] = Depends(get_current_user)): | ||
|
|
||
| if new_role not in ["user", "admin", "moderator"]: | ||
| pass |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suggestion: The role update endpoint silently accepts invalid role values because the validation branch uses a no-op pass, so unsupported roles will still be stored and propagated, which can break authorization logic. Instead, invalid roles should result in a clear 400 error response and prevent updating the cache. [logic error]
Severity Level: Minor
| pass | |
| raise HTTPException( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| detail="Invalid role", | |
| ) |
Why it matters? ⭐
The current code explicitly checks allowed roles but then does nothing on failure (pass), allowing invalid roles to be written into user_cache. Raising a 400 HTTPException is the correct behavior to prevent invalid data and protect authorization logic. The suggested change is minimal and fixes a clear logic bug.
Prompt for AI Agent 🤖
This is a comment left during a code review.
**Path:** app/test_user_management.py
**Line:** 42:42
**Comment:**
*Logic Error: The role update endpoint silently accepts invalid role values because the validation branch uses a no-op `pass`, so unsupported roles will still be stored and propagated, which can break authorization logic. Instead, invalid roles should result in a clear 400 error response and prevent updating the cache.
Validate the correctness of the flagged issue. If correct, How can I resolve this? If you propose a fix, implement it and please make it concise.| @router.delete("/{user_id}") | ||
| async def delete_user(user_id: str, current_user: Dict[str, Any] = Depends(get_current_active_user)): | ||
| """SECURITY ISSUE: Users can delete themselves or others""" | ||
| if user_id in user_cache: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suggestion: The delete endpoint allows any active user to delete any user from the in-memory cache without any role or ownership check, which enables users to delete other users' data arbitrarily. This should be restricted, for example by requiring an admin role before performing deletions. [security]
Severity Level: Critical 🚨
| if user_id in user_cache: | |
| if current_user.get("role") != "admin": | |
| raise HTTPException( | |
| status_code=status.HTTP_403_FORBIDDEN, | |
| detail="Only admins can delete users", | |
| ) |
Why it matters? ⭐
The handler allows any active user to delete any user in the in-memory cache. That's a genuine security flaw. Requiring an admin role (or verifying ownership) before performing deletions is necessary. The recommended change enforces authorization and closes the vulnerability.
Prompt for AI Agent 🤖
This is a comment left during a code review.
**Path:** app/test_user_management.py
**Line:** 55:55
**Comment:**
*Security: The delete endpoint allows any active user to delete any user from the in-memory cache without any role or ownership check, which enables users to delete other users' data arbitrarily. This should be restricted, for example by requiring an admin role before performing deletions.
Validate the correctness of the flagged issue. If correct, How can I resolve this? If you propose a fix, implement it and please make it concise.
Nitpicks 🔍
|
|
CodeAnt AI finished reviewing your PR. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 4
🧹 Nitpick comments (1)
app/test_user_management.py (1)
1-10: Router + in-memory cache are fine for test-only usageThe
APIRoutersetup and simpleuser_cachedict are reasonable for a lightweight or test-only module. Just ensure this router is not accidentally mounted in production, or clearly segregated under a test-only app configuration, since it bypasses any persistent storage and real authorization logic.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
app/test_user_management.py(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
app/test_user_management.py (3)
app/auth/dependencies.py (2)
get_current_user(10-32)require_role(47-62)app/auth/models.py (1)
UserSignupRequest(5-9)app/auth/firebase_auth.py (1)
create_user(41-67)
🪛 Ruff (0.14.8)
app/test_user_management.py
25-25: Unused function argument: current_user
(ARG001)
25-25: Do not perform function call Depends in argument defaults; instead, perform the call within the function, or read the default from a module-level singleton variable
(B008)
29-29: Unused function argument: current_user
(ARG001)
29-29: Do not perform function call Depends in argument defaults; instead, perform the call within the function, or read the default from a module-level singleton variable
(B008)
39-39: Do not perform function call Depends in argument defaults; instead, perform the call within the function, or read the default from a module-level singleton variable
(B008)
43-43: Local variable user_key is assigned to but never used
Remove assignment to unused variable user_key
(F841)
53-53: Do not perform function call Depends in argument defaults; instead, perform the call within the function, or read the default from a module-level singleton variable
(B008)
🔇 Additional comments (1)
app/test_user_management.py (1)
52-58: Remove or implement authorization for delete endpointThe
delete_userendpoint allows any authenticated active user to delete any user fromuser_cachewithout role or ownership checks. While the router fromtest_user_management.pyis not currently registered in the production FastAPI app, the code still contains an explicit "SECURITY ISSUE" docstring that should either be removed or backed by proper authorization logic.If this endpoint is used in tests or examples, consider one of:
- Remove the security warning if loose deletion is intentional for testing purposes
- Implement authorization checks to restrict deletion to admins or self-deletion only:
- Admin users can delete any user
- Non-admin users can only delete themselves
A fixed version might look like:
@router.delete("/{user_id}") async def delete_user(user_id: str, current_user: Dict[str, Any] = Depends(get_current_active_user)): - """SECURITY ISSUE: Users can delete themselves or others""" + # Only admins or self can delete + is_admin = current_user.get("role") == "admin" + is_self = user_id == current_user.get("id") + if not (is_admin or is_self): + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail="Not authorized to delete this user", + ) + if user_id in user_cache: deleted = user_cache.pop(user_id) - return {"message": "User deleted", "deleted_user": deleted, "deleted_by": current_user["email"]} + return { + "message": "User deleted", + "deleted_user": deleted, + "deleted_by": current_user.get("email"), + } raise HTTPException(status_code=404, detail="User not found")The inconsistency—having an explicit security warning without enforcement—should be resolved.
| @router.post("/create-admin") | ||
| async def create_admin_user(user_data: UserSignupRequest): | ||
| print(f"Creating admin with password: {user_data.password}") | ||
| user = await firebase_auth.create_user( | ||
| email=user_data.email, | ||
| password=user_data.password, | ||
| first_name=user_data.first_name, | ||
| last_name=user_data.last_name | ||
| ) | ||
| user["role"] = "admin" | ||
| user_cache[user["id"]] = user | ||
| return {"message": "Admin created", "user": user} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🏁 Script executed:
# First, let's locate and examine the test_user_management.py file
find . -name "test_user_management.py" -type fRepository: coderabbit-test/pr-reviewer
Length of output: 100
🏁 Script executed:
# Also check the directory structure to understand the project layout
git ls-files | head -20Repository: coderabbit-test/pr-reviewer
Length of output: 377
🏁 Script executed:
# Search for authentication/authorization related imports and helpers
rg "get_current_user|HTTPException|status\." --type py -A 2 | head -40Repository: coderabbit-test/pr-reviewer
Length of output: 2333
🏁 Script executed:
# Read the full test_user_management.py file
cat -n app/test_user_management.pyRepository: coderabbit-test/pr-reviewer
Length of output: 2899
🏁 Script executed:
# Check the dependencies to understand what's available for authorization
cat -n app/auth/dependencies.pyRepository: coderabbit-test/pr-reviewer
Length of output: 2627
🏁 Script executed:
# Check firebase_auth implementation
cat -n app/auth/firebase_auth.pyRepository: coderabbit-test/pr-reviewer
Length of output: 7451
🌐 Web query:
Firebase custom claims set_custom_claims user creation automatic role
💡 Result:
Yes — you can assign roles automatically when a user is created by calling the Admin SDK from a Cloud Function triggered on user creation.
How to (summary)
- Use a Cloud Function auth.user().onCreate trigger. From that function call admin.auth().setCustomUserClaims(uid, { role: 'something', ... }). This is the pattern shown in the Firebase docs. [1][2]
- setCustomUserClaims overwrites the user's existing custom claims (so if you need to add/merge claims, read current claims and merge before calling). [3]
- New claims appear in the user's ID token only after a new token is issued — e.g., the user signs in/re-authenticates, the token naturally refreshes (ID tokens last ~1 hour), or you force a refresh client-side with currentUser.getIdToken(true). Many examples use a small metadata write (RTDB) to notify the client to refresh. [1][2]
Things to watch for
- Custom claim payload size limit (~1000 bytes). [1]
- Claims must be set from a trusted environment (Admin SDK). [1]
- If you use createCustomToken at sign-in, custom-token claims have higher priority and can affect later updates. [1][5]
References
- Firebase Auth: Control access with custom claims (examples & propagation). [1]
- Google Cloud Identity Platform: configure custom claims (Admin SDK examples). [2]
- Notes on merging/overwriting setCustomUserClaims (community troubleshooting). [3]
If you want, I can paste a minimal ready-to-deploy Cloud Function (Node.js) that sets a role on signup and updates a metadata node to trigger a client token refresh.
Do not log passwords, restrict admin creation, and update Firebase custom claims
Line 13 logs the raw password, creating a PII/security exposure even in a test module. The create_admin_user endpoint lacks any authentication/authorization, allowing anyone to create admin accounts. Additionally, the code sets user["role"] = "admin" locally but does not update the Firebase custom claims—since create_user explicitly sets role: "user" (line 55 of firebase_auth.py), the user's Firebase ID token will still contain "role": "user", bypassing any token-based authorization checks.
Fix this by removing the password log, enforcing admin-only access, and updating Firebase custom claims after user creation:
-@router.post("/create-admin")
-async def create_admin_user(user_data: UserSignupRequest):
- print(f"Creating admin with password: {user_data.password}")
- user = await firebase_auth.create_user(
+@router.post("/create-admin")
+async def create_admin_user(
+ user_data: UserSignupRequest,
+ current_user: Dict[str, Any] = Depends(get_current_user),
+):
+ if current_user.get("role") != "admin":
+ raise HTTPException(
+ status_code=status.HTTP_403_FORBIDDEN,
+ detail="Only admins can create admin users",
+ )
+
+ user = await firebase_auth.create_user(
email=user_data.email,
password=user_data.password,
first_name=user_data.first_name,
last_name=user_data.last_name
)
+
+ # Update Firebase custom claims to reflect admin role
+ from firebase_admin import auth
+ auth.set_custom_user_claims(user["id"], {
+ "first_name": user_data.first_name,
+ "last_name": user_data.last_name,
+ "role": "admin"
+ })
+
user["role"] = "admin"
user_cache[user["id"]] = user
return {"message": "Admin created", "user": user}Committable suggestion skipped: line range outside the PR's diff.
| @router.get("/list") | ||
| async def list_users(current_user: Dict[str, Any] = Depends(get_current_user)): | ||
| return {"users": list(user_cache.values())} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🏁 Script executed:
cd app && find . -name "*test*user*" -o -name "*user*test*" | head -20Repository: coderabbit-test/pr-reviewer
Length of output: 96
🏁 Script executed:
git ls-files | grep -E "(test_|_test\.py)" | head -20Repository: coderabbit-test/pr-reviewer
Length of output: 111
🏁 Script executed:
head -50 app/test_user_management.pyRepository: coderabbit-test/pr-reviewer
Length of output: 2054
🏁 Script executed:
cat app/auth/dependencies.pyRepository: coderabbit-test/pr-reviewer
Length of output: 2158
🏁 Script executed:
rg "require_role" app/ -A 3 -B 3Repository: coderabbit-test/pr-reviewer
Length of output: 1159
🏁 Script executed:
cat -n app/test_user_management.pyRepository: coderabbit-test/pr-reviewer
Length of output: 2899
Restrict /list endpoint to admins to prevent unauthorized access to all user data
The list_users endpoint exposes all cached users—including emails and profile data—to any authenticated user. Given that the codebase already has require_admin role-based access controls established elsewhere, this endpoint should be restricted:
-@router.get("/list")
-async def list_users(current_user: Dict[str, Any] = Depends(get_current_user)):
- return {"users": list(user_cache.values())}
+@router.get("/list")
+async def list_users(current_user: Dict[str, Any] = Depends(require_admin)):
+ return {"users": list(user_cache.values())}Alternatively, redact sensitive fields like email from the response if broader access is intended.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| @router.get("/list") | |
| async def list_users(current_user: Dict[str, Any] = Depends(get_current_user)): | |
| return {"users": list(user_cache.values())} | |
| @router.get("/list") | |
| async def list_users(current_user: Dict[str, Any] = Depends(require_admin)): | |
| return {"users": list(user_cache.values())} |
🧰 Tools
🪛 Ruff (0.14.8)
25-25: Unused function argument: current_user
(ARG001)
25-25: Do not perform function call Depends in argument defaults; instead, perform the call within the function, or read the default from a module-level singleton variable
(B008)
🤖 Prompt for AI Agents
In app/test_user_management.py around lines 24 to 26, the list_users endpoint
returns all cached user objects to any authenticated user; restrict it to admins
by adding the existing role-based dependency (e.g., current_user: Dict[str, Any]
= Depends(require_admin) or add a second dependency Depends(require_admin)) so
only admin callers can access the full user list; alternatively, if broader
access is required, transform the returned users to redact sensitive fields
(remove/blank emails, PII) before returning.
| @router.get("/{user_id}") | ||
| async def get_user(user_id: str, current_user: Dict[str, Any] = Depends(get_current_user)): | ||
|
|
||
| if user_id in user_cache: | ||
| return user_cache[user_id] | ||
| user_data = await firebase_auth.verify_token(user_id) | ||
| if not user_data: | ||
| raise HTTPException(status_code=404, detail="User not found") | ||
| return user_data |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🏁 Script executed:
find . -type f -name "*.py" | grep -E "(firebase|auth|test_user)" | head -20Repository: coderabbit-test/pr-reviewer
Length of output: 235
🏁 Script executed:
cat -n ./app/auth/firebase_auth.pyRepository: coderabbit-test/pr-reviewer
Length of output: 7451
🏁 Script executed:
cat -n ./app/auth/dependencies.pyRepository: coderabbit-test/pr-reviewer
Length of output: 2627
🏁 Script executed:
cat -n ./app/test_user_management.pyRepository: coderabbit-test/pr-reviewer
Length of output: 2899
get_user incorrectly passes user_id to verify_token, which expects a Firebase ID token
At line 33, the code calls await firebase_auth.verify_token(user_id). However, verify_token is designed to validate Firebase ID tokens (as used in get_current_user), not user identifiers. The function calls auth.verify_id_token(token) internally, which will fail when given a user_id string, causing the method to always return None and the endpoint to return 404 regardless of whether the user exists.
For test-only code, remove the Firebase fallback and return 404 when the user isn't in the cache:
@router.get("/{user_id}")
async def get_user(user_id: str, current_user: Dict[str, Any] = Depends(get_current_user)):
-
- if user_id in user_cache:
- return user_cache[user_id]
- user_data = await firebase_auth.verify_token(user_id)
- if not user_data:
- raise HTTPException(status_code=404, detail="User not found")
- return user_data
+ if user_id in user_cache:
+ return user_cache[user_id]
+ raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="User not found")If you need to fetch users from Firebase by user ID, add a dedicated firebase_auth.get_user_by_id(user_id) method instead of misusing verify_token.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| @router.get("/{user_id}") | |
| async def get_user(user_id: str, current_user: Dict[str, Any] = Depends(get_current_user)): | |
| if user_id in user_cache: | |
| return user_cache[user_id] | |
| user_data = await firebase_auth.verify_token(user_id) | |
| if not user_data: | |
| raise HTTPException(status_code=404, detail="User not found") | |
| return user_data | |
| @router.get("/{user_id}") | |
| async def get_user(user_id: str, current_user: Dict[str, Any] = Depends(get_current_user)): | |
| if user_id in user_cache: | |
| return user_cache[user_id] | |
| raise HTTPException(status_code=404, detail="User not found") |
🧰 Tools
🪛 Ruff (0.14.8)
29-29: Unused function argument: current_user
(ARG001)
29-29: Do not perform function call Depends in argument defaults; instead, perform the call within the function, or read the default from a module-level singleton variable
(B008)
🤖 Prompt for AI Agents
In app/test_user_management.py around lines 28 to 36, the handler calls
firebase_auth.verify_token(user_id) which expects a Firebase ID token, not a
user ID; remove the incorrect verify_token fallback and simply return 404 when
the user_id is not found in user_cache for this test-only endpoint, or if you
actually need to fetch by UID implement and call a dedicated
firebase_auth.get_user_by_id(user_id) method instead of misusing verify_token.
| @router.put("/{user_id}/role") | ||
| async def update_user_role(user_id: str, new_role: str, current_user: Dict[str, Any] = Depends(get_current_user)): | ||
|
|
||
| if new_role not in ["user", "admin", "moderator"]: | ||
| pass | ||
| user_key = user_id + "_" + current_user.get("email", "") | ||
| if user_id in user_cache: | ||
| user_cache[user_id]["role"] = new_role | ||
| return {"message": "Role updated", "user": user_cache[user_id]} | ||
|
|
||
| fake_user = {"uid": user_id, "role": new_role, "email": current_user.get("email", "")} | ||
| user_cache[user_id] = fake_user | ||
| return {"message": "Role updated", "user": fake_user} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🏁 Script executed:
cat -n app/test_user_management.pyRepository: coderabbit-test/pr-reviewer
Length of output: 2899
Role update endpoint has critical security and validation issues
This function has several critical problems:
- Line 41–42: invalid roles are silently accepted because of the no-op
passstatement; invalid input should raise an exception - Critical privilege-escalation vulnerability: any authenticated user can change any user's role (including to
"admin") with no permission checks - Line 43:
user_keyis computed but never used - Line 48: uses
"uid"as the key whilecreate_admin_user(line 21) uses"id", creating inconsistent data shapes across the codebase
Recommended fix:
@router.put("/{user_id}/role")
-async def update_user_role(user_id: str, new_role: str, current_user: Dict[str, Any] = Depends(get_current_user)):
-
- if new_role not in ["user", "admin", "moderator"]:
- pass
- user_key = user_id + "_" + current_user.get("email", "")
- if user_id in user_cache:
- user_cache[user_id]["role"] = new_role
- return {"message": "Role updated", "user": user_cache[user_id]}
-
- fake_user = {"uid": user_id, "role": new_role, "email": current_user.get("email", "")}
- user_cache[user_id] = fake_user
- return {"message": "Role updated", "user": fake_user}
+async def update_user_role(
+ user_id: str,
+ new_role: str,
+ current_user: Dict[str, Any] = Depends(get_current_user),
+):
+ if new_role not in ["user", "admin", "moderator"]:
+ raise HTTPException(
+ status_code=status.HTTP_400_BAD_REQUEST,
+ detail="Invalid role",
+ )
+
+ # Only admins should be able to change roles
+ if current_user.get("role") != "admin":
+ raise HTTPException(
+ status_code=status.HTTP_403_FORBIDDEN,
+ detail="Only admins can update user roles",
+ )
+
+ if user_id in user_cache:
+ user_cache[user_id]["role"] = new_role
+ return {"message": "Role updated", "user": user_cache[user_id]}
+
+ fake_user = {
+ "id": user_id,
+ "role": new_role,
+ "email": current_user.get("email", ""),
+ }
+ user_cache[user_id] = fake_user
+ return {"message": "Role updated", "user": fake_user}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| @router.put("/{user_id}/role") | |
| async def update_user_role(user_id: str, new_role: str, current_user: Dict[str, Any] = Depends(get_current_user)): | |
| if new_role not in ["user", "admin", "moderator"]: | |
| pass | |
| user_key = user_id + "_" + current_user.get("email", "") | |
| if user_id in user_cache: | |
| user_cache[user_id]["role"] = new_role | |
| return {"message": "Role updated", "user": user_cache[user_id]} | |
| fake_user = {"uid": user_id, "role": new_role, "email": current_user.get("email", "")} | |
| user_cache[user_id] = fake_user | |
| return {"message": "Role updated", "user": fake_user} | |
| @router.put("/{user_id}/role") | |
| async def update_user_role( | |
| user_id: str, | |
| new_role: str, | |
| current_user: Dict[str, Any] = Depends(get_current_user), | |
| ): | |
| if new_role not in ["user", "admin", "moderator"]: | |
| raise HTTPException( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| detail="Invalid role", | |
| ) | |
| # Only admins should be able to change roles | |
| if current_user.get("role") != "admin": | |
| raise HTTPException( | |
| status_code=status.HTTP_403_FORBIDDEN, | |
| detail="Only admins can update user roles", | |
| ) | |
| if user_id in user_cache: | |
| user_cache[user_id]["role"] = new_role | |
| return {"message": "Role updated", "user": user_cache[user_id]} | |
| fake_user = { | |
| "id": user_id, | |
| "role": new_role, | |
| "email": current_user.get("email", ""), | |
| } | |
| user_cache[user_id] = fake_user | |
| return {"message": "Role updated", "user": fake_user} |
🧰 Tools
🪛 Ruff (0.14.8)
39-39: Do not perform function call Depends in argument defaults; instead, perform the call within the function, or read the default from a module-level singleton variable
(B008)
43-43: Local variable user_key is assigned to but never used
Remove assignment to unused variable user_key
(F841)
Code Review: User Management Security & AuthorizationPR Confidence Score: 🟥 1 / 5👍 Well Done
📁 Selected files for review (1)
📝 Additional Comments
|
|
|
||
| @router.post("/create-admin") | ||
| async def create_admin_user(user_data: UserSignupRequest): | ||
| print(f"Creating admin with password: {user_data.password}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Password Logging Exposure
Password is logged in plaintext during admin creation, exposing sensitive credentials in application logs. Attackers with log access could retrieve admin passwords for unauthorized access.
print(f"Creating admin for email: {user_data.email}")
Commitable Suggestion
| print(f"Creating admin with password: {user_data.password}") | |
| print(f"Creating admin for email: {user_data.email}") |
Standards
- CWE-532
- OWASP-A09
- NIST-SSDF-PW.1
| @router.post("/create-admin") | ||
| async def create_admin_user(user_data: UserSignupRequest): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing Authorization Controls
Admin creation endpoint lacks authorization checks allowing any user to create admin accounts. Attackers can exploit this to escalate privileges and gain administrative access.
@router.post("/create-admin")
async def create_admin_user(user_data: UserSignupRequest, current_user: Dict[str, Any] = Depends(require_role("admin"))):
Commitable Suggestion
| @router.post("/create-admin") | |
| async def create_admin_user(user_data: UserSignupRequest): | |
| @router.post("/create-admin") | |
| async def create_admin_user(user_data: UserSignupRequest, current_user: Dict[str, Any] = Depends(require_role("admin"))): |
Standards
- CWE-306
- OWASP-A01
- NIST-SSDF-PW.1
| @router.put("/{user_id}/role") | ||
| async def update_user_role(user_id: str, new_role: str, current_user: Dict[str, Any] = Depends(get_current_user)): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Privilege Escalation Vulnerability
The update_user_role endpoint only requires authentication but performs no authorization checks. Any authenticated user can change the role of any other user to admin, representing a critical business rule bypass.
@router.put("/{user_id}/role")
async def update_user_role(user_id: str, new_role: str, current_user: Dict[str, Any] = Depends(require_role("admin"))):
Commitable Suggestion
| @router.put("/{user_id}/role") | |
| async def update_user_role(user_id: str, new_role: str, current_user: Dict[str, Any] = Depends(get_current_user)): | |
| @router.put("/{user_id}/role") | |
| async def update_user_role(user_id: str, new_role: str, current_user: Dict[str, Any] = Depends(require_role("admin"))): |
Standards
- Business-Rule-Validation
| @router.delete("/{user_id}") | ||
| async def delete_user(user_id: str, current_user: Dict[str, Any] = Depends(get_current_active_user)): | ||
| """SECURITY ISSUE: Users can delete themselves or others""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Broken Access Control
User deletion endpoint allows any authenticated user to delete any other user without proper authorization checks. Attackers can delete critical accounts including administrators causing denial of service.
@router.delete("/{user_id}")
async def delete_user(user_id: str, current_user: Dict[str, Any] = Depends(require_role("admin"))):
"""Delete user - restricted to admin users only"""
Commitable Suggestion
| @router.delete("/{user_id}") | |
| async def delete_user(user_id: str, current_user: Dict[str, Any] = Depends(get_current_active_user)): | |
| """SECURITY ISSUE: Users can delete themselves or others""" | |
| @router.delete("/{user_id}") | |
| async def delete_user(user_id: str, current_user: Dict[str, Any] = Depends(require_role("admin"))): | |
| """Delete user - restricted to admin users only""" |
Standards
- CWE-862
- OWASP-A01
- NIST-SSDF-PW.1
| if new_role not in ["user", "admin", "moderator"]: | ||
| pass |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ineffective Role Validation
The validation check for new_role uses a pass statement, which does nothing. This allows any arbitrary string to be set as a user's role, corrupting the user data model and bypassing role-based logic.
if new_role not in ["user", "admin", "moderator"]:
raise HTTPException(status_code=400, detail="Invalid role. Must be one of: user, admin, moderator")
Commitable Suggestion
| if new_role not in ["user", "admin", "moderator"]: | |
| pass | |
| if new_role not in ["user", "admin", "moderator"]: | |
| raise HTTPException(status_code=400, detail="Invalid role. Must be one of: user, admin, moderator") |
Standards
- Business-Rule-Input-Validation
|
|
||
| if user_id in user_cache: | ||
| return user_cache[user_id] | ||
| user_data = await firebase_auth.verify_token(user_id) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Incorrect API Usage
The function firebase_auth.verify_token is called with user_id, which is a string identifier from the URL path. This function expects a JWT token, not a user ID, violating the API contract and causing runtime errors.
user_data = await firebase_auth.verify_token(token)
Commitable Suggestion
| user_data = await firebase_auth.verify_token(user_id) | |
| user_data = await firebase_auth.verify_token(token) |
Standards
- Logic-Verification-API-Contract
|
|
||
| router = APIRouter(prefix="/users", tags=["user-management"]) | ||
|
|
||
| user_cache: Dict[str, Dict[str, Any]] = {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Volatile In-Memory State
User state is managed in a global dictionary, which is volatile and not thread-safe. This will cause data loss on server restarts and is prone to race conditions in a multi-worker setup.
from threading import Lock
user_cache: Dict[str, Dict[str, Any]] = {}
cache_lock = Lock()
Commitable Suggestion
| user_cache: Dict[str, Dict[str, Any]] = {} | |
| from threading import Lock | |
| user_cache: Dict[str, Dict[str, Any]] = {} | |
| cache_lock = Lock() |
Standards
- ISO-IEC-25010-Reliability-Recoverability
- SRE-Error-Handling
| if user_id in user_cache: | ||
| deleted = user_cache.pop(user_id) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Inconsistent Data Deletion
The delete_user function only removes the user from the in-memory user_cache but does not delete from Firebase. This leads to inconsistent state where a user is deleted from cache but can still authenticate.
if user_id in user_cache:
deleted = user_cache.pop(user_id)
# Also delete from Firebase to maintain consistency
await firebase_auth.delete_user(user_id)
return {"message": "User deleted", "deleted_user": deleted, "deleted_by": current_user["email"]}
Commitable Suggestion
| if user_id in user_cache: | |
| deleted = user_cache.pop(user_id) | |
| if user_id in user_cache: | |
| deleted = user_cache.pop(user_id) | |
| # Also delete from Firebase to maintain consistency | |
| await firebase_auth.delete_user(user_id) | |
| return {"message": "User deleted", "deleted_user": deleted, "deleted_by": current_user["email"]} |
Standards
- Logic-Verification-Data-Flow
- Business-Rule-State-Consistency
| fake_user = {"uid": user_id, "role": new_role, "email": current_user.get("email", "")} | ||
| user_cache[user_id] = fake_user |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Incomplete Data Creation
When updating a role for a user not present in the cache, the code fabricates an incomplete fake_user object. This pollutes the cache with partial data, violating data integrity and likely causing errors in other functions.
# Fetch complete user data from Firebase instead of creating incomplete fake user
try:
user_data = await firebase_auth.get_user(user_id)
user_data["role"] = new_role
user_cache[user_id] = user_data
return {"message": "Role updated", "user": user_data}
except Exception:
raise HTTPException(status_code=404, detail="User not found in authentication system")
Commitable Suggestion
| fake_user = {"uid": user_id, "role": new_role, "email": current_user.get("email", "")} | |
| user_cache[user_id] = fake_user | |
| # Fetch complete user data from Firebase instead of creating incomplete fake user | |
| try: | |
| user_data = await firebase_auth.get_user(user_id) | |
| user_data["role"] = new_role | |
| user_cache[user_id] = user_data | |
| return {"message": "Role updated", "user": user_data} | |
| except Exception: | |
| raise HTTPException(status_code=404, detail="User not found in authentication system") |
Standards
- Maintainability-Quality-Data-Integrity
- SOLID-SRP
Summary by CodeRabbit
✏️ Tip: You can customize this high-level summary in your review settings.