diff --git a/app/policies.py b/app/policies.py index 514b0b2..9f9e5cc 100644 --- a/app/policies.py +++ b/app/policies.py @@ -20,6 +20,7 @@ # Fallback regex patterns for when Presidio is not available EMAIL = re.compile(r"\b([A-Za-z0-9._%+-])[^@\s]*(@[A-Za-z0-9.-]+\.[A-Za-z]{2,})\b") +SSN = re.compile(r"\b(?!000|666|9\d{2})\d{3}-\d{2}-\d{4}\b") PHONE = re.compile(r"\+?\d[\d\s\-\(\)]{7,}\d") CARD = re.compile(r"\b(?:\d[ -]*?){13,19}\b") PHI_MRN = re.compile( @@ -465,6 +466,7 @@ def detect_regex_pii_findings(raw_text: str) -> List[Dict[str, Any]]: findings: List[Dict[str, Any]] = [] _append_regex_findings(findings, EMAIL, "PII:email_address", raw_text, 0.8) + _append_regex_findings(findings, SSN, "PII:us_ssn", raw_text, 0.9) _append_regex_findings(findings, PHONE, "PII:phone_number", raw_text, 0.8) for match in CARD.finditer(raw_text): @@ -840,44 +842,14 @@ def _evaluate_policy( # PRECEDENCE LEVEL 2: Tool-specific access rules (highest priority for non-dangerous tools) if tool in tool_access and tool_access[tool].get("direction") == direction: - # Run PII detection on raw text - findings = [] - if USE_PRESIDIO and ANALYZER is not None: - results = ANALYZER.analyze( - text=raw_text, entities=list(ANONYMIZE_OPERATORS.keys()), language="en" - ) - for r in results: - if not is_false_positive(r.entity_type, "", raw_text): - findings.append( - { - "type": f"PII:{r.entity_type.lower()}", - "start": r.start, - "end": r.end, - "score": r.score, - "text": raw_text[r.start : r.end], - } - ) - - # Apply tool-specific transformations based on findings - if findings: - transformed_text, tool_reasons = apply_tool_access_text( - tool, findings, raw_text - ) - return { - "decision": "transform", - "raw_text_out": transformed_text, - "reasons": tool_reasons, - "policy_id": "tool-access", - "ts": now, - } - else: - # No PII found, pass through - return { - "decision": "allow", - "raw_text_out": raw_text, - "policy_id": "tool-access", - "ts": now, - } + # Keep the static YAML path aligned with the payload-driven evaluator so + # regex fallback and tool-specific allow/tokenize behavior stay consistent. + return _apply_tool_specific_policy_dynamic( + tool=tool, + raw_text=raw_text, + now=now, + tool_policy=tool_access[tool], + ) # PRECEDENCE LEVEL 3: Global defaults for this direction default_action = defaults.get(direction, {}).get("action", "redact") @@ -941,21 +913,22 @@ def evaluate_with_payload_policy( ) -> Dict: """ Evaluate policy using payload-provided configuration - Falls back to static YAML if no policy_config provided + Falls back to the loaded static YAML policy if no policy_config is provided. """ - if not policy_config: - # Fallback to current YAML-based logic - return evaluate(tool, scope, raw_text, now, direction) + resolved_policy_config = ( + deepcopy(policy_config) if policy_config else deepcopy(get_policy()) + ) + resolved_policy_config["tool"] = tool + resolved_policy_config["scope"] = scope or "" - # Use payload-provided policy configuration return _evaluate_dynamic_policy( tool, scope, raw_text, now, direction, - policy_config, + resolved_policy_config, tool_config, user_id, budget_context, @@ -1125,57 +1098,43 @@ def _apply_tool_specific_policy_dynamic( } ) else: - findings.extend(detect_regex_pii_findings(raw_text)) - - import re - - ssn_patterns = [ - r"\b\d{3}-\d{2}-\d{4}\b", # XXX-XX-XXXX with dashes - r"\b(?!000|666|9\d{2})\d{3}[-]?(?!00)\d{2}[-]?(?!0000)\d{4}\b", # With optional dashes - r"\b(?!000|666|9\d{2})\d{9}\b", # 9 digits without dashes (if context suggests SSN) - ] - - # Check if text contains SSN-related context - ssn_context = re.search( - r"\b(ssn|social\s*security|tax\s*id|social\s*security\s*number)\b", - raw_text, - re.IGNORECASE, - ) + # Detect SSNs with context first so generic phone regexes cannot claim + # the same span before tokenization rules get a chance to run. + import re + + ssn_patterns = [ + r"\b\d{3}-\d{2}-\d{4}\b", # XXX-XX-XXXX with dashes + r"\b(?!000|666|9\d{2})\d{3}[-]?(?!00)\d{2}[-]?(?!0000)\d{4}\b", # With optional dashes + r"\b(?!000|666|9\d{2})\d{9}\b", # 9 digits without dashes (if context suggests SSN) + ] + + ssn_context = re.search( + r"\b(ssn|social\s*security|tax\s*id|social\s*security\s*number)\b", + raw_text, + re.IGNORECASE, + ) - for pattern in ssn_patterns: - for match in re.finditer(pattern, raw_text): - # Check if this SSN overlaps with any existing finding - overlaps = False - for finding in findings: - if not ( - match.end() <= finding["start"] or match.start() >= finding["end"] - ): - overlaps = True - break - - # Only add if no overlap and (has context or is in standard format) - if not overlaps and (ssn_context or "-" in match.group()): - # Check if it's already detected as US_SSN - already_detected = False - for finding in findings: - if ( - finding["type"] == "PII:us_ssn" - and finding["start"] == match.start() - ): - already_detected = True - break + for pattern in ssn_patterns: + for match in re.finditer(pattern, raw_text): + if not ssn_context and "-" not in match.group(): + continue + if _has_overlap(match.start(), match.end(), findings): + continue + findings.append( + { + "type": "PII:us_ssn", + "start": match.start(), + "end": match.end(), + "score": 0.9 if ssn_context else 0.7, + "text": match.group(), + } + ) + break - if not already_detected: - findings.append( - { - "type": "PII:us_ssn", - "start": match.start(), - "end": match.end(), - "score": 0.9 if ssn_context else 0.7, - "text": match.group(), - } - ) - break # Only add first match per pattern + for finding in detect_regex_pii_findings(raw_text): + if _has_overlap(finding["start"], finding["end"], findings): + continue + findings.append(finding) # Apply tool-specific transformations based on findings and allow_pii rules if findings: @@ -1322,26 +1281,33 @@ def apply_tool_access_text_dynamic( if action == "pass_through": # Keep original text + reasons.extend(_tool_reason_codes("allowed", pii_type)) continue elif action == "tokenize": # Replace with token token = tokenize(original_text) transformed = transformed[:start] + token + transformed[end:] - reasons.append(f"tokenized:{pii_type}") + reasons.extend(_tool_reason_codes("tokenized", pii_type)) elif action == "redact": # Replace with placeholder placeholder = f"[{pii_type.upper()}]" transformed = transformed[:start] + placeholder + transformed[end:] - reasons.append(f"redacted:{pii_type}") + reasons.extend(_tool_reason_codes("redacted", pii_type)) elif action == "deny": # This should be handled at a higher level, but just redact here placeholder = f"[{pii_type.upper()}]" transformed = transformed[:start] + placeholder + transformed[end:] - reasons.append(f"redacted:{pii_type}") + reasons.extend(_tool_reason_codes("redacted", pii_type)) return transformed, reasons +def _tool_reason_codes(action: str, pii_type: str) -> List[str]: + """Emit both legacy and namespaced reason codes for API compatibility.""" + + return [f"{action}:{pii_type}", f"pii.{action}:{pii_type}"] + + def _apply_default_action_dynamic( action: str, raw_text: str, @@ -1723,11 +1689,16 @@ def _add_budget_info_to_result( result["reasons"] = [] if budget_status.reason == "budget_ok": - result["reasons"].append("budget_check_passed") + reason = "budget_check_passed" elif budget_status.reason == "budget_warning": - result["reasons"].append("budget_warning") + reason = "budget_warning" elif budget_status.reason == "budget_exceeded": - result["reasons"].append("budget_exceeded") + reason = "budget_exceeded" + else: + reason = None + + if reason and reason not in result["reasons"]: + result["reasons"].append(reason) return result diff --git a/tests/api/__init__.py b/tests/api/__init__.py new file mode 100644 index 0000000..e90c597 --- /dev/null +++ b/tests/api/__init__.py @@ -0,0 +1,2 @@ +# SPDX-License-Identifier: MIT +# Copyright (c) 2024 GovernsAI. All rights reserved. diff --git a/tests/api/test_precheck_policy_regression.py b/tests/api/test_precheck_policy_regression.py new file mode 100644 index 0000000..f9e54af --- /dev/null +++ b/tests/api/test_precheck_policy_regression.py @@ -0,0 +1,288 @@ +# SPDX-License-Identifier: MIT +# Copyright (c) 2024 GovernsAI. All rights reserved. +"""Regression coverage for precheck policy behavior at the API layer.""" + +from copy import deepcopy +from pathlib import Path + +import pytest +import yaml + +PRECHECK_URL = "/api/v1/precheck" +POLICY_PATH = Path(__file__).resolve().parents[2] / "policy.tool_access.yaml" +POLICY = yaml.safe_load(POLICY_PATH.read_text()) + + +def _tool_reason_codes(action, pii_type): + return {f"{action}:{pii_type}", f"pii.{action}:{pii_type}"} + + +TOOL_CASES = { + "verify_identity": { + "raw_text": "Verify jane@example.com against SSN: 123-45-6789.", + "decision": "transform", + "policy_id": "tool-access", + "contains": ["jane@example.com", "pii_"], + "not_contains": ["123-45-6789"], + "reasons": _tool_reason_codes("allowed", "PII:email_address") + | _tool_reason_codes("tokenized", "PII:us_ssn"), + }, + "send_marketing_email": { + "raw_text": "Send the launch note to jane@example.com.", + "decision": "transform", + "policy_id": "tool-access", + "contains": ["jane@example.com"], + "not_contains": [], + "reasons": _tool_reason_codes("allowed", "PII:email_address"), + }, + "data_export": { + "raw_text": "Export jane@example.com to the reporting system.", + "decision": "transform", + "policy_id": "strict-fallback", + "contains": [""], + "not_contains": ["jane@example.com"], + "reasons": {"pii.redacted:email_address"}, + }, + "audit_log": { + "raw_text": "Audit record for jane@example.com.", + "decision": "transform", + "policy_id": "strict-fallback", + "contains": [""], + "not_contains": ["jane@example.com"], + "reasons": {"pii.redacted:email_address"}, + }, +} + + +@pytest.fixture(autouse=True) +def _force_regex_fallback(monkeypatch): + monkeypatch.setattr("app.policies.USE_PRESIDIO", False) + monkeypatch.setattr("app.policies.ANALYZER", None) + + +def _precheck( + test_client, + active_api_key, + tool, + raw_text, + *, + dynamic_policy=False, + tool_config=None, + budget_context=None, + user_id=None, +): + payload = {"tool": tool, "raw_text": raw_text} + if dynamic_policy: + payload["policy_config"] = deepcopy(POLICY) + if tool_config is not None: + payload["tool_config"] = tool_config + if budget_context is not None: + payload["budget_context"] = budget_context + if user_id is not None: + payload["user_id"] = user_id + + return test_client.post( + PRECHECK_URL, + headers={"X-Governs-Key": active_api_key.key}, + json=payload, + ) + + +def _assert_reason_set(body, expected_reasons): + # Order is not audit-significant here; we only care that both contracts exist. + assert set(body.get("reasons") or []) == expected_reasons + + +def test_tool_cases_cover_every_declared_policy_tool(): + assert set(TOOL_CASES) == set(POLICY["tool_access"]) + + +@pytest.mark.parametrize("tool_name", sorted(TOOL_CASES)) +def test_precheck_static_yaml_matches_expected_policy_per_declared_tool( + tool_name, test_client, active_api_key +): + case = TOOL_CASES[tool_name] + + response = _precheck( + test_client=test_client, + active_api_key=active_api_key, + tool=tool_name, + raw_text=case["raw_text"], + ) + + assert response.status_code == 200 + body = response.json() + assert body["decision"] == case["decision"] + assert body["policy_id"] == case["policy_id"] + + for needle in case["contains"]: + assert needle in body["raw_text_out"] + + for needle in case["not_contains"]: + assert needle not in body["raw_text_out"] + + _assert_reason_set(body, case["reasons"]) + + +@pytest.mark.parametrize("tool_name", sorted(TOOL_CASES)) +def test_declared_tools_match_between_static_and_dynamic_policy_paths( + tool_name, test_client, active_api_key +): + case = TOOL_CASES[tool_name] + + static_response = _precheck( + test_client=test_client, + active_api_key=active_api_key, + tool=tool_name, + raw_text=case["raw_text"], + ) + dynamic_response = _precheck( + test_client=test_client, + active_api_key=active_api_key, + tool=tool_name, + raw_text=case["raw_text"], + dynamic_policy=True, + ) + + assert static_response.status_code == 200 + assert dynamic_response.status_code == 200 + static_body = static_response.json() + dynamic_body = dynamic_response.json() + + assert static_body["decision"] == dynamic_body["decision"] == case["decision"] + assert static_body["policy_id"] == dynamic_body["policy_id"] == case["policy_id"] + assert static_body["raw_text_out"] == dynamic_body["raw_text_out"] + _assert_reason_set(static_body, case["reasons"]) + _assert_reason_set(dynamic_body, case["reasons"]) + + +@pytest.mark.parametrize("dynamic_policy", [False, True]) +def test_clean_text_allows_known_ingress_tool( + dynamic_policy, test_client, active_api_key +): + raw_text = "Plain operational status update with no PII." + + response = _precheck( + test_client=test_client, + active_api_key=active_api_key, + tool="verify_identity", + raw_text=raw_text, + dynamic_policy=dynamic_policy, + ) + + assert response.status_code == 200 + body = response.json() + assert body["decision"] == "allow" + assert body["policy_id"] == "tool-access" + assert body["raw_text_out"] == raw_text + + +@pytest.mark.parametrize("dynamic_policy", [False, True]) +def test_unknown_tool_uses_documented_default_redaction_reasons( + dynamic_policy, test_client, active_api_key +): + response = _precheck( + test_client=test_client, + active_api_key=active_api_key, + tool="unknown.tool", + raw_text="Unknown tool sent jane@example.com to a third party.", + dynamic_policy=dynamic_policy, + ) + + assert response.status_code == 200 + body = response.json() + assert body["decision"] == "transform" + assert body["policy_id"] == "strict-fallback" + assert body["reasons"] == ["pii.redacted:email_address"] + assert "" in body["raw_text_out"] + assert "jane@example.com" not in body["raw_text_out"] + + +def test_static_yaml_budget_enforcement_matches_dynamic_payload( + test_client, active_api_key +): + raw_text = "Budget gate this purchase request." + tool_config = { + "tool_name": "verify_identity", + "direction": "ingress", + "metadata": {"purchase_amount": 25.0}, + } + budget_context = { + "monthly_limit": 10.0, + "current_spend": 9.0, + "llm_spend": 9.0, + "purchase_spend": 0.0, + "remaining_budget": 1.0, + "budget_type": "user", + } + + static_response = _precheck( + test_client=test_client, + active_api_key=active_api_key, + tool="verify_identity", + raw_text=raw_text, + tool_config=tool_config, + budget_context=budget_context, + user_id="user-budget-1", + ) + dynamic_response = _precheck( + test_client=test_client, + active_api_key=active_api_key, + tool="verify_identity", + raw_text=raw_text, + dynamic_policy=True, + tool_config=tool_config, + budget_context=budget_context, + user_id="user-budget-1", + ) + + assert static_response.status_code == 200 + assert dynamic_response.status_code == 200 + + for body in (static_response.json(), dynamic_response.json()): + assert body["decision"] == "deny" + assert body["policy_id"] == "budget-check" + assert body["reasons"] == ["budget_exceeded"] + assert body["raw_text_out"] == raw_text + + +@pytest.mark.parametrize( + "raw_text", + [ + "Verify jane@example.com against SSN: 123-45-6789.", + "Verify jane@example.com against SSN 123456789.", + ], +) +def test_verify_identity_tokenizes_ssn_shapes_on_both_paths( + raw_text, test_client, active_api_key +): + static_response = _precheck( + test_client=test_client, + active_api_key=active_api_key, + tool="verify_identity", + raw_text=raw_text, + ) + dynamic_response = _precheck( + test_client=test_client, + active_api_key=active_api_key, + tool="verify_identity", + raw_text=raw_text, + dynamic_policy=True, + ) + + assert static_response.status_code == 200 + assert dynamic_response.status_code == 200 + static_body = static_response.json() + dynamic_body = dynamic_response.json() + expected_reasons = _tool_reason_codes( + "allowed", "PII:email_address" + ) | _tool_reason_codes("tokenized", "PII:us_ssn") + + assert static_body["raw_text_out"] == dynamic_body["raw_text_out"] + for body in (static_body, dynamic_body): + assert body["decision"] == "transform" + assert "jane@example.com" in body["raw_text_out"] + assert "pii_" in body["raw_text_out"] + assert "123-45-6789" not in body["raw_text_out"] + assert "123456789" not in body["raw_text_out"] + _assert_reason_set(body, expected_reasons)