Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
661 changes: 661 additions & 0 deletions IMPLEMENTATION_SUMMARY.md

Large diffs are not rendered by default.

270 changes: 245 additions & 25 deletions api/routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from services.git_utils import git_utils
from services.ranker import ranker
from services.pack_builder import pack_builder
from services.metrics import metrics_service
from config import settings
from loguru import logger

Expand Down Expand Up @@ -65,15 +66,19 @@ class IngestRepoRequest(BaseModel):
repo_url: Optional[str] = None
local_path: Optional[str] = None
branch: Optional[str] = "main"
mode: str = "full" # full | incremental
include_globs: list[str] = ["**/*.py", "**/*.ts", "**/*.tsx"]
exclude_globs: list[str] = ["**/node_modules/**", "**/.git/**", "**/__pycache__/**"]
since_commit: Optional[str] = None # For incremental mode: compare against this commit

class IngestRepoResponse(BaseModel):
"""Repository ingestion response"""
task_id: str
status: str # queued, running, done, error
message: Optional[str] = None
files_processed: Optional[int] = None
mode: Optional[str] = None # full | incremental
changed_files_count: Optional[int] = None # For incremental mode

# Related files models
class NodeSummary(BaseModel):
Expand Down Expand Up @@ -107,6 +112,7 @@ class ContextPack(BaseModel):
budget_limit: int
stage: str
repo_id: str
category_counts: Optional[dict] = None # {"file": N, "symbol": M}


# health check
Expand All @@ -116,15 +122,15 @@ async def health_check():
try:
# check Neo4j knowledge service status
neo4j_connected = knowledge_service._initialized if hasattr(knowledge_service, '_initialized') else False

services_status = {
"neo4j_knowledge_service": neo4j_connected,
"graph_service": graph_service._connected if hasattr(graph_service, '_connected') else False,
"task_queue": True # task queue is always available
}

overall_status = "healthy" if services_status["neo4j_knowledge_service"] else "degraded"

