diff --git a/api/routes.py b/api/routes.py index 64d86d0..649cd6f 100644 --- a/api/routes.py +++ b/api/routes.py @@ -1,13 +1,19 @@ -from fastapi import APIRouter, HTTPException, Depends, UploadFile, File, Form +from fastapi import APIRouter, HTTPException, Depends, UploadFile, File, Form, Query from fastapi.responses import JSONResponse -from typing import List, Dict, Optional, Any +from typing import List, Dict, Optional, Any, Literal from pydantic import BaseModel +import uuid +from datetime import datetime from services.sql_parser import sql_analyzer from services.graph_service import graph_service from services.neo4j_knowledge_service import Neo4jKnowledgeService from services.universal_sql_schema_parser import parse_sql_schema_smart from services.task_queue import task_queue +from services.code_ingestor import get_code_ingestor +from services.git_utils import git_utils +from services.ranker import ranker +from services.pack_builder import pack_builder from config import settings from loguru import logger @@ -53,6 +59,56 @@ class SQLSchemaParseRequest(BaseModel): schema_content: Optional[str] = None file_path: Optional[str] = None +# Repository ingestion models +class IngestRepoRequest(BaseModel): + """Repository ingestion request""" + repo_url: Optional[str] = None + local_path: Optional[str] = None + branch: Optional[str] = "main" + include_globs: list[str] = ["**/*.py", "**/*.ts", "**/*.tsx"] + exclude_globs: list[str] = ["**/node_modules/**", "**/.git/**", "**/__pycache__/**"] + +class IngestRepoResponse(BaseModel): + """Repository ingestion response""" + task_id: str + status: str # queued, running, done, error + message: Optional[str] = None + files_processed: Optional[int] = None + +# Related files models +class NodeSummary(BaseModel): + """Summary of a code node""" + type: str # file, symbol + ref: str + path: Optional[str] = None + lang: Optional[str] = None + score: float + summary: str + +class RelatedResponse(BaseModel): + """Response for related files endpoint""" + nodes: list[NodeSummary] + query: str + repo_id: str + +# Context pack models +class ContextItem(BaseModel): + """A single item in the context pack""" + kind: str # file, symbol, guideline + title: str + summary: str + ref: str + extra: Optional[dict] = None + +class ContextPack(BaseModel): + """Response for context pack endpoint""" + items: list[ContextItem] + budget_used: int + budget_limit: int + stage: str + repo_id: str + + # health check @router.get("/health", response_model=HealthResponse) async def health_check(): @@ -284,4 +340,250 @@ async def get_system_config(): except Exception as e: logger.error(f"Get config failed: {e}") - raise HTTPException(status_code=500, detail=str(e)) \ No newline at end of file + raise HTTPException(status_code=500, detail=str(e)) +# Repository ingestion endpoint +@router.post("/ingest/repo", response_model=IngestRepoResponse) +async def ingest_repo(request: IngestRepoRequest): + """ + Ingest a repository into the knowledge graph + Scans files matching patterns and creates File/Repo nodes in Neo4j + """ + try: + # Validate request + if not request.repo_url and not request.local_path: + raise HTTPException( + status_code=400, + detail="Either repo_url or local_path must be provided" + ) + + # Generate task ID + task_id = f"ing-{datetime.now().strftime('%Y%m%d-%H%M%S')}-{uuid.uuid4().hex[:8]}" + + # Determine repository path and ID + repo_path = None + repo_id = None + cleanup_needed = False + + if request.local_path: + repo_path = request.local_path + repo_id = git_utils.get_repo_id_from_path(repo_path) + else: + # Clone repository + logger.info(f"Cloning repository: {request.repo_url}") + clone_result = git_utils.clone_repo( + request.repo_url, + branch=request.branch + ) + + if not clone_result.get("success"): + return IngestRepoResponse( + task_id=task_id, + status="error", + message=clone_result.get("error", "Failed to clone repository") + ) + + repo_path = clone_result["path"] + repo_id = git_utils.get_repo_id_from_url(request.repo_url) + cleanup_needed = True + + logger.info(f"Processing repository: {repo_id} at {repo_path}") + + # Get code ingestor + code_ingestor = get_code_ingestor(graph_service) + + # Scan files + files = code_ingestor.scan_files( + repo_path=repo_path, + include_globs=request.include_globs, + exclude_globs=request.exclude_globs + ) + + if not files: + message = "No files found matching the specified patterns" + logger.warning(message) + return IngestRepoResponse( + task_id=task_id, + status="done", + message=message, + files_processed=0 + ) + + # Ingest files into Neo4j + result = code_ingestor.ingest_files( + repo_id=repo_id, + files=files + ) + + # Cleanup if needed + if cleanup_needed: + git_utils.cleanup_temp_repo(repo_path) + + if result.get("success"): + return IngestRepoResponse( + task_id=task_id, + status="done", + message=f"Successfully ingested {result['files_processed']} files", + files_processed=result["files_processed"] + ) + else: + return IngestRepoResponse( + task_id=task_id, + status="error", + message=result.get("error", "Failed to ingest files") + ) + + except Exception as e: + logger.error(f"Ingest failed: {e}") + raise HTTPException(status_code=500, detail=str(e)) + +# Related files endpoint +@router.get("/graph/related", response_model=RelatedResponse) +async def get_related( + query: str = Query(..., description="Search query"), + repoId: str = Query(..., description="Repository ID"), + limit: int = Query(30, ge=1, le=100, description="Maximum number of results") +): + """ + Find related files using fulltext search and keyword matching + Returns file summaries with ref:// handles for MCP integration + """ + try: + # Perform fulltext search + search_results = graph_service.fulltext_search( + query_text=query, + repo_id=repoId, + limit=limit * 2 # Get more for ranking + ) + + if not search_results: + logger.info(f"No results found for query: {query}") + return RelatedResponse( + nodes=[], + query=query, + repo_id=repoId + ) + + # Rank results + ranked_files = ranker.rank_files( + files=search_results, + query=query, + limit=limit + ) + + # Convert to NodeSummary objects + nodes = [] + for file in ranked_files: + summary = ranker.generate_file_summary( + path=file["path"], + lang=file["lang"] + ) + + ref = ranker.generate_ref_handle( + path=file["path"] + ) + + node = NodeSummary( + type="file", + ref=ref, + path=file["path"], + lang=file["lang"], + score=file["score"], + summary=summary + ) + nodes.append(node) + + logger.info(f"Found {len(nodes)} related files for query: {query}") + + return RelatedResponse( + nodes=nodes, + query=query, + repo_id=repoId + ) + + except Exception as e: + logger.error(f"Related query failed: {e}") + raise HTTPException(status_code=500, detail=str(e)) + +# Context pack endpoint +@router.get("/context/pack", response_model=ContextPack) +async def get_context_pack( + repoId: str = Query(..., description="Repository ID"), + stage: str = Query("plan", description="Stage (plan/review/implement)"), + budget: int = Query(1500, ge=100, le=10000, description="Token budget"), + keywords: Optional[str] = Query(None, description="Comma-separated keywords"), + focus: Optional[str] = Query(None, description="Comma-separated focus paths") +): + """ + Build a context pack within token budget + Searches for relevant files and packages them with summaries and ref:// handles + """ + try: + # Parse keywords and focus paths + keyword_list = [k.strip() for k in keywords.split(',')] if keywords else [] + focus_paths = [f.strip() for f in focus.split(',')] if focus else [] + + # Create search query from keywords + search_query = ' '.join(keyword_list) if keyword_list else '*' + + # Search for relevant files + search_results = graph_service.fulltext_search( + query_text=search_query, + repo_id=repoId, + limit=50 + ) + + if not search_results: + logger.info(f"No files found for context pack in repo: {repoId}") + return ContextPack( + items=[], + budget_used=0, + budget_limit=budget, + stage=stage, + repo_id=repoId + ) + + # Rank files + ranked_files = ranker.rank_files( + files=search_results, + query=search_query, + limit=50 + ) + + # Convert to node format + nodes = [] + for file in ranked_files: + summary = ranker.generate_file_summary( + path=file["path"], + lang=file["lang"] + ) + + ref = ranker.generate_ref_handle( + path=file["path"] + ) + + nodes.append({ + "type": "file", + "path": file["path"], + "lang": file["lang"], + "score": file["score"], + "summary": summary, + "ref": ref + }) + + # Build context pack within budget + context_pack = pack_builder.build_context_pack( + nodes=nodes, + budget=budget, + stage=stage, + repo_id=repoId, + keywords=keyword_list, + focus_paths=focus_paths + ) + + logger.info(f"Built context pack with {len(context_pack['items'])} items") + + return ContextPack(**context_pack) + + except Exception as e: + logger.error(f"Context pack generation failed: {e}") + raise HTTPException(status_code=500, detail=str(e)) diff --git a/services/code_ingestor.py b/services/code_ingestor.py new file mode 100644 index 0000000..9fb0a22 --- /dev/null +++ b/services/code_ingestor.py @@ -0,0 +1,171 @@ +""" +Code ingestor service for repository ingestion +Handles file scanning, language detection, and Neo4j ingestion +""" +import os +from pathlib import Path +from typing import List, Dict, Any, Optional +from loguru import logger +import hashlib +import fnmatch + + +class CodeIngestor: + """Code file scanner and ingestor for repositories""" + + # Language detection based on file extension + LANG_MAP = { + '.py': 'python', + '.ts': 'typescript', + '.tsx': 'typescript', + '.js': 'javascript', + '.jsx': 'javascript', + '.java': 'java', + '.go': 'go', + '.rs': 'rust', + '.cpp': 'cpp', + '.c': 'c', + '.h': 'c', + '.hpp': 'cpp', + '.cs': 'csharp', + '.rb': 'ruby', + '.php': 'php', + '.swift': 'swift', + '.kt': 'kotlin', + '.scala': 'scala', + } + + def __init__(self, neo4j_service): + """Initialize code ingestor with Neo4j service""" + self.neo4j_service = neo4j_service + + def scan_files( + self, + repo_path: str, + include_globs: List[str], + exclude_globs: List[str] + ) -> List[Dict[str, Any]]: + """Scan files in repository matching patterns""" + files = [] + repo_path = os.path.abspath(repo_path) + + for root, dirs, filenames in os.walk(repo_path): + # Filter out excluded directories + dirs[:] = [ + d for d in dirs + if not self._should_exclude(os.path.join(root, d), repo_path, exclude_globs) + ] + + for filename in filenames: + file_path = os.path.join(root, filename) + rel_path = os.path.relpath(file_path, repo_path) + + # Check if file matches include patterns and not excluded + if self._should_include(rel_path, include_globs) and \ + not self._should_exclude(file_path, repo_path, exclude_globs): + + try: + file_info = self._get_file_info(file_path, rel_path) + files.append(file_info) + except Exception as e: + logger.warning(f"Failed to process {rel_path}: {e}") + + logger.info(f"Scanned {len(files)} files in {repo_path}") + return files + + def _should_include(self, rel_path: str, include_globs: List[str]) -> bool: + """Check if file matches include patterns""" + return any(fnmatch.fnmatch(rel_path, pattern) for pattern in include_globs) + + def _should_exclude(self, file_path: str, repo_path: str, exclude_globs: List[str]) -> bool: + """Check if file/directory matches exclude patterns""" + rel_path = os.path.relpath(file_path, repo_path) + return any(fnmatch.fnmatch(rel_path, pattern.strip('*')) or + fnmatch.fnmatch(rel_path + '/', pattern) for pattern in exclude_globs) + + def _get_file_info(self, file_path: str, rel_path: str) -> Dict[str, Any]: + """Get file information including language, size, and content""" + ext = Path(file_path).suffix.lower() + lang = self.LANG_MAP.get(ext, 'unknown') + + # Get file size + size = os.path.getsize(file_path) + + # Read content for small files (for fulltext search) + content = None + if size < 100_000: # Only read files < 100KB + try: + with open(file_path, 'r', encoding='utf-8', errors='ignore') as f: + content = f.read() + except Exception as e: + logger.warning(f"Could not read {rel_path}: {e}") + + # Calculate SHA hash + sha = None + try: + with open(file_path, 'rb') as f: + sha = hashlib.sha256(f.read()).hexdigest()[:16] + except Exception as e: + logger.warning(f"Could not hash {rel_path}: {e}") + + return { + "path": rel_path, + "lang": lang, + "size": size, + "content": content, + "sha": sha + } + + def ingest_files( + self, + repo_id: str, + files: List[Dict[str, Any]] + ) -> Dict[str, Any]: + """Ingest files into Neo4j""" + try: + # Create repository node + self.neo4j_service.create_repo(repo_id, { + "created": "datetime()", + "file_count": len(files) + }) + + # Create file nodes + success_count = 0 + for file_info in files: + result = self.neo4j_service.create_file( + repo_id=repo_id, + path=file_info["path"], + lang=file_info["lang"], + size=file_info["size"], + content=file_info.get("content"), + sha=file_info.get("sha") + ) + + if result.get("success"): + success_count += 1 + + logger.info(f"Ingested {success_count}/{len(files)} files for repo {repo_id}") + + return { + "success": True, + "files_processed": success_count, + "total_files": len(files) + } + except Exception as e: + logger.error(f"Failed to ingest files: {e}") + return { + "success": False, + "error": str(e) + } + + +# Global instance +code_ingestor = None + + +def get_code_ingestor(neo4j_service): + """Get or create code ingestor instance""" + global code_ingestor + if code_ingestor is None: + code_ingestor = CodeIngestor(neo4j_service) + return code_ingestor diff --git a/services/git_utils.py b/services/git_utils.py new file mode 100644 index 0000000..80c5da4 --- /dev/null +++ b/services/git_utils.py @@ -0,0 +1,73 @@ +""" +Git utilities for repository operations +""" +import os +import subprocess +from typing import Optional, Dict, Any +from loguru import logger +import tempfile +import shutil + + +class GitUtils: + """Git operations helper""" + + @staticmethod + def clone_repo(repo_url: str, target_dir: Optional[str] = None, branch: str = "main") -> Dict[str, Any]: + """Clone a git repository""" + try: + if target_dir is None: + target_dir = tempfile.mkdtemp(prefix="repo_") + + cmd = ["git", "clone", "--depth", "1", "-b", branch, repo_url, target_dir] + result = subprocess.run( + cmd, + capture_output=True, + text=True, + timeout=300 + ) + + if result.returncode == 0: + return { + "success": True, + "path": target_dir, + "message": f"Cloned {repo_url} to {target_dir}" + } + else: + return { + "success": False, + "error": result.stderr + } + except Exception as e: + logger.error(f"Failed to clone repository: {e}") + return { + "success": False, + "error": str(e) + } + + @staticmethod + def get_repo_id_from_path(repo_path: str) -> str: + """Generate a repository ID from path""" + return os.path.basename(os.path.abspath(repo_path)) + + @staticmethod + def get_repo_id_from_url(repo_url: str) -> str: + """Generate a repository ID from URL""" + repo_name = repo_url.rstrip('/').split('/')[-1] + if repo_name.endswith('.git'): + repo_name = repo_name[:-4] + return repo_name + + @staticmethod + def cleanup_temp_repo(repo_path: str): + """Clean up temporary repository""" + try: + if repo_path.startswith(tempfile.gettempdir()): + shutil.rmtree(repo_path) + logger.info(f"Cleaned up temporary repo: {repo_path}") + except Exception as e: + logger.warning(f"Failed to cleanup temp repo: {e}") + + +# Global instance +git_utils = GitUtils() diff --git a/services/graph_service.py b/services/graph_service.py index f21b27f..f6d15df 100644 --- a/services/graph_service.py +++ b/services/graph_service.py @@ -391,6 +391,105 @@ async def close(self): logger.info("Disconnected from Neo4j") except Exception as e: logger.error(f"Failed to close Neo4j connection: {e}") + + def create_repo(self, repo_id: str, metadata: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: + """Create a repository node (synchronous for compatibility)""" + if not self._connected: + return {"success": False, "error": "Not connected to Neo4j"} + + try: + with self.driver.session(database=settings.neo4j_database) as session: + query = """ + MERGE (r:Repo {id: $repo_id}) + SET r += $metadata + RETURN r + """ + session.run(query, { + "repo_id": repo_id, + "metadata": metadata or {} + }) + return {"success": True} + except Exception as e: + logger.error(f"Failed to create repo: {e}") + return {"success": False, "error": str(e)} + + def create_file( + self, + repo_id: str, + path: str, + lang: str, + size: int, + content: Optional[str] = None, + sha: Optional[str] = None + ) -> Dict[str, Any]: + """Create a file node and link to repo (synchronous)""" + if not self._connected: + return {"success": False, "error": "Not connected to Neo4j"} + + try: + with self.driver.session(database=settings.neo4j_database) as session: + query = """ + MATCH (r:Repo {id: $repo_id}) + MERGE (f:File {repoId: $repo_id, path: $path}) + SET f.lang = $lang, + f.size = $size, + f.content = $content, + f.sha = $sha, + f.updated = datetime() + MERGE (f)-[:IN_REPO]->(r) + RETURN f + """ + session.run(query, { + "repo_id": repo_id, + "path": path, + "lang": lang, + "size": size, + "content": content, + "sha": sha + }) + return {"success": True} + except Exception as e: + logger.error(f"Failed to create file: {e}") + return {"success": False, "error": str(e)} + + def fulltext_search( + self, + query_text: str, + repo_id: Optional[str] = None, + limit: int = 30 + ) -> List[Dict[str, Any]]: + """Fulltext search on files (synchronous)""" + if not self._connected: + return [] + + try: + with self.driver.session(database=settings.neo4j_database) as session: + # For now, use simple CONTAINS match until fulltext index is set up + # This is a simplified version for the initial implementation + query = """ + MATCH (f:File) + WHERE ($repo_id IS NULL OR f.repoId = $repo_id) + AND (toLower(f.path) CONTAINS toLower($query_text) + OR toLower(f.lang) CONTAINS toLower($query_text) + OR ($query_text IN f.content AND f.content IS NOT NULL)) + RETURN f.path as path, + f.lang as lang, + f.size as size, + f.repoId as repoId, + 1.0 as score + LIMIT $limit + """ + + result = session.run(query, { + "query_text": query_text, + "repo_id": repo_id, + "limit": limit + }) + + return [dict(record) for record in result] + except Exception as e: + logger.error(f"Fulltext search failed: {e}") + return [] # global graph service instance graph_service = Neo4jGraphService() \ No newline at end of file diff --git a/services/pack_builder.py b/services/pack_builder.py new file mode 100644 index 0000000..85c09cf --- /dev/null +++ b/services/pack_builder.py @@ -0,0 +1,102 @@ +""" +Context pack builder for generating context bundles within token budgets +""" +from typing import List, Dict, Any, Optional +from loguru import logger + + +class PackBuilder: + """Context pack builder""" + + @staticmethod + def build_context_pack( + nodes: List[Dict[str, Any]], + budget: int, + stage: str, + repo_id: str, + keywords: Optional[List[str]] = None, + focus_paths: Optional[List[str]] = None + ) -> Dict[str, Any]: + """ + Build a context pack from nodes within budget + + Args: + nodes: List of node dictionaries with path, lang, score, etc. + budget: Token budget (estimated as ~4 chars per token) + stage: Stage name (plan/review/etc) + repo_id: Repository ID + keywords: Optional keywords for filtering + focus_paths: Optional list of paths to prioritize + + Returns: + Dict with items, budget_used, budget_limit, stage, repo_id + """ + items = [] + budget_used = 0 + chars_per_token = 4 + + # Sort nodes by score if available + sorted_nodes = sorted( + nodes, + key=lambda x: x.get("score", 0), + reverse=True + ) + + # Prioritize focus paths if provided + if focus_paths: + focus_nodes = [ + n for n in sorted_nodes + if any(fp in n.get("path", "") for fp in focus_paths) + ] + other_nodes = [ + n for n in sorted_nodes + if n not in focus_nodes + ] + sorted_nodes = focus_nodes + other_nodes + + for node in sorted_nodes: + # Create context item + item = { + "kind": node.get("type", "file"), + "title": PackBuilder._extract_title(node.get("path", "")), + "summary": node.get("summary", ""), + "ref": node.get("ref", ""), + "extra": { + "lang": node.get("lang"), + "score": node.get("score", 0) + } + } + + # Estimate size (title + summary + ref + overhead) + item_size = len(item["title"]) + len(item["summary"]) + len(item["ref"]) + 50 + estimated_tokens = item_size // chars_per_token + + # Check if adding this item would exceed budget + if budget_used + estimated_tokens > budget: + logger.debug(f"Budget limit reached: {budget_used}/{budget} tokens") + break + + items.append(item) + budget_used += estimated_tokens + + logger.info(f"Built context pack with {len(items)} items, {budget_used}/{budget} tokens") + + return { + "items": items, + "budget_used": budget_used, + "budget_limit": budget, + "stage": stage, + "repo_id": repo_id + } + + @staticmethod + def _extract_title(path: str) -> str: + """Extract title from path (last 2 segments)""" + parts = path.split('/') + if len(parts) >= 2: + return '/'.join(parts[-2:]) + return path + + +# Global instance +pack_builder = PackBuilder() diff --git a/services/ranker.py b/services/ranker.py new file mode 100644 index 0000000..3974956 --- /dev/null +++ b/services/ranker.py @@ -0,0 +1,83 @@ +""" +Ranking service for search results +Simple keyword and path matching for file relevance +""" +from typing import List, Dict, Any +import re + + +class Ranker: + """Search result ranker""" + + @staticmethod + def rank_files( + files: List[Dict[str, Any]], + query: str, + limit: int = 30 + ) -> List[Dict[str, Any]]: + """Rank files by relevance to query using keyword matching""" + query_lower = query.lower() + query_terms = set(re.findall(r'\w+', query_lower)) + + scored_files = [] + for file in files: + path = file.get("path", "").lower() + lang = file.get("lang", "").lower() + base_score = file.get("score", 1.0) + + # Calculate relevance score + score = base_score + + # Exact path match + if query_lower in path: + score *= 2.0 + + # Term matching in path + path_terms = set(re.findall(r'\w+', path)) + matching_terms = query_terms & path_terms + if matching_terms: + score *= (1.0 + len(matching_terms) * 0.3) + + # Language match + if query_lower in lang: + score *= 1.5 + + # Prefer files in src/, lib/, core/ directories + if any(prefix in path for prefix in ['src/', 'lib/', 'core/', 'app/']): + score *= 1.2 + + # Penalize test files (unless looking for tests) + if 'test' not in query_lower and ('test' in path or 'spec' in path): + score *= 0.5 + + scored_files.append({ + **file, + "score": score + }) + + # Sort by score descending + scored_files.sort(key=lambda x: x["score"], reverse=True) + + # Return top results + return scored_files[:limit] + + @staticmethod + def generate_file_summary(path: str, lang: str) -> str: + """Generate rule-based summary for a file""" + parts = path.split('/') + + if len(parts) > 1: + parent_dir = parts[-2] + filename = parts[-1] + return f"{lang.capitalize()} file {filename} in {parent_dir}/ directory" + else: + return f"{lang.capitalize()} file {path}" + + @staticmethod + def generate_ref_handle(path: str, start_line: int = 1, end_line: int = 1000) -> str: + """Generate ref:// handle for a file""" + return f"ref://file/{path}#L{start_line}-L{end_line}" + + +# Global instance +ranker = Ranker()