From a26b1779e197fd5d22f3fb68b07cd9866fda54b6 Mon Sep 17 00:00:00 2001 From: mdheller <21163552+mdheller@users.noreply.github.com> Date: Mon, 13 Apr 2026 00:32:36 -0400 Subject: [PATCH] enforce abstract reasoning posture in bundle validation --- scripts/evaluate_control_matrix_gate.py | 119 +++++++++++++++++++++++- scripts/validate_bundle.py | 42 ++++++++- 2 files changed, 159 insertions(+), 2 deletions(-) diff --git a/scripts/evaluate_control_matrix_gate.py b/scripts/evaluate_control_matrix_gate.py index 6b40393..5c01944 100644 --- a/scripts/evaluate_control_matrix_gate.py +++ b/scripts/evaluate_control_matrix_gate.py @@ -26,11 +26,32 @@ def _bundle_name(bundle: dict[str, Any]) -> str: return f"{md.get('name', 'UNKNOWN')}@{md.get('version', 'UNKNOWN')}" +def _pick_override(overrides: dict[str, Any], key: str, default: Any) -> Any: + if key in overrides: + return overrides[key] + return default + + +def _stringish(value: Any, default: str = "") -> str: + if value is None or value == "": + return default + return str(value) + + +def _boolish_str(value: Any, default: str = "false") -> str: + if value is None: + return default + if isinstance(value, bool): + return "true" if value else "false" + return str(value).strip().lower() + + def derive_gate_context(bundle: dict[str, Any]) -> dict[str, str]: spec = bundle.get("spec") or {} policy = spec.get("policy") or {} control_matrix = policy.get("controlMatrix") or {} overrides = control_matrix.get("context") or {} + abstract_reasoning = policy.get("abstractReasoning") or {} lane = str(policy.get("lane") or overrides.get("environment_tier") or "staging") env = {"dev": "dev", "staging": "staging", "prod": "prod"}.get(lane, "dev") @@ -54,6 +75,42 @@ def derive_gate_context(bundle: dict[str, Any]) -> dict[str, str]: tenant_scope = "global" if bool(policy.get("globalDeployment", False)) else "single_tenant" + reasoning_class = _stringish( + _pick_override(overrides, "reasoning_class", abstract_reasoning.get("reasoningClass")), + "REACTIVE", + ) + verification_mode = _stringish( + _pick_override(overrides, "verification_mode", abstract_reasoning.get("verificationMode")), + "NONE", + ) + llm_only_forbidden = _boolish_str( + _pick_override(overrides, "llm_only_forbidden", abstract_reasoning.get("llmOnlyForbidden", False)) + ) + requires_counterexample_search = _boolish_str( + _pick_override( + overrides, + "requires_counterexample_search", + abstract_reasoning.get("requiresCounterexampleSearch", False), + ) + ) + requires_program_candidate = _boolish_str( + _pick_override( + overrides, + "requires_program_candidate", + abstract_reasoning.get("requiresProgramCandidate", False), + ) + ) + requires_backtracking_capability = _boolish_str( + _pick_override( + overrides, + "requires_backtracking_capability", + abstract_reasoning.get("requiresBacktrackingCapability", False), + ) + ) + program_candidate_ref_present = _boolish_str(bool(abstract_reasoning.get("programCandidateRef"))) + counterexample_refs_present = _boolish_str(bool(abstract_reasoning.get("counterexampleRefs"))) + backtracking_capable = _boolish_str(abstract_reasoning.get("backtrackingCapable", False)) + context = { "phase": str(overrides.get("phase") or phase), "authority": str(overrides.get("authority") or authority), @@ -61,6 +118,15 @@ def derive_gate_context(bundle: dict[str, Any]) -> dict[str, str]: "approval_mode": str(overrides.get("approval_mode") or approval_mode), "tenant_scope": str(overrides.get("tenant_scope") or tenant_scope), "enforcement_point": str(overrides.get("enforcement_point") or "policy_engine"), + "reasoning_class": reasoning_class, + "verification_mode": verification_mode, + "llm_only_forbidden": llm_only_forbidden, + "requires_counterexample_search": requires_counterexample_search, + "requires_program_candidate": requires_program_candidate, + "requires_backtracking_capability": requires_backtracking_capability, + "program_candidate_ref_present": program_candidate_ref_present, + "counterexample_refs_present": counterexample_refs_present, + "backtracking_capable": backtracking_capable, } return context @@ -75,6 +141,47 @@ def evaluate_bundle_gate( raise ControlGateError(f"policy bundle missing: {policy_bundle_path}") context = derive_gate_context(bundle) + + if context["reasoning_class"] in {"ABSTRACT", "PROGRAM_INDUCTION"}: + bundle_sha256 = hashlib.sha256(policy_bundle_path.read_bytes()).hexdigest() + base_artifact = { + "kind": "ControlGateArtifact", + "bundle": _bundle_name(bundle), + "bundlePath": str(bundle_path.resolve()), + "evaluatedAt": dt.datetime.now(dt.timezone.utc).isoformat(), + "enforcementPoint": context["enforcement_point"], + "policyBundlePath": str(policy_bundle_path), + "policyBundleSha256": bundle_sha256, + "gateContext": context, + "matchedRowIds": [], + "blockingRowIds": [], + "candidateRowIds": [], + } + if context["llm_only_forbidden"] == "true" and context["verification_mode"] == "NONE": + return { + **base_artifact, + "result": "deny", + "reason": "abstract lane forbids llm-only evaluation", + } + if context["requires_program_candidate"] == "true" and context["program_candidate_ref_present"] != "true": + return { + **base_artifact, + "result": "deny", + "reason": "abstract lane requires program candidate evidence", + } + if context["requires_counterexample_search"] == "true" and context["counterexample_refs_present"] != "true": + return { + **base_artifact, + "result": "deny", + "reason": "abstract lane requires counterexample search evidence", + } + if context["requires_backtracking_capability"] == "true" and context["backtracking_capable"] != "true": + return { + **base_artifact, + "result": "deny", + "reason": "abstract lane requires declared backtracking capability", + } + rows = _load_json(policy_bundle_path) relevant_rows = [ row for row in rows if row.get("enforcement_point") == context["enforcement_point"] @@ -85,6 +192,16 @@ def matches(row: dict[str, Any]) -> bool: for key in ("phase", "authority", "environment_tier", "approval_mode", "tenant_scope"): if allow_if.get(key) != context[key]: return False + for key in ( + "reasoning_class", + "verification_mode", + "llm_only_forbidden", + "requires_counterexample_search", + "requires_program_candidate", + "requires_backtracking_capability", + ): + if key in allow_if and _stringish(allow_if.get(key)).lower() != _stringish(context[key]).lower(): + return False return True exact_rows = [row for row in relevant_rows if matches(row)] @@ -162,4 +279,4 @@ def main() -> int: if __name__ == "__main__": - raise SystemExit(main()) + raise SystemExit(main()) \ No newline at end of file diff --git a/scripts/validate_bundle.py b/scripts/validate_bundle.py index c33a26c..e1d5a74 100644 --- a/scripts/validate_bundle.py +++ b/scripts/validate_bundle.py @@ -53,6 +53,38 @@ def main() -> int: if not isinstance(mrs, int) or mrs < 5 or mrs > 3600: die("spec.policy.maxRunSeconds must be an int in [5, 3600]", 2) + abstract_reasoning = pol.get("abstractReasoning") or {} + if abstract_reasoning: + reasoning_class = abstract_reasoning.get("reasoningClass", "REACTIVE") + verification_mode = abstract_reasoning.get("verificationMode", "NONE") + llm_only_forbidden = bool(abstract_reasoning.get("llmOnlyForbidden", False)) + requires_counterexample_search = bool(abstract_reasoning.get("requiresCounterexampleSearch", False)) + requires_program_candidate = bool(abstract_reasoning.get("requiresProgramCandidate", False)) + requires_backtracking_capability = bool(abstract_reasoning.get("requiresBacktrackingCapability", False)) + + allowed_reasoning_classes = {"REACTIVE", "RETRIEVAL", "ABSTRACT", "CAUSAL", "PROGRAM_INDUCTION"} + allowed_verification_modes = { + "NONE", + "POLICY_ONLY", + "COUNTEREXAMPLE_SEARCH", + "PROGRAM_EXECUTION", + "CAUSAL_CHECK", + "HUMAN_REVIEW", + "COMPOSITE", + } + if reasoning_class not in allowed_reasoning_classes: + die(f"spec.policy.abstractReasoning.reasoningClass must be one of {sorted(allowed_reasoning_classes)}", 2) + if verification_mode not in allowed_verification_modes: + die(f"spec.policy.abstractReasoning.verificationMode must be one of {sorted(allowed_verification_modes)}", 2) + if reasoning_class in {"ABSTRACT", "PROGRAM_INDUCTION"} and llm_only_forbidden and verification_mode == "NONE": + die("abstractReasoning forbids llm-only evaluation when reasoningClass is ABSTRACT or PROGRAM_INDUCTION", 2) + if requires_program_candidate and not abstract_reasoning.get("programCandidateRef"): + die("abstractReasoning requires programCandidateRef", 2) + if requires_counterexample_search and not (abstract_reasoning.get("counterexampleRefs") or []): + die("abstractReasoning requires counterexampleRefs", 2) + if requires_backtracking_capability and not abstract_reasoning.get("backtrackingCapable", False): + die("abstractReasoning requires backtrackingCapable=true", 2) + vm = spec["vm"] backend_intent = vm.get("backendIntent") allowed = {"qemu", "microvm", "lima-process", "fleet"} @@ -93,6 +125,14 @@ def main() -> int: "artifactPath": str(gate_artifact_path), "matchedRowIds": gate_artifact["matchedRowIds"], }, + "abstractGate": { + "reasoningClass": gate_artifact["gateContext"].get("reasoning_class"), + "verificationMode": gate_artifact["gateContext"].get("verification_mode"), + "llmOnlyForbidden": gate_artifact["gateContext"].get("llm_only_forbidden"), + "requiresCounterexampleSearch": gate_artifact["gateContext"].get("requires_counterexample_search"), + "requiresProgramCandidate": gate_artifact["gateContext"].get("requires_program_candidate"), + "requiresBacktrackingCapability": gate_artifact["gateContext"].get("requires_backtracking_capability"), + }, } report_path = os.path.join(out_dir, "validation-artifact.json") with open(report_path, "w", encoding="utf-8") as f: @@ -102,4 +142,4 @@ def main() -> int: if __name__ == "__main__": - raise SystemExit(main()) + raise SystemExit(main()) \ No newline at end of file