feat: Ingestion logs enhancement#1598
Conversation
WalkthroughThis PR introduces two FastAPI endpoints for ingestion job management: one listing jobs with optional query filters and one retrieving individual jobs by ID. The implementation uses Pydantic models for data validation, in-file mock data for storage, and integrates with the existing public API route registry. ChangesIngestion Jobs API
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~20 minutes Suggested labels
🚥 Pre-merge checks | ✅ 3 | ❌ 2❌ Failed checks (1 warning, 1 inconclusive)
✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Warning Review ran into problems🔥 ProblemsGit: Failed to clone repository. Please run the Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 5
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@src/api/v1/ingestion_jobs.py`:
- Around line 546-587: Add pagination to the list_ingestion_jobs endpoint:
modify list_ingestion_jobs to accept limit and offset Query params (e.g., limit
with default 50 and bounds, offset default 0), compute total =
len(filtered_jobs) after applying filters on MOCK_JOBS, then slice jobs =
jobs[offset: offset + limit], build summaries from the sliced jobs and return an
IngestionJobListResponse containing total and the paginated jobs (and optionally
include limit/offset/has_more fields in the response model if needed); update
references in the function (list_ingestion_jobs, MOCK_JOBS, IngestionJobSummary,
IngestionJobListResponse) and keep get_api_key_user_async dependency unchanged.
- Around line 546-587: The endpoint list_ingestion_jobs currently returns a raw
JSONResponse and lacks a declared response model; update the FastAPI route
signature to include response_model=IngestionJobListResponse, and return the
Pydantic model instance (IngestionJobListResponse(total=..., jobs=...)) directly
instead of JSONResponse(response.model_dump()); keep the filtering logic over
MOCK_JOBS and construction of IngestionJobSummary unchanged, but ensure the
function signature still depends on get_api_key_user_async so OpenAPI and
response validation use IngestionJobListResponse and IngestionJobSummary types.
- Around line 546-599: Both endpoints accept user: User =
Depends(get_api_key_user_async) but never enforce ownership; update
list_ingestion_jobs to filter MOCK_JOBS by job.user_id == user.user_id before
building IngestionJobSummary (so only the authenticated user's jobs are
returned) and update get_ingestion_job to verify the retrieved job's user_id
matches user.user_id and return 404 if not, using the existing MOCK_JOBS,
list_ingestion_jobs, get_ingestion_job, and User.user_id symbols to locate the
changes.
- Around line 590-599: The get_ingestion_job endpoint currently returns a
JSONResponse directly and lacks a response_model; update the function signature
for get_ingestion_job to declare the proper response_model (the Pydantic model
used for ingestion jobs) and keep the Depends(User=get_api_key_user_async) auth,
then replace the JSONResponse-not-found branch with raise
HTTPException(status_code=404, detail="Ingestion job not found") and return the
job model instance (or job.model_dump() if your response_model expects dict) so
FastAPI/OpenAPI can validate and document the response; ensure HTTPException is
imported and reference MOCK_JOBS, get_ingestion_job, and get_api_key_user_async
when making the changes.
In `@src/app/routes/public_v1.py`:
- Around line 91-103: The two route registrations that call
v1_ingestion_jobs.list_ingestion_jobs and v1_ingestion_jobs.get_ingestion_job
currently use tags=["ingestion"]; update them to use tags=["public"] to match
the rest of this registration file (or, if a separate "ingestion" grouping is
intentional, add a short comment explaining the reason). Ensure you change the
tags argument on both add_api_route calls so OpenAPI grouping is consistent.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: d90d6550-202f-492a-a72f-d67cff507ea6
📒 Files selected for processing (2)
src/api/v1/ingestion_jobs.pysrc/app/routes/public_v1.py
| async def list_ingestion_jobs( | ||
| status: Optional[str] = Query( | ||
| None, description="Filter by status: completed, failed, running, pending" | ||
| ), | ||
| component: Optional[str] = Query( | ||
| None, description="Filter by component: openrag, docling, langflow, opensearch" | ||
| ), | ||
| actionability: Optional[str] = Query( | ||
| None, | ||
| description="Filter by actionability: USER_ACTIONABLE, RETRYABLE, ADMIN_ACTIONABLE, DEVELOPER_ACTIONABLE", | ||
| ), | ||
| user: User = Depends(get_api_key_user_async), | ||
| ): | ||
| """List ingestion jobs with optional filtering.""" | ||
| jobs = list(MOCK_JOBS.values()) | ||
|
|
||
| if status: | ||
| jobs = [j for j in jobs if j.status == status] | ||
| if component: | ||
| jobs = [j for j in jobs if j.component == component] | ||
| if actionability: | ||
| jobs = [j for j in jobs if j.actionability == actionability] | ||
|
|
||
| summaries = [ | ||
| IngestionJobSummary( | ||
| ingestion_job_id=j.ingestion_job_id, | ||
| file_name=j.file_name, | ||
| source_type=j.source_type, | ||
| status=j.status, | ||
| component=j.component, | ||
| phase=j.phase, | ||
| error_code=j.error_code, | ||
| actionability=j.actionability, | ||
| retryable=j.retryable, | ||
| user_title=j.user_title, | ||
| created_at=j.created_at, | ||
| ) | ||
| for j in jobs | ||
| ] | ||
|
|
||
| response = IngestionJobListResponse(total=len(summaries), jobs=summaries) | ||
| return JSONResponse(response.model_dump()) |
There was a problem hiding this comment.
🛠️ Refactor suggestion | 🟠 Major | 🏗️ Heavy lift
Consider adding pagination for scalability.
List endpoints in public APIs should typically support pagination to handle large datasets efficiently. Without pagination, this endpoint could return hundreds or thousands of jobs, leading to performance issues and poor user experience.
💡 Suggested pagination parameters
async def list_ingestion_jobs(
status: Optional[str] = Query(None, description="Filter by status: completed, failed, running, pending"),
component: Optional[str] = Query(None, description="Filter by component: openrag, docling, langflow, opensearch"),
actionability: Optional[str] = Query(None, description="Filter by actionability: USER_ACTIONABLE, RETRYABLE, ADMIN_ACTIONABLE, DEVELOPER_ACTIONABLE"),
limit: int = Query(50, ge=1, le=100, description="Maximum number of jobs to return"),
offset: int = Query(0, ge=0, description="Number of jobs to skip"),
user: User = Depends(get_api_key_user_async),
) -> IngestionJobListResponse:
"""List ingestion jobs with optional filtering and pagination."""
jobs = list(MOCK_JOBS.values())
# Apply filters
if status:
jobs = [j for j in jobs if j.status == status]
if component:
jobs = [j for j in jobs if j.component == component]
if actionability:
jobs = [j for j in jobs if j.actionability == actionability]
# Apply pagination
total = len(jobs)
jobs = jobs[offset:offset + limit]
summaries = [
IngestionJobSummary(
ingestion_job_id=j.ingestion_job_id,
file_name=j.file_name,
source_type=j.source_type,
status=j.status,
component=j.component,
phase=j.phase,
error_code=j.error_code,
actionability=j.actionability,
retryable=j.retryable,
user_title=j.user_title,
created_at=j.created_at,
)
for j in jobs
]
return IngestionJobListResponse(total=total, jobs=summaries)Note: The IngestionJobListResponse model may need additional fields like limit, offset, or has_more to support pagination metadata.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/api/v1/ingestion_jobs.py` around lines 546 - 587, Add pagination to the
list_ingestion_jobs endpoint: modify list_ingestion_jobs to accept limit and
offset Query params (e.g., limit with default 50 and bounds, offset default 0),
compute total = len(filtered_jobs) after applying filters on MOCK_JOBS, then
slice jobs = jobs[offset: offset + limit], build summaries from the sliced jobs
and return an IngestionJobListResponse containing total and the paginated jobs
(and optionally include limit/offset/has_more fields in the response model if
needed); update references in the function (list_ingestion_jobs, MOCK_JOBS,
IngestionJobSummary, IngestionJobListResponse) and keep get_api_key_user_async
dependency unchanged.
🛠️ Refactor suggestion | 🟠 Major | ⚡ Quick win
Declare response model and return Pydantic model directly.
FastAPI endpoints should declare the response model for automatic OpenAPI documentation generation and response validation. Additionally, return the Pydantic model directly instead of manually converting to JSONResponse.
♻️ Proposed refactor
async def list_ingestion_jobs(
status: Optional[str] = Query(
None, description="Filter by status: completed, failed, running, pending"
),
component: Optional[str] = Query(
None, description="Filter by component: openrag, docling, langflow, opensearch"
),
actionability: Optional[str] = Query(
None,
description="Filter by actionability: USER_ACTIONABLE, RETRYABLE, ADMIN_ACTIONABLE, DEVELOPER_ACTIONABLE",
),
user: User = Depends(get_api_key_user_async),
-):
+) -> IngestionJobListResponse:
"""List ingestion jobs with optional filtering."""
jobs = list(MOCK_JOBS.values())
if status:
jobs = [j for j in jobs if j.status == status]
if component:
jobs = [j for j in jobs if j.component == component]
if actionability:
jobs = [j for j in jobs if j.actionability == actionability]
summaries = [
IngestionJobSummary(
ingestion_job_id=j.ingestion_job_id,
file_name=j.file_name,
source_type=j.source_type,
status=j.status,
component=j.component,
phase=j.phase,
error_code=j.error_code,
actionability=j.actionability,
retryable=j.retryable,
user_title=j.user_title,
created_at=j.created_at,
)
for j in jobs
]
- response = IngestionJobListResponse(total=len(summaries), jobs=summaries)
- return JSONResponse(response.model_dump())
+ return IngestionJobListResponse(total=len(summaries), jobs=summaries)As per coding guidelines, FastAPI routes should use proper response model typing.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/api/v1/ingestion_jobs.py` around lines 546 - 587, The endpoint
list_ingestion_jobs currently returns a raw JSONResponse and lacks a declared
response model; update the FastAPI route signature to include
response_model=IngestionJobListResponse, and return the Pydantic model instance
(IngestionJobListResponse(total=..., jobs=...)) directly instead of
JSONResponse(response.model_dump()); keep the filtering logic over MOCK_JOBS and
construction of IngestionJobSummary unchanged, but ensure the function signature
still depends on get_api_key_user_async so OpenAPI and response validation use
IngestionJobListResponse and IngestionJobSummary types.
| async def list_ingestion_jobs( | ||
| status: Optional[str] = Query( | ||
| None, description="Filter by status: completed, failed, running, pending" | ||
| ), | ||
| component: Optional[str] = Query( | ||
| None, description="Filter by component: openrag, docling, langflow, opensearch" | ||
| ), | ||
| actionability: Optional[str] = Query( | ||
| None, | ||
| description="Filter by actionability: USER_ACTIONABLE, RETRYABLE, ADMIN_ACTIONABLE, DEVELOPER_ACTIONABLE", | ||
| ), | ||
| user: User = Depends(get_api_key_user_async), | ||
| ): | ||
| """List ingestion jobs with optional filtering.""" | ||
| jobs = list(MOCK_JOBS.values()) | ||
|
|
||
| if status: | ||
| jobs = [j for j in jobs if j.status == status] | ||
| if component: | ||
| jobs = [j for j in jobs if j.component == component] | ||
| if actionability: | ||
| jobs = [j for j in jobs if j.actionability == actionability] | ||
|
|
||
| summaries = [ | ||
| IngestionJobSummary( | ||
| ingestion_job_id=j.ingestion_job_id, | ||
| file_name=j.file_name, | ||
| source_type=j.source_type, | ||
| status=j.status, | ||
| component=j.component, | ||
| phase=j.phase, | ||
| error_code=j.error_code, | ||
| actionability=j.actionability, | ||
| retryable=j.retryable, | ||
| user_title=j.user_title, | ||
| created_at=j.created_at, | ||
| ) | ||
| for j in jobs | ||
| ] | ||
|
|
||
| response = IngestionJobListResponse(total=len(summaries), jobs=summaries) | ||
| return JSONResponse(response.model_dump()) | ||
|
|
||
|
|
||
| async def get_ingestion_job( | ||
| job_id: str, | ||
| user: User = Depends(get_api_key_user_async), | ||
| ): | ||
| """Get a specific ingestion job by ID.""" | ||
| job = MOCK_JOBS.get(job_id) | ||
| if not job: | ||
| return JSONResponse({"error": "Ingestion job not found"}, status_code=404) | ||
|
|
||
| return JSONResponse(job.model_dump()) |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Description: Check for user-based filtering patterns in ingestion-related code
# Search for user filtering patterns in API endpoints
rg -n -C3 'user\.id|user_id.*filter|filter.*user' --type=py src/api/
# Search for TODO/FIXME comments about user filtering or multi-tenancy
rg -n 'TODO|FIXME|HACK' --type=py src/api/v1/ingestion_jobs.py
# Check if other v1 endpoints filter by user
rg -n -B5 'Depends\(get_api_key_user_async\)' --type=py src/api/v1/ | rg -A5 'user\.'Repository: langflow-ai/openrag
Length of output: 11668
Filter ingestion jobs by authenticated user to match codebase patterns.
Both endpoints inject user: User = Depends(get_api_key_user_async) but never use it. Other API endpoints (knowledge filters, settings) filter results by user.user_id to ensure users see only their own data. Apply the same pattern here to prevent information disclosure and align with existing security practices.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/api/v1/ingestion_jobs.py` around lines 546 - 599, Both endpoints accept
user: User = Depends(get_api_key_user_async) but never enforce ownership; update
list_ingestion_jobs to filter MOCK_JOBS by job.user_id == user.user_id before
building IngestionJobSummary (so only the authenticated user's jobs are
returned) and update get_ingestion_job to verify the retrieved job's user_id
matches user.user_id and return 404 if not, using the existing MOCK_JOBS,
list_ingestion_jobs, get_ingestion_job, and User.user_id symbols to locate the
changes.
| async def get_ingestion_job( | ||
| job_id: str, | ||
| user: User = Depends(get_api_key_user_async), | ||
| ): | ||
| """Get a specific ingestion job by ID.""" | ||
| job = MOCK_JOBS.get(job_id) | ||
| if not job: | ||
| return JSONResponse({"error": "Ingestion job not found"}, status_code=404) | ||
|
|
||
| return JSONResponse(job.model_dump()) |
There was a problem hiding this comment.
🛠️ Refactor suggestion | 🟠 Major | ⚡ Quick win
Use response_model annotation and raise HTTPException for errors.
FastAPI endpoints should declare response models for OpenAPI documentation and use HTTPException for error responses rather than returning JSONResponse directly.
♻️ Proposed refactor
Add this import at the top:
from fastapi import Depends, HTTPException, QueryThen refactor the endpoint:
+from fastapi import Depends, HTTPException, Query
-from fastapi import Depends, Query
async def get_ingestion_job(
job_id: str,
user: User = Depends(get_api_key_user_async),
-):
+) -> IngestionJobDetail:
"""Get a specific ingestion job by ID."""
job = MOCK_JOBS.get(job_id)
if not job:
- return JSONResponse({"error": "Ingestion job not found"}, status_code=404)
+ raise HTTPException(status_code=404, detail="Ingestion job not found")
- return JSONResponse(job.model_dump())
+ return jobAs per coding guidelines, FastAPI routes should use proper response model typing and correct HTTP status codes.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/api/v1/ingestion_jobs.py` around lines 590 - 599, The get_ingestion_job
endpoint currently returns a JSONResponse directly and lacks a response_model;
update the function signature for get_ingestion_job to declare the proper
response_model (the Pydantic model used for ingestion jobs) and keep the
Depends(User=get_api_key_user_async) auth, then replace the
JSONResponse-not-found branch with raise HTTPException(status_code=404,
detail="Ingestion job not found") and return the job model instance (or
job.model_dump() if your response_model expects dict) so FastAPI/OpenAPI can
validate and document the response; ensure HTTPException is imported and
reference MOCK_JOBS, get_ingestion_job, and get_api_key_user_async when making
the changes.
| # Ingestion jobs endpoints | ||
| app.add_api_route( | ||
| "/v1/ingestion/jobs", | ||
| v1_ingestion_jobs.list_ingestion_jobs, | ||
| methods=["GET"], | ||
| tags=["ingestion"], | ||
| ) | ||
| app.add_api_route( | ||
| "/v1/ingestion/jobs/{job_id}", | ||
| v1_ingestion_jobs.get_ingestion_job, | ||
| methods=["GET"], | ||
| tags=["ingestion"], | ||
| ) |
There was a problem hiding this comment.
Inconsistent OpenAPI tags may affect documentation and SDK generation.
The new ingestion job routes use tags=["ingestion"] while all other routes in this file use tags=["public"]. This inconsistency could:
- Create unexpected groupings in the OpenAPI/Swagger documentation
- Affect SDK generation if it's tag-based
- Confuse developers expecting uniform tagging in
public_v1.py
Consider using tags=["public"] for consistency with other routes in this registration file, or if separate tagging is intentional, document the reasoning.
🏷️ Proposed fix for consistency
# Ingestion jobs endpoints
app.add_api_route(
"/v1/ingestion/jobs",
v1_ingestion_jobs.list_ingestion_jobs,
methods=["GET"],
- tags=["ingestion"],
+ tags=["public"],
)
app.add_api_route(
"/v1/ingestion/jobs/{job_id}",
v1_ingestion_jobs.get_ingestion_job,
methods=["GET"],
- tags=["ingestion"],
+ tags=["public"],
)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| # Ingestion jobs endpoints | |
| app.add_api_route( | |
| "/v1/ingestion/jobs", | |
| v1_ingestion_jobs.list_ingestion_jobs, | |
| methods=["GET"], | |
| tags=["ingestion"], | |
| ) | |
| app.add_api_route( | |
| "/v1/ingestion/jobs/{job_id}", | |
| v1_ingestion_jobs.get_ingestion_job, | |
| methods=["GET"], | |
| tags=["ingestion"], | |
| ) | |
| # Ingestion jobs endpoints | |
| app.add_api_route( | |
| "/v1/ingestion/jobs", | |
| v1_ingestion_jobs.list_ingestion_jobs, | |
| methods=["GET"], | |
| tags=["public"], | |
| ) | |
| app.add_api_route( | |
| "/v1/ingestion/jobs/{job_id}", | |
| v1_ingestion_jobs.get_ingestion_job, | |
| methods=["GET"], | |
| tags=["public"], | |
| ) |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/app/routes/public_v1.py` around lines 91 - 103, The two route
registrations that call v1_ingestion_jobs.list_ingestion_jobs and
v1_ingestion_jobs.get_ingestion_job currently use tags=["ingestion"]; update
them to use tags=["public"] to match the rest of this registration file (or, if
a separate "ingestion" grouping is intentional, add a short comment explaining
the reason). Ensure you change the tags argument on both add_api_route calls so
OpenAPI grouping is consistent.
All jobs
curl -H "Authorization: Bearer " http://localhost:8000/v1/ingestion/jobs
Only failed Docling jobs
curl -H "Authorization: Bearer " "http://localhost:8000/v1/ingestion/jobs?status=failed&component=docling"
Full detail for the password-protected scenario
curl -H "Authorization: Bearer " http://localhost:8000/v1/ingestion/jobs/job-005
404 case
curl -H "Authorization: Bearer " http://localhost:8000/v1/ingestion/jobs/job-999
Summary by CodeRabbit