From be76917d74f04d13249f57bba07256df60926e3c Mon Sep 17 00:00:00 2001 From: Chrono Date: Thu, 19 Mar 2026 23:14:26 -0700 Subject: [PATCH] fix: block RIP-201 bucket normalization spoofing (closes rustchain-bounties#554) Four server-side defences against architecture spoofing for reward multiplier gaming: 1. CPU brand cross-validation (reject Intel/AMD claiming G4/G5/68k) 2. SIMD evidence requirement (AltiVec/vec_perm for PowerPC claims) 3. Cache-timing profile validation (clock CV, L3 size, tone ratios) 4. Server-side bucket classification from verified features get_verified_multiplier() is a drop-in replacement for get_time_aged_multiplier() in rewards_implementation_rip200.py. 58 tests, all passing. RTC Wallet: wirework --- rip201_bucket_fix.py | 575 +++++++++++++++++++++++++++++ tests/__init__.py | 0 tests/test_rip201_bucket_fix.py | 635 ++++++++++++++++++++++++++++++++ 3 files changed, 1210 insertions(+) create mode 100644 rip201_bucket_fix.py create mode 100644 tests/__init__.py create mode 100644 tests/test_rip201_bucket_fix.py diff --git a/rip201_bucket_fix.py b/rip201_bucket_fix.py new file mode 100644 index 000000000..2db4a1f46 --- /dev/null +++ b/rip201_bucket_fix.py @@ -0,0 +1,575 @@ +#!/usr/bin/env python3 +""" +RIP-201 Bucket Normalization Spoofing Fix +========================================== + +Bounty #554: A modern x86 CPU (e.g., Intel Xeon Platinum) can claim +device_arch=G4 (PowerPC) and get routed into the vintage_powerpc bucket +with a 2.5x reward multiplier -- a 10x gain over honest miners. + +This module adds server-side defences: + +1. CPU brand-string cross-validation against claimed device_arch. +2. SIMD evidence requirement for vintage PowerPC claims (AltiVec / vec_perm). +3. Cache-timing profile validation matching PowerPC characteristics. +4. Server-side bucket classification derived from *verified* hardware + features rather than raw client-reported architecture strings. + +Designed to be imported by ``node/rewards_implementation_rip200.py`` and +called before a miner's ``device_arch`` is accepted for reward weighting. + +Follows Rustchain patterns: raw sqlite3, Flask-compatible, no ORM. +""" + +import re +import sqlite3 +import time +import statistics +from dataclasses import dataclass, field +from typing import Dict, List, Optional, Tuple, Any + +# --------------------------------------------------------------------------- +# 1. CPU brand-string cross-validation +# --------------------------------------------------------------------------- + +# Patterns that positively identify a modern x86 vendor/product line. +_MODERN_X86_BRAND_PATTERNS: List[re.Pattern] = [ + re.compile(p, re.IGNORECASE) for p in [ + r"Intel\(R\)\s*(Core|Xeon|Celeron|Pentium|Atom)", + r"Intel.*Core.*i[3579]", + r"Intel.*Xeon", + r"Genuine\s*Intel", + r"AMD\s*(Ryzen|EPYC|Athlon|Opteron|Phenom|FX|Threadripper)", + r"AMD.*EPYC", + r"AuthenticAMD", + # VIA / Centaur modern chips + r"VIA\s*(Nano|C7|Eden|Quadcore)", + r"CentaurHauls", + ] +] + +# Architectures that are *not* x86 -- the vintage/RISC buckets. +_NON_X86_ARCHS = frozenset({ + "g3", "g4", "g5", "power8", "sparc", "68k", "amiga_68k", + "apple_silicon", "arm64", "riscv", +}) + +# Architectures that specifically require PowerPC lineage. +_POWERPC_ARCHS = frozenset({"g3", "g4", "g5", "power8"}) + +# Expected brand substrings for PowerPC claims. +_POWERPC_BRAND_KEYWORDS = [ + "motorola", "freescale", "nxp", "ibm", "power macintosh", + "powerbook", "powermac", "power mac", "amigaone", "pegasos", + "sam440", "sam460", "ppc", "powerpc", +] + + +def _brand_looks_modern_x86(brand: str) -> bool: + """Return True if the brand string matches a known modern x86 CPU.""" + if not brand: + return False + return any(pat.search(brand) for pat in _MODERN_X86_BRAND_PATTERNS) + + +def _brand_looks_powerpc(brand: str) -> bool: + """Return True if the brand string plausibly belongs to a PowerPC system.""" + if not brand: + return False + lower = brand.lower() + return any(kw in lower for kw in _POWERPC_BRAND_KEYWORDS) + + +def validate_cpu_brand_vs_arch( + cpu_brand: str, + claimed_arch: str, +) -> Tuple[bool, str]: + """Cross-validate CPU brand string against claimed architecture. + + Returns (passed, reason). ``passed=False`` means the claim is + rejected outright -- e.g. an Intel Xeon claiming G4. + """ + if not claimed_arch: + return False, "missing_device_arch" + + arch_lower = claimed_arch.lower().strip() + + # Only gate non-x86 claims. Modern x86 miners claiming modern_x86 + # don't need brand gating -- they're honest. + if arch_lower not in _NON_X86_ARCHS: + return True, "arch_not_gated" + + # Hard reject: modern x86 brand + non-x86 arch claim. + if _brand_looks_modern_x86(cpu_brand): + return False, ( + f"brand_arch_conflict:brand_is_modern_x86," + f"claimed_arch={claimed_arch}" + ) + + # For PowerPC claims, additionally require a PowerPC-plausible brand. + # An empty/missing brand is not sufficient -- we need positive evidence. + if arch_lower in _POWERPC_ARCHS: + if not _brand_looks_powerpc(cpu_brand or ""): + return False, ( + f"brand_not_powerpc:brand={cpu_brand}," + f"claimed_arch={claimed_arch}" + ) + + return True, "brand_ok" + + +# --------------------------------------------------------------------------- +# 2. SIMD evidence requirement for vintage PowerPC +# --------------------------------------------------------------------------- + +def validate_simd_evidence( + claimed_arch: str, + simd_data: Dict[str, Any], +) -> Tuple[bool, str]: + """Require AltiVec / vec_perm evidence for G4/G5 PowerPC claims. + + G3 does *not* have AltiVec, so we skip that check for G3. + Power8/9 also has AltiVec (IBM VMX). + + Returns (passed, reason). + """ + arch_lower = (claimed_arch or "").lower().strip() + altivec_archs = {"g4", "g5", "power8"} + + if arch_lower not in altivec_archs: + return True, "simd_check_not_required" + + if not simd_data or not isinstance(simd_data, dict): + return False, f"missing_simd_data:claimed_arch={arch_lower}" + + # Accept nested { "data": { ... } } or flat dict. + data = simd_data.get("data", simd_data) + if not isinstance(data, dict): + data = simd_data + + has_altivec = bool(data.get("has_altivec", False)) + if not has_altivec: + return False, f"altivec_missing:claimed_arch={arch_lower}" + + # vec_perm is a strong AltiVec indicator (used by real G4 miners). + vec_perm = data.get("vec_perm_result") + altivec_ops = data.get("altivec_ops") + simd_type = (data.get("simd_type") or "").lower() + + # Must not simultaneously claim x86 SIMD features. + for x86_feat in ("has_sse2", "has_avx", "has_avx2", "has_avx512"): + if data.get(x86_feat, False): + return False, ( + f"x86_simd_with_altivec:feat={x86_feat}," + f"claimed_arch={arch_lower}" + ) + + # Require either vec_perm evidence, altivec_ops count, or simd_type + # explicitly set to "altivec". + if vec_perm is None and altivec_ops is None and simd_type != "altivec": + return False, ( + f"insufficient_altivec_evidence:claimed_arch={arch_lower}" + ) + + return True, "simd_evidence_ok" + + +# --------------------------------------------------------------------------- +# 3. Cache-timing profile validation for PowerPC +# --------------------------------------------------------------------------- + +# PowerPC G4 characteristics: +# - L1 32 KB, L2 256 KB-1 MB (on-chip or backside) +# - No L3 on G4 (7450/7447) +# - Higher cache-miss variance than modern x86 +# - Coefficient of variation (CV) typically 0.01 - 0.15 (vs < 0.008 on x86) +# +# We codify these as min/max bounds. Values outside the window indicate +# the miner is running on hardware inconsistent with G4/G5. + +@dataclass +class _CacheProfile: + """Expected cache-timing characteristics for an architecture.""" + cv_min: float + cv_max: float + tone_ratio_min: float # min mean tone_ratio (L(n+1)/L(n) latency ratio) + tone_ratio_max: float + max_cache_levels: int # e.g. G4 has L1+L2 only => 2 + require_no_large_l3: bool = False # G4 should NOT have a big L3 + +# Profiles keyed by normalized arch. +_CACHE_PROFILES: Dict[str, _CacheProfile] = { + "g4": _CacheProfile( + cv_min=0.008, cv_max=0.15, + tone_ratio_min=0.8, tone_ratio_max=8.0, + max_cache_levels=3, # L1 + L2 (+ small L3 on some) + require_no_large_l3=True, # No 4096 KB+ L3 + ), + "g5": _CacheProfile( + cv_min=0.005, cv_max=0.12, + tone_ratio_min=0.7, tone_ratio_max=10.0, + max_cache_levels=4, + require_no_large_l3=False, + ), + "g3": _CacheProfile( + cv_min=0.01, cv_max=0.20, + tone_ratio_min=0.5, tone_ratio_max=6.0, + max_cache_levels=2, + require_no_large_l3=True, + ), +} + + +def validate_cache_timing( + claimed_arch: str, + cache_data: Dict[str, Any], + clock_data: Dict[str, Any], +) -> Tuple[bool, str]: + """Validate cache-timing profile matches PowerPC characteristics. + + For non-PowerPC arches this is a no-op pass. + + Returns (passed, reason). + """ + arch_lower = (claimed_arch or "").lower().strip() + profile = _CACHE_PROFILES.get(arch_lower) + if profile is None: + return True, "cache_profile_check_not_required" + + # --- Clock CV check --- + if clock_data and isinstance(clock_data, dict): + cd = clock_data.get("data", clock_data) + if isinstance(cd, dict): + cv = cd.get("cv", 0) + if cv and cv < profile.cv_min: + return False, ( + f"clock_cv_too_low:cv={cv:.6f}," + f"min={profile.cv_min}," + f"claimed_arch={arch_lower}" + ) + if cv and cv > profile.cv_max: + return False, ( + f"clock_cv_too_high:cv={cv:.6f}," + f"max={profile.cv_max}," + f"claimed_arch={arch_lower}" + ) + + # --- Cache structure check --- + if cache_data and isinstance(cache_data, dict): + cd = cache_data.get("data", cache_data) + if isinstance(cd, dict): + latencies = cd.get("latencies", {}) + + # G4 should NOT have a large L3/L4 present. + if profile.require_no_large_l3: + for big_level in ("4096KB", "16384KB"): + if big_level in latencies: + entry = latencies[big_level] + if isinstance(entry, dict) and "error" not in entry: + return False, ( + f"unexpected_large_cache:{big_level}," + f"claimed_arch={arch_lower}" + ) + + # Tone ratio validation. + tone_ratios = cd.get("tone_ratios", []) + if tone_ratios and len(tone_ratios) > 0: + mean_tone = statistics.mean(tone_ratios) + if mean_tone < profile.tone_ratio_min: + return False, ( + f"tone_ratio_too_low:mean={mean_tone:.2f}," + f"min={profile.tone_ratio_min}," + f"claimed_arch={arch_lower}" + ) + if mean_tone > profile.tone_ratio_max: + return False, ( + f"tone_ratio_too_high:mean={mean_tone:.2f}," + f"max={profile.tone_ratio_max}," + f"claimed_arch={arch_lower}" + ) + + return True, "cache_timing_ok" + + +# --------------------------------------------------------------------------- +# 4. Server-side bucket classification from verified features +# --------------------------------------------------------------------------- + +@dataclass +class BucketClassification: + """Result of server-side bucket classification.""" + bucket: str # The verified reward bucket name + multiplier: float # The multiplier to apply + claimed_arch: str # What the miner originally claimed + verified_arch: str # What the server determined from evidence + downgraded: bool # True if the miner was moved to a lower bucket + rejection_reasons: List[str] = field(default_factory=list) + accepted: bool = True # False = attestation fully rejected + + +# Base multipliers (mirrors ANTIQUITY_MULTIPLIERS in rip_200_round_robin). +_BUCKET_MULTIPLIERS: Dict[str, float] = { + "vintage_powerpc_g4": 2.5, + "vintage_powerpc_g5": 2.0, + "vintage_powerpc_g3": 2.5, + "vintage_68k": 3.0, + "vintage_x86": 2.0, + "retro_x86": 1.5, + "power8": 1.8, + "sparc": 2.0, + "apple_silicon": 1.0, + "arm64": 1.0, + "modern_x86": 1.0, + "unknown": 1.0, +} + + +def _infer_arch_from_features( + simd_data: Dict[str, Any], + cache_data: Dict[str, Any], + cpu_brand: str, +) -> str: + """Infer the most likely architecture bucket from raw hardware evidence. + + This is the server-side replacement for trusting ``device_arch``. + """ + data = {} + if simd_data and isinstance(simd_data, dict): + data = simd_data.get("data", simd_data) + if not isinstance(data, dict): + data = simd_data + + has_altivec = bool(data.get("has_altivec", False)) + has_sse2 = bool(data.get("has_sse2", False)) + has_avx = bool(data.get("has_avx", False)) + has_neon = bool(data.get("has_neon", False)) + + # Modern x86 is the most common -- check first. + if has_sse2 or has_avx or _brand_looks_modern_x86(cpu_brand): + return "modern_x86" + + if has_neon: + brand_lower = (cpu_brand or "").lower() + if "apple" in brand_lower: + return "apple_silicon" + return "arm64" + + if has_altivec: + # Could be G4, G5, or Power8. + brand_lower = (cpu_brand or "").lower() + if "ibm" in brand_lower or "power8" in brand_lower or "power9" in brand_lower: + return "power8" + # Distinguish G4 vs G5 by cache structure if possible. + if cache_data and isinstance(cache_data, dict): + cd = cache_data.get("data", cache_data) + if isinstance(cd, dict): + latencies = cd.get("latencies", {}) + if "4096KB" in latencies: + return "vintage_powerpc_g5" + return "vintage_powerpc_g4" + + # No SIMD at all -- likely very old. + brand_lower = (cpu_brand or "").lower() + if any(kw in brand_lower for kw in ("motorola", "68k", "mc68")): + return "vintage_68k" + if any(kw in brand_lower for kw in ("powerpc", "ppc", "power macintosh")): + return "vintage_powerpc_g3" + + return "modern_x86" # conservative default + + +def classify_reward_bucket( + claimed_arch: str, + cpu_brand: str, + fingerprint: Dict[str, Any], +) -> BucketClassification: + """Server-side reward-bucket classification. + + Instead of trusting the client-supplied ``device_arch`` directly, this + function: + + 1. Runs brand-string cross-validation. + 2. Checks SIMD evidence. + 3. Checks cache-timing profile. + 4. Infers the *actual* architecture from verified features. + 5. Assigns the miner to the correct reward bucket and multiplier. + + If the miner's claim is inconsistent, they are downgraded to the + bucket their evidence supports (usually ``modern_x86`` at 1.0x). + """ + reasons: List[str] = [] + + # ---- Extract fingerprint sub-sections ---- + checks = fingerprint.get("checks", {}) if isinstance(fingerprint, dict) else {} + if not checks and isinstance(fingerprint, dict): + checks = {k: v for k, v in fingerprint.items() + if k in ("clock_drift", "cache_timing", "simd_identity", + "thermal_drift")} + simd_data = checks.get("simd_identity", {}) + cache_data = checks.get("cache_timing", {}) + clock_data = checks.get("clock_drift", {}) + + # ---- Step 1: Brand cross-validation ---- + brand_ok, brand_reason = validate_cpu_brand_vs_arch(cpu_brand, claimed_arch) + if not brand_ok: + reasons.append(brand_reason) + + # ---- Step 2: SIMD evidence ---- + simd_ok, simd_reason = validate_simd_evidence(claimed_arch, simd_data) + if not simd_ok: + reasons.append(simd_reason) + + # ---- Step 3: Cache-timing profile ---- + cache_ok, cache_reason = validate_cache_timing( + claimed_arch, cache_data, clock_data, + ) + if not cache_ok: + reasons.append(cache_reason) + + # ---- Step 4: Server-side feature inference ---- + verified_arch = _infer_arch_from_features(simd_data, cache_data, cpu_brand) + + # ---- Step 5: Determine bucket ---- + all_checks_passed = brand_ok and simd_ok and cache_ok + + if all_checks_passed: + # Trust the claim -- it is consistent with evidence. + bucket = _arch_to_bucket(claimed_arch) + multiplier = _BUCKET_MULTIPLIERS.get(bucket, 1.0) + return BucketClassification( + bucket=bucket, + multiplier=multiplier, + claimed_arch=claimed_arch, + verified_arch=verified_arch, + downgraded=False, + ) + + # Downgrade: use the server-inferred architecture instead. + bucket = verified_arch # already a bucket name + multiplier = _BUCKET_MULTIPLIERS.get(bucket, 1.0) + + return BucketClassification( + bucket=bucket, + multiplier=multiplier, + claimed_arch=claimed_arch, + verified_arch=verified_arch, + downgraded=True, + rejection_reasons=reasons, + accepted=True, # still accepted, but at lower multiplier + ) + + +def _arch_to_bucket(arch: str) -> str: + """Map a claimed device_arch to a canonical bucket name.""" + arch_lower = (arch or "").lower().strip() + mapping = { + "g4": "vintage_powerpc_g4", + "g5": "vintage_powerpc_g5", + "g3": "vintage_powerpc_g3", + "powerpc": "vintage_powerpc_g3", + "ppc": "vintage_powerpc_g3", + "power macintosh": "vintage_powerpc_g4", + "power8": "power8", + "power9": "power8", + "68k": "vintage_68k", + "m68k": "vintage_68k", + "amiga_68k": "vintage_68k", + "sparc": "sparc", + "apple_silicon": "apple_silicon", + "arm64": "arm64", + "aarch64": "arm64", + "modern_x86": "modern_x86", + "x86_64": "modern_x86", + "x86-64": "modern_x86", + "amd64": "modern_x86", + "retro_x86": "retro_x86", + "vintage_x86": "vintage_x86", + } + return mapping.get(arch_lower, "modern_x86") + + +# --------------------------------------------------------------------------- +# 5. Database helpers (sqlite3, following Rustchain patterns) +# --------------------------------------------------------------------------- + +def log_bucket_classification( + db: sqlite3.Connection, + miner_id: str, + classification: BucketClassification, + ts: Optional[int] = None, +) -> None: + """Write a bucket-classification audit row. + + Table ``rip201_bucket_audit`` is created if it does not exist. + """ + ts = ts or int(time.time()) + db.execute(""" + CREATE TABLE IF NOT EXISTS rip201_bucket_audit ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + ts INTEGER NOT NULL, + miner TEXT NOT NULL, + claimed_arch TEXT, + verified_arch TEXT, + bucket TEXT, + multiplier REAL, + downgraded INTEGER, + reasons TEXT + ) + """) + db.execute(""" + INSERT INTO rip201_bucket_audit + (ts, miner, claimed_arch, verified_arch, bucket, multiplier, downgraded, reasons) + VALUES (?, ?, ?, ?, ?, ?, ?, ?) + """, ( + ts, + miner_id, + classification.claimed_arch, + classification.verified_arch, + classification.bucket, + classification.multiplier, + int(classification.downgraded), + "|".join(classification.rejection_reasons), + )) + db.commit() + + +# --------------------------------------------------------------------------- +# 6. Integration helper -- drop-in for rewards_implementation_rip200.py +# --------------------------------------------------------------------------- + +def get_verified_multiplier( + miner_id: str, + claimed_arch: str, + cpu_brand: str, + fingerprint: Dict[str, Any], + chain_age_years: float, + db: Optional[sqlite3.Connection] = None, +) -> float: + """Return the time-aged multiplier using server-verified bucket. + + This is the function that ``rewards_implementation_rip200.py`` should + call in place of the current + ``get_time_aged_multiplier(device_arch, chain_age_years)`` to close + the RIP-201 spoofing vector. + """ + classification = classify_reward_bucket( + claimed_arch, cpu_brand, fingerprint, + ) + + if db is not None: + try: + log_bucket_classification(db, miner_id, classification) + except Exception: + pass # audit logging should never block reward calculation + + base = classification.multiplier + + # Apply time-aging decay (same formula as rip_200_round_robin). + if base <= 1.0: + return 1.0 + + DECAY_RATE = 0.06 # mirrors rip_200_round_robin_1cpu1vote.py + vintage_bonus = base - 1.0 + aged_bonus = max(0.0, vintage_bonus * (1 - DECAY_RATE * chain_age_years)) + + return 1.0 + aged_bonus diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/test_rip201_bucket_fix.py b/tests/test_rip201_bucket_fix.py new file mode 100644 index 000000000..bb0d84f2c --- /dev/null +++ b/tests/test_rip201_bucket_fix.py @@ -0,0 +1,635 @@ +#!/usr/bin/env python3 +""" +Tests for RIP-201 Bucket Normalization Spoofing Fix +===================================================== + +Proves the four defences required by bounty #554: + +1. CPU brand cross-validation -- Intel Xeon + G4 is REJECTED. +2. SIMD evidence -- missing AltiVec for PowerPC claims is REJECTED. +3. Cache-timing profile -- mismatch for PowerPC claims is REJECTED. +4. Server-side bucket classification -- modern x86 cannot spoof into + ANY vintage bucket. + +Uses only stdlib unittest; no client code is executed locally. +""" + +import sqlite3 +import unittest +import sys +import os + +# Ensure the parent directory is on the path so we can import the fix. +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..")) + +from rip201_bucket_fix import ( + validate_cpu_brand_vs_arch, + validate_simd_evidence, + validate_cache_timing, + classify_reward_bucket, + get_verified_multiplier, + log_bucket_classification, + BucketClassification, + _brand_looks_modern_x86, + _brand_looks_powerpc, + _infer_arch_from_features, + _arch_to_bucket, +) + + +# ── Helpers ────────────────────────────────────────────────────── + +def _make_fingerprint( + has_altivec=False, + has_sse2=False, + has_avx=False, + has_avx2=False, + has_avx512=False, + has_neon=False, + simd_type="", + vec_perm_result=None, + altivec_ops=None, + cv=0.05, + tone_ratios=None, + latency_keys=None, + thermal_drift_pct=5.0, +): + """Build a fingerprint dict matching the Rustchain schema.""" + simd = { + "has_altivec": has_altivec, + "has_sse2": has_sse2, + "has_avx": has_avx, + "has_avx2": has_avx2, + "has_avx512": has_avx512, + "has_neon": has_neon, + "simd_type": simd_type, + } + if vec_perm_result is not None: + simd["vec_perm_result"] = vec_perm_result + if altivec_ops is not None: + simd["altivec_ops"] = altivec_ops + + latencies = {} + if latency_keys is None: + latency_keys = ["4KB", "32KB", "256KB", "1024KB"] + for k in latency_keys: + latencies[k] = {"random_ns": 2.0} + + return { + "checks": { + "simd_identity": {"passed": True, "data": simd}, + "clock_drift": {"passed": True, "data": {"cv": cv, "samples": 200}}, + "cache_timing": { + "passed": True, + "data": { + "latencies": latencies, + "tone_ratios": tone_ratios or [2.0, 2.5, 2.0], + }, + }, + "thermal_drift": { + "passed": True, + "data": {"thermal_drift_pct": thermal_drift_pct}, + }, + } + } + + +def _g4_fingerprint(): + """Fingerprint matching a genuine PowerPC G4.""" + return _make_fingerprint( + has_altivec=True, + simd_type="altivec", + vec_perm_result=0xDEAD, + altivec_ops=42, + cv=0.05, + tone_ratios=[2.0, 2.5, 2.0], + latency_keys=["4KB", "32KB", "256KB", "1024KB"], + thermal_drift_pct=5.0, + ) + + +def _modern_x86_fingerprint(): + """Fingerprint matching a modern Intel/AMD x86-64 system.""" + return _make_fingerprint( + has_sse2=True, + has_avx=True, + has_avx2=True, + simd_type="sse_avx", + cv=0.002, + tone_ratios=[1.5, 2.0, 2.5, 2.5], + latency_keys=["4KB", "32KB", "256KB", "1024KB", "4096KB"], + ) + + +# ================================================================= +# Test suite +# ================================================================= + +class TestBrandCrossValidation(unittest.TestCase): + """Defence 1: CPU brand string vs claimed arch.""" + + def test_intel_xeon_claiming_g4_rejected(self): + """Bounty requirement: Intel Xeon + G4 claim is REJECTED.""" + passed, reason = validate_cpu_brand_vs_arch( + "Intel(R) Xeon(R) Platinum 8380 CPU @ 2.30GHz", + "g4", + ) + self.assertFalse(passed) + self.assertIn("brand_is_modern_x86", reason) + + def test_amd_epyc_claiming_g4_rejected(self): + """Bounty requirement: AMD EPYC + G4 claim is REJECTED.""" + passed, reason = validate_cpu_brand_vs_arch( + "AMD EPYC 7763 64-Core Processor", + "g4", + ) + self.assertFalse(passed) + self.assertIn("brand_is_modern_x86", reason) + + def test_intel_core_i9_claiming_g5_rejected(self): + passed, reason = validate_cpu_brand_vs_arch( + "Intel(R) Core(TM) i9-13900K", + "g5", + ) + self.assertFalse(passed) + + def test_amd_ryzen_claiming_sparc_rejected(self): + passed, reason = validate_cpu_brand_vs_arch( + "AMD Ryzen 9 7950X 16-Core Processor", + "sparc", + ) + self.assertFalse(passed) + + def test_intel_xeon_claiming_68k_rejected(self): + passed, reason = validate_cpu_brand_vs_arch( + "Intel(R) Xeon(R) E5-2670 v3 @ 2.30GHz", + "68k", + ) + self.assertFalse(passed) + + def test_genuine_powerpc_g4_accepted(self): + """Bounty requirement: Real PowerPC G4 is ACCEPTED.""" + passed, reason = validate_cpu_brand_vs_arch( + "PowerPC G4 7447A @ 1.42GHz", + "g4", + ) + self.assertTrue(passed) + + def test_amigaone_g4_accepted(self): + passed, reason = validate_cpu_brand_vs_arch( + "AmigaOne G4 7447 @ 1GHz", + "g4", + ) + self.assertTrue(passed) + + def test_ibm_power8_accepted(self): + passed, reason = validate_cpu_brand_vs_arch( + "IBM POWER8 @ 3.5GHz", + "power8", + ) + self.assertTrue(passed) + + def test_modern_x86_claiming_modern_x86_accepted(self): + """Honest x86 miners are not blocked.""" + passed, reason = validate_cpu_brand_vs_arch( + "Intel(R) Core(TM) i7-12700K", + "modern_x86", + ) + self.assertTrue(passed) + + def test_missing_arch_rejected(self): + passed, reason = validate_cpu_brand_vs_arch("anything", "") + self.assertFalse(passed) + + def test_empty_brand_powerpc_claim_rejected(self): + """Empty brand + PowerPC claim: rejected (no positive evidence).""" + passed, reason = validate_cpu_brand_vs_arch("", "g4") + # Empty string is not modern_x86, so brand_looks_modern_x86 = False + # But also not powerpc brand, so should fail for powerpc archs. + self.assertFalse(passed) + self.assertIn("brand_not_powerpc", reason) + + def test_unknown_brand_non_powerpc_non_x86_passes(self): + """Unknown brand claiming arm64 should pass (no brand gate for ARM).""" + passed, reason = validate_cpu_brand_vs_arch( + "Qualcomm Snapdragon 8 Gen 3", "arm64", + ) + self.assertTrue(passed) + + +class TestSIMDEvidence(unittest.TestCase): + """Defence 2: SIMD evidence for PowerPC claims.""" + + def test_g4_with_altivec_accepted(self): + """Bounty requirement: Real G4 + valid AltiVec evidence ACCEPTED.""" + simd = { + "data": { + "has_altivec": True, + "simd_type": "altivec", + "vec_perm_result": 0xCAFE, + } + } + passed, reason = validate_simd_evidence("g4", simd) + self.assertTrue(passed) + + def test_g4_missing_altivec_rejected(self): + """Bounty requirement: Missing AltiVec for G4 is REJECTED.""" + simd = {"data": {"has_altivec": False, "simd_type": "none"}} + passed, reason = validate_simd_evidence("g4", simd) + self.assertFalse(passed) + self.assertIn("altivec_missing", reason) + + def test_g4_with_sse2_and_altivec_rejected(self): + """x86 SIMD features alongside AltiVec = spoofing.""" + simd = { + "data": { + "has_altivec": True, + "has_sse2": True, + "simd_type": "altivec", + "vec_perm_result": 0xBEEF, + } + } + passed, reason = validate_simd_evidence("g4", simd) + self.assertFalse(passed) + self.assertIn("x86_simd_with_altivec", reason) + + def test_g4_with_avx_and_altivec_rejected(self): + simd = { + "data": { + "has_altivec": True, + "has_avx": True, + "simd_type": "altivec", + "vec_perm_result": 1, + } + } + passed, reason = validate_simd_evidence("g4", simd) + self.assertFalse(passed) + + def test_g4_altivec_but_no_evidence_rejected(self): + """has_altivec=True but no vec_perm, no altivec_ops, no simd_type.""" + simd = {"data": {"has_altivec": True}} + passed, reason = validate_simd_evidence("g4", simd) + self.assertFalse(passed) + self.assertIn("insufficient_altivec_evidence", reason) + + def test_g4_altivec_with_ops_count_accepted(self): + """altivec_ops count alone is sufficient evidence.""" + simd = { + "data": { + "has_altivec": True, + "altivec_ops": 128, + } + } + passed, reason = validate_simd_evidence("g4", simd) + self.assertTrue(passed) + + def test_g5_missing_simd_data_rejected(self): + passed, reason = validate_simd_evidence("g5", {}) + self.assertFalse(passed) + + def test_g5_none_simd_data_rejected(self): + passed, reason = validate_simd_evidence("g5", None) + self.assertFalse(passed) + + def test_g3_no_altivec_needed(self): + """G3 does not have AltiVec -- SIMD check is skipped.""" + passed, reason = validate_simd_evidence("g3", {}) + self.assertTrue(passed) + + def test_modern_x86_simd_check_not_required(self): + passed, reason = validate_simd_evidence("modern_x86", {}) + self.assertTrue(passed) + + def test_power8_requires_altivec(self): + simd = {"data": {"has_altivec": False}} + passed, reason = validate_simd_evidence("power8", simd) + self.assertFalse(passed) + + +class TestCacheTimingValidation(unittest.TestCase): + """Defence 3: Cache-timing profile for PowerPC claims.""" + + def test_g4_valid_cache_profile_accepted(self): + cache = { + "data": { + "latencies": { + "4KB": {"random_ns": 1.0}, + "32KB": {"random_ns": 2.0}, + "256KB": {"random_ns": 5.0}, + "1024KB": {"random_ns": 10.0}, + }, + "tone_ratios": [2.0, 2.5, 2.0], + } + } + clock = {"data": {"cv": 0.05}} + passed, reason = validate_cache_timing("g4", cache, clock) + self.assertTrue(passed) + + def test_g4_clock_cv_too_low_rejected(self): + """Bounty requirement: cache timing mismatch REJECTED. + Modern x86 CV (~0.002) is way below G4 minimum (0.008).""" + cache = {"data": {"latencies": {"4KB": {"random_ns": 1.0}}, "tone_ratios": [2.0]}} + clock = {"data": {"cv": 0.002}} + passed, reason = validate_cache_timing("g4", cache, clock) + self.assertFalse(passed) + self.assertIn("clock_cv_too_low", reason) + + def test_g4_large_l3_rejected(self): + """G4 should NOT have a 4096 KB L3 cache.""" + cache = { + "data": { + "latencies": { + "4KB": {"random_ns": 1.0}, + "32KB": {"random_ns": 2.0}, + "256KB": {"random_ns": 5.0}, + "4096KB": {"random_ns": 20.0}, + }, + "tone_ratios": [2.0, 2.5], + } + } + clock = {"data": {"cv": 0.05}} + passed, reason = validate_cache_timing("g4", cache, clock) + self.assertFalse(passed) + self.assertIn("unexpected_large_cache", reason) + + def test_g4_tone_ratio_too_low_rejected(self): + cache = { + "data": { + "latencies": {"4KB": {"random_ns": 1.0}}, + "tone_ratios": [0.3, 0.4], + } + } + clock = {"data": {"cv": 0.05}} + passed, reason = validate_cache_timing("g4", cache, clock) + self.assertFalse(passed) + self.assertIn("tone_ratio_too_low", reason) + + def test_g5_allows_large_l3(self): + """G5 can have a large L3 -- should not be rejected.""" + cache = { + "data": { + "latencies": { + "4KB": {"random_ns": 1.0}, + "32KB": {"random_ns": 2.0}, + "4096KB": {"random_ns": 20.0}, + }, + "tone_ratios": [2.0, 2.5], + } + } + clock = {"data": {"cv": 0.08}} + passed, reason = validate_cache_timing("g5", cache, clock) + self.assertTrue(passed) + + def test_modern_x86_cache_check_skipped(self): + """Non-PowerPC arches are not subject to cache profile checks.""" + passed, reason = validate_cache_timing("modern_x86", {}, {}) + self.assertTrue(passed) + + def test_g3_cv_too_high_rejected(self): + clock = {"data": {"cv": 0.50}} + cache = {"data": {"latencies": {"4KB": {"random_ns": 1.0}}, "tone_ratios": [2.0]}} + passed, reason = validate_cache_timing("g3", cache, clock) + self.assertFalse(passed) + self.assertIn("clock_cv_too_high", reason) + + +class TestServerSideBucketClassification(unittest.TestCase): + """Defence 4: Server-side classification from verified features.""" + + def test_intel_xeon_spoofing_g4_downgraded_to_modern_x86(self): + """Bounty requirement: Modern x86 cannot spoof into ANY vintage bucket. + Intel Xeon claiming G4 gets downgraded to modern_x86 at 1.0x.""" + fp = _modern_x86_fingerprint() + result = classify_reward_bucket( + "g4", + "Intel(R) Xeon(R) Platinum 8380 CPU @ 2.30GHz", + fp, + ) + self.assertTrue(result.downgraded) + self.assertEqual(result.bucket, "modern_x86") + self.assertEqual(result.multiplier, 1.0) + self.assertGreater(len(result.rejection_reasons), 0) + + def test_amd_epyc_spoofing_g4_downgraded(self): + """Bounty requirement: AMD EPYC + G4 REJECTED / downgraded.""" + fp = _modern_x86_fingerprint() + result = classify_reward_bucket( + "g4", + "AMD EPYC 7763 64-Core Processor", + fp, + ) + self.assertTrue(result.downgraded) + self.assertEqual(result.bucket, "modern_x86") + self.assertEqual(result.multiplier, 1.0) + + def test_genuine_g4_full_multiplier(self): + """Bounty requirement: Real PowerPC G4 with valid evidence ACCEPTED.""" + fp = _g4_fingerprint() + result = classify_reward_bucket( + "g4", + "PowerPC G4 7447A @ 1.42GHz", + fp, + ) + self.assertFalse(result.downgraded) + self.assertEqual(result.bucket, "vintage_powerpc_g4") + self.assertEqual(result.multiplier, 2.5) + + def test_x86_spoofing_g5_downgraded(self): + fp = _modern_x86_fingerprint() + result = classify_reward_bucket( + "g5", + "Intel(R) Core(TM) i7-12700K", + fp, + ) + self.assertTrue(result.downgraded) + self.assertEqual(result.multiplier, 1.0) + + def test_x86_spoofing_68k_downgraded(self): + fp = _modern_x86_fingerprint() + result = classify_reward_bucket( + "68k", + "AMD Ryzen 9 7950X", + fp, + ) + self.assertTrue(result.downgraded) + self.assertEqual(result.multiplier, 1.0) + + def test_x86_spoofing_sparc_downgraded(self): + fp = _modern_x86_fingerprint() + result = classify_reward_bucket( + "sparc", + "Intel(R) Xeon(R) Gold 6248R", + fp, + ) + self.assertTrue(result.downgraded) + self.assertEqual(result.multiplier, 1.0) + + def test_x86_spoofing_power8_downgraded(self): + fp = _modern_x86_fingerprint() + result = classify_reward_bucket( + "power8", + "Intel(R) Xeon(R) E5-2690 v4", + fp, + ) + self.assertTrue(result.downgraded) + self.assertEqual(result.multiplier, 1.0) + + def test_honest_modern_x86_unchanged(self): + fp = _modern_x86_fingerprint() + result = classify_reward_bucket( + "modern_x86", + "Intel(R) Core(TM) i7-12700K", + fp, + ) + self.assertFalse(result.downgraded) + self.assertEqual(result.multiplier, 1.0) + + def test_inferred_arch_matches_evidence(self): + """Server-side inference should detect modern_x86 from SSE2/AVX.""" + fp = _modern_x86_fingerprint() + inferred = _infer_arch_from_features( + fp["checks"]["simd_identity"], + fp["checks"]["cache_timing"], + "Intel(R) Xeon(R) Platinum 8380", + ) + self.assertEqual(inferred, "modern_x86") + + def test_inferred_arch_detects_g4_from_altivec(self): + fp = _g4_fingerprint() + inferred = _infer_arch_from_features( + fp["checks"]["simd_identity"], + fp["checks"]["cache_timing"], + "PowerPC G4 7447A", + ) + self.assertIn("powerpc", inferred) + + +class TestGetVerifiedMultiplier(unittest.TestCase): + """Integration test for the drop-in multiplier function.""" + + def test_spoofed_g4_gets_1x(self): + """Intel Xeon spoofing G4 gets 1.0x multiplier after verification.""" + mult = get_verified_multiplier( + miner_id="miner_xeon_spoof", + claimed_arch="g4", + cpu_brand="Intel(R) Xeon(R) Platinum 8380 CPU @ 2.30GHz", + fingerprint=_modern_x86_fingerprint(), + chain_age_years=0.0, + ) + self.assertEqual(mult, 1.0) + + def test_genuine_g4_gets_2_5x_at_year_0(self): + mult = get_verified_multiplier( + miner_id="miner_real_g4", + claimed_arch="g4", + cpu_brand="PowerPC G4 7447A @ 1.42GHz", + fingerprint=_g4_fingerprint(), + chain_age_years=0.0, + ) + self.assertEqual(mult, 2.5) + + def test_genuine_g4_decays_over_time(self): + mult = get_verified_multiplier( + miner_id="miner_real_g4", + claimed_arch="g4", + cpu_brand="PowerPC G4 7447A @ 1.42GHz", + fingerprint=_g4_fingerprint(), + chain_age_years=5.0, + ) + # At year 5 with DECAY_RATE=0.06: bonus = 1.5 * (1 - 0.3) = 1.05 + # multiplier = 1.0 + 1.05 = 2.05 + self.assertAlmostEqual(mult, 2.05, places=2) + self.assertGreater(mult, 1.0) + self.assertLess(mult, 2.5) + + def test_with_audit_db(self): + """Ensure audit logging works with an in-memory database.""" + db = sqlite3.connect(":memory:") + mult = get_verified_multiplier( + miner_id="miner_audit_test", + claimed_arch="g4", + cpu_brand="Intel(R) Xeon(R) Platinum 8380", + fingerprint=_modern_x86_fingerprint(), + chain_age_years=0.0, + db=db, + ) + self.assertEqual(mult, 1.0) + row = db.execute( + "SELECT * FROM rip201_bucket_audit WHERE miner = ?", + ("miner_audit_test",), + ).fetchone() + self.assertIsNotNone(row) + db.close() + + +class TestBrandDetectionHelpers(unittest.TestCase): + """Unit tests for brand-detection helpers.""" + + def test_intel_xeon_is_modern_x86(self): + self.assertTrue(_brand_looks_modern_x86("Intel(R) Xeon(R) Platinum 8380")) + + def test_amd_epyc_is_modern_x86(self): + self.assertTrue(_brand_looks_modern_x86("AMD EPYC 7763")) + + def test_amd_ryzen_is_modern_x86(self): + self.assertTrue(_brand_looks_modern_x86("AMD Ryzen 9 7950X")) + + def test_powerpc_g4_is_not_modern_x86(self): + self.assertFalse(_brand_looks_modern_x86("PowerPC G4 7447A")) + + def test_motorola_is_powerpc(self): + self.assertTrue(_brand_looks_powerpc("Motorola PowerPC 7450")) + + def test_ibm_is_powerpc(self): + self.assertTrue(_brand_looks_powerpc("IBM POWER8")) + + def test_intel_is_not_powerpc(self): + self.assertFalse(_brand_looks_powerpc("Intel(R) Xeon(R) Platinum 8380")) + + +class TestArchToBucket(unittest.TestCase): + """Verify arch -> bucket mapping.""" + + def test_g4_maps_to_vintage_powerpc_g4(self): + self.assertEqual(_arch_to_bucket("g4"), "vintage_powerpc_g4") + + def test_unknown_maps_to_modern_x86(self): + self.assertEqual(_arch_to_bucket("whateverXYZ"), "modern_x86") + + def test_case_insensitive(self): + self.assertEqual(_arch_to_bucket("G4"), "vintage_powerpc_g4") + self.assertEqual(_arch_to_bucket("G5"), "vintage_powerpc_g5") + + +class TestEdgeCases(unittest.TestCase): + """Edge cases and regression guards.""" + + def test_empty_fingerprint_downgrades_powerpc_claim(self): + """Empty fingerprint + G4 claim should be downgraded.""" + result = classify_reward_bucket("g4", "PowerPC G4 7447A", {}) + # Missing SIMD data -> simd check fails -> downgrade. + self.assertTrue(result.downgraded) + + def test_none_fingerprint_downgrades(self): + result = classify_reward_bucket("g4", "PowerPC G4 7447A", {"checks": {}}) + self.assertTrue(result.downgraded) + + def test_via_nano_claiming_g4_rejected(self): + """VIA Nano is x86 -- should not get PowerPC bucket.""" + passed, _ = validate_cpu_brand_vs_arch("VIA Nano U3500", "g4") + self.assertFalse(passed) + + def test_multiple_rejection_reasons_accumulated(self): + """When both brand and SIMD fail, both reasons should appear.""" + fp = _modern_x86_fingerprint() + result = classify_reward_bucket( + "g4", + "Intel(R) Xeon(R) Platinum 8380", + fp, + ) + # Should have at least brand + SIMD reasons. + self.assertGreaterEqual(len(result.rejection_reasons), 2) + + +if __name__ == "__main__": + unittest.main()