Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 52 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,22 @@ Roblox Sentinel, part of the Roblox Safety Toolkit, is a Python library designed

By prioritizing recall over precision, Sentinel serves as a high-recall candidate generator for more thorough investigation. This approach is particularly effective for applications where rare patterns are critical to identify. Rather than treating each message in isolation, Sentinel analyzes patterns across messages to identify concerning behavior.

## What’s New: Aggregation options and Explainability
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be in its own file, CHANGELOG.md, and should follow https://keepachangelog.com/en/1.1.0/.


Sentinel now includes multiple aggregation strategies and built‑in explainability to help you tune for your use case and understand why a score was assigned.

- Aggregators (in `sentinel.score_formulae`):
- `skewness(scores, min_size_of_scores=10)`: default, pattern‑oriented and robust to message count
- `top_k_mean(scores, k=3)`: focuses on the strongest signals
- `percentile_score(scores, q=90.0)`: robust to outliers via a percentile over positives
- `softmax_weighted_mean(scores, temperature=1.0)`: smoothly emphasizes higher scores
- `max_score(scores)`: simplest, picks the highest positive score

- Explainability (in results):
- Each call to `calculate_rare_class_affinity` returns a `RareClassAffinityResult` with:
- `aggregation_name`, `aggregation_stats`: which aggregator was used and key params
- `explanations`: per‑text details including top‑K positive/negative similarities, contrastive components, and neighbor snippets (when available)

## Terminology

In Sentinel's codebase:
Expand Down Expand Up @@ -65,6 +81,16 @@ print(f"Overall rare class affinity score: {overall_score:.4f}")
for message, score in result.observation_scores.items():
risk_level = "High" if score > 0.5 else "Medium" if score > 0.1 else "Low"
print(f"'{message}' - Score: {score:.4f} - Risk: {risk_level}")

# Inspect explainability
print("Aggregator:", result.aggregation_name)
print("Aggregation stats:", result.aggregation_stats)
for message, ex in result.explanations.items():
print("--", message)
print(" topk_positive:", ex["topk_positive"]) # scaled similarities
print(" topk_negative:", ex["topk_negative"]) # scaled similarities
print(" contrastive:", ex["contrastive"]) # positive_term, negative_term, log_ratio_unclipped
print(" neighbors (sample):", ex["neighbors"][:2] if ex["neighbors"] else None)
```

## Creating a New Index
Expand Down Expand Up @@ -109,6 +135,32 @@ saved_config = index.save(
aws_access_key_id="YOUR_ACCESS_KEY_ID", # Optional if using environment credentials
aws_secret_access_key="YOUR_SECRET_ACCESS_KEY" # Optional if using environment credentials
)

## Choosing an aggregation strategy

Different deployments optimize for different trade‑offs. You can swap in any aggregator using the `aggregation_function` argument:

```python
from sentinel.score_formulae import top_k_mean, percentile_score, softmax_weighted_mean, max_score

texts = ["msg a", "msg b", "msg c"]

# Focus on the strongest few signals
res1 = index.calculate_rare_class_affinity(texts, aggregation_function=lambda arr: top_k_mean(arr, k=3))

# Robust to outliers
res2 = index.calculate_rare_class_affinity(texts, aggregation_function=lambda arr: percentile_score(arr, q=90))

# Smoothly emphasize higher scores
res3 = index.calculate_rare_class_affinity(texts, aggregation_function=lambda arr: softmax_weighted_mean(arr, temperature=0.5))

# Simplest, picks the maximum
res4 = index.calculate_rare_class_affinity(texts, aggregation_function=max_score)
```

Notes:
- All aggregators operate over per‑observation scores where non‑confident observations are already clipped to 0.
- The default `skewness` remains a good choice when user activity volume varies widely.
```

