Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions modules/bundle-mapper/module-package.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
name: bundle-mapper
version: "0.1.0"
commands: []
pip_dependencies: []
module_dependencies: []
core_compatibility: ">=0.28.0,<1.0.0"
tier: community
schema_extensions:
project_bundle: {}
project_metadata:
bundle_mapper.mapping_rules:
type: "list | None"
description: "Persistent mapping rules from user confirmations"
bundle_mapper.history:
type: "dict | None"
description: "Auto-populated historical mappings (item_key -> bundle_id counts)"
publisher:
name: nold-ai
url: https://github.com/nold-ai/specfact-cli-modules
integrity:
checksum_algorithm: sha256
dependencies: []
7 changes: 7 additions & 0 deletions modules/bundle-mapper/src/bundle_mapper/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
"""Bundle mapper module: confidence-based spec-to-bundle assignment with interactive review."""

from bundle_mapper.mapper.engine import BundleMapper
from bundle_mapper.models.bundle_mapping import BundleMapping


__all__ = ["BundleMapper", "BundleMapping"]
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"""Command hooks for backlog refine/import --auto-bundle (used when module is loaded)."""
7 changes: 7 additions & 0 deletions modules/bundle-mapper/src/bundle_mapper/mapper/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
"""Bundle mapper engine and history."""

from bundle_mapper.mapper.engine import BundleMapper
from bundle_mapper.mapper.history import save_user_confirmed_mapping


__all__ = ["BundleMapper", "save_user_confirmed_mapping"]
204 changes: 204 additions & 0 deletions modules/bundle-mapper/src/bundle_mapper/mapper/engine.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
"""
BundleMapper engine: confidence-based mapping from backlog items to bundles.
"""

from __future__ import annotations

import re
from pathlib import Path
from typing import Any

from beartype import beartype
from icontract import ensure, require

from bundle_mapper.mapper.history import (
item_key,
item_keys_similar,
load_bundle_mapping_config,
)
from bundle_mapper.models.bundle_mapping import BundleMapping


try:
from specfact_cli.models.backlog_item import BacklogItem
except ImportError:
BacklogItem = Any # type: ignore[misc, assignment]

WEIGHT_EXPLICIT = 0.8
WEIGHT_HISTORICAL = 0.15
WEIGHT_CONTENT = 0.05
HISTORY_CAP = 10.0


def _tokenize(text: str) -> set[str]:
"""Lowercase, split by non-alphanumeric."""
return set(re.findall(r"[a-z0-9]+", text.lower()))


def _jaccard(a: set[str], b: set[str]) -> float:
"""Jaccard similarity between two sets."""
if not a and not b:
return 1.0
if not a or not b:
return 0.0
return len(a & b) / len(a | b)


@beartype
class BundleMapper:
"""
Computes mapping from backlog items to OpenSpec bundle ids using three signals:
explicit labels (bundle:xyz), historical patterns, content similarity.
"""

def __init__(
self,
available_bundle_ids: list[str] | None = None,
config_path: Path | None = None,
bundle_spec_keywords: dict[str, set[str]] | None = None,
) -> None:
"""
Args:
available_bundle_ids: Valid bundle ids (for explicit label validation).
config_path: Path to .specfact config for rules/history.
bundle_spec_keywords: Optional map bundle_id -> set of keywords from specs (for content similarity).
"""
self._available_bundle_ids = set(available_bundle_ids or [])
self._config_path = config_path
self._config: dict[str, Any] = {}
self._bundle_keywords = bundle_spec_keywords or {}

def _load_config(self) -> dict[str, Any]:
if not self._config:
self._config = load_bundle_mapping_config(self._config_path)
return self._config

@beartype
def _score_explicit_mapping(self, item: BacklogItem) -> tuple[str | None, float]:
"""Return (bundle_id, score) for explicit bundle:xyz tag, or (None, 0.0)."""
prefix = self._load_config().get("explicit_label_prefix", "bundle:")
for tag in item.tags:
tag = (tag or "").strip()
if tag.startswith(prefix):
bundle_id = tag[len(prefix) :].strip()
if bundle_id and (not self._available_bundle_ids or bundle_id in self._available_bundle_ids):
return (bundle_id, 1.0)
return (None, 0.0)

@beartype
def _score_historical_mapping(self, item: BacklogItem) -> tuple[str | None, float]:
"""Return (bundle_id, score) from history, or (None, 0.0)."""
key = item_key(item)
history = self._load_config().get("history", {})
best_bundle: str | None = None
best_count = 0
for hist_key, entry in history.items():
if not item_keys_similar(key, hist_key):
continue
counts = entry.get("counts", {})
for bid, cnt in counts.items():
if cnt > best_count:
best_count = cnt
best_bundle = bid
if best_bundle is None:
return (None, 0.0)
score = min(1.0, best_count / HISTORY_CAP)
return (best_bundle, score)

