Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 25 additions & 27 deletions services/pack_builder.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""
Context pack builder for generating context bundles within token budgets
"""

from typing import List, Dict, Any, Optional
from loguru import logger

Expand All @@ -22,7 +23,7 @@ def build_context_pack(
focus_paths: Optional[List[str]] = None,
file_limit: int = DEFAULT_FILE_LIMIT,
symbol_limit: int = DEFAULT_SYMBOL_LIMIT,
enable_deduplication: bool = True
enable_deduplication: bool = True,
) -> Dict[str, Any]:
"""
Build a context pack from nodes within budget with deduplication and category limits.
Expand All @@ -47,22 +48,16 @@ def build_context_pack(
logger.debug(f"After deduplication: {len(nodes)} unique nodes")

# Step 2: Sort nodes by score
sorted_nodes = sorted(
nodes,
key=lambda x: x.get("score", 0),
reverse=True
)
sorted_nodes = sorted(nodes, key=lambda x: x.get("score", 0), reverse=True)

# Step 3: Prioritize focus paths if provided
if focus_paths:
focus_nodes = [
n for n in sorted_nodes
n
for n in sorted_nodes
if any(fp in n.get("path", "") for fp in focus_paths)
]
other_nodes = [
n for n in sorted_nodes
if n not in focus_nodes
]
other_nodes = [n for n in sorted_nodes if n not in focus_nodes]
sorted_nodes = focus_nodes + other_nodes

# Step 4: Apply category limits and budget constraints
Expand All @@ -80,7 +75,9 @@ def build_context_pack(
logger.debug(f"File limit reached ({file_limit}), skipping file nodes")
continue
elif node_type == "symbol" and symbol_count >= symbol_limit:
logger.debug(f"Symbol limit reached ({symbol_limit}), skipping symbol nodes")
logger.debug(
f"Symbol limit reached ({symbol_limit}), skipping symbol nodes"
)
continue
elif node_type not in ["file", "symbol", "guideline"]:
# Unknown type, count as file
Expand All @@ -93,14 +90,13 @@ def build_context_pack(
"title": PackBuilder._extract_title(node.get("path", "")),
"summary": node.get("summary", ""),
"ref": node.get("ref", ""),
"extra": {
"lang": node.get("lang"),
"score": node.get("score", 0)
}
"extra": {"lang": node.get("lang"), "score": node.get("score", 0)},
}

# Estimate size (title + summary + ref + overhead)
item_size = len(item["title"]) + len(item["summary"]) + len(item["ref"]) + 50
item_size = (
len(item["title"]) + len(item["summary"]) + len(item["ref"]) + 50
)
estimated_tokens = item_size // chars_per_token

# Check if adding this item would exceed budget
Expand Down Expand Up @@ -129,24 +125,24 @@ def build_context_pack(
"budget_limit": budget,
"stage": stage,
"repo_id": repo_id,
"category_counts": {
"file": file_count,
"symbol": symbol_count
}
"category_counts": {"file": file_count, "symbol": symbol_count},
}

@staticmethod
def _deduplicate_nodes(nodes: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""
Remove duplicate nodes based on ref handle.
If multiple nodes have the same ref, keep the one with highest score.
Nodes without a ref are preserved with a unique identifier.
"""
seen_refs = {}
nodes_without_ref = []

for node in nodes:
ref = node.get("ref")
if not ref:
# No ref, keep it (shouldn't happen normally)
# No ref, keep it in a separate list
nodes_without_ref.append(node)
continue

# Check if we've seen this ref before
Expand All @@ -159,21 +155,23 @@ def _deduplicate_nodes(nodes: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
else:
seen_refs[ref] = node

# Return deduplicated nodes
deduplicated = list(seen_refs.values())
# Combine deduplicated nodes with nodes without refs
deduplicated = list(seen_refs.values()) + nodes_without_ref
removed_count = len(nodes) - len(deduplicated)

if removed_count > 0:
logger.debug(f"Removed {removed_count} duplicate nodes")
if nodes_without_ref:
logger.debug(f"Preserved {len(nodes_without_ref)} nodes without ref")

return deduplicated

@staticmethod
def _extract_title(path: str) -> str:
"""Extract title from path (last 2 segments)"""
parts = path.split('/')
parts = path.split("/")
if len(parts) >= 2:
return '/'.join(parts[-2:])
return "/".join(parts[-2:])
return path


Expand Down