return HealthResponse(
status=overall_status,
services=services_status,
Expand All @@ -134,6 +140,45 @@ async def health_check():
logger.error(f"Health check failed: {e}")
raise HTTPException(status_code=500, detail=str(e))

# Prometheus metrics endpoint
@router.get("/metrics")
async def get_metrics():
"""
Prometheus metrics endpoint

Exposes metrics in Prometheus text format for monitoring and observability:
- HTTP request counts and latencies
- Repository ingestion metrics
- Graph query performance
- Neo4j health and statistics
- Context pack generation metrics
- Task queue metrics

Example:
curl http://localhost:8000/api/v1/metrics
"""
try:
# Update Neo4j metrics before generating output
await metrics_service.update_neo4j_metrics(graph_service)

# Update task queue metrics
from services.task_queue import task_queue, TaskStatus
stats = task_queue.get_queue_stats()
metrics_service.update_task_queue_size("pending", stats.get("pending", 0))
metrics_service.update_task_queue_size("running", stats.get("running", 0))
metrics_service.update_task_queue_size("completed", stats.get("completed", 0))
metrics_service.update_task_queue_size("failed", stats.get("failed", 0))

# Generate metrics
from fastapi.responses import Response
return Response(
content=metrics_service.get_metrics(),
media_type=metrics_service.get_content_type()
)
except Exception as e:
logger.error(f"Metrics generation failed: {e}")
raise HTTPException(status_code=500, detail=str(e))

# knowledge query interface
@router.post("/knowledge/query")
async def query_knowledge(query_request: QueryRequest):
Expand Down Expand Up @@ -386,50 +431,117 @@ async def ingest_repo(request: IngestRepoRequest):
repo_id = git_utils.get_repo_id_from_url(request.repo_url)
cleanup_needed = True

logger.info(f"Processing repository: {repo_id} at {repo_path}")
logger.info(f"Processing repository: {repo_id} at {repo_path} (mode={request.mode})")

# Get code ingestor
code_ingestor = get_code_ingestor(graph_service)

# Scan files
files = code_ingestor.scan_files(
repo_path=repo_path,
include_globs=request.include_globs,
exclude_globs=request.exclude_globs
)

if not files:
message = "No files found matching the specified patterns"

# Handle incremental mode
files_to_process = []
changed_files_count = 0

if request.mode == "incremental":
# Check if it's a git repository
if not git_utils.is_git_repo(repo_path):
logger.warning(f"Incremental mode requested but {repo_path} is not a git repo, falling back to full mode")
request.mode = "full"
else:
# Get changed files
changed_result = git_utils.get_changed_files(
repo_path=repo_path,
since_commit=request.since_commit,
include_untracked=True
)

if not changed_result.get("success"):
logger.warning(f"Failed to get changed files: {changed_result.get('error')}, falling back to full mode")
request.mode = "full"
else:
changed_files = changed_result.get("changed_files", [])
changed_files_count = len(changed_files)

if changed_files_count == 0:
logger.info("No files changed, skipping ingestion")
return IngestRepoResponse(
task_id=task_id,
status="done",
message="No files changed since last ingestion",
files_processed=0,
mode="incremental",
changed_files_count=0
)

# Filter changed files by glob patterns
logger.info(f"Found {changed_files_count} changed files, filtering by patterns...")

# Scan only the changed files
all_files = code_ingestor.scan_files(
repo_path=repo_path,
include_globs=request.include_globs,
exclude_globs=request.exclude_globs
)

# Create a set of changed file paths for quick lookup
changed_paths = {cf['path'] for cf in changed_files}

# Filter to only include files that are in both lists
files_to_process = [
f for f in all_files
if f['path'] in changed_paths
]

logger.info(f"Filtered to {len(files_to_process)} files matching patterns")

# Full mode or fallback
if request.mode == "full":
# Scan all files
files_to_process = code_ingestor.scan_files(
repo_path=repo_path,
include_globs=request.include_globs,
exclude_globs=request.exclude_globs
)

if not files_to_process:
message = "No files found matching the specified patterns" if request.mode == "full" else "No changed files match the patterns"
logger.warning(message)
return IngestRepoResponse(
task_id=task_id,
status="done",
message=message,
files_processed=0
files_processed=0,
mode=request.mode,
changed_files_count=changed_files_count if request.mode == "incremental" else None
)

# Ingest files into Neo4j
result = code_ingestor.ingest_files(
repo_id=repo_id,
files=files
files=files_to_process
)

# Cleanup if needed
if cleanup_needed:
git_utils.cleanup_temp_repo(repo_path)

if result.get("success"):
message = f"Successfully ingested {result['files_processed']} files"
if request.mode == "incremental":
message += f" (out of {changed_files_count} changed)"

return IngestRepoResponse(
task_id=task_id,
status="done",
message=f"Successfully ingested {result['files_processed']} files",
files_processed=result["files_processed"]
message=message,
files_processed=result["files_processed"],
mode=request.mode,
changed_files_count=changed_files_count if request.mode == "incremental" else None
)
else:
return IngestRepoResponse(
task_id=task_id,
status="error",
message=result.get("error", "Failed to ingest files")
message=result.get("error", "Failed to ingest files"),
mode=request.mode
)

except Exception as e:
Expand Down Expand Up @@ -581,9 +693,117 @@ async def get_context_pack(
)

logger.info(f"Built context pack with {len(context_pack['items'])} items")

return ContextPack(**context_pack)

except Exception as e:
logger.error(f"Context pack generation failed: {e}")
raise HTTPException(status_code=500, detail=str(e))

# Impact analysis endpoint
class ImpactNode(BaseModel):
"""A node in the impact analysis results"""
type: str # file, symbol
path: str
lang: Optional[str] = None
repoId: str
relationship: str # CALLS, IMPORTS
depth: int
score: float
ref: str
summary: str

class ImpactResponse(BaseModel):
"""Response for impact analysis endpoint"""
nodes: list[ImpactNode]
file: str
repo_id: str
depth: int

@router.get("/graph/impact", response_model=ImpactResponse)
async def get_impact_analysis(
repoId: str = Query(..., description="Repository ID"),
file: str = Query(..., description="File path to analyze"),
depth: int = Query(2, ge=1, le=5, description="Traversal depth for dependencies"),
limit: int = Query(50, ge=1, le=100, description="Maximum number of results")
):
"""
Analyze the impact of a file by finding reverse dependencies.

Returns files and symbols that depend on the specified file through:
- CALLS relationships (who calls functions/methods in this file)
- IMPORTS relationships (who imports this file)

This is useful for:
- Understanding the blast radius of changes
- Finding code that needs to be updated when modifying this file
- Identifying critical files with many dependents

Example:
GET /graph/impact?repoId=myproject&file=src/auth/token.py&depth=2&limit=50

Returns files that call functions in token.py or import from it,
up to 2 levels deep in the dependency chain.
"""
try:
# Perform impact analysis
impact_results = graph_service.impact_analysis(
repo_id=repoId,
file_path=file,
depth=depth,
limit=limit
)

if not impact_results:
logger.info(f"No reverse dependencies found for file: {file}")
return ImpactResponse(
nodes=[],
file=file,
repo_id=repoId,
depth=depth
)

# Convert to ImpactNode objects
nodes = []
for result in impact_results:
# Generate summary
summary = ranker.generate_file_summary(
path=result["path"],
lang=result.get("lang", "unknown")
)

# Add relationship context to summary
rel_type = result.get("relationship", "DEPENDS_ON")
if rel_type == "CALLS":
summary += f" (calls functions in {file.split('/')[-1]})"
elif rel_type == "IMPORTS":
summary += f" (imports {file.split('/')[-1]})"

# Generate ref handle
ref = ranker.generate_ref_handle(path=result["path"])

node = ImpactNode(
type=result.get("type", "file"),
path=result["path"],
lang=result.get("lang"),
repoId=result["repoId"],
relationship=result.get("relationship", "DEPENDS_ON"),
depth=result.get("depth", 1),
score=result.get("score", 0.5),
ref=ref,
summary=summary
)
nodes.append(node)

logger.info(f"Found {len(nodes)} reverse dependencies for {file}")

return ImpactResponse(
nodes=nodes,
file=file,
repo_id=repoId,
depth=depth
)

except Exception as e:
logger.error(f"Impact analysis failed: {e}")
raise HTTPException(status_code=500, detail=str(e))
Loading