@beartype
def _score_content_similarity(self, item: BacklogItem) -> list[tuple[str, float]]:
"""Return list of (bundle_id, score) by keyword overlap with item title/body."""
text = f"{item.title} {item.body_markdown or ''}"
tokens = _tokenize(text)
if not tokens:
return []
results: list[tuple[str, float]] = []
for bundle_id, keywords in self._bundle_keywords.items():
sim = _jaccard(tokens, keywords)
if sim > 0:
results.append((bundle_id, sim))
return sorted(results, key=lambda x: -x[1])

@beartype
def _explain_score(self, bundle_id: str, score: float, method: str) -> str:
"""Human-readable one-line explanation."""
if method == "explicit_label":
return f"Explicit label β†’ {bundle_id} (confidence {score:.2f})"
if method == "historical":
return f"Historical pattern β†’ {bundle_id} (confidence {score:.2f})"
if method == "content_similarity":
return f"Content similarity β†’ {bundle_id} (confidence {score:.2f})"
return f"{bundle_id} (confidence {score:.2f})"

@beartype
def _build_explanation(
self,
primary_bundle_id: str | None,
confidence: float,
candidates: list[tuple[str, float]],
reasons: list[str],
) -> str:
"""Build full explanation string."""
parts = [f"Confidence: {confidence:.2f}"]
if reasons:
parts.append("; ".join(reasons))
if candidates:
parts.append("Alternatives: " + ", ".join(f"{b}({s:.2f})" for b, s in candidates[:5]))
return ". ".join(parts)

@beartype
@require(lambda item: item is not None, "Item must not be None")
@ensure(
lambda result: 0.0 <= result.confidence <= 1.0,
"Confidence in [0, 1]",
)
def compute_mapping(self, item: BacklogItem) -> BundleMapping:
"""
Compute mapping for one backlog item using weighted signals:
0.8 * explicit + 0.15 * historical + 0.05 * content.
"""
reasons: list[str] = []
explicit_bundle, explicit_score = self._score_explicit_mapping(item)
hist_bundle, hist_score = self._score_historical_mapping(item)
content_list = self._score_content_similarity(item)

primary_bundle_id: str | None = None
weighted = 0.0

if explicit_bundle and explicit_score > 0:
primary_bundle_id = explicit_bundle
weighted += WEIGHT_EXPLICIT * explicit_score
reasons.append(self._explain_score(explicit_bundle, explicit_score, "explicit_label"))

if hist_bundle and hist_score > 0:
contrib = WEIGHT_HISTORICAL * hist_score
if primary_bundle_id is None:
primary_bundle_id = hist_bundle
weighted += contrib
reasons.append(self._explain_score(hist_bundle, hist_score, "historical"))
elif hist_bundle == primary_bundle_id:
weighted += contrib

if content_list:
best_content = content_list[0]
contrib = WEIGHT_CONTENT * best_content[1]
weighted += contrib
if primary_bundle_id is None:
primary_bundle_id = best_content[0]
reasons.append(self._explain_score(best_content[0], best_content[1], "content_similarity"))

confidence = min(1.0, weighted)
candidates: list[tuple[str, float]] = []
if primary_bundle_id:
seen = {primary_bundle_id}
for bid, sc in content_list:
if bid not in seen:
seen.add(bid)
candidates.append((bid, sc * WEIGHT_CONTENT))
explanation = self._build_explanation(primary_bundle_id, confidence, candidates, reasons)
return BundleMapping(
primary_bundle_id=primary_bundle_id,
confidence=confidence,
candidates=candidates[:10],
explained_reasoning=explanation,
)
138 changes: 138 additions & 0 deletions modules/bundle-mapper/src/bundle_mapper/mapper/history.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
"""
Mapping history persistence: save and load user-confirmed mappings from config.
"""

from __future__ import annotations

import re
from pathlib import Path
from typing import Any, Protocol, runtime_checkable

import yaml
from beartype import beartype
from icontract import ensure, require
from pydantic import BaseModel, Field


DEFAULT_LABEL_PREFIX = "bundle:"
DEFAULT_AUTO_ASSIGN_THRESHOLD = 0.8
DEFAULT_CONFIRM_THRESHOLD = 0.5


@runtime_checkable
class _ItemLike(Protocol):
"""Minimal interface for backlog item used by history."""

