Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
169 changes: 70 additions & 99 deletions app/policies.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

# Fallback regex patterns for when Presidio is not available
EMAIL = re.compile(r"\b([A-Za-z0-9._%+-])[^@\s]*(@[A-Za-z0-9.-]+\.[A-Za-z]{2,})\b")
SSN = re.compile(r"\b(?!000|666|9\d{2})\d{3}-\d{2}-\d{4}\b")
PHONE = re.compile(r"\+?\d[\d\s\-\(\)]{7,}\d")
CARD = re.compile(r"\b(?:\d[ -]*?){13,19}\b")
PHI_MRN = re.compile(
Expand Down Expand Up @@ -465,6 +466,7 @@ def detect_regex_pii_findings(raw_text: str) -> List[Dict[str, Any]]:
findings: List[Dict[str, Any]] = []

_append_regex_findings(findings, EMAIL, "PII:email_address", raw_text, 0.8)
_append_regex_findings(findings, SSN, "PII:us_ssn", raw_text, 0.9)
_append_regex_findings(findings, PHONE, "PII:phone_number", raw_text, 0.8)

for match in CARD.finditer(raw_text):
Expand Down Expand Up @@ -840,44 +842,14 @@ def _evaluate_policy(

# PRECEDENCE LEVEL 2: Tool-specific access rules (highest priority for non-dangerous tools)
if tool in tool_access and tool_access[tool].get("direction") == direction:
# Run PII detection on raw text
findings = []
if USE_PRESIDIO and ANALYZER is not None:
results = ANALYZER.analyze(
text=raw_text, entities=list(ANONYMIZE_OPERATORS.keys()), language="en"
)
for r in results:
if not is_false_positive(r.entity_type, "", raw_text):
findings.append(
{
"type": f"PII:{r.entity_type.lower()}",
"start": r.start,
"end": r.end,
"score": r.score,
"text": raw_text[r.start : r.end],
}
)

# Apply tool-specific transformations based on findings
if findings:
transformed_text, tool_reasons = apply_tool_access_text(
tool, findings, raw_text
)
return {
"decision": "transform",
"raw_text_out": transformed_text,
"reasons": tool_reasons,
"policy_id": "tool-access",
"ts": now,
}
else:
# No PII found, pass through
return {
"decision": "allow",
"raw_text_out": raw_text,
"policy_id": "tool-access",
"ts": now,
}
# Keep the static YAML path aligned with the payload-driven evaluator so
# regex fallback and tool-specific allow/tokenize behavior stay consistent.
return _apply_tool_specific_policy_dynamic(
tool=tool,
raw_text=raw_text,
now=now,
tool_policy=tool_access[tool],
)

# PRECEDENCE LEVEL 3: Global defaults for this direction
default_action = defaults.get(direction, {}).get("action", "redact")
Expand Down Expand Up @@ -941,21 +913,22 @@ def evaluate_with_payload_policy(
) -> Dict:
"""
Evaluate policy using payload-provided configuration
Falls back to static YAML if no policy_config provided
Falls back to the loaded static YAML policy if no policy_config is provided.
"""

if not policy_config:
# Fallback to current YAML-based logic
return evaluate(tool, scope, raw_text, now, direction)
resolved_policy_config = (
deepcopy(policy_config) if policy_config else deepcopy(get_policy())
)
resolved_policy_config["tool"] = tool
resolved_policy_config["scope"] = scope or ""

# Use payload-provided policy configuration
return _evaluate_dynamic_policy(
tool,
scope,
raw_text,
now,
direction,
policy_config,
resolved_policy_config,
tool_config,
user_id,
budget_context,
Expand Down Expand Up @@ -1125,57 +1098,43 @@ def _apply_tool_specific_policy_dynamic(
}
)
else:
findings.extend(detect_regex_pii_findings(raw_text))

import re

ssn_patterns = [
r"\b\d{3}-\d{2}-\d{4}\b", # XXX-XX-XXXX with dashes
r"\b(?!000|666|9\d{2})\d{3}[-]?(?!00)\d{2}[-]?(?!0000)\d{4}\b", # With optional dashes
r"\b(?!000|666|9\d{2})\d{9}\b", # 9 digits without dashes (if context suggests SSN)
]

# Check if text contains SSN-related context
ssn_context = re.search(
r"\b(ssn|social\s*security|tax\s*id|social\s*security\s*number)\b",
raw_text,
re.IGNORECASE,
)
# Detect SSNs with context first so generic phone regexes cannot claim
# the same span before tokenization rules get a chance to run.
import re

ssn_patterns = [
r"\b\d{3}-\d{2}-\d{4}\b", # XXX-XX-XXXX with dashes
r"\b(?!000|666|9\d{2})\d{3}[-]?(?!00)\d{2}[-]?(?!0000)\d{4}\b", # With optional dashes
r"\b(?!000|666|9\d{2})\d{9}\b", # 9 digits without dashes (if context suggests SSN)
]

ssn_context = re.search(
r"\b(ssn|social\s*security|tax\s*id|social\s*security\s*number)\b",
raw_text,
re.IGNORECASE,
)

for pattern in ssn_patterns:
for match in re.finditer(pattern, raw_text):
# Check if this SSN overlaps with any existing finding
overlaps = False
for finding in findings:
if not (
match.end() <= finding["start"] or match.start() >= finding["end"]
):
overlaps = True
break

# Only add if no overlap and (has context or is in standard format)
if not overlaps and (ssn_context or "-" in match.group()):
# Check if it's already detected as US_SSN
already_detected = False
for finding in findings:
if (
finding["type"] == "PII:us_ssn"
and finding["start"] == match.start()
):
already_detected = True
break
for pattern in ssn_patterns:
for match in re.finditer(pattern, raw_text):
if not ssn_context and "-" not in match.group():
continue
if _has_overlap(match.start(), match.end(), findings):
continue
findings.append(
{
"type": "PII:us_ssn",
"start": match.start(),
"end": match.end(),
"score": 0.9 if ssn_context else 0.7,
"text": match.group(),
}
)
break

if not already_detected:
findings.append(
{
"type": "PII:us_ssn",
"start": match.start(),
"end": match.end(),
"score": 0.9 if ssn_context else 0.7,
"text": match.group(),
}
)
break # Only add first match per pattern
for finding in detect_regex_pii_findings(raw_text):
if _has_overlap(finding["start"], finding["end"], findings):
continue
findings.append(finding)

# Apply tool-specific transformations based on findings and allow_pii rules
if findings:
Expand Down Expand Up @@ -1322,26 +1281,33 @@ def apply_tool_access_text_dynamic(

if action == "pass_through":
# Keep original text
reasons.extend(_tool_reason_codes("allowed", pii_type))
continue
elif action == "tokenize":
# Replace with token
token = tokenize(original_text)
transformed = transformed[:start] + token + transformed[end:]
reasons.append(f"tokenized:{pii_type}")
reasons.extend(_tool_reason_codes("tokenized", pii_type))
elif action == "redact":
# Replace with placeholder
placeholder = f"[{pii_type.upper()}]"
transformed = transformed[:start] + placeholder + transformed[end:]
reasons.append(f"redacted:{pii_type}")
reasons.extend(_tool_reason_codes("redacted", pii_type))
elif action == "deny":
# This should be handled at a higher level, but just redact here
placeholder = f"[{pii_type.upper()}]"
transformed = transformed[:start] + placeholder + transformed[end:]
reasons.append(f"redacted:{pii_type}")
reasons.extend(_tool_reason_codes("redacted", pii_type))

return transformed, reasons


def _tool_reason_codes(action: str, pii_type: str) -> List[str]:
"""Emit both legacy and namespaced reason codes for API compatibility."""

return [f"{action}:{pii_type}", f"pii.{action}:{pii_type}"]


def _apply_default_action_dynamic(
action: str,
raw_text: str,
Expand Down Expand Up @@ -1723,11 +1689,16 @@ def _add_budget_info_to_result(
result["reasons"] = []

if budget_status.reason == "budget_ok":
result["reasons"].append("budget_check_passed")
reason = "budget_check_passed"
elif budget_status.reason == "budget_warning":
result["reasons"].append("budget_warning")
reason = "budget_warning"
elif budget_status.reason == "budget_exceeded":
result["reasons"].append("budget_exceeded")
reason = "budget_exceeded"
else:
reason = None

if reason and reason not in result["reasons"]:
result["reasons"].append(reason)

return result

Expand Down
2 changes: 2 additions & 0 deletions tests/api/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# SPDX-License-Identifier: MIT
# Copyright (c) 2024 GovernsAI. All rights reserved.
Loading
Loading