diff --git a/README.md b/README.md index 9150d9b..70b184b 100644 --- a/README.md +++ b/README.md @@ -11,6 +11,22 @@ Roblox Sentinel, part of the Roblox Safety Toolkit, is a Python library designed By prioritizing recall over precision, Sentinel serves as a high-recall candidate generator for more thorough investigation. This approach is particularly effective for applications where rare patterns are critical to identify. Rather than treating each message in isolation, Sentinel analyzes patterns across messages to identify concerning behavior. +## What’s New: Aggregation options and Explainability + +Sentinel now includes multiple aggregation strategies and built‑in explainability to help you tune for your use case and understand why a score was assigned. + +- Aggregators (in `sentinel.score_formulae`): + - `skewness(scores, min_size_of_scores=10)`: default, pattern‑oriented and robust to message count + - `top_k_mean(scores, k=3)`: focuses on the strongest signals + - `percentile_score(scores, q=90.0)`: robust to outliers via a percentile over positives + - `softmax_weighted_mean(scores, temperature=1.0)`: smoothly emphasizes higher scores + - `max_score(scores)`: simplest, picks the highest positive score + +- Explainability (in results): + - Each call to `calculate_rare_class_affinity` returns a `RareClassAffinityResult` with: + - `aggregation_name`, `aggregation_stats`: which aggregator was used and key params + - `explanations`: per‑text details including top‑K positive/negative similarities, contrastive components, and neighbor snippets (when available) + ## Terminology In Sentinel's codebase: @@ -65,6 +81,16 @@ print(f"Overall rare class affinity score: {overall_score:.4f}") for message, score in result.observation_scores.items(): risk_level = "High" if score > 0.5 else "Medium" if score > 0.1 else "Low" print(f"'{message}' - Score: {score:.4f} - Risk: {risk_level}") + +# Inspect explainability +print("Aggregator:", result.aggregation_name) +print("Aggregation stats:", result.aggregation_stats) +for message, ex in result.explanations.items(): + print("--", message) + print(" topk_positive:", ex["topk_positive"]) # scaled similarities + print(" topk_negative:", ex["topk_negative"]) # scaled similarities + print(" contrastive:", ex["contrastive"]) # positive_term, negative_term, log_ratio_unclipped + print(" neighbors (sample):", ex["neighbors"][:2] if ex["neighbors"] else None) ``` ## Creating a New Index @@ -109,6 +135,32 @@ saved_config = index.save( aws_access_key_id="YOUR_ACCESS_KEY_ID", # Optional if using environment credentials aws_secret_access_key="YOUR_SECRET_ACCESS_KEY" # Optional if using environment credentials ) + +## Choosing an aggregation strategy + +Different deployments optimize for different trade‑offs. You can swap in any aggregator using the `aggregation_function` argument: + +```python +from sentinel.score_formulae import top_k_mean, percentile_score, softmax_weighted_mean, max_score + +texts = ["msg a", "msg b", "msg c"] + +# Focus on the strongest few signals +res1 = index.calculate_rare_class_affinity(texts, aggregation_function=lambda arr: top_k_mean(arr, k=3)) + +# Robust to outliers +res2 = index.calculate_rare_class_affinity(texts, aggregation_function=lambda arr: percentile_score(arr, q=90)) + +# Smoothly emphasize higher scores +res3 = index.calculate_rare_class_affinity(texts, aggregation_function=lambda arr: softmax_weighted_mean(arr, temperature=0.5)) + +# Simplest, picks the maximum +res4 = index.calculate_rare_class_affinity(texts, aggregation_function=max_score) +``` + +Notes: +- All aggregators operate over per‑observation scores where non‑confident observations are already clipped to 0. +- The default `skewness` remains a good choice when user activity volume varies widely. ``` ## How It Works diff --git a/src/sentinel/__init__.py b/src/sentinel/__init__.py index 2973bea..dca2c35 100644 --- a/src/sentinel/__init__.py +++ b/src/sentinel/__init__.py @@ -19,6 +19,23 @@ """ from sentinel.sentinel_local_index import SentinelLocalIndex -from sentinel.score_formulae import calculate_contrastive_score +from sentinel.score_formulae import ( + calculate_contrastive_score, + skewness, + mean_of_positives, + top_k_mean, + percentile_score, + softmax_weighted_mean, + max_score, +) -__all__ = ["SentinelLocalIndex", "calculate_contrastive_score"] +__all__ = [ + "SentinelLocalIndex", + "calculate_contrastive_score", + "skewness", + "mean_of_positives", + "top_k_mean", + "percentile_score", + "softmax_weighted_mean", + "max_score", +] diff --git a/src/sentinel/score_formulae.py b/src/sentinel/score_formulae.py index bb59bb5..90e709f 100644 --- a/src/sentinel/score_formulae.py +++ b/src/sentinel/score_formulae.py @@ -12,7 +12,14 @@ # See the License for the specific language governing permissions and # limitations under the License. -"""Score calculation functions for Sentinel index.""" +"""Score calculation functions for Sentinel index. + +This module contains per-observation scoring utilities (contrastive scoring) +and aggregation functions to combine multiple observation scores into a single +affinity number. In addition to the default skewness, a set of robust +alternatives are provided to fit different deployment preferences (recall vs precision, +stability vs sensitivity, etc.). +""" import numpy as np from typing import List, Callable @@ -70,6 +77,117 @@ def skewness(scores: np.array, min_size_of_scores: int = 10) -> float: return (mean - median) / std +def top_k_mean(scores: np.array, k: int = 3) -> float: + """Mean of the top-k positive scores. + + Focuses on the strongest signals while ignoring noise and negatives. + + Args: + scores: Array of observation scores. + k: Number of highest positive scores to average. + + Returns: + Mean of the top-k positive scores (0.0 if no positive scores). + """ + if scores.size == 0: + return 0.0 + positives = scores[scores > 0] + if positives.size == 0: + return 0.0 + k = min(k, positives.size) + # Use partition for efficiency, then mean of the largest k + idx = np.argpartition(positives, -k)[-k:] + return float(np.mean(positives[idx])) + + +def percentile_score(scores: np.array, q: float = 90.0) -> float: + """Return the q-th percentile among positive scores (robust to outliers). + + Args: + scores: Array of observation scores. + q: Percentile in [0, 100]. + + Returns: + q-th percentile of positive scores (0.0 if no positive scores). + """ + if scores.size == 0: + return 0.0 + positives = scores[scores > 0] + if positives.size == 0: + return 0.0 + return float(np.percentile(positives, q)) + + +def softmax_weighted_mean(scores: np.array, temperature: float = 1.0) -> float: + """Softmax-weighted mean over positive scores. + + Emphasizes higher scores while keeping some contribution from smaller ones. + + Args: + scores: Array of observation scores. + temperature: Softmax temperature (>0). Lower values emphasize peaks more. + + Returns: + Softmax-weighted average of positive scores (0.0 if no positive scores). + """ + if scores.size == 0: + return 0.0 + positives = scores[scores > 0] + if positives.size == 0: + return 0.0 + t = max(1e-6, float(temperature)) + x = positives / t + # Numerical stability + x = x - np.max(x) + w = np.exp(x) + w = w / np.sum(w) + return float(np.sum(w * positives)) + + +def max_score(scores: np.array) -> float: + """Maximum positive score (simple, sensitive, and easy to interpret).""" + if scores.size == 0: + return 0.0 + positives = scores[scores > 0] + if positives.size == 0: + return 0.0 + return float(np.max(positives)) + + +def contrastive_components( + similarities_topk_pos: List[float], + similarities_topk_neg: List[float], + aggregation_fn: Callable[[np.array], float] = np.mean, +): + """Return contrastive components and final log-ratio for a single observation. + + Computes the positive and negative terms used by the contrastive score and + the unclipped log ratio. Useful for explainability. + + Returns: + (positives_term, negatives_term, log_ratio) + """ + if len(similarities_topk_pos) <= 0 or len(similarities_topk_neg) <= 0: + raise ValueError( + "The lists of similarities must have at least one element each." + ) + + similarities_topk_pos = np.array(similarities_topk_pos) + similarities_topk_neg = np.array(similarities_topk_neg) + + positives_term = aggregation_fn(np.exp(similarities_topk_pos)) + negatives_term = aggregation_fn(np.exp(similarities_topk_neg)) + + # Avoid divide-by-zero (shouldn’t happen with exp, but be safe) + if negatives_term == 0: + log_ratio = np.inf + else: + ratio = positives_term / negatives_term + log_ratio = np.log(ratio) + + return float(positives_term), float(negatives_term), float(log_ratio) + + def calculate_contrastive_score( similarities_topk_pos: List[float], similarities_topk_neg: List[float], @@ -94,19 +212,10 @@ def calculate_contrastive_score( Returns: A contrastive score where values > 0 indicate closer similarity to rare class content """ - if len(similarities_topk_pos) <= 0 or len(similarities_topk_neg) <= 0: - raise ValueError( - "The lists of similarities must have at least one element each." - ) - - similarities_topk_pos = np.array(similarities_topk_pos) - similarities_topk_neg = np.array(similarities_topk_neg) - - positives_term = aggregation_fn(np.exp(similarities_topk_pos)) - negatives_term = aggregation_fn(np.exp(similarities_topk_neg)) - - contrastive_score = positives_term / negatives_term - - if contrastive_score <= 1.0: - return 0 # Clip to zero to avoid negative scores, since we accumulate this score for all chat lines of a user. - return np.log(contrastive_score) + positives_term, negatives_term, log_ratio = contrastive_components( + similarities_topk_pos, similarities_topk_neg, aggregation_fn + ) + # Clip to zero to avoid negative scores, since we accumulate this score for all chat lines of a user. + if log_ratio <= 0.0: + return 0.0 + return float(log_ratio) diff --git a/src/sentinel/score_types.py b/src/sentinel/score_types.py index 9cfe4bb..68fba06 100644 --- a/src/sentinel/score_types.py +++ b/src/sentinel/score_types.py @@ -15,28 +15,36 @@ """Data types for rare class detection and scoring.""" from dataclasses import dataclass -from typing import Dict +from typing import Dict, Optional, Any @dataclass class RareClassAffinityResult: - """Result of calculating affinity to a rare class of text. - - This class contains both: - 1. The overall rare_class_affinity_score for a collection of texts, which is used to prioritize - cases for further investigation in a realtime context - 2. The individual observation_scores for each text, which can be used to identify which specific - observations contributed most to the overall pattern - - As a high-recall candidate generator, this result helps identify potential instances of rare - classes that warrant closer examination, prioritizing not missing true positives even at the - cost of some false positives. - - Attributes: - rare_class_affinity_score: The aggregated score indicating overall affinity to the rare class, - typically calculated using skewness to identify patterns - observation_scores: Dictionary mapping each input text to its individual similarity score - """ - - rare_class_affinity_score: float - observation_scores: Dict[str, float] + """Result of calculating affinity to a rare class of text. + + This class contains both: + 1. The overall rare_class_affinity_score for a collection of texts, which is used to prioritize + cases for further investigation in a realtime context. + 2. The individual observation_scores for each text, which can be used to identify which specific + observations contributed most to the overall pattern. + + As a high-recall candidate generator, this result helps identify potential instances of rare + classes that warrant closer examination, prioritizing not missing true positives even at the + cost of some false positives. + + Attributes: + rare_class_affinity_score: The aggregated score indicating overall affinity to the rare class, + typically calculated using skewness to identify patterns. + observation_scores: Mapping of input text to its individual similarity score. + aggregation_name: Optional name of the aggregation function used. + aggregation_stats: Optional dictionary with aggregation-relevant statistics + (e.g. top_k, percentile, temperature, num_positives). + explanations: Optional per-text explainability records describing which neighbors and + components contributed to each score. + """ + + rare_class_affinity_score: float + observation_scores: Dict[str, float] + aggregation_name: Optional[str] = None + aggregation_stats: Optional[Dict[str, Any]] = None + explanations: Optional[Dict[str, Any]] = None diff --git a/src/sentinel/sentinel_local_index.py b/src/sentinel/sentinel_local_index.py index 8730bfb..42a1660 100644 --- a/src/sentinel/sentinel_local_index.py +++ b/src/sentinel/sentinel_local_index.py @@ -26,7 +26,7 @@ from sentence_transformers import SentenceTransformer from sentence_transformers.util import semantic_search -from sentinel.score_formulae import calculate_contrastive_score, skewness +from sentinel.score_formulae import calculate_contrastive_score, skewness, contrastive_components from sentinel.io.saved_index_config import SavedIndexConfig from sentinel.io.index_io import save_index, load_index, create_s3_transport_params from sentinel.embeddings.sbert import get_sentence_transformer_and_scaling_fn @@ -253,17 +253,15 @@ def calculate_rare_class_affinity( self, text_samples: List[str], top_k: int = 5, - similarity_formula: Callable[ - [List[float], List[float]], float - ] = calculate_contrastive_score, + similarity_formula: Callable[[List[float], List[float]], float] = calculate_contrastive_score, # Function to aggregate individual scores into an overall affinity score aggregation_function: Callable[[np.array], float] = skewness, # Margin to ignore when text is only slightly more similar to positive than negative. min_score_to_consider: float = 0.1, # Use when simulating by sampling texts from the same data indexed. - prevent_exact_match: bool = False, - encoding_additional_kwargs: Mapping[str, Any] = {}, - show_progress_bar: bool = False, + prevent_exact_match: bool = False, + encoding_additional_kwargs: Mapping[str, Any] = {}, + show_progress_bar: bool = False, ) -> RareClassAffinityResult: """Calculate rare class affinity for the given text samples in realtime. @@ -320,7 +318,13 @@ def calculate_rare_class_affinity( top_k=top_k + additional_neighbors, ) + # Explainability defaults (always on for transparency) + explain = True + include_neighbors = True + neighbors_limit = 5 + observation_scores = {} + explanations = {} if explain else None for i, q in enumerate(text_samples): LOG.debug("Query: %s", q) @@ -340,6 +344,7 @@ def calculate_rare_class_affinity( similarities_topk_positive = [] similarities_topk_negative = [] max_h = top_k # Number of examples to consider + neighbor_records = [] if include_neighbors else None # Process each match in order of similarity (highest first) for h, (score, corpus_id, sign) in enumerate(matches): @@ -381,6 +386,22 @@ def calculate_rare_class_affinity( f"[{sign}] {neighbor} (Score: {score:.4f}, Scaled Score: {scaled_score:.4f})" ) + if include_neighbors and len(neighbor_records) < neighbors_limit: + # Keep a compact neighbor record for explainability + try: + corpus_id_int = int(corpus_id) + except Exception: + corpus_id_int = int(corpus_id) if isinstance(corpus_id, (int, np.integer)) else 0 + neighbor_records.append( + { + "sign": "+" if sign == "+" else "-", + "raw_score": float(score), + "scaled_score": float(scaled_score), + "neighbor": neighbor, + "corpus_id": corpus_id_int, + } + ) + # Ensure we have at least one similarity value for each category # If we didn't find any of a particular category in the top matches, # use the first match from the original search @@ -409,6 +430,25 @@ def calculate_rare_class_affinity( else: observation_scores[q] = score + # Per-text explainability + if explain: + pos_term, neg_term, log_ratio = contrastive_components( + similarities_topk_pos=similarities_topk_positive, + similarities_topk_neg=similarities_topk_negative, + ) + explanations[q] = { + "topk_positive": [float(x) for x in similarities_topk_positive], + "topk_negative": [float(x) for x in similarities_topk_negative], + "contrastive": { + "positive_term": pos_term, + "negative_term": neg_term, + "log_ratio_unclipped": log_ratio, + }, + "neighbors": neighbor_records[:neighbors_limit] + if include_neighbors and neighbor_records is not None + else None, + } + # Calculate the overall rare class affinity score by aggregating individual scores # If there are no scores, default to 0.0 if not observation_scores: @@ -418,7 +458,21 @@ def calculate_rare_class_affinity( np.array(list(observation_scores.values())) ) + # Aggregation metadata for explainability + agg_name = getattr(aggregation_function, "__name__", str(aggregation_function)) + agg_stats = { + "num_texts": len(text_samples), + "num_positive_scores": int( + np.sum(np.array(list(observation_scores.values())) > 0) + ), + "top_k_per_observation": top_k, + "min_score_to_consider": float(min_score_to_consider), + } + return RareClassAffinityResult( rare_class_affinity_score=rare_class_score, observation_scores=observation_scores, + aggregation_name=agg_name, + aggregation_stats=agg_stats, + explanations=explanations if explain else None, ) diff --git a/tests/conftest.py b/tests/conftest.py index c578a2f..7ee8517 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -15,6 +15,8 @@ """Shared test fixtures and configurations for Sentinel tests.""" import os +import sys +import pathlib import pytest import numpy as np import torch @@ -22,6 +24,13 @@ from unittest.mock import MagicMock +# Ensure the package under src/ is importable without installation +_REPO_ROOT = pathlib.Path(__file__).resolve().parents[1] +_SRC_PATH = _REPO_ROOT / "src" +if str(_SRC_PATH) not in sys.path: + sys.path.insert(0, str(_SRC_PATH)) + + # Set up logging for tests @pytest.fixture(autouse=True) def setup_logging(): diff --git a/tests/test_score_formulae.py b/tests/test_score_formulae.py index 0115bc0..214124a 100644 --- a/tests/test_score_formulae.py +++ b/tests/test_score_formulae.py @@ -21,6 +21,10 @@ mean_of_positives, calculate_contrastive_score, skewness, + top_k_mean, + percentile_score, + softmax_weighted_mean, + max_score, ) @@ -125,3 +129,24 @@ def test_skewness(): empty_scores = np.array([]) result = skewness(empty_scores) assert np.isclose(result, 0.0), "Should return 0.0 for empty array" + + +def test_additional_aggregators(): + scores = np.array([0.0, 0.2, 0.5, 1.0, 0.7, -0.1, 0.3]) + + # top_k_mean + val = top_k_mean(scores, k=2) + assert np.isclose(val, np.mean([1.0, 0.7])) + + # percentile_score + val = percentile_score(scores, q=50) + # positives are [0.2, 0.5, 1.0, 0.7, 0.3]; median = 0.5 + assert np.isclose(val, 0.5) + + # softmax_weighted_mean (temperature=1) + val = softmax_weighted_mean(scores, temperature=1.0) + assert val > 0.5 and val <= 1.0 + + # max_score + val = max_score(scores) + assert np.isclose(val, 1.0) diff --git a/tests/test_sriracha_local_index.py b/tests/test_sriracha_local_index.py index a34f769..7e95f7d 100644 --- a/tests/test_sriracha_local_index.py +++ b/tests/test_sriracha_local_index.py @@ -168,6 +168,16 @@ def test_calculate_rare_class_affinity(self, simple_index): ) assert all(score == 0.0 for score in result.observation_scores.values()) + # Explainability fields present + assert result.aggregation_name is not None + assert isinstance(result.aggregation_stats, dict) + assert result.explanations is not None + # Each input has an explanation + for t in mixed_text: + assert t in result.explanations + ex = result.explanations[t] + assert "topk_positive" in ex and "topk_negative" in ex and "contrastive" in ex + # Integration test combining various components @pytest.mark.integration