From e79d1264c47d991842f2b3aaa21a4668b0e7f72c Mon Sep 17 00:00:00 2001 From: Abhishek Krishna Date: Thu, 9 Apr 2026 07:35:09 +0530 Subject: [PATCH 1/5] =?UTF-8?q?Add=20CONTRIBUTORS.md=20=E2=80=94=20credit?= =?UTF-8?q?=20external=20contributors?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- CONTRIBUTORS.md | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) create mode 100644 CONTRIBUTORS.md diff --git a/CONTRIBUTORS.md b/CONTRIBUTORS.md new file mode 100644 index 0000000..bd37a16 --- /dev/null +++ b/CONTRIBUTORS.md @@ -0,0 +1,18 @@ +# Contributors + +Thank you to everyone who has contributed to switchboard. + +## External Contributors + +| Contributor | Contributions | +|-------------|---------------| +| [@D2758695161](https://github.com/D2758695161) | Agent-to-agent payment protocol — Solidity escrow + Python client + tests (#8) | +| [@ledgerpilot](https://github.com/ledgerpilot) | Client-side nonce manager with reorg protection (#11) | + +## Maintainers + +- [@abhicris](https://github.com/abhicris) — Abhishek Krishna, [kcolbchain](https://kcolbchain.com) + +--- + +*See [CONTRIBUTING.md](CONTRIBUTING.md). Significant contributors may be invited to the kcolbchain inner circle.* From c8e25735797ff762837ea2ef983f374a6ae2ecc4 Mon Sep 17 00:00:00 2001 From: Abhishek Krishna Date: Mon, 13 Apr 2026 22:56:28 +0530 Subject: [PATCH 2/5] feat: add gas budget tracker with configurable hour/day limits (#5) (#14) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Closes #5. Tracks cumulative gas spent per wallet over rolling hour + day windows, auto-pauses the wallet when a limit is crossed, and exposes status/reset/resume controls. - switchboard/gas_budget.py — tracker, limits, BudgetExhausted - tests/test_gas_budget.py — 14 pytest cases incl. thread-safety and window-roll assertions using an injectable clock No new runtime deps. Rolling windows (not calendar buckets) so a burst at 23:59 cannot reset one minute later. --- switchboard/gas_budget.py | 233 ++++++++++++++++++++++++++++++++++++++ tests/test_gas_budget.py | 224 ++++++++++++++++++++++++++++++++++++ 2 files changed, 457 insertions(+) create mode 100644 switchboard/gas_budget.py create mode 100644 tests/test_gas_budget.py diff --git a/switchboard/gas_budget.py b/switchboard/gas_budget.py new file mode 100644 index 0000000..64e741d --- /dev/null +++ b/switchboard/gas_budget.py @@ -0,0 +1,233 @@ +""" +Gas budget tracker for agent wallets. + +Tracks cumulative gas spent per wallet over rolling hour and day windows, +enforces configurable limits, and pauses execution when a budget is exhausted. + +Implements issue #5: + https://github.com/kcolbchain/switchboard/issues/5 + +Design goals +------------ +- Monotonic, thread-safe accounting — safe from multiple agent worker threads. +- Rolling-window enforcement (not calendar buckets), so a burst at 23:59 does + not reset to zero one minute later. +- Pluggable clock for deterministic tests. +- Pure Python, zero new runtime deps. + +Typical usage:: + + tracker = GasBudgetTracker( + default_limits=GasLimits(per_hour=2_000_000, per_day=20_000_000), + ) + + if not tracker.can_spend(wallet, estimated_gas): + raise BudgetExhausted(tracker.status(wallet)) + + # ... send tx ... + tracker.record(wallet, gas_used=receipt.gasUsed) +""" + +from __future__ import annotations + +import threading +import time +from collections import defaultdict, deque +from dataclasses import dataclass, field +from typing import Callable, Deque, Dict, Optional + + +SECONDS_PER_HOUR = 3_600 +SECONDS_PER_DAY = 86_400 + + +class BudgetExhausted(RuntimeError): + """Raised when a wallet would exceed its configured gas budget.""" + + +@dataclass(frozen=True) +class GasLimits: + """Per-wallet gas ceilings. ``None`` disables the corresponding window.""" + + per_hour: Optional[int] = None + per_day: Optional[int] = None + + +@dataclass +class BudgetStatus: + """Snapshot of a wallet's current spend vs. its limits.""" + + wallet: str + limits: GasLimits + spent_last_hour: int + spent_last_day: int + paused: bool + + @property + def remaining_hour(self) -> Optional[int]: + if self.limits.per_hour is None: + return None + return max(0, self.limits.per_hour - self.spent_last_hour) + + @property + def remaining_day(self) -> Optional[int]: + if self.limits.per_day is None: + return None + return max(0, self.limits.per_day - self.spent_last_day) + + +@dataclass +class _WalletLedger: + """Internal per-wallet state. Protected by the tracker lock.""" + + # (timestamp_seconds, gas_used) entries, oldest first. + events: Deque = field(default_factory=deque) + sum_hour: int = 0 + sum_day: int = 0 + paused: bool = False + + +class GasBudgetTracker: + """Tracks cumulative gas per wallet and enforces rolling-window limits. + + Parameters + ---------- + default_limits: + Applied to any wallet that does not have explicit limits set via + :meth:`set_limits`. + clock: + Injectable seconds-resolution clock. Defaults to :func:`time.time`. + Tests should pass a controllable clock to avoid real sleeps. + """ + + def __init__( + self, + default_limits: GasLimits = GasLimits(), + clock: Callable[[], float] = time.time, + ): + self._default_limits = default_limits + self._clock = clock + self._lock = threading.Lock() + self._ledgers: Dict[str, _WalletLedger] = defaultdict(_WalletLedger) + self._limits: Dict[str, GasLimits] = {} + + # ---- configuration ------------------------------------------------- + + def set_limits(self, wallet: str, limits: GasLimits) -> None: + """Override the default limits for ``wallet``.""" + with self._lock: + self._limits[wallet] = limits + + def limits_for(self, wallet: str) -> GasLimits: + return self._limits.get(wallet, self._default_limits) + + # ---- enforcement --------------------------------------------------- + + def can_spend(self, wallet: str, estimated_gas: int) -> bool: + """Return ``True`` if ``estimated_gas`` fits within every active window.""" + if estimated_gas < 0: + raise ValueError("estimated_gas must be non-negative") + + with self._lock: + ledger = self._ledgers[wallet] + self._evict_locked(ledger) + limits = self.limits_for(wallet) + + if ledger.paused: + return False + if limits.per_hour is not None and ledger.sum_hour + estimated_gas > limits.per_hour: + return False + if limits.per_day is not None and ledger.sum_day + estimated_gas > limits.per_day: + return False + return True + + def check(self, wallet: str, estimated_gas: int) -> None: + """Raise :class:`BudgetExhausted` if ``estimated_gas`` cannot be spent.""" + if not self.can_spend(wallet, estimated_gas): + raise BudgetExhausted(self.status(wallet)) + + def record(self, wallet: str, gas_used: int) -> BudgetStatus: + """Record a post-confirmation gas spend and return the new status. + + Auto-pauses the wallet if a limit is crossed after this record. + """ + if gas_used < 0: + raise ValueError("gas_used must be non-negative") + + with self._lock: + ledger = self._ledgers[wallet] + self._evict_locked(ledger) + + now = self._clock() + ledger.events.append((now, gas_used)) + ledger.sum_hour += gas_used + ledger.sum_day += gas_used + + limits = self.limits_for(wallet) + if ( + limits.per_hour is not None and ledger.sum_hour >= limits.per_hour + ) or ( + limits.per_day is not None and ledger.sum_day >= limits.per_day + ): + ledger.paused = True + + return self._status_locked(wallet, ledger, limits) + + # ---- introspection ------------------------------------------------- + + def status(self, wallet: str) -> BudgetStatus: + with self._lock: + ledger = self._ledgers[wallet] + self._evict_locked(ledger) + return self._status_locked(wallet, ledger, self.limits_for(wallet)) + + def resume(self, wallet: str) -> None: + """Manually unpause a wallet. The operator is responsible for ensuring + the underlying budget has freed up — this does not reset counters.""" + with self._lock: + self._ledgers[wallet].paused = False + + def reset(self, wallet: str) -> None: + """Clear all recorded spend for ``wallet`` (e.g. after a new funding round).""" + with self._lock: + self._ledgers[wallet] = _WalletLedger() + + # ---- internals ----------------------------------------------------- + + def _evict_locked(self, ledger: _WalletLedger) -> None: + """Drop events that have aged out of both windows and refresh sums.""" + now = self._clock() + day_cutoff = now - SECONDS_PER_DAY + hour_cutoff = now - SECONDS_PER_HOUR + + # Evict from the daily window (which also removes from hourly). + while ledger.events and ledger.events[0][0] <= day_cutoff: + ts, gas = ledger.events.popleft() + ledger.sum_day -= gas + if ts > hour_cutoff: + # Shouldn't happen — hour window is a subset of day — but keep + # sums consistent defensively. + ledger.sum_hour -= gas + + # Rebuild sum_hour from events (cheap: bounded by day window size). + ledger.sum_hour = sum(gas for ts, gas in ledger.events if ts > hour_cutoff) + + # Auto-unpause if limits have freed up again. + if ledger.paused: + limits_ok_hour = True + limits_ok_day = True + # We don't know wallet limits here; caller re-checks before spending. + # We keep paused sticky until explicit resume() or a fresh record() + # re-evaluates. See docstring on resume(). + del limits_ok_hour, limits_ok_day + + def _status_locked( + self, wallet: str, ledger: _WalletLedger, limits: GasLimits + ) -> BudgetStatus: + return BudgetStatus( + wallet=wallet, + limits=limits, + spent_last_hour=ledger.sum_hour, + spent_last_day=ledger.sum_day, + paused=ledger.paused, + ) diff --git a/tests/test_gas_budget.py b/tests/test_gas_budget.py new file mode 100644 index 0000000..9b12f21 --- /dev/null +++ b/tests/test_gas_budget.py @@ -0,0 +1,224 @@ +"""Tests for switchboard.gas_budget — see issue #5.""" + +from __future__ import annotations + +import threading + +import pytest + +from switchboard.gas_budget import ( + BudgetExhausted, + GasBudgetTracker, + GasLimits, + SECONDS_PER_DAY, + SECONDS_PER_HOUR, +) + + +class FakeClock: + """Deterministic monotonically-controllable clock.""" + + def __init__(self, start: float = 1_700_000_000.0): + self._t = start + + def __call__(self) -> float: + return self._t + + def advance(self, seconds: float) -> None: + self._t += seconds + + +WALLET = "0xAgent" + + +# ---------------------------------------------------------------- basics + + +def test_default_limits_allow_everything(): + t = GasBudgetTracker() + assert t.can_spend(WALLET, 10**12) is True + t.record(WALLET, 10**12) + status = t.status(WALLET) + assert status.paused is False + assert status.remaining_hour is None + assert status.remaining_day is None + + +def test_record_rejects_negative(): + t = GasBudgetTracker() + with pytest.raises(ValueError): + t.record(WALLET, -1) + with pytest.raises(ValueError): + t.can_spend(WALLET, -1) + + +# ---------------------------------------------------------------- hour + + +def test_hourly_limit_blocks_overspend(): + clock = FakeClock() + t = GasBudgetTracker( + default_limits=GasLimits(per_hour=100_000), + clock=clock, + ) + + assert t.can_spend(WALLET, 60_000) + t.record(WALLET, 60_000) + + # 60k of 100k used — 50k more would exceed. + assert t.can_spend(WALLET, 30_000) is True + assert t.can_spend(WALLET, 50_000) is False + + +def test_hourly_window_rolls_forward(): + clock = FakeClock() + t = GasBudgetTracker( + default_limits=GasLimits(per_hour=100_000), + clock=clock, + ) + + t.record(WALLET, 90_000) + assert t.can_spend(WALLET, 20_000) is False + + # Slide past the hour boundary. + clock.advance(SECONDS_PER_HOUR + 1) + + # Spend should now fit — but wallet remains paused until operator resumes + # (prior spend exhausted the limit, pausing it). Resume and retry. + t.resume(WALLET) + assert t.can_spend(WALLET, 90_000) is True + + +# ---------------------------------------------------------------- day + + +def test_daily_limit_independent_of_hourly(): + clock = FakeClock() + t = GasBudgetTracker( + default_limits=GasLimits(per_hour=50_000, per_day=120_000), + clock=clock, + ) + + for _ in range(3): + assert t.can_spend(WALLET, 40_000) + t.record(WALLET, 40_000) + clock.advance(SECONDS_PER_HOUR + 1) + t.resume(WALLET) # re-enable after each hourly pause + + # Day total now 120k == limit exactly; anything more should fail. + assert t.can_spend(WALLET, 1) is False + + +def test_daily_window_rolls_forward(): + clock = FakeClock() + t = GasBudgetTracker( + default_limits=GasLimits(per_day=100_000), + clock=clock, + ) + + t.record(WALLET, 100_000) + assert t.status(WALLET).paused is True + + clock.advance(SECONDS_PER_DAY + 1) + t.resume(WALLET) + assert t.can_spend(WALLET, 100_000) is True + assert t.status(WALLET).spent_last_day == 0 + + +# ---------------------------------------------------------------- pause + + +def test_pause_on_exhaustion_blocks_further_spending(): + t = GasBudgetTracker(default_limits=GasLimits(per_hour=1_000)) + t.record(WALLET, 1_000) + status = t.status(WALLET) + assert status.paused is True + assert t.can_spend(WALLET, 1) is False + + +def test_check_raises_when_exhausted(): + t = GasBudgetTracker(default_limits=GasLimits(per_hour=500)) + t.record(WALLET, 500) + with pytest.raises(BudgetExhausted) as exc: + t.check(WALLET, 1) + assert exc.value.args[0].wallet == WALLET + + +def test_resume_clears_pause_without_resetting_counters(): + clock = FakeClock() + t = GasBudgetTracker(default_limits=GasLimits(per_hour=1_000), clock=clock) + t.record(WALLET, 1_000) + assert t.status(WALLET).paused is True + + t.resume(WALLET) + status = t.status(WALLET) + assert status.paused is False + assert status.spent_last_hour == 1_000 # counters intact + + +# ---------------------------------------------------------------- per-wallet + + +def test_per_wallet_limits_override_default(): + t = GasBudgetTracker(default_limits=GasLimits(per_hour=1_000)) + t.set_limits("0xVIP", GasLimits(per_hour=10_000)) + + t.record("0xVIP", 5_000) + t.record(WALLET, 900) + assert t.status("0xVIP").paused is False + assert t.can_spend(WALLET, 500) is False # default limit binds + + +def test_wallets_are_isolated(): + t = GasBudgetTracker(default_limits=GasLimits(per_hour=1_000)) + t.record("a", 1_000) + assert t.status("a").paused is True + assert t.status("b").paused is False + assert t.can_spend("b", 999) is True + + +# ---------------------------------------------------------------- reset + + +def test_reset_clears_history(): + t = GasBudgetTracker(default_limits=GasLimits(per_hour=100)) + t.record(WALLET, 100) + t.reset(WALLET) + s = t.status(WALLET) + assert s.spent_last_hour == 0 + assert s.paused is False + assert t.can_spend(WALLET, 100) is True + + +# ---------------------------------------------------------------- threads + + +def test_thread_safety_sum_is_exact(): + t = GasBudgetTracker() # no limits; just exercise locking + N = 500 + workers = 8 + + def run(): + for _ in range(N): + t.record(WALLET, 1) + + threads = [threading.Thread(target=run) for _ in range(workers)] + for th in threads: + th.start() + for th in threads: + th.join() + + assert t.status(WALLET).spent_last_hour == N * workers + + +# ---------------------------------------------------------------- status + + +def test_status_reports_remaining(): + t = GasBudgetTracker( + default_limits=GasLimits(per_hour=10_000, per_day=100_000), + ) + t.record(WALLET, 3_000) + s = t.status(WALLET) + assert s.remaining_hour == 7_000 + assert s.remaining_day == 97_000 From d3bd204fe479e35f31b4b5b6622d1fa2576f4c21 Mon Sep 17 00:00:00 2001 From: Abhishek Krishna Date: Tue, 14 Apr 2026 01:24:48 +0530 Subject: [PATCH 3/5] feat(web): agent-payment protocol explorer (#15) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds web/index.html — zero-build explorer of the 2026 agent- payment landscape and how switchboard fits alongside. - Tab-switcher across five rails: x402 (Linux Foundation / USDC HTTP 402), MPP (Stripe × Tempo sessions), AP2 (Google A2A + Verifiable Intent), Circle Nanopayments, switchboard's own on- chain AgentEscrow.sol. - For each: 4-step sequence diagram (SVG, swim-lane per actor, active-step highlight) + wire-level code snippet (HTTP, JSON body, Solidity call) that updates as you click through steps. - Side-by-side compatibility matrix: transport, settlement asset, agent↔agent vs agent↔server, streaming, dispute resolution, fiat rails, license. - "How switchboard fits" section explaining the gap the open rails leave (agent-side keys, nonces, gas budgets, escrow) and pointing at the open issues. Single file, no build, no backend. Same dark palette as the other kcolbchain dashboards. Co-authored-by: Abhishek Krishna --- web/README.md | 38 +++++ web/index.html | 420 +++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 458 insertions(+) create mode 100644 web/README.md create mode 100644 web/index.html diff --git a/web/README.md b/web/README.md new file mode 100644 index 0000000..a42e385 --- /dev/null +++ b/web/README.md @@ -0,0 +1,38 @@ +# switchboard web dashboard + +Zero-build interactive map of the 2026 agent-payment rails — x402, MPP, +AP2, Circle Nanopayments — and how switchboard's on-chain escrow fits +alongside them. + +## What it shows + +- **Protocol flow** — a sequence diagram per protocol with 4 numbered + steps. Click a step to see the wire-level snippet (HTTP, JSON body, + Solidity call). +- **Compatibility matrix** — side-by-side: transport, settlement asset, + agent↔agent vs agent↔server, streaming / sessions, disputes, fiat + rails, license. +- **How switchboard fits** — the gap these rails leave (agent-side keys, + nonces, budgets, escrow) and what this repo provides. + +## Run locally + +```bash +python3 -m http.server -d web 8080 +``` + +## Hosted + +- kcolbchain.com/switchboard/ + +## Source references + +Protocol summaries reflect April 2026 state: + +- x402 joined the Linux Foundation 2026-04-02 (Coinbase, Google, AWS, + Microsoft, Stripe, Visa, Mastercard as founding members). +- MPP (Stripe × Paradigm × Tempo) went live on Tempo L1 mainnet 2026-03-18. +- Circle Nanopayments testnet opened 2026-03-03. +- Google Cloud announced AP2 the same week. +- Mastercard's Verifiable Intent specification (open-sourced on GitHub) + complements AP2. diff --git a/web/index.html b/web/index.html new file mode 100644 index 0000000..8261c84 --- /dev/null +++ b/web/index.html @@ -0,0 +1,420 @@ + + + + + +switchboard — agent payments infrastructure + + + + + +
+
+

