Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 118 additions & 1 deletion scripts/evaluate_control_matrix_gate.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,32 @@ def _bundle_name(bundle: dict[str, Any]) -> str:
return f"{md.get('name', 'UNKNOWN')}@{md.get('version', 'UNKNOWN')}"


def _pick_override(overrides: dict[str, Any], key: str, default: Any) -> Any:
if key in overrides:
return overrides[key]
return default


def _stringish(value: Any, default: str = "") -> str:
if value is None or value == "":
return default
return str(value)


def _boolish_str(value: Any, default: str = "false") -> str:
if value is None:
return default
if isinstance(value, bool):
return "true" if value else "false"
return str(value).strip().lower()


def derive_gate_context(bundle: dict[str, Any]) -> dict[str, str]:
spec = bundle.get("spec") or {}
policy = spec.get("policy") or {}
control_matrix = policy.get("controlMatrix") or {}
overrides = control_matrix.get("context") or {}
abstract_reasoning = policy.get("abstractReasoning") or {}

lane = str(policy.get("lane") or overrides.get("environment_tier") or "staging")
env = {"dev": "dev", "staging": "staging", "prod": "prod"}.get(lane, "dev")
Expand All @@ -54,13 +75,58 @@ def derive_gate_context(bundle: dict[str, Any]) -> dict[str, str]:

tenant_scope = "global" if bool(policy.get("globalDeployment", False)) else "single_tenant"

reasoning_class = _stringish(
_pick_override(overrides, "reasoning_class", abstract_reasoning.get("reasoningClass")),
"REACTIVE",
)
verification_mode = _stringish(
_pick_override(overrides, "verification_mode", abstract_reasoning.get("verificationMode")),
"NONE",
)
llm_only_forbidden = _boolish_str(
_pick_override(overrides, "llm_only_forbidden", abstract_reasoning.get("llmOnlyForbidden", False))
)
requires_counterexample_search = _boolish_str(
_pick_override(
overrides,
"requires_counterexample_search",
abstract_reasoning.get("requiresCounterexampleSearch", False),
)
)
requires_program_candidate = _boolish_str(
_pick_override(
overrides,
"requires_program_candidate",
abstract_reasoning.get("requiresProgramCandidate", False),
)
)
requires_backtracking_capability = _boolish_str(
_pick_override(
overrides,
"requires_backtracking_capability",
abstract_reasoning.get("requiresBacktrackingCapability", False),
)
)
program_candidate_ref_present = _boolish_str(bool(abstract_reasoning.get("programCandidateRef")))
counterexample_refs_present = _boolish_str(bool(abstract_reasoning.get("counterexampleRefs")))
backtracking_capable = _boolish_str(abstract_reasoning.get("backtrackingCapable", False))

context = {
"phase": str(overrides.get("phase") or phase),
"authority": str(overrides.get("authority") or authority),
"environment_tier": str(overrides.get("environment_tier") or env),
"approval_mode": str(overrides.get("approval_mode") or approval_mode),
"tenant_scope": str(overrides.get("tenant_scope") or tenant_scope),
"enforcement_point": str(overrides.get("enforcement_point") or "policy_engine"),
"reasoning_class": reasoning_class,
"verification_mode": verification_mode,
"llm_only_forbidden": llm_only_forbidden,
"requires_counterexample_search": requires_counterexample_search,
"requires_program_candidate": requires_program_candidate,
"requires_backtracking_capability": requires_backtracking_capability,
"program_candidate_ref_present": program_candidate_ref_present,
"counterexample_refs_present": counterexample_refs_present,
"backtracking_capable": backtracking_capable,
}
return context