## How It Works
Expand Down
21 changes: 19 additions & 2 deletions src/sentinel/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,23 @@
"""

from sentinel.sentinel_local_index import SentinelLocalIndex
from sentinel.score_formulae import calculate_contrastive_score
from sentinel.score_formulae import (
calculate_contrastive_score,
skewness,
mean_of_positives,
top_k_mean,
percentile_score,
softmax_weighted_mean,
max_score,
)

__all__ = ["SentinelLocalIndex", "calculate_contrastive_score"]
__all__ = [
"SentinelLocalIndex",
"calculate_contrastive_score",
"skewness",
"mean_of_positives",
"top_k_mean",
"percentile_score",
"softmax_weighted_mean",
"max_score",
]
143 changes: 126 additions & 17 deletions src/sentinel/score_formulae.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,14 @@
# See the License for the specific language governing permissions and
# limitations under the License.

"""Score calculation functions for Sentinel index."""
"""Score calculation functions for Sentinel index.

This module contains per-observation scoring utilities (contrastive scoring)
and aggregation functions to combine multiple observation scores into a single
affinity number. In addition to the default skewness, a set of robust
alternatives are provided to fit different deployment preferences (recall vs precision,
stability vs sensitivity, etc.).
"""

import numpy as np
from typing import List, Callable
Expand Down Expand Up @@ -70,6 +77,117 @@ def skewness(scores: np.array, min_size_of_scores: int = 10) -> float:
return (mean - median) / std


def top_k_mean(scores: np.array, k: int = 3) -> float:
"""Mean of the top-k positive scores.

Focuses on the strongest signals while ignoring noise and negatives.

Args:
scores: Array of observation scores.
k: Number of highest positive scores to average.

Returns:
Mean of the top-k positive scores (0.0 if no positive scores).
"""
if scores.size == 0:
return 0.0
positives = scores[scores > 0]
if positives.size == 0:
return 0.0
k = min(k, positives.size)
# Use partition for efficiency, then mean of the largest k
idx = np.argpartition(positives, -k)[-k:]
return float(np.mean(positives[idx]))


def percentile_score(scores: np.array, q: float = 90.0) -> float:
"""Return the q-th percentile among positive scores (robust to outliers).

Args:
scores: Array of observation scores.
q: Percentile in [0, 100].

Returns:
q-th percentile of positive scores (0.0 if no positive scores).
"""
if scores.size == 0:
return 0.0
positives = scores[scores > 0]
if positives.size == 0:
return 0.0
return float(np.percentile(positives, q))


def softmax_weighted_mean(scores: np.array, temperature: float = 1.0) -> float:
"""Softmax-weighted mean over positive scores.

Emphasizes higher scores while keeping some contribution from smaller ones.

Args:
scores: Array of observation scores.
temperature: Softmax temperature (>0). Lower values emphasize peaks more.

Returns:
Softmax-weighted average of positive scores (0.0 if no positive scores).
"""
if scores.size == 0:
return 0.0
positives = scores[scores > 0]
if positives.size == 0:
return 0.0
t = max(1e-6, float(temperature))
x = positives / t
# Numerical stability
x = x - np.max(x)
w = np.exp(x)
w = w / np.sum(w)
return float(np.sum(w * positives))


def max_score(scores: np.array) -> float:
"""Maximum positive score (simple, sensitive, and easy to interpret)."""
if scores.size == 0:
return 0.0
positives = scores[scores > 0]
if positives.size == 0:
return 0.0
return float(np.max(positives))


def contrastive_components(
similarities_topk_pos: List[float],
similarities_topk_neg: List[float],
aggregation_fn: Callable[[np.array], float] = np.mean,
):
"""Return contrastive components and final log-ratio for a single observation.

Computes the positive and negative terms used by the contrastive score and
the unclipped log ratio. Useful for explainability.