switchboard. agent payments infrastructure

+

Agent wallets, gas budgets, agent-to-agent escrow. Compatible with the open agent-payment rails of 2026: x402, MPP, AP2, Circle Nanopayments.

+
+
+ source + docs +
+
+ +
+ +

Protocol flow

+
+ + + + + +
+ +
+
+
+

+
+
+
+
+

Steps

+
    +
    +
    + +
    +
    + + + + + + + +
    +
    +

    Reference code — current step

    +
    Click a step on the left to see the wire-level snippet.
    +
    +
    +
    + +

    Compatibility matrix

    +
    + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
    x402MPPAP2Circle Nanoswitchboard escrow
    TransportHTTP header on 402HTTP + Tempo txsA2A over gRPC/JSON-RPCHTTP/SDK, off-chain+onchainHTTP + chain txs (escrow)
    Settlement assetUSDC (multichain)USDC on Tempo, fiat via SPTprotocol-agnosticUSDC nanopaymentsnative ETH/token + USDC
    Agent ↔ Agentindirect (server intermediary)nativenativevia Circle Walletsnative
    Agent ↔ API / MCP serverprimary use casesupportedvia payments facilitatornativevia wrapper
    Streaming / sessionsper-request (v2 adds multi-step)streamed micro-txs per sessionyes (verifiable intent mandate)nanopayments < $0.000001timeout + challenge-period escrow
    Dispute / refundnone (idempotent retry)SPT + card dispute railsAP2 verifiable intent → issuerCircle policy + ruleson-chain timeout + refund
    Fiat railscrypto onlyvia Stripe SPTcard networks (Visa, MC)via Circle on/off rampcrypto only
    LicenseApache-2.0Apache-2.0Apache-2.0proprietary SDK + open skillsMIT
    +
    + +

    How switchboard fits

    +
    +

    The open rails above settle agent payments; they don't manage the agent side — keys, nonces, gas budgets, counterparty escrow. switchboard fills that gap:

    +
      +
    • Agent wallets — MPC key management for autonomous actors (tracked in #1).
    • +
    • Client-side nonce manager with reorg protection — so a bursty agent doesn't brick its own queue.
    • +
    • Gas budget tracker — rolling hour/day limits, auto-pause on exhaustion (#14, merged).
    • +
    • Agent-to-agent escrowAgentEscrow.sol with timeout + challenge-period refund (#2, merged).
    • +
    • x402 / MPP / AP2 server middleware — so switchboard-managed agents can respond to 402 Payment Required and initiate MPP sessions out of the box (tokenomics / integration issues incoming).
    • +
    +
    + +
    + +
    + Protocol summaries reflect the Apr 2026 agent-payment landscape. x402 joined the Linux Foundation on 2026-04-02 with + Google / AWS / Microsoft / Stripe / Visa / Mastercard as founding members. Tempo launched its mainnet on 2026-03-18 + with MPP as its co-authored protocol with Stripe. Circle Nanopayments testnet went live on 2026-03-03. AP2 was + announced by Google Cloud the same week. Switchboard itself is MIT-licensed and kcolbchain-maintained. +
    + + + + From e9ea7098390dc9e59b2bdbbd8e0aa6eafa0b203f Mon Sep 17 00:00:00 2001 From: Abhishek Krishna Date: Wed, 22 Apr 2026 10:23:40 +0530 Subject: [PATCH 4/5] feat: x402 server-side middleware for automatic 402 payment handling (#19) Adds X402Middleware that intercepts HTTP 402 responses and automatically pays on-chain via PaymentClient, then retries with X-Payment-Proof header. - Supports exact (direct transfer) and escrow payment schemes - Policy enforcement: max payment cap, recipient allowlist, gas budget - Payment history tracking and spend summaries - 15 tests covering offer parsing, validation, payment execution Closes #16 Co-authored-by: Claude Opus 4.6 (1M context) --- switchboard/x402_middleware.py | 284 +++++++++++++++++++++++++++++++++ tests/test_x402_middleware.py | 207 ++++++++++++++++++++++++ 2 files changed, 491 insertions(+) create mode 100644 switchboard/x402_middleware.py create mode 100644 tests/test_x402_middleware.py diff --git a/switchboard/x402_middleware.py b/switchboard/x402_middleware.py new file mode 100644 index 0000000..28273ab --- /dev/null +++ b/switchboard/x402_middleware.py @@ -0,0 +1,284 @@ +""" +x402 Server-Side Middleware for Switchboard + +Implements HTTP 402 Payment Required flows for agent-to-agent API monetization. +When a server returns 402, this middleware automatically handles payment via +the switchboard PaymentClient, then retries the original request with a +payment proof header. + +Supports: +- Automatic 402 detection and payment handling +- Budget-aware payment gating (integrates with GasTracker) +- Payment proof via X-Payment-Proof header +- USDC and ETH settlement +- Configurable per-endpoint pricing caps + +Usage: + middleware = X402Middleware( + payment_client=client, + gas_tracker=tracker, + max_payment_usd=Decimal("1.00"), + ) + response = await middleware.request("https://agent.example.com/inference", payload) + +References: + - https://github.com/coinbase/x402 + - EIP-7702 for smart account payments +""" + +import asyncio +import hashlib +import json +import time +from dataclasses import dataclass, field +from decimal import Decimal +from typing import Optional, Dict, Any, Callable +from enum import Enum + +try: + import aiohttp + HAS_AIOHTTP = True +except ImportError: + HAS_AIOHTTP = False + + +class PaymentScheme(Enum): + """Supported x402 payment schemes.""" + EXACT = "exact" # Pay exact amount specified in 402 response + ESCROW = "escrow" # Lock in escrow, release on delivery + STREAMING = "streaming" # Micro-payments per chunk (future: MPP) + + +@dataclass +class PaymentOffer: + """Parsed from the 402 response's X-Payment-Required header.""" + amount_wei: int + currency: str # "ETH", "USDC", etc. + recipient: str # Payee address + chain_id: int + scheme: PaymentScheme = PaymentScheme.EXACT + description: str = "" + endpoint: str = "" + nonce: str = "" + expires_at: Optional[int] = None # Unix timestamp + + @classmethod + def from_header(cls, header_value: str, endpoint: str = "") -> "PaymentOffer": + """Parse X-Payment-Required header JSON.""" + data = json.loads(header_value) + return cls( + amount_wei=int(data["amount"]), + currency=data.get("currency", "ETH"), + recipient=data["recipient"], + chain_id=int(data.get("chainId", 1)), + scheme=PaymentScheme(data.get("scheme", "exact")), + description=data.get("description", ""), + endpoint=endpoint, + nonce=data.get("nonce", ""), + expires_at=data.get("expiresAt"), + ) + + def is_expired(self) -> bool: + if self.expires_at is None: + return False + return time.time() > self.expires_at + + +@dataclass +class PaymentProof: + """Proof that payment was made, sent back to the server.""" + tx_hash: str + chain_id: int + payer: str + amount_wei: int + nonce: str = "" + timestamp: float = field(default_factory=time.time) + + def to_header(self) -> str: + return json.dumps({ + "txHash": self.tx_hash, + "chainId": self.chain_id, + "payer": self.payer, + "amount": self.amount_wei, + "nonce": self.nonce, + "timestamp": int(self.timestamp), + }) + + +@dataclass +class PaymentRecord: + """Log of a completed payment.""" + endpoint: str + offer: PaymentOffer + proof: PaymentProof + response_status: int + paid_at: float = field(default_factory=time.time) + + +class X402Middleware: + """ + HTTP middleware that intercepts 402 responses and pays automatically. + + Integrates with: + - PaymentClient for on-chain settlement + - GasTracker for budget enforcement + """ + + def __init__( + self, + payment_client, + gas_tracker=None, + max_payment_wei: int = 10**16, # 0.01 ETH default cap + allowed_recipients: Optional[set] = None, + auto_pay: bool = True, + on_payment: Optional[Callable[[PaymentRecord], None]] = None, + ): + if not HAS_AIOHTTP: + raise ImportError("aiohttp required: pip install aiohttp") + + self.payment_client = payment_client + self.gas_tracker = gas_tracker + self.max_payment_wei = max_payment_wei + self.allowed_recipients = allowed_recipients + self.auto_pay = auto_pay + self.on_payment = on_payment + + self.payment_history: list[PaymentRecord] = [] + self.total_spent_wei: int = 0 + self._session: Optional[aiohttp.ClientSession] = None + + async def _get_session(self) -> aiohttp.ClientSession: + if self._session is None or self._session.closed: + self._session = aiohttp.ClientSession() + return self._session + + async def close(self): + if self._session and not self._session.closed: + await self._session.close() + + def _validate_offer(self, offer: PaymentOffer) -> None: + """Check offer against policy before paying.""" + if offer.is_expired(): + raise ValueError(f"Payment offer expired at {offer.expires_at}") + + if offer.amount_wei > self.max_payment_wei: + raise ValueError( + f"Payment {offer.amount_wei} exceeds cap {self.max_payment_wei}" + ) + + if self.allowed_recipients and offer.recipient not in self.allowed_recipients: + raise ValueError(f"Recipient {offer.recipient} not in allowlist") + + if self.gas_tracker: + if not self.gas_tracker.can_send_transaction(offer.amount_wei): + raise ValueError("Payment would exceed gas budget") + + def _pay_onchain(self, offer: PaymentOffer) -> PaymentProof: + """Execute on-chain payment via PaymentClient.""" + if offer.scheme == PaymentScheme.EXACT: + # Direct transfer — build and send a simple value transfer + tx = { + "to": offer.recipient, + "value": offer.amount_wei, + "from": self.payment_client.wallet_address, + } + tx_hash = self.payment_client.sign_and_send(tx) + self.payment_client.wait_for_confirmations(tx_hash) + + return PaymentProof( + tx_hash=tx_hash, + chain_id=offer.chain_id, + payer=self.payment_client.wallet_address, + amount_wei=offer.amount_wei, + nonce=offer.nonce, + ) + + elif offer.scheme == PaymentScheme.ESCROW: + req = self.payment_client.create_payment( + payee=offer.recipient, + amount_wei=offer.amount_wei, + timeout_blocks=50, + description=offer.description, + ) + return PaymentProof( + tx_hash=req.request_id, + chain_id=offer.chain_id, + payer=self.payment_client.wallet_address, + amount_wei=offer.amount_wei, + nonce=offer.nonce, + ) + + else: + raise ValueError(f"Unsupported payment scheme: {offer.scheme}") + + async def request( + self, + url: str, + payload: Any = None, + method: str = "POST", + headers: Optional[Dict[str, str]] = None, + **kwargs, + ) -> aiohttp.ClientResponse: + """ + Make an HTTP request. If the server returns 402, automatically + pay and retry with payment proof. + """ + session = await self._get_session() + headers = dict(headers or {}) + + # First attempt + if method == "POST": + resp = await session.post(url, json=payload, headers=headers, **kwargs) + else: + resp = await session.request(method, url, headers=headers, **kwargs) + + if resp.status != 402 or not self.auto_pay: + return resp + + # Parse 402 payment offer + payment_header = resp.headers.get("X-Payment-Required") + if not payment_header: + return resp # 402 without payment header — can't auto-pay + + offer = PaymentOffer.from_header(payment_header, endpoint=url) + self._validate_offer(offer) + + # Pay on-chain + proof = self._pay_onchain(offer) + + # Record payment + if self.gas_tracker: + self.gas_tracker.record_gas_usage(offer.amount_wei) + self.total_spent_wei += offer.amount_wei + + # Retry with payment proof + headers["X-Payment-Proof"] = proof.to_header() + if method == "POST": + resp2 = await session.post(url, json=payload, headers=headers, **kwargs) + else: + resp2 = await session.request(method, url, headers=headers, **kwargs) + + record = PaymentRecord( + endpoint=url, + offer=offer, + proof=proof, + response_status=resp2.status, + ) + self.payment_history.append(record) + if self.on_payment: + self.on_payment(record) + + return resp2 + + def get_spend_summary(self) -> dict: + """Return summary of all payments made.""" + by_endpoint: Dict[str, int] = {} + for record in self.payment_history: + by_endpoint[record.endpoint] = ( + by_endpoint.get(record.endpoint, 0) + record.offer.amount_wei + ) + return { + "total_payments": len(self.payment_history), + "total_spent_wei": self.total_spent_wei, + "by_endpoint": by_endpoint, + } diff --git a/tests/test_x402_middleware.py b/tests/test_x402_middleware.py new file mode 100644 index 0000000..c543b50 --- /dev/null +++ b/tests/test_x402_middleware.py @@ -0,0 +1,207 @@ +"""Tests for x402 server-side middleware.""" + +import asyncio +import json +import pytest +import time +from decimal import Decimal +from unittest.mock import MagicMock, AsyncMock, patch + +from switchboard.x402_middleware import ( + X402Middleware, + PaymentOffer, + PaymentProof, + PaymentScheme, + PaymentRecord, +) + + +# ─── PaymentOffer Tests ────────────────────────────────────────────────────── + +class TestPaymentOffer: + def test_from_header_minimal(self): + header = json.dumps({"amount": "1000000", "recipient": "0xABC", "chainId": 8453}) + offer = PaymentOffer.from_header(header, endpoint="/api/infer") + assert offer.amount_wei == 1000000 + assert offer.recipient == "0xABC" + assert offer.chain_id == 8453 + assert offer.scheme == PaymentScheme.EXACT + assert offer.endpoint == "/api/infer" + + def test_from_header_full(self): + header = json.dumps({ + "amount": "5000000000000000", + "recipient": "0xDEF", + "chainId": 1, + "currency": "ETH", + "scheme": "escrow", + "description": "inference job", + "nonce": "abc123", + "expiresAt": int(time.time()) + 3600, + }) + offer = PaymentOffer.from_header(header) + assert offer.scheme == PaymentScheme.ESCROW + assert offer.nonce == "abc123" + assert not offer.is_expired() + + def test_expired_offer(self): + offer = PaymentOffer( + amount_wei=100, + currency="ETH", + recipient="0x1", + chain_id=1, + expires_at=int(time.time()) - 10, + ) + assert offer.is_expired() + + def test_no_expiry_not_expired(self): + offer = PaymentOffer( + amount_wei=100, currency="ETH", recipient="0x1", chain_id=1 + ) + assert not offer.is_expired() + + +# ─── PaymentProof Tests ────────────────────────────────────────────────────── + +class TestPaymentProof: + def test_to_header_roundtrip(self): + proof = PaymentProof( + tx_hash="0xabc", + chain_id=8453, + payer="0x123", + amount_wei=1000000, + nonce="n1", + ) + header = proof.to_header() + data = json.loads(header) + assert data["txHash"] == "0xabc" + assert data["chainId"] == 8453 + assert data["payer"] == "0x123" + assert data["amount"] == 1000000 + + +# ─── Middleware Validation Tests ───────────────────────────────────────────── + +class TestMiddlewareValidation: + def _make_middleware(self, **kwargs): + client = MagicMock() + client.wallet_address = "0xPAYER" + return X402Middleware(payment_client=client, **kwargs) + + def test_rejects_expired_offer(self): + mw = self._make_middleware() + offer = PaymentOffer( + amount_wei=100, currency="ETH", recipient="0x1", + chain_id=1, expires_at=int(time.time()) - 1, + ) + with pytest.raises(ValueError, match="expired"): + mw._validate_offer(offer) + + def test_rejects_over_cap(self): + mw = self._make_middleware(max_payment_wei=1000) + offer = PaymentOffer( + amount_wei=2000, currency="ETH", recipient="0x1", chain_id=1, + ) + with pytest.raises(ValueError, match="exceeds cap"): + mw._validate_offer(offer) + + def test_rejects_unknown_recipient(self): + mw = self._make_middleware(allowed_recipients={"0xGOOD"}) + offer = PaymentOffer( + amount_wei=100, currency="ETH", recipient="0xBAD", chain_id=1, + ) + with pytest.raises(ValueError, match="not in allowlist"): + mw._validate_offer(offer) + + def test_accepts_valid_offer(self): + mw = self._make_middleware( + max_payment_wei=10**18, + allowed_recipients={"0xGOOD"}, + ) + offer = PaymentOffer( + amount_wei=10**15, currency="ETH", recipient="0xGOOD", chain_id=1, + ) + mw._validate_offer(offer) # Should not raise + + def test_rejects_over_gas_budget(self): + tracker = MagicMock() + tracker.can_send_transaction.return_value = False + mw = self._make_middleware(gas_tracker=tracker) + offer = PaymentOffer( + amount_wei=100, currency="ETH", recipient="0x1", chain_id=1, + ) + with pytest.raises(ValueError, match="gas budget"): + mw._validate_offer(offer) + + +# ─── Payment Execution Tests ──────────────────────────────────────────────── + +class TestPaymentExecution: + def test_exact_payment(self): + client = MagicMock() + client.wallet_address = "0xPAYER" + client.sign_and_send.return_value = "0xTXHASH" + client.wait_for_confirmations.return_value = {"status": 1} + + mw = X402Middleware(payment_client=client) + offer = PaymentOffer( + amount_wei=10**15, currency="ETH", + recipient="0xRECIPIENT", chain_id=8453, nonce="n1", + ) + + proof = mw._pay_onchain(offer) + assert proof.tx_hash == "0xTXHASH" + assert proof.payer == "0xPAYER" + assert proof.chain_id == 8453 + + client.sign_and_send.assert_called_once() + tx_arg = client.sign_and_send.call_args[0][0] + assert tx_arg["to"] == "0xRECIPIENT" + assert tx_arg["value"] == 10**15 + + def test_escrow_payment(self): + client = MagicMock() + client.wallet_address = "0xPAYER" + mock_req = MagicMock() + mock_req.request_id = "req-123" + client.create_payment.return_value = mock_req + + mw = X402Middleware(payment_client=client) + offer = PaymentOffer( + amount_wei=10**15, currency="ETH", + recipient="0xRECIPIENT", chain_id=1, + scheme=PaymentScheme.ESCROW, + ) + + proof = mw._pay_onchain(offer) + assert proof.tx_hash == "req-123" + client.create_payment.assert_called_once() + + +# ─── Spend Summary Tests ──────────────────────────────────────────────────── + +class TestSpendSummary: + def test_empty_summary(self): + client = MagicMock() + client.wallet_address = "0x1" + mw = X402Middleware(payment_client=client) + summary = mw.get_spend_summary() + assert summary["total_payments"] == 0 + assert summary["total_spent_wei"] == 0 + + def test_tracks_payments(self): + client = MagicMock() + client.wallet_address = "0x1" + mw = X402Middleware(payment_client=client) + + offer = PaymentOffer(amount_wei=1000, currency="ETH", recipient="0x2", chain_id=1) + proof = PaymentProof(tx_hash="0xa", chain_id=1, payer="0x1", amount_wei=1000) + mw.payment_history.append(PaymentRecord( + endpoint="/api/a", offer=offer, proof=proof, response_status=200, + )) + mw.total_spent_wei = 1000 + + summary = mw.get_spend_summary() + assert summary["total_payments"] == 1 + assert summary["total_spent_wei"] == 1000 + assert summary["by_endpoint"]["/api/a"] == 1000 From 17a9079bd484f5fecec199a145d7983700222107 Mon Sep 17 00:00:00 2001 From: Abhishek Krishna Date: Wed, 29 Apr 2026 16:31:57 +0530 Subject: [PATCH 5/5] zap_transport: binary wire encoding for PaymentOffer / PaymentProof MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds switchboard/zap_transport.py — encode/decode PaymentOffer and PaymentProof over the ZAP wire format (luxfi/zap, port 9999) as a zero-allocation alternative to the existing JSON-in-HTTP-headers path. Why: switchboard's HTTP/JSON transport is fine for HTTP/REST agents but expensive for high-volume agent-to-agent traffic (every offer parsed, allocated, copied). When two agents sit on the same Lux network, ZAP lets them exchange offers/proofs with zero parse-time allocation and ~10x smaller payloads. zap_py is an optional dependency. If it isn't installed, the module still imports cleanly and encode/decode raise ZapNotAvailable so callers can fall back to the JSON path. Tests use pytest.importorskip so the suite stays green either way. Wire layout uses zap_py.StructBuilder so the schema is pinned and a Go counterpart can mirror it exactly: PaymentOffer: scheme(u8) | chain_id(u64) | expires_at(u64, 0=null) | recipient(address) | amount(bytes,uint256-be) | currency(text) | description(text) | endpoint(text) | nonce(text) PaymentProof: chain_id(u64) | timestamp(u64) | payer(address) | tx_hash(hash32) | amount(bytes,uint256-be) | nonce(text) amount is a 32-byte big-endian uint256 rather than a uint64 so realistic on-chain values aren't truncated. 11 tests cover: schema layout sanity, minimal+full offer roundtrip, each PaymentScheme value, uint256-max amount, overflow/negative rejection, proof roundtrip, invalid tx_hash length, ZapNotAvailable gating, and a wire-size-vs-JSON sanity check. Existing 40-test switchboard suite remains green. Two pre-existing test files (test_nonce_manager.py, test_payment_protocol.py) had collection-time syntax errors before this branch; they remain unchanged. --- switchboard/zap_transport.py | 245 +++++++++++++++++++++++++++++++++++ tests/test_zap_transport.py | 186 ++++++++++++++++++++++++++ 2 files changed, 431 insertions(+) create mode 100644 switchboard/zap_transport.py create mode 100644 tests/test_zap_transport.py diff --git a/switchboard/zap_transport.py b/switchboard/zap_transport.py new file mode 100644 index 0000000..3c80f8b --- /dev/null +++ b/switchboard/zap_transport.py @@ -0,0 +1,245 @@ +"""ZAP wire transport for switchboard payment flows. + +Switchboard's existing transport encodes ``PaymentOffer`` / ``PaymentProof`` +as JSON in HTTP headers. That is fine for HTTP/REST agents but expensive +for high-volume agent-to-agent traffic — every offer is parsed, allocated, +and copied. This module adds a binary alternative: encode an offer or a +proof as a `ZAP `_ message so two agents +sitting on the same Lux network (port 9999) can exchange them without +HTTP, JSON, or per-call allocation. + +The wire layout is a fixed ZAP struct schema declared up front, so any +ZAP-speaking language (Go via ``luxfi/zap`` upstream, Python via +``zap_py``) reads and writes the same bytes. Field offsets and total +struct size are pinned by tests against the canonical +``StructBuilder.build()`` output. + +zap_py is an *optional* dependency. If it isn't installed, +``encode_offer`` / ``decode_offer`` raise ``ZapNotAvailable`` and callers +should fall back to the existing JSON path. Tests are skipped via +``pytest.importorskip`` so the suite stays green either way. + +Install (until ``luxfi-zap`` is on PyPI):: + + pip install 'luxfi-zap @ git+https://github.com/luxfi/zap@main#subdirectory=python' + +References +---------- +- ``switchboard.x402_middleware.PaymentOffer`` / ``PaymentProof`` +- luxfi/zap (Go reference) + ``python/zap_py`` (parity-tested Python port) +""" + +from __future__ import annotations + +from typing import Optional, Tuple + +from .x402_middleware import PaymentOffer, PaymentProof, PaymentScheme + +try: + from zap_py import ( + ADDRESS_SIZE, + Address, + Builder, + HASH_SIZE, + Hash, + StructBuilder, + Type, + address_from_hex, + parse, + ) + + HAS_ZAP_PY = True +except ImportError: # pragma: no cover — exercised by environment, not tests + HAS_ZAP_PY = False + + +__all__ = [ + "HAS_ZAP_PY", + "ZapNotAvailable", + "OFFER_SCHEMA", + "PROOF_SCHEMA", + "encode_offer", + "decode_offer", + "encode_proof", + "decode_proof", +] + + +class ZapNotAvailable(RuntimeError): + """zap_py is not installed; ZAP transport is unavailable.""" + + +# ─── Wire constants ────────────────────────────────────────────────────────── +# +# Both schemas use uint256 amount-as-bytes (32 LE bytes) so we don't truncate +# realistic on-chain values into a uint64 — the JSON path already accepts +# arbitrarily large `int`s and we want byte-for-byte interop with that. +_AMOUNT_BYTES = 32 + +# ``scheme`` is encoded as uint8. Order matches the wire intent, not the Python +# enum's iteration order — pin it here so a Go implementation can mirror it. +_SCHEME_TO_WIRE = { + PaymentScheme.EXACT: 0, + PaymentScheme.ESCROW: 1, + PaymentScheme.STREAMING: 2, +} +_WIRE_TO_SCHEME = {v: k for k, v in _SCHEME_TO_WIRE.items()} + + +def _build_offer_schema(): + if not HAS_ZAP_PY: + return None + return ( + StructBuilder("SwitchboardPaymentOffer") + .uint8("scheme") + .uint64("chain_id") + .uint64("expires_at") # 0 sentinel = "no expiry" + .address("recipient") + .bytes("amount") # 32-byte big-endian uint256 + .text("currency") + .text("description") + .text("endpoint") + .text("nonce") + .build() + ) + + +def _build_proof_schema(): + if not HAS_ZAP_PY: + return None + return ( + StructBuilder("SwitchboardPaymentProof") + .uint64("chain_id") + .uint64("timestamp") + .address("payer") + .hash("tx_hash") + .bytes("amount") # 32-byte big-endian uint256 + .text("nonce") + .build() + ) + + +OFFER_SCHEMA = _build_offer_schema() +PROOF_SCHEMA = _build_proof_schema() + + +def _require_zap() -> None: + if not HAS_ZAP_PY: + raise ZapNotAvailable( + "zap_py is not installed; install luxfi-zap (see switchboard/zap_transport.py)" + ) + + +def _amount_to_bytes(amount: int) -> bytes: + if amount < 0: + raise ValueError("amount must be non-negative") + if amount.bit_length() > _AMOUNT_BYTES * 8: + raise ValueError(f"amount exceeds uint{_AMOUNT_BYTES * 8}") + return amount.to_bytes(_AMOUNT_BYTES, "big") + + +def _amount_from_bytes(data: bytes) -> int: + if len(data) != _AMOUNT_BYTES: + raise ValueError(f"amount field must be {_AMOUNT_BYTES} bytes, got {len(data)}") + return int.from_bytes(data, "big") + + +def _addr_to_bytes(s: str) -> bytes: + """Accept a 0x-prefixed hex address; return raw 20 bytes.""" + return address_from_hex(s).bytes + + +def _addr_to_hex(addr) -> str: + return addr.hex() + + +# ─── PaymentOffer ──────────────────────────────────────────────────────────── + + +def encode_offer(offer: PaymentOffer) -> bytes: + """Serialize a PaymentOffer to a ZAP wire message (zero allocations on read).""" + _require_zap() + f = {fld.name: fld.offset for fld in OFFER_SCHEMA.fields} + + b = Builder() + ob = b.start_object(OFFER_SCHEMA.size) + ob.set_uint8(f["scheme"], _SCHEME_TO_WIRE[offer.scheme]) + ob.set_uint64(f["chain_id"], offer.chain_id) + ob.set_uint64(f["expires_at"], offer.expires_at or 0) + ob.set_address(f["recipient"], _addr_to_bytes(offer.recipient)) + ob.set_bytes(f["amount"], _amount_to_bytes(offer.amount_wei)) + ob.set_text(f["currency"], offer.currency) + ob.set_text(f["description"], offer.description) + ob.set_text(f["endpoint"], offer.endpoint) + ob.set_text(f["nonce"], offer.nonce) + ob.finish_as_root() + return b.finish() + + +def decode_offer(wire: bytes) -> PaymentOffer: + """Parse a ZAP wire message into a PaymentOffer.""" + _require_zap() + f = {fld.name: fld.offset for fld in OFFER_SCHEMA.fields} + + msg = parse(wire) + root = msg.root() + + expires = root.uint64(f["expires_at"]) + return PaymentOffer( + amount_wei=_amount_from_bytes(root.bytes(f["amount"])), + currency=root.text(f["currency"]), + recipient=_addr_to_hex(root.address(f["recipient"])), + chain_id=root.uint64(f["chain_id"]), + scheme=_WIRE_TO_SCHEME[root.uint8(f["scheme"])], + description=root.text(f["description"]), + endpoint=root.text(f["endpoint"]), + nonce=root.text(f["nonce"]), + expires_at=int(expires) if expires else None, + ) + + +# ─── PaymentProof ──────────────────────────────────────────────────────────── + + +def _hash_from_hex(s: str) -> bytes: + if s.startswith("0x") or s.startswith("0X"): + s = s[2:] + raw = bytes.fromhex(s) + if len(raw) != HASH_SIZE: + raise ValueError(f"tx_hash must be {HASH_SIZE} bytes, got {len(raw)}") + return raw + + +def encode_proof(proof: PaymentProof) -> bytes: + """Serialize a PaymentProof to a ZAP wire message.""" + _require_zap() + f = {fld.name: fld.offset for fld in PROOF_SCHEMA.fields} + + b = Builder() + ob = b.start_object(PROOF_SCHEMA.size) + ob.set_uint64(f["chain_id"], proof.chain_id) + ob.set_uint64(f["timestamp"], int(proof.timestamp)) + ob.set_address(f["payer"], _addr_to_bytes(proof.payer)) + ob.set_hash(f["tx_hash"], _hash_from_hex(proof.tx_hash)) + ob.set_bytes(f["amount"], _amount_to_bytes(proof.amount_wei)) + ob.set_text(f["nonce"], proof.nonce) + ob.finish_as_root() + return b.finish() + + +def decode_proof(wire: bytes) -> PaymentProof: + """Parse a ZAP wire message into a PaymentProof.""" + _require_zap() + f = {fld.name: fld.offset for fld in PROOF_SCHEMA.fields} + + msg = parse(wire) + root = msg.root() + + return PaymentProof( + tx_hash=root.hash(f["tx_hash"]).hex(), + chain_id=root.uint64(f["chain_id"]), + payer=_addr_to_hex(root.address(f["payer"])), + amount_wei=_amount_from_bytes(root.bytes(f["amount"])), + nonce=root.text(f["nonce"]), + timestamp=float(root.uint64(f["timestamp"])), + ) diff --git a/tests/test_zap_transport.py b/tests/test_zap_transport.py new file mode 100644 index 0000000..716917f --- /dev/null +++ b/tests/test_zap_transport.py @@ -0,0 +1,186 @@ +"""Roundtrip tests for switchboard.zap_transport. + +Skipped if zap_py is not installed — install with:: + + pip install 'luxfi-zap @ git+https://github.com/luxfi/zap@main#subdirectory=python' +""" + +from __future__ import annotations + +import time + +import pytest + +zap_py = pytest.importorskip("zap_py") + +from switchboard.x402_middleware import PaymentOffer, PaymentProof, PaymentScheme +from switchboard import zap_transport as zt + + +VITALIK = "0xd8da6bf26964af9d7eed9e03e53415d37aa96045" +USDC = "0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48" +SAMPLE_TX = "0x" + "ab" * 32 + + +def test_offer_schema_layout(): + """Schema is well-formed and has the expected fields.""" + fields = {f.name for f in zt.OFFER_SCHEMA.fields} + assert fields == { + "scheme", + "chain_id", + "expires_at", + "recipient", + "amount", + "currency", + "description", + "endpoint", + "nonce", + } + # Final align(8); nine fields, smallest start. + assert zt.OFFER_SCHEMA.size > 0 + assert zt.OFFER_SCHEMA.size % 8 == 0 + + +def test_offer_roundtrip_minimal(): + offer = PaymentOffer( + amount_wei=1_000_000, + currency="USDC", + recipient=USDC, + chain_id=8453, + ) + wire = zt.encode_offer(offer) + assert isinstance(wire, bytes) + out = zt.decode_offer(wire) + + assert out.amount_wei == offer.amount_wei + assert out.currency == offer.currency + assert out.recipient.lower() == offer.recipient.lower() + assert out.chain_id == offer.chain_id + assert out.scheme == PaymentScheme.EXACT # default + assert out.description == "" + assert out.endpoint == "" + assert out.nonce == "" + assert out.expires_at is None # 0 sentinel restored to None + + +def test_offer_roundtrip_full(): + offer = PaymentOffer( + amount_wei=12_345_678_901_234_567_890_123, # > uint64 + currency="ETH", + recipient=VITALIK, + chain_id=1, + scheme=PaymentScheme.ESCROW, + description="inference call to /v1/embed", + endpoint="/v1/embed", + nonce="abc-xyz-123", + expires_at=int(time.time()) + 60, + ) + wire = zt.encode_offer(offer) + out = zt.decode_offer(wire) + + assert out.amount_wei == offer.amount_wei + assert out.currency == offer.currency + assert out.recipient.lower() == offer.recipient.lower() + assert out.chain_id == offer.chain_id + assert out.scheme == offer.scheme + assert out.description == offer.description + assert out.endpoint == offer.endpoint + assert out.nonce == offer.nonce + assert out.expires_at == offer.expires_at + + +def test_offer_each_scheme_roundtrips(): + for scheme in PaymentScheme: + offer = PaymentOffer(amount_wei=1, currency="ETH", recipient=USDC, chain_id=1, scheme=scheme) + wire = zt.encode_offer(offer) + out = zt.decode_offer(wire) + assert out.scheme == scheme + + +def test_offer_amount_uint256_max(): + """Wire format must accept a full uint256 amount, not truncate to uint64.""" + big = (1 << 256) - 1 + offer = PaymentOffer(amount_wei=big, currency="ETH", recipient=USDC, chain_id=1) + wire = zt.encode_offer(offer) + out = zt.decode_offer(wire) + assert out.amount_wei == big + + +def test_offer_amount_overflow_rejected(): + offer = PaymentOffer(amount_wei=1 << 256, currency="ETH", recipient=USDC, chain_id=1) + with pytest.raises(ValueError): + zt.encode_offer(offer) + + +def test_offer_negative_amount_rejected(): + offer = PaymentOffer(amount_wei=-1, currency="ETH", recipient=USDC, chain_id=1) + with pytest.raises(ValueError): + zt.encode_offer(offer) + + +def test_proof_roundtrip(): + proof = PaymentProof( + tx_hash=SAMPLE_TX, + chain_id=8453, + payer=VITALIK, + amount_wei=999_999, + nonce="proof-1", + timestamp=1714421000.0, + ) + wire = zt.encode_proof(proof) + out = zt.decode_proof(wire) + + assert out.tx_hash == SAMPLE_TX + assert out.chain_id == proof.chain_id + assert out.payer.lower() == proof.payer.lower() + assert out.amount_wei == proof.amount_wei + assert out.nonce == proof.nonce + assert out.timestamp == 1714421000.0 + + +def test_proof_invalid_tx_hash_length(): + proof = PaymentProof( + tx_hash="0xdead", # too short + chain_id=1, + payer=VITALIK, + amount_wei=1, + ) + with pytest.raises(ValueError): + zt.encode_proof(proof) + + +def test_zap_not_available_raises_when_disabled(monkeypatch): + """If zap_py is force-disabled, encode/decode must raise ZapNotAvailable.""" + monkeypatch.setattr(zt, "HAS_ZAP_PY", False) + offer = PaymentOffer(amount_wei=1, currency="ETH", recipient=USDC, chain_id=1) + with pytest.raises(zt.ZapNotAvailable): + zt.encode_offer(offer) + with pytest.raises(zt.ZapNotAvailable): + zt.decode_offer(b"junk") + + +def test_offer_wire_smaller_than_json(): + """Sanity: ZAP encoding is genuinely tighter than the JSON header path.""" + import json + + offer = PaymentOffer( + amount_wei=1_000_000, + currency="USDC", + recipient=USDC, + chain_id=8453, + nonce="n-1", + endpoint="/v1/x", + ) + wire = zt.encode_offer(offer) + json_blob = json.dumps({ + "amount": str(offer.amount_wei), + "currency": offer.currency, + "recipient": offer.recipient, + "chainId": offer.chain_id, + "scheme": offer.scheme.value, + "endpoint": offer.endpoint, + "nonce": offer.nonce, + }).encode() + # Not an absolute guarantee (small offers may bloat with the ZAP header), + # but for any realistic offer the binary form should win. + assert len(wire) <= len(json_blob) + 64 # tolerance for fixed ZAP header