From ce2ab78a06fe766d7691575965909dd78a500257 Mon Sep 17 00:00:00 2001 From: David Ahmann Date: Tue, 24 Feb 2026 07:02:19 -0500 Subject: [PATCH] tooling: add CI state classifier command (#18121) --- scripts/ci_state_classifier.py | 187 +++++++++++++++++++++++++++++++++ 1 file changed, 187 insertions(+) create mode 100755 scripts/ci_state_classifier.py diff --git a/scripts/ci_state_classifier.py b/scripts/ci_state_classifier.py new file mode 100755 index 00000000000..8c1c9a61af8 --- /dev/null +++ b/scripts/ci_state_classifier.py @@ -0,0 +1,187 @@ +#!/usr/bin/env python3 +"""Classify GitHub PR checks into deterministic machine-readable states. + +Outputs one of: +- passed +- failed +- pending +- no_checks +- policy_blocked +""" + +from __future__ import annotations + +import argparse +import json +import re +import subprocess +import sys +from dataclasses import dataclass +from pathlib import Path +from typing import Any + +FAIL_VALUES = {"FAILURE", "ERROR", "TIMED_OUT", "CANCELLED", "ACTION_REQUIRED"} +PENDING_VALUES = {"PENDING", "QUEUED", "IN_PROGRESS", "REQUESTED", "WAITING"} +POLICY_PATTERNS = [ + r"\bcla\b", + r"license/cla", + r"code[- ]?owners", + r"dco", + r"policy", + r"compliance", + r"signed[- ]off", +] + + +@dataclass +class Check: + name: str + conclusion: str + status: str + + +def _norm(value: Any) -> str: + if value is None: + return "" + return str(value).strip().upper() + + +def _to_checks(raw: list[dict[str, Any]]) -> list[Check]: + checks: list[Check] = [] + for item in raw: + name = ( + item.get("name") + or item.get("context") + or item.get("__typename") + or "unknown" + ) + conclusion = _norm(item.get("conclusion") or item.get("state")) + status = _norm(item.get("status") or item.get("state")) + checks.append(Check(name=str(name), conclusion=conclusion, status=status)) + return checks + + +def _is_policy(check: Check) -> bool: + name = check.name.lower() + return any(re.search(pattern, name) for pattern in POLICY_PATTERNS) + + +def classify(checks: list[Check]) -> str: + if not checks: + return "no_checks" + + failing = [c for c in checks if c.conclusion in FAIL_VALUES or c.status in FAIL_VALUES] + pending = [c for c in checks if c.status in PENDING_VALUES or c.conclusion in PENDING_VALUES] + + if failing: + if all(_is_policy(c) for c in failing): + return "policy_blocked" + return "failed" + + if pending: + if all(_is_policy(c) for c in pending): + return "policy_blocked" + return "pending" + + return "passed" + + +def _load_status_rollup(path: Path) -> list[dict[str, Any]]: + payload = json.loads(path.read_text()) + if isinstance(payload, dict) and "statusCheckRollup" in payload: + rollup = payload["statusCheckRollup"] + if isinstance(rollup, list): + return rollup + if isinstance(payload, list): + return payload + raise ValueError("Expected JSON list or object with statusCheckRollup list") + + +def _fetch_status_rollup(repo: str, pr: int) -> list[dict[str, Any]]: + proc = subprocess.run( + [ + "gh", + "pr", + "view", + "--repo", + repo, + str(pr), + "--json", + "statusCheckRollup,url", + ], + capture_output=True, + text=True, + check=True, + ) + payload = json.loads(proc.stdout) + return payload.get("statusCheckRollup", []) or [] + + +def _report(repo: str, pr: int | None, checks: list[Check]) -> dict[str, Any]: + return { + "repo": repo, + "pr": pr, + "classification": classify(checks), + "check_count": len(checks), + "checks": [ + { + "name": c.name, + "conclusion": c.conclusion, + "status": c.status, + "is_policy_check": _is_policy(c), + } + for c in checks + ], + } + + +def _self_test() -> None: + assert classify([]) == "no_checks" + assert ( + classify([Check("unit", "SUCCESS", "COMPLETED")]) == "passed" + ) + assert ( + classify([Check("build", "FAILURE", "COMPLETED")]) == "failed" + ) + assert ( + classify([Check("license/cla", "", "QUEUED")]) == "policy_blocked" + ) + assert ( + classify([Check("tests", "", "IN_PROGRESS")]) == "pending" + ) + print(json.dumps({"self_test": "ok"})) + + +def main() -> int: + parser = argparse.ArgumentParser() + parser.add_argument("--repo", help="owner/repo") + parser.add_argument("--pr", type=int, help="PR number") + parser.add_argument("--input", help="Path to JSON payload for statusCheckRollup") + parser.add_argument("--self-test", action="store_true") + parser.add_argument("--pretty", action="store_true") + args = parser.parse_args() + + if args.self_test: + _self_test() + return 0 + + if args.input: + raw = _load_status_rollup(Path(args.input)) + checks = _to_checks(raw) + report = _report(args.repo or "fixture", args.pr, checks) + else: + if not args.repo or not args.pr: + parser.error("--repo and --pr are required unless --input or --self-test is used") + raw = _fetch_status_rollup(args.repo, args.pr) + checks = _to_checks(raw) + report = _report(args.repo, args.pr, checks) + + if args.pretty: + print(json.dumps(report, indent=2, sort_keys=True)) + else: + print(json.dumps(report, sort_keys=True)) + return 0 + + +if __name__ == "__main__": + raise SystemExit(main())