Returns:
(positives_term, negatives_term, log_ratio)
"""
if len(similarities_topk_pos) <= 0 or len(similarities_topk_neg) <= 0:
raise ValueError(
"The lists of similarities must have at least one element each."
)

similarities_topk_pos = np.array(similarities_topk_pos)
similarities_topk_neg = np.array(similarities_topk_neg)

positives_term = aggregation_fn(np.exp(similarities_topk_pos))
negatives_term = aggregation_fn(np.exp(similarities_topk_neg))

# Avoid divide-by-zero (shouldn’t happen with exp, but be safe)
if negatives_term == 0:
log_ratio = np.inf
else:
ratio = positives_term / negatives_term
log_ratio = np.log(ratio)

return float(positives_term), float(negatives_term), float(log_ratio)


def calculate_contrastive_score(
similarities_topk_pos: List[float],
similarities_topk_neg: List[float],
Expand All @@ -94,19 +212,10 @@ def calculate_contrastive_score(
Returns:
A contrastive score where values > 0 indicate closer similarity to rare class content
"""
if len(similarities_topk_pos) <= 0 or len(similarities_topk_neg) <= 0:
raise ValueError(
"The lists of similarities must have at least one element each."
)

similarities_topk_pos = np.array(similarities_topk_pos)
similarities_topk_neg = np.array(similarities_topk_neg)

positives_term = aggregation_fn(np.exp(similarities_topk_pos))
negatives_term = aggregation_fn(np.exp(similarities_topk_neg))

contrastive_score = positives_term / negatives_term

if contrastive_score <= 1.0:
return 0 # Clip to zero to avoid negative scores, since we accumulate this score for all chat lines of a user.
return np.log(contrastive_score)
positives_term, negatives_term, log_ratio = contrastive_components(
similarities_topk_pos, similarities_topk_neg, aggregation_fn
)
# Clip to zero to avoid negative scores, since we accumulate this score for all chat lines of a user.
if log_ratio <= 0.0:
return 0.0
return float(log_ratio)
50 changes: 29 additions & 21 deletions src/sentinel/score_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,28 +15,36 @@
"""Data types for rare class detection and scoring."""

from dataclasses import dataclass
from typing import Dict
from typing import Dict, Optional, Any


@dataclass
class RareClassAffinityResult:
"""Result of calculating affinity to a rare class of text.

This class contains both:
1. The overall rare_class_affinity_score for a collection of texts, which is used to prioritize
cases for further investigation in a realtime context
2. The individual observation_scores for each text, which can be used to identify which specific
observations contributed most to the overall pattern

As a high-recall candidate generator, this result helps identify potential instances of rare
classes that warrant closer examination, prioritizing not missing true positives even at the
cost of some false positives.

Attributes:
rare_class_affinity_score: The aggregated score indicating overall affinity to the rare class,
typically calculated using skewness to identify patterns
observation_scores: Dictionary mapping each input text to its individual similarity score
"""

rare_class_affinity_score: float
observation_scores: Dict[str, float]
"""Result of calculating affinity to a rare class of text.

This class contains both:
1. The overall rare_class_affinity_score for a collection of texts, which is used to prioritize
cases for further investigation in a realtime context.
2. The individual observation_scores for each text, which can be used to identify which specific
observations contributed most to the overall pattern.

As a high-recall candidate generator, this result helps identify potential instances of rare
classes that warrant closer examination, prioritizing not missing true positives even at the
cost of some false positives.

Attributes:
rare_class_affinity_score: The aggregated score indicating overall affinity to the rare class,
typically calculated using skewness to identify patterns.
observation_scores: Mapping of input text to its individual similarity score.
aggregation_name: Optional name of the aggregation function used.
aggregation_stats: Optional dictionary with aggregation-relevant statistics
(e.g. top_k, percentile, temperature, num_positives).
explanations: Optional per-text explainability records describing which neighbors and
components contributed to each score.
"""

rare_class_affinity_score: float
observation_scores: Dict[str, float]
aggregation_name: Optional[str] = None
aggregation_stats: Optional[Dict[str, Any]] = None
explanations: Optional[Dict[str, Any]] = None
Loading