id: str
assignees: list[str]
area: str | None
tags: list[str]


class MappingRule(BaseModel):
"""A single mapping rule (pattern -> bundle_id)."""

pattern: str = Field(..., description="Pattern: tag=~regex, assignee=exact, area=exact")
bundle_id: str = Field(..., description="Target bundle id")
action: str = Field(default="assign", description="Action: assign")
confidence: float = Field(default=1.0, ge=0.0, le=1.0, description="Rule confidence")

@beartype
def matches(self, item: _ItemLike) -> bool:
"""Return True if this rule matches the item."""
if self.pattern.startswith("tag=~"):
regex = self.pattern[5:].strip()
try:
pat = re.compile(regex)
except re.error:
return False
return any(pat.search(t) for t in item.tags)
if self.pattern.startswith("assignee="):
val = self.pattern[9:].strip()
return val in item.assignees
if self.pattern.startswith("area="):
val = self.pattern[5:].strip()
return item.area == val
return False


def item_key(item: _ItemLike) -> str:
"""Build a stable key for history lookup (area, assignee, tags)."""
area = (item.area or "").strip()
assignee = (item.assignees[0] if item.assignees else "").strip()
tags_str = "|".join(sorted(t.strip() for t in item.tags if t))
return f"area={area}|assignee={assignee}|tags={tags_str}"


def item_keys_similar(key_a: str, key_b: str) -> bool:
"""Return True if keys share at least 2 of 3 non-empty components (area, assignee, tags). Empty fields are ignored to avoid matching unrelated items."""

def parts(k: str) -> tuple[str, str, str]:
d: dict[str, str] = {}
for seg in k.split("|"):
if "=" in seg:
name, val = seg.split("=", 1)
d[name.strip()] = val.strip()
return (d.get("area", ""), d.get("assignee", ""), d.get("tags", ""))

a1, a2, a3 = parts(key_a)
b1, b2, b3 = parts(key_b)
matches = 0
if a1 and b1 and a1 == b1:
matches += 1
if a2 and b2 and a2 == b2:
matches += 1
if a3 and b3 and a3 == b3:
matches += 1
return matches >= 2


@beartype
@require(lambda config_path: config_path is None or config_path.exists() or not config_path.exists(), "Path valid")
@ensure(lambda result: result is None, "Returns None")
def save_user_confirmed_mapping(
item: _ItemLike,
bundle_id: str,
config_path: Path | None = None,
) -> None:
"""
Persist a user-confirmed mapping: increment history count and save to config.

Creates item_key from item metadata, increments mapping count in history,
and writes backlog.bundle_mapping.history to config_path (or default .specfact/config.yaml).
"""
if config_path is None:
config_path = Path.home() / ".specfact" / "config.yaml"
key = item_key(item)
data: dict[str, Any] = {}
if config_path.exists():
with open(config_path, encoding="utf-8") as f:
data = yaml.safe_load(f) or {}
backlog = data.setdefault("backlog", {})
bm = backlog.setdefault("bundle_mapping", {})
history = bm.setdefault("history", {})
entry = history.setdefault(key, {})
counts = entry.setdefault("counts", {})
counts[bundle_id] = counts.get(bundle_id, 0) + 1
config_path.parent.mkdir(parents=True, exist_ok=True)
with open(config_path, "w", encoding="utf-8") as f:
yaml.safe_dump(data, f, default_flow_style=False, sort_keys=False)


@beartype
def load_bundle_mapping_config(config_path: Path | None = None) -> dict[str, Any]:
"""Load backlog.bundle_mapping section from config; return dict with rules, history, thresholds."""
if config_path is None:
config_path = Path.home() / ".specfact" / "config.yaml"
data: dict[str, Any] = {}
if config_path.exists():
with open(config_path, encoding="utf-8") as f:
data = yaml.safe_load(f) or {}
bm = (data.get("backlog") or {}).get("bundle_mapping") or {}
return {
"rules": bm.get("rules", []),
"history": bm.get("history", {}),
"explicit_label_prefix": bm.get("explicit_label_prefix", DEFAULT_LABEL_PREFIX),
"auto_assign_threshold": float(bm.get("auto_assign_threshold", DEFAULT_AUTO_ASSIGN_THRESHOLD)),
"confirm_threshold": float(bm.get("confirm_threshold", DEFAULT_CONFIRM_THRESHOLD)),
}
6 changes: 6 additions & 0 deletions modules/bundle-mapper/src/bundle_mapper/models/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
"""Bundle mapper models."""

from bundle_mapper.models.bundle_mapping import BundleMapping


__all__ = ["BundleMapping"]
Loading
Loading