Expand All @@ -75,6 +141,47 @@ def evaluate_bundle_gate(
raise ControlGateError(f"policy bundle missing: {policy_bundle_path}")

context = derive_gate_context(bundle)

if context["reasoning_class"] in {"ABSTRACT", "PROGRAM_INDUCTION"}:
bundle_sha256 = hashlib.sha256(policy_bundle_path.read_bytes()).hexdigest()
base_artifact = {
"kind": "ControlGateArtifact",
"bundle": _bundle_name(bundle),
"bundlePath": str(bundle_path.resolve()),
"evaluatedAt": dt.datetime.now(dt.timezone.utc).isoformat(),
"enforcementPoint": context["enforcement_point"],
"policyBundlePath": str(policy_bundle_path),
"policyBundleSha256": bundle_sha256,
"gateContext": context,
"matchedRowIds": [],
"blockingRowIds": [],
"candidateRowIds": [],
}
if context["llm_only_forbidden"] == "true" and context["verification_mode"] == "NONE":
return {
**base_artifact,
"result": "deny",
"reason": "abstract lane forbids llm-only evaluation",
}
if context["requires_program_candidate"] == "true" and context["program_candidate_ref_present"] != "true":
return {
**base_artifact,
"result": "deny",
"reason": "abstract lane requires program candidate evidence",
}
if context["requires_counterexample_search"] == "true" and context["counterexample_refs_present"] != "true":
return {
**base_artifact,
"result": "deny",
"reason": "abstract lane requires counterexample search evidence",
}
if context["requires_backtracking_capability"] == "true" and context["backtracking_capable"] != "true":
return {
**base_artifact,
"result": "deny",
"reason": "abstract lane requires declared backtracking capability",
}

rows = _load_json(policy_bundle_path)
relevant_rows = [
row for row in rows if row.get("enforcement_point") == context["enforcement_point"]
Expand All @@ -85,6 +192,16 @@ def matches(row: dict[str, Any]) -> bool:
for key in ("phase", "authority", "environment_tier", "approval_mode", "tenant_scope"):
if allow_if.get(key) != context[key]:
return False
for key in (
"reasoning_class",
"verification_mode",
"llm_only_forbidden",
"requires_counterexample_search",
"requires_program_candidate",
"requires_backtracking_capability",
):
if key in allow_if and _stringish(allow_if.get(key)).lower() != _stringish(context[key]).lower():
return False
return True

exact_rows = [row for row in relevant_rows if matches(row)]
Expand Down Expand Up @@ -162,4 +279,4 @@ def main() -> int:


if __name__ == "__main__":
raise SystemExit(main())
raise SystemExit(main())
42 changes: 41 additions & 1 deletion scripts/validate_bundle.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,38 @@ def main() -> int:
if not isinstance(mrs, int) or mrs < 5 or mrs > 3600:
die("spec.policy.maxRunSeconds must be an int in [5, 3600]", 2)

abstract_reasoning = pol.get("abstractReasoning") or {}
if abstract_reasoning:
reasoning_class = abstract_reasoning.get("reasoningClass", "REACTIVE")
verification_mode = abstract_reasoning.get("verificationMode", "NONE")
llm_only_forbidden = bool(abstract_reasoning.get("llmOnlyForbidden", False))
requires_counterexample_search = bool(abstract_reasoning.get("requiresCounterexampleSearch", False))
requires_program_candidate = bool(abstract_reasoning.get("requiresProgramCandidate", False))
requires_backtracking_capability = bool(abstract_reasoning.get("requiresBacktrackingCapability", False))

allowed_reasoning_classes = {"REACTIVE", "RETRIEVAL", "ABSTRACT", "CAUSAL", "PROGRAM_INDUCTION"}
allowed_verification_modes = {
"NONE",
"POLICY_ONLY",
"COUNTEREXAMPLE_SEARCH",
"PROGRAM_EXECUTION",
"CAUSAL_CHECK",
"HUMAN_REVIEW",
"COMPOSITE",
}
if reasoning_class not in allowed_reasoning_classes:
die(f"spec.policy.abstractReasoning.reasoningClass must be one of {sorted(allowed_reasoning_classes)}", 2)
if verification_mode not in allowed_verification_modes:
die(f"spec.policy.abstractReasoning.verificationMode must be one of {sorted(allowed_verification_modes)}", 2)
if reasoning_class in {"ABSTRACT", "PROGRAM_INDUCTION"} and llm_only_forbidden and verification_mode == "NONE":
die("abstractReasoning forbids llm-only evaluation when reasoningClass is ABSTRACT or PROGRAM_INDUCTION", 2)
if requires_program_candidate and not abstract_reasoning.get("programCandidateRef"):
die("abstractReasoning requires programCandidateRef", 2)
if requires_counterexample_search and not (abstract_reasoning.get("counterexampleRefs") or []):
die("abstractReasoning requires counterexampleRefs", 2)
if requires_backtracking_capability and not abstract_reasoning.get("backtrackingCapable", False):
die("abstractReasoning requires backtrackingCapable=true", 2)

vm = spec["vm"]
backend_intent = vm.get("backendIntent")
allowed = {"qemu", "microvm", "lima-process", "fleet"}
Expand Down Expand Up @@ -93,6 +125,14 @@ def main() -> int:
"artifactPath": str(gate_artifact_path),
"matchedRowIds": gate_artifact["matchedRowIds"],
},
"abstractGate": {
"reasoningClass": gate_artifact["gateContext"].get("reasoning_class"),
"verificationMode": gate_artifact["gateContext"].get("verification_mode"),
"llmOnlyForbidden": gate_artifact["gateContext"].get("llm_only_forbidden"),
"requiresCounterexampleSearch": gate_artifact["gateContext"].get("requires_counterexample_search"),
"requiresProgramCandidate": gate_artifact["gateContext"].get("requires_program_candidate"),
"requiresBacktrackingCapability": gate_artifact["gateContext"].get("requires_backtracking_capability"),
},
}
report_path = os.path.join(out_dir, "validation-artifact.json")
with open(report_path, "w", encoding="utf-8") as f:
Expand All @@ -102,4 +142,4 @@ def main() -> int:


if __name__ == "__main__":
raise SystemExit(main())
raise SystemExit(main())
Loading