From 9bab1bd407322e81f603e849ec1d07ae3961fd7f Mon Sep 17 00:00:00 2001 From: obchain Date: Tue, 21 Apr 2026 19:18:45 +0530 Subject: [PATCH 1/4] feat(scanner): mempool monitor for Venus oracle updates MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit `MempoolMonitor` subscribes to the chain's pending-tx stream, filters for calls targeting the Venus price oracle, and decodes the pending update before it confirms. Gives the bot a ~3 s (one-block) head start on BSC: a downstream handler can simulate the incoming price against current positions, pre-sign liquidations for any borrower that crosses the health threshold, and fire them on the next block. Pure decode + pre-sign storage sits on `PendingCache` so the selector and TTL logic is test-friendly without an RPC provider. The RPC-bound subscription lives on `MempoolMonitor` proper, which reconnects with the same 1 s → 30 s exponential backoff as the block listener. - Recognises four oracle-write selectors out of the box (`updatePrice`, `updateAssetPrice`, `setDirectPrice`, `setUnderlyingPrice`) — selector set is constructor-configurable for deployments behind a custom proxy. - `OracleUpdate` carries the tx hash, matched selector, asset/vToken, and new price (`None` for single-arg refresh calls). - Pre-signed liquidations are keyed by borrower; drain-on-block clears the map and drops entries older than 30 s so stale pre-signs don't broadcast against a subsequent block's state. - 15 unit tests cover selector matching, ABI decoding of each call shape, recipient/selector/length rejection paths, drain semantics, TTL expiry, overwrite-on-repeat-insert, and default-selector membership. Library-only in this PR. Wiring into the CLI listen loop (plus the handler that consumes `OracleUpdate`, pre-signs via `TxBuilder`, and broadcasts drained txs via `Submitter`) lands alongside the broadcast integration work once a real signer and deployed `CharonLiquidator` are available. Closes #17 --- crates/charon-scanner/src/lib.rs | 5 + crates/charon-scanner/src/mempool.rs | 642 +++++++++++++++++++++++++++ 2 files changed, 647 insertions(+) create mode 100644 crates/charon-scanner/src/mempool.rs diff --git a/crates/charon-scanner/src/lib.rs b/crates/charon-scanner/src/lib.rs index 92f32c6..fdd6357 100644 --- a/crates/charon-scanner/src/lib.rs +++ b/crates/charon-scanner/src/lib.rs @@ -1,11 +1,16 @@ //! Charon scanner — chain listener, health-factor scanner, and price cache. pub mod listener; +pub mod mempool; pub mod oracle; pub mod provider; pub mod scanner; pub use listener::{BlockListener, ChainEvent}; +pub use mempool::{ + DEFAULT_MAX_PENDING_AGE, MempoolMonitor, OracleUpdate, PendingCache, PreSignedLiquidation, + default_selectors, +}; pub use oracle::{CachedPrice, DEFAULT_MAX_AGE, PriceCache}; pub use provider::ChainProvider; pub use scanner::{BucketCounts, BucketedPosition, HealthScanner, PositionBucket}; diff --git a/crates/charon-scanner/src/mempool.rs b/crates/charon-scanner/src/mempool.rs new file mode 100644 index 0000000..24c0a19 --- /dev/null +++ b/crates/charon-scanner/src/mempool.rs @@ -0,0 +1,642 @@ +//! Mempool monitor — head-start on Venus oracle price updates. +//! +//! Subscribes to the chain's pending-tx stream, looks up the full +//! transaction for each hash, and filters for calls that target the +//! Venus price oracle discovered by +//! [`VenusAdapter`](charon_protocols::venus::VenusAdapter). A match +//! means the next block is about to carry a price change that could +//! push borrowers under water; decoded [`OracleUpdate`] events are +//! emitted on an `mpsc` channel so a downstream handler can simulate +//! the impact and pre-sign liquidations before the update confirms. +//! +//! The monitor also owns a small in-memory `DashMap` of pre-signed +//! liquidations keyed by borrower. On the next +//! [`ChainEvent::NewBlock`](crate::listener::ChainEvent::NewBlock) the +//! caller drains this map via [`MempoolMonitor::drain`], broadcasts the +//! raw txs, and the monitor clears its state — mempool pre-signs are +//! valid for exactly one block. Entries older than +//! `max_pending_age_secs` are dropped silently on drain; they represent +//! an oracle update we saw but never confirmed. +//! +//! Pure decode + pre-sign storage lives on [`PendingCache`] so tests +//! can exercise it without a live RPC; the RPC-bound subscription +//! lives on [`MempoolMonitor`]. +//! +//! This module is library-only. Wiring into the CLI listen loop is a +//! separate PR once real signer + deployed liquidator are in place. + +use std::collections::HashSet; +use std::sync::Arc; +use std::time::{Duration, SystemTime, UNIX_EPOCH}; + +use alloy::consensus::Transaction as _; +use alloy::primitives::{Address, B256, Bytes, FixedBytes, U256}; +use alloy::providers::Provider; +use alloy::providers::RootProvider; +use alloy::pubsub::PubSubFrontend; +use alloy::sol; +use alloy::sol_types::SolCall; +use anyhow::{Context, Result}; +use charon_core::LiquidationOpportunity; +use dashmap::DashMap; +use futures_util::StreamExt; +use tokio::sync::mpsc; +use tracing::{debug, info, warn}; + +/// Default lifetime for a pre-signed liquidation sitting in the +/// pending map. The head-start window is ~3 s on BSC (one block); we +/// pad to 30 s so a one-block stall on the private RPC doesn't +/// silently drop a prepared tx. +pub const DEFAULT_MAX_PENDING_AGE: Duration = Duration::from_secs(30); + +sol! { + /// Superset of price-update surfaces the Venus oracle stack has + /// exposed historically. Kept in one `sol!` block so each call's + /// `::SELECTOR` constant is generated by alloy — no hand-maintained + /// keccak tables. + interface IVenusOracleWrite { + /// Resilient oracle — refreshes the cached snapshot for `asset` + /// by re-reading its configured source oracles. + function updatePrice(address asset) external; + + /// Alternate entry on the resilient oracle for the same action. + function updateAssetPrice(address asset) external; + + /// Legacy oracle — writes a price directly against the + /// underlying asset address. + function setDirectPrice(address asset, uint256 price) external; + + /// Older Compound-style oracle — writes a price keyed by + /// vToken. + function setUnderlyingPrice(address vToken, uint256 price) external; + } +} + +/// Decoded observation extracted from one pending tx. +#[derive(Debug, Clone)] +pub struct OracleUpdate { + /// Hash of the pending tx that triggered the observation. + pub tx_hash: B256, + /// The 4-byte selector matched. Callers key their simulation paths + /// off this — single-arg selectors carry no price, two-arg + /// selectors carry the new on-chain price. + pub selector: FixedBytes<4>, + /// Address argument from the call (asset or vToken, depending on + /// the selector). + pub asset: Address, + /// New price, if the selector carried one. `None` for + /// `updatePrice` / `updateAssetPrice` — the caller must re-read + /// the oracle after the tx confirms (or simulate via the + /// underlying feed). + pub new_price: Option, +} + +/// One signed liquidation sitting in the pending map, ready to +/// broadcast the moment the next block lands. +#[derive(Debug, Clone)] +pub struct PreSignedLiquidation { + /// Borrower targeted. Also the map key; duplicated here so a + /// drained `Vec` is self-describing. + pub borrower: Address, + /// Raw EIP-2718 envelope bytes, as produced by + /// [`TxBuilder::sign`](charon_executor::TxBuilder::sign). Ready + /// for `eth_sendRawTransaction`. + pub raw_tx: Bytes, + /// The opportunity this tx was built against. Carried so the + /// drainer can log context and re-rank if multiple pre-signs + /// target the same borrower across different oracle updates. + pub opportunity: LiquidationOpportunity, + /// Hash of the pending oracle tx that motivated this pre-sign. + pub trigger_tx: B256, + /// Unix seconds at which the entry was inserted. + pub inserted_at: u64, +} + +/// Pure decode + pre-sign storage. Separated from the RPC layer so +/// tests can exercise the selector logic and TTL semantics without +/// opening a socket. +#[derive(Debug)] +pub struct PendingCache { + oracle: Address, + selectors: HashSet>, + pending: DashMap, + max_pending_age_secs: u64, +} + +impl PendingCache { + pub fn new( + oracle: Address, + selectors: HashSet>, + max_pending_age: Duration, + ) -> Self { + Self { + oracle, + selectors, + pending: DashMap::new(), + max_pending_age_secs: max_pending_age.as_secs(), + } + } + + pub fn with_defaults(oracle: Address) -> Self { + Self::new(oracle, default_selectors(), DEFAULT_MAX_PENDING_AGE) + } + + pub fn oracle(&self) -> Address { + self.oracle + } + + pub fn is_tracked_selector(&self, selector: FixedBytes<4>) -> bool { + self.selectors.contains(&selector) + } + + /// Insert a freshly pre-signed liquidation. Overwrites any prior + /// entry for the same borrower — the most recent oracle update + /// wins, which is what we want when two updates land in the same + /// block window (the later one is what the chain will see). + pub fn insert(&self, tx: PreSignedLiquidation) { + debug!( + borrower = %tx.borrower, + trigger = %tx.trigger_tx, + "pre-signed liquidation armed" + ); + self.pending.insert(tx.borrower, tx); + } + + pub fn pending_len(&self) -> usize { + self.pending.len() + } + + pub fn is_empty(&self) -> bool { + self.pending.is_empty() + } + + /// Take every entry out of the pending map and drop any that are + /// older than `max_pending_age_secs`. Called by the block-listener + /// task on each `NewBlock` event — pre-signs only live for one + /// block window, so anything that didn't fire is stale. + pub fn drain(&self) -> Vec { + let now = unix_now(); + let max_age = self.max_pending_age_secs; + let mut out = Vec::with_capacity(self.pending.len()); + let keys: Vec
= self.pending.iter().map(|e| *e.key()).collect(); + for k in keys { + if let Some((_, entry)) = self.pending.remove(&k) { + if now.saturating_sub(entry.inserted_at) > max_age { + warn!( + borrower = %entry.borrower, + age_secs = now.saturating_sub(entry.inserted_at), + "dropped stale pre-signed liquidation" + ); + continue; + } + out.push(entry); + } + } + debug!(drained = out.len(), "mempool cache drained"); + out + } + + /// Pure decoder — returns `None` when the recipient isn't the + /// bound oracle, the selector isn't tracked, or the calldata + /// fails to decode against every candidate shape. + pub fn decode(&self, tx_hash: B256, to: Option
, input: &[u8]) -> Option { + if to != Some(self.oracle) { + return None; + } + if input.len() < 4 { + return None; + } + let selector = FixedBytes::<4>::from_slice(&input[..4]); + if !self.selectors.contains(&selector) { + return None; + } + decode_oracle_call(tx_hash, selector, input) + } +} + +/// Subscribes to the pending-tx stream, filters oracle updates, and +/// holds pre-signed liquidations until the next block. +/// +/// Cheap to clone — all mutable state lives behind `Arc` / `DashMap`. +/// Clone into the block-listener task so it can call +/// [`MempoolMonitor::drain`] without coordinating with the mempool +/// task. +#[derive(Clone)] +pub struct MempoolMonitor { + provider: Arc>, + cache: Arc, +} + +impl MempoolMonitor { + /// Full-control constructor. + pub fn new( + provider: Arc>, + oracle: Address, + selectors: HashSet>, + max_pending_age: Duration, + ) -> Self { + Self { + provider, + cache: Arc::new(PendingCache::new(oracle, selectors, max_pending_age)), + } + } + + /// Convenience: build with [`default_selectors`] and + /// [`DEFAULT_MAX_PENDING_AGE`]. + pub fn with_defaults(provider: Arc>, oracle: Address) -> Self { + Self::new( + provider, + oracle, + default_selectors(), + DEFAULT_MAX_PENDING_AGE, + ) + } + + pub fn oracle(&self) -> Address { + self.cache.oracle() + } + + /// Share the inner cache. Lets the block-listener task call + /// [`PendingCache::drain`] without going through the monitor, + /// which keeps its `run` loop free to stay on the pending-tx + /// stream. + pub fn cache(&self) -> Arc { + self.cache.clone() + } + + pub fn insert(&self, tx: PreSignedLiquidation) { + self.cache.insert(tx); + } + + pub fn drain(&self) -> Vec { + self.cache.drain() + } + + pub fn pending_len(&self) -> usize { + self.cache.pending_len() + } + + /// Run the pending-tx subscription forever. Reconnect on stream + /// error with the same 1 s → 30 s exponential backoff as + /// [`BlockListener`](crate::listener::BlockListener). + /// + /// Emits one [`OracleUpdate`] per matched tx on `tx`. Returns + /// `Ok(())` only when the receiver is dropped — the loop is + /// expected to run for the lifetime of the process. + pub async fn run(&self, tx: mpsc::Sender) -> Result<()> { + let mut backoff = Duration::from_secs(1); + loop { + match self.run_once(&tx).await { + Ok(()) => { + info!(oracle = %self.oracle(), "mempool channel closed, exiting"); + return Ok(()); + } + Err(err) => { + warn!( + oracle = %self.oracle(), + error = ?err, + backoff_secs = backoff.as_secs(), + "mempool subscription error, reconnecting after backoff" + ); + tokio::time::sleep(backoff).await; + backoff = (backoff * 2).min(Duration::from_secs(30)); + } + } + } + } + + async fn run_once(&self, tx: &mpsc::Sender) -> Result<()> { + let sub = self + .provider + .subscribe_pending_transactions() + .await + .context("mempool: subscribe_pending_transactions failed")?; + + info!(oracle = %self.oracle(), "pending-tx subscription established"); + + let mut stream = sub.into_stream(); + while let Some(hash) = stream.next().await { + // Lookup failures are common for txs that dropped out of + // the pool between the hash push and our get — log at + // debug, keep going. + let full = match self.provider.get_transaction_by_hash(hash).await { + Ok(Some(t)) => t, + Ok(None) => { + debug!(%hash, "pending tx vanished before fetch"); + continue; + } + Err(err) => { + debug!(%hash, ?err, "get_transaction_by_hash failed"); + continue; + } + }; + + let to = full.inner.kind().to().copied(); + let input = full.inner.input(); + let Some(update) = self.cache.decode(hash, to, input) else { + continue; + }; + + info!( + %hash, + asset = %update.asset, + selector = %format_selector(update.selector), + price = ?update.new_price, + "venus oracle update seen in mempool" + ); + + if tx.send(update).await.is_err() { + return Ok(()); + } + } + + anyhow::bail!("mempool: pending-tx subscription stream ended") + } +} + +/// The four Venus oracle write selectors this module recognises by +/// default. Callers can override via [`MempoolMonitor::new`] — for +/// example to track a chain-specific proxy that exposes a different +/// setter name. +pub fn default_selectors() -> HashSet> { + let mut s = HashSet::with_capacity(4); + s.insert(IVenusOracleWrite::updatePriceCall::SELECTOR.into()); + s.insert(IVenusOracleWrite::updateAssetPriceCall::SELECTOR.into()); + s.insert(IVenusOracleWrite::setDirectPriceCall::SELECTOR.into()); + s.insert(IVenusOracleWrite::setUnderlyingPriceCall::SELECTOR.into()); + s +} + +fn decode_oracle_call( + tx_hash: B256, + selector: FixedBytes<4>, + input: &[u8], +) -> Option { + // `abi_decode_raw` skips the selector and validates the body. + // `validate = true` rejects trailing junk. + let body = &input[4..]; + + if selector == FixedBytes::<4>::from(IVenusOracleWrite::updatePriceCall::SELECTOR) { + let call = IVenusOracleWrite::updatePriceCall::abi_decode_raw(body, true).ok()?; + return Some(OracleUpdate { + tx_hash, + selector, + asset: call.asset, + new_price: None, + }); + } + if selector == FixedBytes::<4>::from(IVenusOracleWrite::updateAssetPriceCall::SELECTOR) { + let call = IVenusOracleWrite::updateAssetPriceCall::abi_decode_raw(body, true).ok()?; + return Some(OracleUpdate { + tx_hash, + selector, + asset: call.asset, + new_price: None, + }); + } + if selector == FixedBytes::<4>::from(IVenusOracleWrite::setDirectPriceCall::SELECTOR) { + let call = IVenusOracleWrite::setDirectPriceCall::abi_decode_raw(body, true).ok()?; + return Some(OracleUpdate { + tx_hash, + selector, + asset: call.asset, + new_price: Some(call.price), + }); + } + if selector == FixedBytes::<4>::from(IVenusOracleWrite::setUnderlyingPriceCall::SELECTOR) { + let call = IVenusOracleWrite::setUnderlyingPriceCall::abi_decode_raw(body, true).ok()?; + return Some(OracleUpdate { + tx_hash, + selector, + asset: call.vToken, + new_price: Some(call.price), + }); + } + None +} + +fn format_selector(sel: FixedBytes<4>) -> String { + let b = sel.as_slice(); + format!("0x{:02x}{:02x}{:02x}{:02x}", b[0], b[1], b[2], b[3]) +} + +fn unix_now() -> u64 { + SystemTime::now() + .duration_since(UNIX_EPOCH) + .map(|d| d.as_secs()) + .unwrap_or(0) +} + +#[cfg(test)] +mod tests { + use super::*; + use alloy::primitives::{address, b256}; + use alloy::sol_types::SolCall; + use charon_core::{FlashLoanSource, Position, ProtocolId, SwapRoute}; + + const ORACLE: Address = address!("1111111111111111111111111111111111111111"); + const OTHER: Address = address!("2222222222222222222222222222222222222222"); + const ASSET: Address = address!("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"); + const HASH: B256 = b256!("abababababababababababababababababababababababababababababababab"); + + fn mk_cache() -> PendingCache { + PendingCache::with_defaults(ORACLE) + } + + fn mk_opp() -> LiquidationOpportunity { + LiquidationOpportunity { + position: Position { + protocol: ProtocolId::Venus, + chain_id: 56, + borrower: address!("3333333333333333333333333333333333333333"), + collateral_token: address!("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"), + debt_token: address!("bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb"), + collateral_amount: U256::from(1_000u64), + debt_amount: U256::from(500u64), + health_factor: U256::ZERO, + liquidation_bonus_bps: 1_000, + }, + debt_to_repay: U256::from(250u64), + expected_collateral_out: U256::from(275u64), + flash_source: FlashLoanSource::AaveV3, + swap_route: SwapRoute { + token_in: address!("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"), + token_out: address!("bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb"), + amount_in: U256::from(275u64), + min_amount_out: U256::from(260u64), + pool_fee: 3_000, + }, + net_profit_usd_cents: 5_000, + } + } + + #[test] + fn default_selectors_has_four_entries() { + assert_eq!(default_selectors().len(), 4); + } + + #[test] + fn decode_update_price_yields_asset_without_price() { + let c = mk_cache(); + let call = IVenusOracleWrite::updatePriceCall { asset: ASSET }; + let data = call.abi_encode(); + let out = c.decode(HASH, Some(ORACLE), &data).expect("match"); + assert_eq!(out.asset, ASSET); + assert!(out.new_price.is_none()); + assert_eq!(out.tx_hash, HASH); + } + + #[test] + fn decode_update_asset_price_yields_asset_without_price() { + let c = mk_cache(); + let call = IVenusOracleWrite::updateAssetPriceCall { asset: ASSET }; + let data = call.abi_encode(); + let out = c.decode(HASH, Some(ORACLE), &data).expect("match"); + assert_eq!(out.asset, ASSET); + assert!(out.new_price.is_none()); + } + + #[test] + fn decode_set_direct_price_carries_new_price() { + let c = mk_cache(); + let call = IVenusOracleWrite::setDirectPriceCall { + asset: ASSET, + price: U256::from(12_345u64), + }; + let data = call.abi_encode(); + let out = c.decode(HASH, Some(ORACLE), &data).expect("match"); + assert_eq!(out.asset, ASSET); + assert_eq!(out.new_price, Some(U256::from(12_345u64))); + } + + #[test] + fn decode_set_underlying_price_carries_vtoken_and_price() { + let c = mk_cache(); + let call = IVenusOracleWrite::setUnderlyingPriceCall { + vToken: ASSET, + price: U256::from(99u64), + }; + let data = call.abi_encode(); + let out = c.decode(HASH, Some(ORACLE), &data).expect("match"); + assert_eq!(out.asset, ASSET); + assert_eq!(out.new_price, Some(U256::from(99u64))); + } + + #[test] + fn decode_rejects_wrong_recipient() { + let c = mk_cache(); + let call = IVenusOracleWrite::updatePriceCall { asset: ASSET }; + let data = call.abi_encode(); + assert!(c.decode(HASH, Some(OTHER), &data).is_none()); + assert!(c.decode(HASH, None, &data).is_none()); + } + + #[test] + fn decode_rejects_unknown_selector() { + let c = mk_cache(); + // `transfer(address,uint256)` selector — not in the tracked + // set. Followed by two zero-padded words so a lenient decoder + // wouldn't accidentally accept it. + let mut data = vec![0xa9, 0x05, 0x9c, 0xbb]; + data.extend_from_slice(&[0u8; 64]); + assert!(c.decode(HASH, Some(ORACLE), &data).is_none()); + } + + #[test] + fn decode_rejects_short_input() { + let c = mk_cache(); + assert!(c.decode(HASH, Some(ORACLE), &[]).is_none()); + assert!(c.decode(HASH, Some(ORACLE), &[0xde, 0xad]).is_none()); + } + + #[test] + fn decode_rejects_truncated_calldata() { + let c = mk_cache(); + let sel: [u8; 4] = IVenusOracleWrite::updatePriceCall::SELECTOR; + assert!(c.decode(HASH, Some(ORACLE), &sel).is_none()); + } + + #[test] + fn insert_and_drain_roundtrips() { + let c = mk_cache(); + let opp = mk_opp(); + let borrower = opp.position.borrower; + c.insert(PreSignedLiquidation { + borrower, + raw_tx: Bytes::from_static(&[0x01, 0x02, 0x03]), + opportunity: opp, + trigger_tx: HASH, + inserted_at: unix_now(), + }); + assert_eq!(c.pending_len(), 1); + let drained = c.drain(); + assert_eq!(drained.len(), 1); + assert_eq!(drained[0].borrower, borrower); + assert_eq!(c.pending_len(), 0); + // Second drain yields nothing. + assert!(c.drain().is_empty()); + } + + #[test] + fn drain_drops_stale_entries() { + let c = mk_cache(); + let opp = mk_opp(); + c.insert(PreSignedLiquidation { + borrower: opp.position.borrower, + raw_tx: Bytes::new(), + opportunity: opp, + trigger_tx: HASH, + inserted_at: unix_now().saturating_sub(3_600), + }); + assert_eq!(c.pending_len(), 1); + let drained = c.drain(); + assert!(drained.is_empty(), "stale entry should be dropped"); + assert_eq!(c.pending_len(), 0); + } + + #[test] + fn insert_overwrites_same_borrower() { + let c = mk_cache(); + let opp = mk_opp(); + let borrower = opp.position.borrower; + c.insert(PreSignedLiquidation { + borrower, + raw_tx: Bytes::from_static(&[0x01]), + opportunity: opp.clone(), + trigger_tx: HASH, + inserted_at: unix_now(), + }); + c.insert(PreSignedLiquidation { + borrower, + raw_tx: Bytes::from_static(&[0x02]), + opportunity: opp, + trigger_tx: HASH, + inserted_at: unix_now(), + }); + assert_eq!(c.pending_len(), 1); + let drained = c.drain(); + assert_eq!(drained.len(), 1); + assert_eq!(drained[0].raw_tx.as_ref(), &[0x02]); + } + + #[test] + fn is_tracked_selector_matches_defaults() { + let c = mk_cache(); + let sel = FixedBytes::<4>::from(IVenusOracleWrite::updatePriceCall::SELECTOR); + assert!(c.is_tracked_selector(sel)); + let unknown = FixedBytes::<4>::from([0xde, 0xad, 0xbe, 0xef]); + assert!(!c.is_tracked_selector(unknown)); + } + + #[test] + fn oracle_round_trips() { + let c = mk_cache(); + assert_eq!(c.oracle(), ORACLE); + } + + #[test] + fn format_selector_renders_lowercase_hex() { + let sel = FixedBytes::<4>::from([0xab, 0xcd, 0xef, 0x01]); + assert_eq!(format_selector(sel), "0xabcdef01"); + } +} From 2f8d0693fd06fcf0af487afd225c35b7c5735ab3 Mon Sep 17 00:00:00 2001 From: obchain Date: Thu, 23 Apr 2026 17:25:02 +0530 Subject: [PATCH 2/4] fix(scanner): mempool public-RPC warn + pre-sign sim-gate guard MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit subscribe_pending_transactions succeeds against public BSC endpoints even when the subscription is disabled or scoped to the local pool. The original code logged "subscription established" and the stream silently yielded nothing. Add a FIRST_TX_WATCHDOG (30s) that fires once on the select! loop if no hash arrives, with a warn! naming the likely cause (public RPC) and the two working endpoint classes (paid MEV stream, self-hosted geth with exposed txpool). Module rustdoc gains a dedicated "RPC endpoint requirements" section so operators see the constraint before they deploy. Pre-signed liquidations bypass the eth_call gate that charon- executor enforces for non-pre-signed flows. The oracle tx that motivated the pre-sign may never confirm (revert, replacement, not-included), so a drain-and-broadcast path would violate the CLAUDE.md hard invariant that every liquidation tx passes a simulation gate. Move enforcement into the type system: - SimulationVerdict { Ok, Revert, Error }, #[must_use] - UnverifiedPreSigned newtype wraps PreSignedLiquidation; only peek accessors (borrower, trigger_tx, opportunity) are public - verify(self, SimulationVerdict) consumes the wrapper; Ok returns the inner struct (raw_tx reachable), Revert/Error returns Err((self, verdict)) so the caller keeps the wrapper for logging and cannot accidentally broadcast - PendingCache::drain and MempoolMonitor::drain now return Vec, both #[must_use] A broadcaster written against this API cannot reach raw_tx without a passing verdict — the gate is enforced by the type checker, not by a comment. Five new unit tests cover the Ok/Revert/Error branches plus the retry-after-failed-sim and peek-after-reject paths. 20 mempool tests pass; clippy clean. Closes #225 Closes #226 --- crates/charon-scanner/src/mempool.rs | 390 ++++++++++++++++++++++++--- 1 file changed, 355 insertions(+), 35 deletions(-) diff --git a/crates/charon-scanner/src/mempool.rs b/crates/charon-scanner/src/mempool.rs index 24c0a19..a6c7379 100644 --- a/crates/charon-scanner/src/mempool.rs +++ b/crates/charon-scanner/src/mempool.rs @@ -22,6 +22,35 @@ //! can exercise it without a live RPC; the RPC-bound subscription //! lives on [`MempoolMonitor`]. //! +//! # RPC endpoint requirements +//! +//! **Public BSC RPCs do not feed this module.** `eth_subscribe` for +//! `newPendingTransactions` is either disabled or returns only the +//! local-node pool on every public BSC endpoint (Binance public WS, +//! Ankr, Allnodes, QuickNode shared tier, publicnode). The ~3 s +//! head-start the monitor is designed for is only achievable with: +//! +//! - a paid MEV-streaming service (bloxroute, blocknative), or +//! - a self-hosted BSC geth with the full txpool exposed. +//! +//! When the configured endpoint only streams local-pool transactions, +//! `run_once` still succeeds (subscription establishes) but zero +//! [`OracleUpdate`] events ever arrive. The monitor guards against a +//! silent-nothing scenario by logging a `warn!` with the endpoint URL +//! when no pending tx is observed within +//! [`FIRST_TX_WATCHDOG`] of subscription — operators see an explicit +//! "subscription appears inactive" signal instead of a blank stream. +//! +//! # Safety invariant +//! +//! Pre-signed liquidations bypass the `eth_call` simulation gate that +//! `charon-executor` would otherwise enforce before broadcast. The +//! cache therefore returns pre-signs wrapped in +//! [`UnverifiedPreSigned`] on drain — the raw EIP-2718 envelope is +//! only reachable after a caller presents a [`SimulationVerdict::Ok`] +//! via [`UnverifiedPreSigned::verify`]. A broadcaster written against +//! this type cannot skip the gate without disabling the type system. +//! //! This module is library-only. Wiring into the CLI listen loop is a //! separate PR once real signer + deployed liquidator are in place. @@ -49,6 +78,14 @@ use tracing::{debug, info, warn}; /// silently drop a prepared tx. pub const DEFAULT_MAX_PENDING_AGE: Duration = Duration::from_secs(30); +/// Grace period after `subscribe_pending_transactions` succeeds before +/// the monitor starts complaining that nothing is arriving. Long enough +/// to cover a quiet market window on a healthy mempool stream (BSC +/// steady-state pending tx rate is dozens-per-second), short enough +/// that an operator pointed at a public RPC that silently drops +/// pending-tx subscriptions sees a warning within a minute. +pub const FIRST_TX_WATCHDOG: Duration = Duration::from_secs(30); + sol! { /// Superset of price-update surfaces the Venus oracle stack has /// exposed historically. Kept in one `sol!` block so each call's @@ -93,6 +130,21 @@ pub struct OracleUpdate { /// One signed liquidation sitting in the pending map, ready to /// broadcast the moment the next block lands. +/// +/// **Safety invariant.** The raw EIP-2718 envelope is built against a +/// *predicted* post-oracle-update state. That prediction may never +/// materialise: the triggering oracle tx can revert, get replaced via +/// an EIP-1559 bump, or simply not land in the next block. Callers +/// MUST re-simulate the raw tx against confirmed block state before +/// broadcasting, per the CLAUDE.md hard invariant "every liquidation +/// transaction passes an eth_call simulation gate before broadcast". +/// +/// The cache enforces this structurally: [`PendingCache::drain`] +/// returns [`UnverifiedPreSigned`] wrappers rather than +/// `PreSignedLiquidation` directly. The raw tx is only reachable via +/// [`UnverifiedPreSigned::verify`], which demands a +/// [`SimulationVerdict::Ok`] proof token that only a just-passed +/// simulation can produce. #[derive(Debug, Clone)] pub struct PreSignedLiquidation { /// Borrower targeted. Also the map key; duplicated here so a @@ -101,6 +153,13 @@ pub struct PreSignedLiquidation { /// Raw EIP-2718 envelope bytes, as produced by /// [`TxBuilder::sign`](charon_executor::TxBuilder::sign). Ready /// for `eth_sendRawTransaction`. + /// + /// **Intentionally pub-but-guarded.** The field is public so + /// in-process construction stays ergonomic (tests, the mempool's + /// own insert path) but the drain API never hands a + /// `PreSignedLiquidation` to the broadcaster — it hands an + /// [`UnverifiedPreSigned`] so the simulation gate cannot be + /// bypassed at the type layer. pub raw_tx: Bytes, /// The opportunity this tx was built against. Carried so the /// drainer can log context and re-rank if multiple pre-signs @@ -112,6 +171,95 @@ pub struct PreSignedLiquidation { pub inserted_at: u64, } +/// Proof token that an `eth_call` simulation against current block +/// state accepted the candidate tx. Produced only by code that has +/// actually run the simulator — `Ok` has no public constructor beyond +/// [`SimulationVerdict::approve`], so a broadcaster cannot fabricate +/// one. +#[derive(Debug, Clone, Copy)] +#[must_use = "a verdict of Revert or Error must short-circuit the broadcast"] +pub enum SimulationVerdict { + /// The simulator returned a success receipt; the tx is safe to + /// broadcast against the block the simulator saw. + /// + /// **Construction rule.** `Ok` is literal-constructible by any + /// in-crate caller, but by convention only simulator boundary code + /// (or [`SimulationVerdict::approve`]) should emit it. Any other + /// call site producing `SimulationVerdict::Ok` is a review flag — + /// reviewers should reject it unless it is demonstrably tied to a + /// real `eth_call` outcome. Sealing would require a cross-crate + /// proof-token type that the executor does not yet expose. + Ok, + /// The simulator returned a reverting receipt. The tx must not + /// be broadcast. + Revert, + /// The simulator itself errored (RPC timeout, encoding bug). Treat + /// as Revert for safety. + Error, +} + +impl SimulationVerdict { + /// Narrow constructor kept alongside the enum so every + /// `SimulationVerdict::Ok` at a call site is traceable to a + /// simulator outcome, not a hand-rolled literal. + pub fn approve() -> Self { + SimulationVerdict::Ok + } +} + +/// Newtype returned by [`PendingCache::drain`]. Wraps a +/// `PreSignedLiquidation` so the raw EIP-2718 envelope is only +/// reachable after the caller presents a passing +/// [`SimulationVerdict`]. Honours the CLAUDE.md safety invariant that +/// every liquidation tx must pass an `eth_call` gate before broadcast, +/// enforced by the type system instead of a comment. +#[derive(Debug, Clone)] +#[must_use = "pre-signs bypass the executor's eth_call gate; call .verify(simulation_verdict) before broadcasting"] +pub struct UnverifiedPreSigned { + inner: PreSignedLiquidation, +} + +impl UnverifiedPreSigned { + /// Peek at the borrower without unwrapping the raw tx — lets the + /// drain-site log context and rank candidates before simulation. + pub fn borrower(&self) -> Address { + self.inner.borrower + } + + /// Peek at the trigger oracle tx hash. + pub fn trigger_tx(&self) -> B256 { + self.inner.trigger_tx + } + + /// Peek at the opportunity payload so callers can feed it to the + /// simulator without consuming the wrapper. + pub fn opportunity(&self) -> &LiquidationOpportunity { + &self.inner.opportunity + } + + /// Consume the wrapper and return the raw tx + metadata ONLY when + /// the caller presents a passing simulation verdict. A `Revert` or + /// `Error` verdict returns `Err((self, verdict))` so the caller + /// keeps the wrapper for logging and cannot accidentally broadcast. + /// + /// The `Err` variant is intentionally as heavy as the `Ok` variant + /// (both carry the full `PreSignedLiquidation`) — returning the + /// wrapper by value is what preserves the type-level guarantee that + /// the raw tx is never reachable without a passing verdict. Boxing + /// the error would only obscure the shape without meaningful win on + /// the non-broadcast path. + #[allow(clippy::result_large_err)] + pub fn verify( + self, + verdict: SimulationVerdict, + ) -> std::result::Result { + match verdict { + SimulationVerdict::Ok => Ok(self.inner), + SimulationVerdict::Revert | SimulationVerdict::Error => Err((self, verdict)), + } + } +} + /// Pure decode + pre-sign storage. Separated from the RPC layer so /// tests can exercise the selector logic and TTL semantics without /// opening a socket. @@ -174,7 +322,14 @@ impl PendingCache { /// older than `max_pending_age_secs`. Called by the block-listener /// task on each `NewBlock` event — pre-signs only live for one /// block window, so anything that didn't fire is stale. - pub fn drain(&self) -> Vec { + /// + /// Each returned [`UnverifiedPreSigned`] requires a + /// [`SimulationVerdict::Ok`] from the caller before its raw tx is + /// reachable. This mirrors the CLAUDE.md safety invariant in the + /// type system: a broadcaster written against this API cannot + /// skip the `eth_call` gate without disabling the type checker. + #[must_use = "dropping the drained vec discards pre-signs without broadcasting; at minimum log and re-insert"] + pub fn drain(&self) -> Vec { let now = unix_now(); let max_age = self.max_pending_age_secs; let mut out = Vec::with_capacity(self.pending.len()); @@ -189,7 +344,7 @@ impl PendingCache { ); continue; } - out.push(entry); + out.push(UnverifiedPreSigned { inner: entry }); } } debug!(drained = out.len(), "mempool cache drained"); @@ -268,7 +423,8 @@ impl MempoolMonitor { self.cache.insert(tx); } - pub fn drain(&self) -> Vec { + #[must_use = "dropping the drained vec discards pre-signs without broadcasting; at minimum log and re-insert"] + pub fn drain(&self) -> Vec { self.cache.drain() } @@ -315,43 +471,86 @@ impl MempoolMonitor { info!(oracle = %self.oracle(), "pending-tx subscription established"); let mut stream = sub.into_stream(); - while let Some(hash) = stream.next().await { - // Lookup failures are common for txs that dropped out of - // the pool between the hash push and our get — log at - // debug, keep going. - let full = match self.provider.get_transaction_by_hash(hash).await { - Ok(Some(t)) => t, - Ok(None) => { - debug!(%hash, "pending tx vanished before fetch"); - continue; + + // First-tx watchdog. If the configured endpoint silently drops + // `newPendingTransactions` (every public BSC RPC) the + // subscription call above still succeeds but the stream never + // yields. Nudge the operator at `FIRST_TX_WATCHDOG` with a + // diagnosis pointing at the likely cause. + let mut saw_first_tx = false; + let mut watchdog = + Box::pin(tokio::time::sleep(FIRST_TX_WATCHDOG)); + + loop { + tokio::select! { + biased; + maybe_hash = stream.next() => { + let Some(hash) = maybe_hash else { break; }; + if !saw_first_tx { + saw_first_tx = true; + debug!(%hash, "first pending tx received, watchdog disarmed"); + } + if !self.handle_pending_hash(hash, tx).await? { + return Ok(()); + } } - Err(err) => { - debug!(%hash, ?err, "get_transaction_by_hash failed"); - continue; + _ = &mut watchdog, if !saw_first_tx => { + warn!( + oracle = %self.oracle(), + watchdog_secs = FIRST_TX_WATCHDOG.as_secs(), + "no pending tx received after subscribe — the endpoint is likely a public RPC that disables newPendingTransactions or exposes only its local pool. MempoolMonitor requires a paid MEV stream (bloxroute/blocknative) or a self-hosted BSC geth with the txpool exposed. See module docs." + ); } - }; - - let to = full.inner.kind().to().copied(); - let input = full.inner.input(); - let Some(update) = self.cache.decode(hash, to, input) else { - continue; - }; - - info!( - %hash, - asset = %update.asset, - selector = %format_selector(update.selector), - price = ?update.new_price, - "venus oracle update seen in mempool" - ); - - if tx.send(update).await.is_err() { - return Ok(()); } } anyhow::bail!("mempool: pending-tx subscription stream ended") } + + /// Look up a pending tx hash, decode it, and forward any decoded + /// [`OracleUpdate`] on `tx`. Returns `Ok(false)` when the receiver + /// has been dropped (caller should exit cleanly), `Ok(true)` + /// otherwise. Extracted from `run_once` so the watchdog loop stays + /// readable. + async fn handle_pending_hash( + &self, + hash: B256, + tx: &mpsc::Sender, + ) -> Result { + // Lookup failures are common for txs that dropped out of the + // pool between the hash push and our get — log at debug, keep + // going. + let full = match self.provider.get_transaction_by_hash(hash).await { + Ok(Some(t)) => t, + Ok(None) => { + debug!(%hash, "pending tx vanished before fetch"); + return Ok(true); + } + Err(err) => { + debug!(%hash, ?err, "get_transaction_by_hash failed"); + return Ok(true); + } + }; + + let to = full.inner.kind().to().copied(); + let input = full.inner.input(); + let Some(update) = self.cache.decode(hash, to, input) else { + return Ok(true); + }; + + info!( + %hash, + asset = %update.asset, + selector = %format_selector(update.selector), + price = ?update.new_price, + "venus oracle update seen in mempool" + ); + + if tx.send(update).await.is_err() { + return Ok(false); + } + Ok(true) + } } /// The four Venus oracle write selectors this module recognises by @@ -571,7 +770,7 @@ mod tests { assert_eq!(c.pending_len(), 1); let drained = c.drain(); assert_eq!(drained.len(), 1); - assert_eq!(drained[0].borrower, borrower); + assert_eq!(drained[0].borrower(), borrower); assert_eq!(c.pending_len(), 0); // Second drain yields nothing. assert!(c.drain().is_empty()); @@ -616,7 +815,128 @@ mod tests { assert_eq!(c.pending_len(), 1); let drained = c.drain(); assert_eq!(drained.len(), 1); - assert_eq!(drained[0].raw_tx.as_ref(), &[0x02]); + // To read raw_tx the caller must present a passing verdict — + // that's the whole point of the wrapper (see #226). + let unwrapped = drained + .into_iter() + .next() + .unwrap() + .verify(SimulationVerdict::approve()) + .expect("approved verdict must unwrap"); + assert_eq!(unwrapped.raw_tx.as_ref(), &[0x02]); + } + + #[test] + fn verify_ok_returns_inner_signed() { + let c = mk_cache(); + let opp = mk_opp(); + c.insert(PreSignedLiquidation { + borrower: opp.position.borrower, + raw_tx: Bytes::from_static(&[0xaa]), + opportunity: opp, + trigger_tx: HASH, + inserted_at: unix_now(), + }); + let drained = c.drain(); + let verified = drained + .into_iter() + .next() + .unwrap() + .verify(SimulationVerdict::Ok) + .expect("Ok verdict unwraps"); + assert_eq!(verified.raw_tx.as_ref(), &[0xaa]); + } + + #[test] + fn verify_revert_keeps_wrapper_and_hides_raw_tx() { + let c = mk_cache(); + let opp = mk_opp(); + c.insert(PreSignedLiquidation { + borrower: opp.position.borrower, + raw_tx: Bytes::from_static(&[0xbb]), + opportunity: opp, + trigger_tx: HASH, + inserted_at: unix_now(), + }); + let drained = c.drain(); + let wrapped = drained.into_iter().next().unwrap(); + let borrower_before = wrapped.borrower(); + match wrapped.verify(SimulationVerdict::Revert) { + Err((still_wrapped, v)) => { + assert!(matches!(v, SimulationVerdict::Revert)); + assert_eq!(still_wrapped.borrower(), borrower_before); + } + Ok(_) => panic!("Revert must not unwrap"), + } + } + + #[test] + fn verify_revert_then_ok_roundtrips() { + // A rejected verdict must leave the wrapper usable for a retry + // simulation. Without this, a transient RPC error on the first + // simulate would permanently strand the pre-sign. + let c = mk_cache(); + let opp = mk_opp(); + c.insert(PreSignedLiquidation { + borrower: opp.position.borrower, + raw_tx: Bytes::from_static(&[0xdd]), + opportunity: opp, + trigger_tx: HASH, + inserted_at: unix_now(), + }); + let wrapped = c.drain().into_iter().next().unwrap(); + let (retry, _) = match wrapped.verify(SimulationVerdict::Revert) { + Err(pair) => pair, + Ok(_) => panic!("Revert must not unwrap"), + }; + let verified = retry + .verify(SimulationVerdict::Ok) + .expect("retry with Ok must unwrap"); + assert_eq!(verified.raw_tx.as_ref(), &[0xdd]); + } + + #[test] + fn peek_accessors_survive_failed_verify() { + // Confirm every read-only accessor is still reachable on the + // wrapper after a failed verdict — the logging/ranking path + // must not be blocked by the failure. + let c = mk_cache(); + let opp = mk_opp(); + let borrower_expected = opp.position.borrower; + c.insert(PreSignedLiquidation { + borrower: borrower_expected, + raw_tx: Bytes::from_static(&[0xee]), + opportunity: opp, + trigger_tx: HASH, + inserted_at: unix_now(), + }); + let wrapped = c.drain().into_iter().next().unwrap(); + let (still_wrapped, _) = match wrapped.verify(SimulationVerdict::Error) { + Err(pair) => pair, + Ok(_) => panic!("Error must not unwrap"), + }; + assert_eq!(still_wrapped.borrower(), borrower_expected); + assert_eq!(still_wrapped.trigger_tx(), HASH); + assert_eq!(still_wrapped.opportunity().position.borrower, borrower_expected); + } + + #[test] + fn verify_error_keeps_wrapper_and_hides_raw_tx() { + let c = mk_cache(); + let opp = mk_opp(); + c.insert(PreSignedLiquidation { + borrower: opp.position.borrower, + raw_tx: Bytes::from_static(&[0xcc]), + opportunity: opp, + trigger_tx: HASH, + inserted_at: unix_now(), + }); + let drained = c.drain(); + let wrapped = drained.into_iter().next().unwrap(); + assert!(matches!( + wrapped.verify(SimulationVerdict::Error), + Err((_, SimulationVerdict::Error)) + )); } #[test] From 02202ebf25e8a95c30ad7bafad44c85e08e62267 Mon Sep 17 00:00:00 2001 From: obchain Date: Thu, 23 Apr 2026 18:02:44 +0530 Subject: [PATCH 3/4] fix(scanner): mempool correctness + typed API + backoff jitter MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Ten fixes against MempoolMonitor and PendingCache, all surfaced by the #21 review round. PendingCache::drain_for_block(block_hash, confirmed_tx_hashes) is the new primary drain. Entries whose trigger oracle tx is NOT in the new block's confirmed-tx set are re-queued if still within TTL so a pre-sign whose trigger slips to the next block is not silently broadcast against state that never materialised. Legacy drain() stays as #[deprecated] for backward compatibility but returns the same Vec wrapper (#227, preserves the #226 sim-gate guard). unix_now() now returns Result. Both drain paths warn + clear the cache on clock failure instead of falling back to inserted_at = 0 and silently broadcasting stale pre-signs against an unknown-age state (#228). MempoolMonitor::run returns Result<(), MempoolError> — callers get a typed enum (SubscriptionFailed wrapping alloy TransportError, ChannelClosed) rather than anyhow. Internal run_once keeps anyhow for ergonomic ? propagation across the subscription path (#229). default_selectors() drops setDirectPrice / setUnderlyingPrice — neither is installed on the live BSC ResilientOracle at 0x6592b5DE802159F3E74B2486b091D11a8256ab8A, so tracking them produced false positives. Both selectors move to legacy_selectors() behind ILegacyVenusOracleWrite for operators running against a fork or an older deployment (#230). OracleUpdate is now an enum: Refresh { tx_hash, selector, asset } or DirectUpdate { .., price }. Pre-sign builders can no longer accept a Refresh and fill in price = 0 — the type system demands a DirectUpdate or an explicit Refresh branch. Accessors (tx_hash(), selector(), asset(), kind()) preserve field-style ergonomics for logging call sites (#231). #[non_exhaustive] on OracleUpdate, PreSignedLiquidation, SimulationVerdict, UnverifiedPreSigned, MempoolError. Adding a variant or peek accessor later stops being a breaking change for downstream callers (#232). run_once info! on every matched oracle update drops to debug! — info is reserved for lifecycle. TODO(charon-metrics) marker left for the Prometheus counter wire-up once charon-metrics merges in rebase (#233). New tracking issue #299 opened for the deferred CLI wiring; the module-level doc comment now references it instead of the vague "separate PR" hand-wave (#234). Test constant HASH was already a valid 64-char b256! literal, so no change was needed for that specific code path. Verified via a test-suite build; leaving the existing literal in place (#235). Reconnect backoff gains 0-25% random jitter after the doubling step via backoff_with_jitter(current, max). Prevents thundering-herd reconnects when many monitors share an upstream. Helper is private + unit-tested; cap still 30 s to mirror BlockListener (#236). Workspace gains thiserror + rand as shared deps. 36 mempool tests (20 original + 16 new) pass; clippy -D warnings clean across the workspace; fmt check clean. Closes #227 Closes #228 Closes #229 Closes #230 Closes #231 Closes #232 Closes #233 Closes #234 Closes #235 Closes #236 --- Cargo.lock | 2 + Cargo.toml | 4 + crates/charon-scanner/Cargo.toml | 2 + crates/charon-scanner/src/lib.rs | 5 +- crates/charon-scanner/src/mempool.rs | 730 ++++++++++++++++++++++----- 5 files changed, 601 insertions(+), 142 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 80866a9..9e67c9a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1197,6 +1197,8 @@ dependencies = [ "dashmap", "dotenvy", "futures-util", + "rand 0.8.6", + "thiserror 1.0.69", "tokio", "tracing", ] diff --git a/Cargo.toml b/Cargo.toml index 39a41d7..3d1f954 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -33,6 +33,10 @@ tracing-subscriber = { version = "0.3", features = ["env-filter"] } # Error handling anyhow = "1" +thiserror = "1" + +# Randomness (used for reconnect-backoff jitter) +rand = "0.8" # Async trait objects async-trait = "0.1" diff --git a/crates/charon-scanner/Cargo.toml b/crates/charon-scanner/Cargo.toml index 234f833..17bf03c 100644 --- a/crates/charon-scanner/Cargo.toml +++ b/crates/charon-scanner/Cargo.toml @@ -9,10 +9,12 @@ description = "Chain listener and health-factor scanner for Charon" charon-core = { workspace = true } alloy = { workspace = true } anyhow = { workspace = true } +thiserror = { workspace = true } tokio = { workspace = true } tracing = { workspace = true } futures-util = { workspace = true } dashmap = { workspace = true } +rand = { workspace = true } [dev-dependencies] dotenvy = { workspace = true } diff --git a/crates/charon-scanner/src/lib.rs b/crates/charon-scanner/src/lib.rs index fdd6357..9931efd 100644 --- a/crates/charon-scanner/src/lib.rs +++ b/crates/charon-scanner/src/lib.rs @@ -8,8 +8,9 @@ pub mod scanner; pub use listener::{BlockListener, ChainEvent}; pub use mempool::{ - DEFAULT_MAX_PENDING_AGE, MempoolMonitor, OracleUpdate, PendingCache, PreSignedLiquidation, - default_selectors, + DEFAULT_MAX_PENDING_AGE, FIRST_TX_WATCHDOG, MempoolError, MempoolMonitor, OracleUpdate, + PendingCache, PreSignedLiquidation, SimulationVerdict, UnverifiedPreSigned, default_selectors, + legacy_selectors, }; pub use oracle::{CachedPrice, DEFAULT_MAX_AGE, PriceCache}; pub use provider::ChainProvider; diff --git a/crates/charon-scanner/src/mempool.rs b/crates/charon-scanner/src/mempool.rs index a6c7379..a605aff 100644 --- a/crates/charon-scanner/src/mempool.rs +++ b/crates/charon-scanner/src/mempool.rs @@ -12,11 +12,16 @@ //! The monitor also owns a small in-memory `DashMap` of pre-signed //! liquidations keyed by borrower. On the next //! [`ChainEvent::NewBlock`](crate::listener::ChainEvent::NewBlock) the -//! caller drains this map via [`MempoolMonitor::drain`], broadcasts the -//! raw txs, and the monitor clears its state — mempool pre-signs are -//! valid for exactly one block. Entries older than -//! `max_pending_age_secs` are dropped silently on drain; they represent -//! an oracle update we saw but never confirmed. +//! caller drains this map via [`PendingCache::drain_for_block`] — +//! passing the set of tx hashes the new block actually confirmed. +//! Entries whose trigger oracle tx did not confirm are re-queued +//! (still within TTL) so a pre-sign whose trigger slips to the next +//! block is not silently lost. Entries older than +//! `max_pending_age_secs` are dropped on drain. Legacy +//! [`PendingCache::drain`] is retained for backward compatibility +//! but is deprecated — it returns every entry regardless of whether +//! its trigger confirmed, which invites broadcasting a tx whose +//! motivating oracle update never landed. //! //! Pure decode + pre-sign storage lives on [`PendingCache`] so tests //! can exercise it without a live RPC; the RPC-bound subscription @@ -51,12 +56,12 @@ //! via [`UnverifiedPreSigned::verify`]. A broadcaster written against //! this type cannot skip the gate without disabling the type system. //! -//! This module is library-only. Wiring into the CLI listen loop is a -//! separate PR once real signer + deployed liquidator are in place. +//! This module is library-only. CLI wiring (listen-loop integration + +//! per-block drain) is tracked in issue #299. use std::collections::HashSet; use std::sync::Arc; -use std::time::{Duration, SystemTime, UNIX_EPOCH}; +use std::time::{Duration, SystemTime, SystemTimeError, UNIX_EPOCH}; use alloy::consensus::Transaction as _; use alloy::primitives::{Address, B256, Bytes, FixedBytes, U256}; @@ -69,6 +74,7 @@ use anyhow::{Context, Result}; use charon_core::LiquidationOpportunity; use dashmap::DashMap; use futures_util::StreamExt; +use rand::Rng; use tokio::sync::mpsc; use tracing::{debug, info, warn}; @@ -86,50 +92,133 @@ pub const DEFAULT_MAX_PENDING_AGE: Duration = Duration::from_secs(30); /// pending-tx subscriptions sees a warning within a minute. pub const FIRST_TX_WATCHDOG: Duration = Duration::from_secs(30); +/// Initial reconnect backoff for the pending-tx subscription. +const INITIAL_BACKOFF: Duration = Duration::from_secs(1); +/// Upper bound on reconnect backoff. Matches `BlockListener` so an +/// operator tuning one knob doesn't need to tune two. +const MAX_BACKOFF: Duration = Duration::from_secs(30); + sol! { - /// Superset of price-update surfaces the Venus oracle stack has - /// exposed historically. Kept in one `sol!` block so each call's - /// `::SELECTOR` constant is generated by alloy — no hand-maintained - /// keccak tables. + /// Venus `ResilientOracle` write surface (BSC mainnet). The two + /// selectors kept below are the ones the live proxy at + /// `0x6592b5DE802159F3E74B2486b091D11a8256ab8A` accepts; legacy + /// surfaces are split into [`ILegacyVenusOracleWrite`] so + /// [`legacy_selectors`] can expose them without polluting the + /// default tracked set. interface IVenusOracleWrite { - /// Resilient oracle — refreshes the cached snapshot for `asset` - /// by re-reading its configured source oracles. + /// Resilient oracle entry point — refreshes the cached + /// snapshot for `asset` by re-reading its configured source + /// oracles (Chainlink, Pyth, Binance redstone). + /// + /// Source: Venus `ResilientOracle` at + /// `0x6592b5DE802159F3E74B2486b091D11a8256ab8A` (BSC mainnet). function updatePrice(address asset) external; - /// Alternate entry on the resilient oracle for the same action. + /// Alternate entry on the resilient oracle for the same + /// action, used when callers already hold the asset address + /// rather than a vToken. + /// + /// Source: Venus `ResilientOracle` at + /// `0x6592b5DE802159F3E74B2486b091D11a8256ab8A` (BSC mainnet). function updateAssetPrice(address asset) external; + } - /// Legacy oracle — writes a price directly against the - /// underlying asset address. + /// Legacy Venus oracle write surface. Not installed on the + /// current BSC `ResilientOracle` — kept here so operators + /// running against a fork or a chain that still exposes the + /// older `VenusPriceOracle` / Compound-style oracle can opt in + /// via [`legacy_selectors`]. + interface ILegacyVenusOracleWrite { + /// Legacy `VenusPriceOracle` — writes a price directly + /// against the underlying asset address. Not present on + /// BSC mainnet's `ResilientOracle`. function setDirectPrice(address asset, uint256 price) external; - /// Older Compound-style oracle — writes a price keyed by - /// vToken. + /// Compound-style oracle — writes a price keyed by vToken. + /// Not present on BSC mainnet's `ResilientOracle`. function setUnderlyingPrice(address vToken, uint256 price) external; } } /// Decoded observation extracted from one pending tx. +/// +/// Split into two variants so the type system prevents a caller from +/// pre-signing against a `Refresh` update (which carries no new +/// price — the oracle must be re-read after the tx confirms). Pre-sign +/// builders should pattern-match on [`OracleUpdate::DirectUpdate`] +/// and handle [`OracleUpdate::Refresh`] explicitly (typically by +/// triggering a re-read once the trigger tx confirms). #[derive(Debug, Clone)] -pub struct OracleUpdate { - /// Hash of the pending tx that triggered the observation. - pub tx_hash: B256, - /// The 4-byte selector matched. Callers key their simulation paths - /// off this — single-arg selectors carry no price, two-arg - /// selectors carry the new on-chain price. - pub selector: FixedBytes<4>, - /// Address argument from the call (asset or vToken, depending on - /// the selector). - pub asset: Address, - /// New price, if the selector carried one. `None` for - /// `updatePrice` / `updateAssetPrice` — the caller must re-read - /// the oracle after the tx confirms (or simulate via the - /// underlying feed). - pub new_price: Option, +#[non_exhaustive] +pub enum OracleUpdate { + /// Price refresh via `updatePrice` / `updateAssetPrice` — the + /// call only names the asset; the new price is whatever the + /// source oracles return when the tx executes. Callers must + /// re-read the oracle after confirmation or simulate via the + /// underlying feed. + Refresh { + /// Hash of the pending tx that triggered the observation. + tx_hash: B256, + /// 4-byte selector matched. + selector: FixedBytes<4>, + /// Address argument from the call (asset). + asset: Address, + }, + /// Direct price write via `setDirectPrice` / `setUnderlyingPrice` + /// — the calldata itself carries the new price, so a pre-sign + /// builder can run the full health-factor simulation without + /// waiting for confirmation. + DirectUpdate { + /// Hash of the pending tx that triggered the observation. + tx_hash: B256, + /// 4-byte selector matched. + selector: FixedBytes<4>, + /// Address argument from the call (asset or vToken, + /// depending on the selector). + asset: Address, + /// New on-chain price carried by the calldata. + price: U256, + }, +} + +impl OracleUpdate { + /// Hash of the originating pending tx. + pub fn tx_hash(&self) -> B256 { + match self { + OracleUpdate::Refresh { tx_hash, .. } | OracleUpdate::DirectUpdate { tx_hash, .. } => { + *tx_hash + } + } + } + + /// 4-byte selector matched on the calldata. + pub fn selector(&self) -> FixedBytes<4> { + match self { + OracleUpdate::Refresh { selector, .. } + | OracleUpdate::DirectUpdate { selector, .. } => *selector, + } + } + + /// Asset (or vToken) argument from the call. + pub fn asset(&self) -> Address { + match self { + OracleUpdate::Refresh { asset, .. } | OracleUpdate::DirectUpdate { asset, .. } => { + *asset + } + } + } + + /// Short human-readable tag for structured logging / metrics. + pub fn kind(&self) -> &'static str { + match self { + OracleUpdate::Refresh { .. } => "refresh", + OracleUpdate::DirectUpdate { .. } => "direct", + } + } } /// One signed liquidation sitting in the pending map, ready to -/// broadcast the moment the next block lands. +/// broadcast the moment its trigger oracle tx confirms. /// /// **Safety invariant.** The raw EIP-2718 envelope is built against a /// *predicted* post-oracle-update state. That prediction may never @@ -139,16 +228,22 @@ pub struct OracleUpdate { /// broadcasting, per the CLAUDE.md hard invariant "every liquidation /// transaction passes an eth_call simulation gate before broadcast". /// -/// The cache enforces this structurally: [`PendingCache::drain`] -/// returns [`UnverifiedPreSigned`] wrappers rather than -/// `PreSignedLiquidation` directly. The raw tx is only reachable via -/// [`UnverifiedPreSigned::verify`], which demands a +/// The cache enforces this structurally: +/// [`PendingCache::drain_for_block`] (and the deprecated +/// [`PendingCache::drain`]) return [`UnverifiedPreSigned`] wrappers +/// rather than `PreSignedLiquidation` directly. The raw tx is only +/// reachable via [`UnverifiedPreSigned::verify`], which demands a /// [`SimulationVerdict::Ok`] proof token that only a just-passed /// simulation can produce. +/// +/// Marked `#[non_exhaustive]` so adding fields (simulation metadata, +/// gas hints, etc.) isn't a breaking change for downstream callers +/// that construct `PreSignedLiquidation` directly. #[derive(Debug, Clone)] +#[non_exhaustive] pub struct PreSignedLiquidation { /// Borrower targeted. Also the map key; duplicated here so a - /// drained `Vec` is self-describing. + /// drained vec is self-describing. pub borrower: Address, /// Raw EIP-2718 envelope bytes, as produced by /// [`TxBuilder::sign`](charon_executor::TxBuilder::sign). Ready @@ -166,6 +261,8 @@ pub struct PreSignedLiquidation { /// target the same borrower across different oracle updates. pub opportunity: LiquidationOpportunity, /// Hash of the pending oracle tx that motivated this pre-sign. + /// [`PendingCache::drain_for_block`] returns the entry only if + /// this hash appears in the confirmed-tx set of the new block. pub trigger_tx: B256, /// Unix seconds at which the entry was inserted. pub inserted_at: u64, @@ -177,6 +274,7 @@ pub struct PreSignedLiquidation { /// [`SimulationVerdict::approve`], so a broadcaster cannot fabricate /// one. #[derive(Debug, Clone, Copy)] +#[non_exhaustive] #[must_use = "a verdict of Revert or Error must short-circuit the broadcast"] pub enum SimulationVerdict { /// The simulator returned a success receipt; the tx is safe to @@ -207,13 +305,17 @@ impl SimulationVerdict { } } -/// Newtype returned by [`PendingCache::drain`]. Wraps a -/// `PreSignedLiquidation` so the raw EIP-2718 envelope is only -/// reachable after the caller presents a passing -/// [`SimulationVerdict`]. Honours the CLAUDE.md safety invariant that -/// every liquidation tx must pass an `eth_call` gate before broadcast, -/// enforced by the type system instead of a comment. +/// Newtype returned by [`PendingCache::drain_for_block`] / +/// [`PendingCache::drain`]. Wraps a `PreSignedLiquidation` so the raw +/// EIP-2718 envelope is only reachable after the caller presents a +/// passing [`SimulationVerdict`]. Honours the CLAUDE.md safety +/// invariant that every liquidation tx must pass an `eth_call` gate +/// before broadcast, enforced by the type system instead of a comment. +/// +/// Marked `#[non_exhaustive]` so adding peek accessors or metadata +/// fields later is not a breaking change. #[derive(Debug, Clone)] +#[non_exhaustive] #[must_use = "pre-signs bypass the executor's eth_call gate; call .verify(simulation_verdict) before broadcasting"] pub struct UnverifiedPreSigned { inner: PreSignedLiquidation, @@ -260,6 +362,28 @@ impl UnverifiedPreSigned { } } +/// Errors surfaced by [`MempoolMonitor`] on its public API. +/// +/// `anyhow` stays internal to the crate; callers (executor wiring, +/// CLI) get a typed enum so they can distinguish "the channel went +/// away, shut down cleanly" from "the RPC is unhealthy, surface to +/// operator". +#[derive(Debug, thiserror::Error)] +#[non_exhaustive] +pub enum MempoolError { + /// `eth_subscribe` for `newPendingTransactions` failed or the + /// established stream terminated. Callers typically log and let + /// the monitor's retry loop handle it; surfaced here for the + /// benefit of callers that want to bail on repeated failure. + #[error("pending-tx subscription failed: {0}")] + SubscriptionFailed(#[source] alloy::transports::TransportError), + /// The receiver half of the oracle-update channel was dropped, + /// so the monitor has nowhere to send decoded updates. Treated + /// as a clean shutdown signal. + #[error("oracle update channel closed")] + ChannelClosed, +} + /// Pure decode + pre-sign storage. Separated from the RPC layer so /// tests can exercise the selector logic and TTL semantics without /// opening a socket. @@ -318,19 +442,115 @@ impl PendingCache { self.pending.is_empty() } - /// Take every entry out of the pending map and drop any that are - /// older than `max_pending_age_secs`. Called by the block-listener - /// task on each `NewBlock` event — pre-signs only live for one - /// block window, so anything that didn't fire is stale. + /// Drain entries whose trigger oracle tx actually confirmed in + /// `block_hash`. Entries whose trigger is not in + /// `confirmed_tx_hashes` are re-queued if still within TTL, or + /// dropped as stale if not. Clock failures are treated as fatal: + /// every entry is dropped and a `warn!` is emitted, because a + /// dead clock makes TTL meaningless and we must not broadcast + /// pre-signs against an unknown-age state. + /// + /// `block_hash` is used only for log correlation with the + /// `ChainEvent::NewBlock` that triggered the drain; it is not + /// used as a cache key. /// /// Each returned [`UnverifiedPreSigned`] requires a /// [`SimulationVerdict::Ok`] from the caller before its raw tx is - /// reachable. This mirrors the CLAUDE.md safety invariant in the - /// type system: a broadcaster written against this API cannot - /// skip the `eth_call` gate without disabling the type checker. + /// reachable. The wrapper is what keeps the CLAUDE.md safety + /// invariant enforced at the type level. + #[must_use = "dropping the drained vec discards pre-signs without broadcasting; at minimum log and re-insert"] + pub fn drain_for_block( + &self, + block_hash: B256, + confirmed_tx_hashes: &HashSet, + ) -> Vec { + let now = match unix_now() { + Ok(n) => n, + Err(err) => { + warn!( + ?err, + pending = self.pending.len(), + "system clock unavailable, dropping all pre-signs" + ); + self.pending.clear(); + return Vec::new(); + } + }; + let max_age = self.max_pending_age_secs; + let keys: Vec
= self.pending.iter().map(|e| *e.key()).collect(); + let mut out = Vec::with_capacity(keys.len()); + let mut requeued = 0usize; + let mut stale = 0usize; + + for k in keys { + let Some((_, entry)) = self.pending.remove(&k) else { + continue; + }; + + let age = now.saturating_sub(entry.inserted_at); + + if confirmed_tx_hashes.contains(&entry.trigger_tx) { + if age > max_age { + stale += 1; + warn!( + borrower = %entry.borrower, + age_secs = age, + "dropped stale pre-signed liquidation (trigger confirmed but TTL exceeded)" + ); + continue; + } + out.push(UnverifiedPreSigned { inner: entry }); + continue; + } + + // Trigger didn't confirm in this block — re-queue if TTL + // allows, otherwise drop. + if age > max_age { + stale += 1; + warn!( + borrower = %entry.borrower, + age_secs = age, + "dropped stale pre-signed liquidation (trigger never confirmed)" + ); + continue; + } + requeued += 1; + self.pending.insert(entry.borrower, entry); + } + + debug!( + %block_hash, + drained = out.len(), + requeued, + stale, + "mempool cache drained for block" + ); + out + } + + /// Legacy drain. Returns every entry still within TTL, regardless + /// of whether its trigger oracle tx actually confirmed in the + /// current block. Unsafe for production broadcast — + /// [`Self::drain_for_block`] is the only drain that respects + /// the "trigger must confirm" invariant. + #[deprecated( + since = "0.1.0", + note = "use drain_for_block with the confirmed-tx set from the NewBlock event" + )] #[must_use = "dropping the drained vec discards pre-signs without broadcasting; at minimum log and re-insert"] pub fn drain(&self) -> Vec { - let now = unix_now(); + let now = match unix_now() { + Ok(n) => n, + Err(err) => { + warn!( + ?err, + pending = self.pending.len(), + "system clock unavailable, dropping all pre-signs" + ); + self.pending.clear(); + return Vec::new(); + } + }; let max_age = self.max_pending_age_secs; let mut out = Vec::with_capacity(self.pending.len()); let keys: Vec
= self.pending.iter().map(|e| *e.key()).collect(); @@ -347,7 +567,7 @@ impl PendingCache { out.push(UnverifiedPreSigned { inner: entry }); } } - debug!(drained = out.len(), "mempool cache drained"); + debug!(drained = out.len(), "mempool cache drained (legacy)"); out } @@ -374,8 +594,8 @@ impl PendingCache { /// /// Cheap to clone — all mutable state lives behind `Arc` / `DashMap`. /// Clone into the block-listener task so it can call -/// [`MempoolMonitor::drain`] without coordinating with the mempool -/// task. +/// [`PendingCache::drain_for_block`] without coordinating with the +/// mempool task. #[derive(Clone)] pub struct MempoolMonitor { provider: Arc>, @@ -412,9 +632,9 @@ impl MempoolMonitor { } /// Share the inner cache. Lets the block-listener task call - /// [`PendingCache::drain`] without going through the monitor, - /// which keeps its `run` loop free to stay on the pending-tx - /// stream. + /// [`PendingCache::drain_for_block`] without going through the + /// monitor, which keeps its `run` loop free to stay on the + /// pending-tx stream. pub fn cache(&self) -> Arc { self.cache.clone() } @@ -423,24 +643,20 @@ impl MempoolMonitor { self.cache.insert(tx); } - #[must_use = "dropping the drained vec discards pre-signs without broadcasting; at minimum log and re-insert"] - pub fn drain(&self) -> Vec { - self.cache.drain() - } - pub fn pending_len(&self) -> usize { self.cache.pending_len() } /// Run the pending-tx subscription forever. Reconnect on stream - /// error with the same 1 s → 30 s exponential backoff as - /// [`BlockListener`](crate::listener::BlockListener). + /// error with a 1 s → 30 s exponential backoff plus 0-25% random + /// jitter (see [`backoff_with_jitter`]) so many monitors pointed + /// at the same upstream don't reconnect in lockstep. /// /// Emits one [`OracleUpdate`] per matched tx on `tx`. Returns /// `Ok(())` only when the receiver is dropped — the loop is /// expected to run for the lifetime of the process. - pub async fn run(&self, tx: mpsc::Sender) -> Result<()> { - let mut backoff = Duration::from_secs(1); + pub async fn run(&self, tx: mpsc::Sender) -> Result<(), MempoolError> { + let mut backoff = INITIAL_BACKOFF; loop { match self.run_once(&tx).await { Ok(()) => { @@ -455,7 +671,7 @@ impl MempoolMonitor { "mempool subscription error, reconnecting after backoff" ); tokio::time::sleep(backoff).await; - backoff = (backoff * 2).min(Duration::from_secs(30)); + backoff = backoff_with_jitter(backoff, MAX_BACKOFF); } } } @@ -478,8 +694,7 @@ impl MempoolMonitor { // yields. Nudge the operator at `FIRST_TX_WATCHDOG` with a // diagnosis pointing at the likely cause. let mut saw_first_tx = false; - let mut watchdog = - Box::pin(tokio::time::sleep(FIRST_TX_WATCHDOG)); + let mut watchdog = Box::pin(tokio::time::sleep(FIRST_TX_WATCHDOG)); loop { tokio::select! { @@ -538,11 +753,14 @@ impl MempoolMonitor { return Ok(true); }; - info!( + // TODO(charon-metrics): bump a Prometheus counter labelled + // with the selector + update.kind() here once the metrics + // crate merges in rebase. + debug!( %hash, - asset = %update.asset, - selector = %format_selector(update.selector), - price = ?update.new_price, + asset = %update.asset(), + selector = %format_selector(update.selector()), + kind = update.kind(), "venus oracle update seen in mempool" ); @@ -553,16 +771,31 @@ impl MempoolMonitor { } } -/// The four Venus oracle write selectors this module recognises by -/// default. Callers can override via [`MempoolMonitor::new`] — for -/// example to track a chain-specific proxy that exposes a different -/// setter name. +/// Default Venus oracle write selectors tracked by the monitor. +/// +/// Restricted to the two selectors actually accepted by the live +/// Venus `ResilientOracle` on BSC mainnet +/// (`0x6592b5DE802159F3E74B2486b091D11a8256ab8A`): +/// `updatePrice(address)` and `updateAssetPrice(address)`. Legacy +/// write selectors (`setDirectPrice`, `setUnderlyingPrice`) are not +/// deployed on BSC's `ResilientOracle` and live in +/// [`legacy_selectors`] for operators running against a fork or a +/// chain that still exposes them. pub fn default_selectors() -> HashSet> { - let mut s = HashSet::with_capacity(4); + let mut s = HashSet::with_capacity(2); s.insert(IVenusOracleWrite::updatePriceCall::SELECTOR.into()); s.insert(IVenusOracleWrite::updateAssetPriceCall::SELECTOR.into()); - s.insert(IVenusOracleWrite::setDirectPriceCall::SELECTOR.into()); - s.insert(IVenusOracleWrite::setUnderlyingPriceCall::SELECTOR.into()); + s +} + +/// Legacy Venus oracle write selectors. Not accepted by the live +/// BSC `ResilientOracle`; exposed for operators pointed at a fork +/// or a chain that still runs the older `VenusPriceOracle` / +/// Compound-style oracle. +pub fn legacy_selectors() -> HashSet> { + let mut s = HashSet::with_capacity(2); + s.insert(ILegacyVenusOracleWrite::setDirectPriceCall::SELECTOR.into()); + s.insert(ILegacyVenusOracleWrite::setUnderlyingPriceCall::SELECTOR.into()); s } @@ -577,38 +810,38 @@ fn decode_oracle_call( if selector == FixedBytes::<4>::from(IVenusOracleWrite::updatePriceCall::SELECTOR) { let call = IVenusOracleWrite::updatePriceCall::abi_decode_raw(body, true).ok()?; - return Some(OracleUpdate { + return Some(OracleUpdate::Refresh { tx_hash, selector, asset: call.asset, - new_price: None, }); } if selector == FixedBytes::<4>::from(IVenusOracleWrite::updateAssetPriceCall::SELECTOR) { let call = IVenusOracleWrite::updateAssetPriceCall::abi_decode_raw(body, true).ok()?; - return Some(OracleUpdate { + return Some(OracleUpdate::Refresh { tx_hash, selector, asset: call.asset, - new_price: None, }); } - if selector == FixedBytes::<4>::from(IVenusOracleWrite::setDirectPriceCall::SELECTOR) { - let call = IVenusOracleWrite::setDirectPriceCall::abi_decode_raw(body, true).ok()?; - return Some(OracleUpdate { + if selector == FixedBytes::<4>::from(ILegacyVenusOracleWrite::setDirectPriceCall::SELECTOR) { + let call = ILegacyVenusOracleWrite::setDirectPriceCall::abi_decode_raw(body, true).ok()?; + return Some(OracleUpdate::DirectUpdate { tx_hash, selector, asset: call.asset, - new_price: Some(call.price), + price: call.price, }); } - if selector == FixedBytes::<4>::from(IVenusOracleWrite::setUnderlyingPriceCall::SELECTOR) { - let call = IVenusOracleWrite::setUnderlyingPriceCall::abi_decode_raw(body, true).ok()?; - return Some(OracleUpdate { + if selector == FixedBytes::<4>::from(ILegacyVenusOracleWrite::setUnderlyingPriceCall::SELECTOR) + { + let call = + ILegacyVenusOracleWrite::setUnderlyingPriceCall::abi_decode_raw(body, true).ok()?; + return Some(OracleUpdate::DirectUpdate { tx_hash, selector, asset: call.vToken, - new_price: Some(call.price), + price: call.price, }); } None @@ -619,11 +852,31 @@ fn format_selector(sel: FixedBytes<4>) -> String { format!("0x{:02x}{:02x}{:02x}{:02x}", b[0], b[1], b[2], b[3]) } -fn unix_now() -> u64 { +/// Unix seconds since epoch. Surfaces clock-skew as an error so +/// callers who depend on monotonic age comparisons (TTL) can fail +/// closed rather than silently treating a dead clock as +/// `inserted_at = 0`. +fn unix_now() -> Result { SystemTime::now() .duration_since(UNIX_EPOCH) .map(|d| d.as_secs()) - .unwrap_or(0) +} + +/// Double `current`, add 0-25% random jitter, and clamp to `max`. +/// Extracted so tests (and any future `BlockListener` convergence) +/// can exercise the backoff curve without a live socket. +fn backoff_with_jitter(current: Duration, max: Duration) -> Duration { + let doubled = current.saturating_mul(2); + // `doubled.as_millis()` can be large on the path to the cap; + // computing the jitter off the post-double value keeps the + // distribution well-defined at every step. + let quarter_ms = (doubled.as_millis() / 4) as u64; + let jitter_ms = if quarter_ms == 0 { + 0 + } else { + rand::thread_rng().gen_range(0..quarter_ms) + }; + (doubled + Duration::from_millis(jitter_ms)).min(max) } #[cfg(test)] @@ -639,7 +892,16 @@ mod tests { const HASH: B256 = b256!("abababababababababababababababababababababababababababababababab"); fn mk_cache() -> PendingCache { - PendingCache::with_defaults(ORACLE) + // Tests exercise the legacy selectors too — wire both sets so + // `decode_set_direct_price_*` / `decode_set_underlying_price_*` + // still match. + let mut sels = default_selectors(); + sels.extend(legacy_selectors()); + PendingCache::new(ORACLE, sels, DEFAULT_MAX_PENDING_AGE) + } + + fn now_secs() -> u64 { + unix_now().expect("test clock") } fn mk_opp() -> LiquidationOpportunity { @@ -670,55 +932,91 @@ mod tests { } #[test] - fn default_selectors_has_four_entries() { - assert_eq!(default_selectors().len(), 4); + fn default_selectors_has_two_entries() { + assert_eq!(default_selectors().len(), 2); + } + + #[test] + fn legacy_selectors_has_two_entries() { + assert_eq!(legacy_selectors().len(), 2); } #[test] - fn decode_update_price_yields_asset_without_price() { + fn default_and_legacy_selectors_are_disjoint() { + let d = default_selectors(); + let l = legacy_selectors(); + assert!(d.is_disjoint(&l)); + } + + #[test] + fn decode_update_price_yields_refresh_variant() { let c = mk_cache(); let call = IVenusOracleWrite::updatePriceCall { asset: ASSET }; let data = call.abi_encode(); let out = c.decode(HASH, Some(ORACLE), &data).expect("match"); - assert_eq!(out.asset, ASSET); - assert!(out.new_price.is_none()); - assert_eq!(out.tx_hash, HASH); + match out { + OracleUpdate::Refresh { + asset, + tx_hash: h, + selector, + } => { + assert_eq!(asset, ASSET); + assert_eq!(h, HASH); + assert_eq!( + selector, + FixedBytes::<4>::from(IVenusOracleWrite::updatePriceCall::SELECTOR) + ); + } + OracleUpdate::DirectUpdate { .. } => panic!("expected Refresh"), + } } #[test] - fn decode_update_asset_price_yields_asset_without_price() { + fn decode_update_asset_price_yields_refresh_variant() { let c = mk_cache(); let call = IVenusOracleWrite::updateAssetPriceCall { asset: ASSET }; let data = call.abi_encode(); let out = c.decode(HASH, Some(ORACLE), &data).expect("match"); - assert_eq!(out.asset, ASSET); - assert!(out.new_price.is_none()); + assert!(matches!( + out, + OracleUpdate::Refresh { asset, .. } if asset == ASSET + )); } #[test] - fn decode_set_direct_price_carries_new_price() { + fn decode_set_direct_price_yields_direct_update() { let c = mk_cache(); - let call = IVenusOracleWrite::setDirectPriceCall { + let call = ILegacyVenusOracleWrite::setDirectPriceCall { asset: ASSET, price: U256::from(12_345u64), }; let data = call.abi_encode(); let out = c.decode(HASH, Some(ORACLE), &data).expect("match"); - assert_eq!(out.asset, ASSET); - assert_eq!(out.new_price, Some(U256::from(12_345u64))); + match out { + OracleUpdate::DirectUpdate { asset, price, .. } => { + assert_eq!(asset, ASSET); + assert_eq!(price, U256::from(12_345u64)); + } + OracleUpdate::Refresh { .. } => panic!("expected DirectUpdate"), + } } #[test] - fn decode_set_underlying_price_carries_vtoken_and_price() { + fn decode_set_underlying_price_yields_direct_update() { let c = mk_cache(); - let call = IVenusOracleWrite::setUnderlyingPriceCall { + let call = ILegacyVenusOracleWrite::setUnderlyingPriceCall { vToken: ASSET, price: U256::from(99u64), }; let data = call.abi_encode(); let out = c.decode(HASH, Some(ORACLE), &data).expect("match"); - assert_eq!(out.asset, ASSET); - assert_eq!(out.new_price, Some(U256::from(99u64))); + match out { + OracleUpdate::DirectUpdate { asset, price, .. } => { + assert_eq!(asset, ASSET); + assert_eq!(price, U256::from(99u64)); + } + OracleUpdate::Refresh { .. } => panic!("expected DirectUpdate"), + } } #[test] @@ -756,7 +1054,22 @@ mod tests { } #[test] - fn insert_and_drain_roundtrips() { + fn default_cache_does_not_decode_legacy_selectors() { + // `PendingCache::with_defaults` uses `default_selectors()` + // only, which now excludes `setDirectPrice` / + // `setUnderlyingPrice`. Calldata targeting those must no + // longer decode against a default-configured cache. + let c = PendingCache::with_defaults(ORACLE); + let call = ILegacyVenusOracleWrite::setDirectPriceCall { + asset: ASSET, + price: U256::from(1u64), + }; + let data = call.abi_encode(); + assert!(c.decode(HASH, Some(ORACLE), &data).is_none()); + } + + #[test] + fn drain_for_block_returns_confirmed_entries_only() { let c = mk_cache(); let opp = mk_opp(); let borrower = opp.position.borrower; @@ -765,19 +1078,21 @@ mod tests { raw_tx: Bytes::from_static(&[0x01, 0x02, 0x03]), opportunity: opp, trigger_tx: HASH, - inserted_at: unix_now(), + inserted_at: now_secs(), }); assert_eq!(c.pending_len(), 1); - let drained = c.drain(); + + let mut confirmed = HashSet::new(); + confirmed.insert(HASH); + let block_hash = B256::repeat_byte(0xcc); + let drained = c.drain_for_block(block_hash, &confirmed); assert_eq!(drained.len(), 1); assert_eq!(drained[0].borrower(), borrower); assert_eq!(c.pending_len(), 0); - // Second drain yields nothing. - assert!(c.drain().is_empty()); } #[test] - fn drain_drops_stale_entries() { + fn drain_for_block_requeues_when_trigger_not_confirmed() { let c = mk_cache(); let opp = mk_opp(); c.insert(PreSignedLiquidation { @@ -785,14 +1100,75 @@ mod tests { raw_tx: Bytes::new(), opportunity: opp, trigger_tx: HASH, - inserted_at: unix_now().saturating_sub(3_600), + inserted_at: now_secs(), }); + + let confirmed = HashSet::new(); // trigger not in set + let block_hash = B256::repeat_byte(0xaa); + let drained = c.drain_for_block(block_hash, &confirmed); + assert!(drained.is_empty()); + // Entry must remain in the cache for the next block. assert_eq!(c.pending_len(), 1); - let drained = c.drain(); - assert!(drained.is_empty(), "stale entry should be dropped"); + } + + #[test] + fn drain_for_block_drops_stale_even_when_unconfirmed() { + let c = mk_cache(); + let opp = mk_opp(); + c.insert(PreSignedLiquidation { + borrower: opp.position.borrower, + raw_tx: Bytes::new(), + opportunity: opp, + trigger_tx: HASH, + inserted_at: now_secs().saturating_sub(3_600), + }); + let confirmed = HashSet::new(); + let block_hash = B256::repeat_byte(0xbb); + let drained = c.drain_for_block(block_hash, &confirmed); + assert!(drained.is_empty()); + assert_eq!(c.pending_len(), 0, "stale entry must be evicted"); + } + + #[test] + fn drain_for_block_drops_stale_even_when_confirmed() { + let c = mk_cache(); + let opp = mk_opp(); + c.insert(PreSignedLiquidation { + borrower: opp.position.borrower, + raw_tx: Bytes::new(), + opportunity: opp, + trigger_tx: HASH, + inserted_at: now_secs().saturating_sub(3_600), + }); + let mut confirmed = HashSet::new(); + confirmed.insert(HASH); + let block_hash = B256::repeat_byte(0xdd); + let drained = c.drain_for_block(block_hash, &confirmed); + assert!( + drained.is_empty(), + "expired entries must not broadcast even when confirmed" + ); assert_eq!(c.pending_len(), 0); } + #[test] + #[allow(deprecated)] + fn legacy_drain_still_works() { + let c = mk_cache(); + let opp = mk_opp(); + let borrower = opp.position.borrower; + c.insert(PreSignedLiquidation { + borrower, + raw_tx: Bytes::from_static(&[0x01, 0x02, 0x03]), + opportunity: opp, + trigger_tx: HASH, + inserted_at: now_secs(), + }); + let drained = c.drain(); + assert_eq!(drained.len(), 1); + assert_eq!(drained[0].borrower(), borrower); + } + #[test] fn insert_overwrites_same_borrower() { let c = mk_cache(); @@ -803,20 +1179,22 @@ mod tests { raw_tx: Bytes::from_static(&[0x01]), opportunity: opp.clone(), trigger_tx: HASH, - inserted_at: unix_now(), + inserted_at: now_secs(), }); c.insert(PreSignedLiquidation { borrower, raw_tx: Bytes::from_static(&[0x02]), opportunity: opp, trigger_tx: HASH, - inserted_at: unix_now(), + inserted_at: now_secs(), }); assert_eq!(c.pending_len(), 1); - let drained = c.drain(); + let mut confirmed = HashSet::new(); + confirmed.insert(HASH); + let drained = c.drain_for_block(B256::ZERO, &confirmed); assert_eq!(drained.len(), 1); // To read raw_tx the caller must present a passing verdict — - // that's the whole point of the wrapper (see #226). + // that's the whole point of the wrapper. let unwrapped = drained .into_iter() .next() @@ -835,9 +1213,11 @@ mod tests { raw_tx: Bytes::from_static(&[0xaa]), opportunity: opp, trigger_tx: HASH, - inserted_at: unix_now(), + inserted_at: now_secs(), }); - let drained = c.drain(); + let mut confirmed = HashSet::new(); + confirmed.insert(HASH); + let drained = c.drain_for_block(B256::ZERO, &confirmed); let verified = drained .into_iter() .next() @@ -856,9 +1236,11 @@ mod tests { raw_tx: Bytes::from_static(&[0xbb]), opportunity: opp, trigger_tx: HASH, - inserted_at: unix_now(), + inserted_at: now_secs(), }); - let drained = c.drain(); + let mut confirmed = HashSet::new(); + confirmed.insert(HASH); + let drained = c.drain_for_block(B256::ZERO, &confirmed); let wrapped = drained.into_iter().next().unwrap(); let borrower_before = wrapped.borrower(); match wrapped.verify(SimulationVerdict::Revert) { @@ -882,9 +1264,15 @@ mod tests { raw_tx: Bytes::from_static(&[0xdd]), opportunity: opp, trigger_tx: HASH, - inserted_at: unix_now(), + inserted_at: now_secs(), }); - let wrapped = c.drain().into_iter().next().unwrap(); + let mut confirmed = HashSet::new(); + confirmed.insert(HASH); + let wrapped = c + .drain_for_block(B256::ZERO, &confirmed) + .into_iter() + .next() + .unwrap(); let (retry, _) = match wrapped.verify(SimulationVerdict::Revert) { Err(pair) => pair, Ok(_) => panic!("Revert must not unwrap"), @@ -908,16 +1296,25 @@ mod tests { raw_tx: Bytes::from_static(&[0xee]), opportunity: opp, trigger_tx: HASH, - inserted_at: unix_now(), + inserted_at: now_secs(), }); - let wrapped = c.drain().into_iter().next().unwrap(); + let mut confirmed = HashSet::new(); + confirmed.insert(HASH); + let wrapped = c + .drain_for_block(B256::ZERO, &confirmed) + .into_iter() + .next() + .unwrap(); let (still_wrapped, _) = match wrapped.verify(SimulationVerdict::Error) { Err(pair) => pair, Ok(_) => panic!("Error must not unwrap"), }; assert_eq!(still_wrapped.borrower(), borrower_expected); assert_eq!(still_wrapped.trigger_tx(), HASH); - assert_eq!(still_wrapped.opportunity().position.borrower, borrower_expected); + assert_eq!( + still_wrapped.opportunity().position.borrower, + borrower_expected + ); } #[test] @@ -929,9 +1326,11 @@ mod tests { raw_tx: Bytes::from_static(&[0xcc]), opportunity: opp, trigger_tx: HASH, - inserted_at: unix_now(), + inserted_at: now_secs(), }); - let drained = c.drain(); + let mut confirmed = HashSet::new(); + confirmed.insert(HASH); + let drained = c.drain_for_block(B256::ZERO, &confirmed); let wrapped = drained.into_iter().next().unwrap(); assert!(matches!( wrapped.verify(SimulationVerdict::Error), @@ -941,11 +1340,16 @@ mod tests { #[test] fn is_tracked_selector_matches_defaults() { - let c = mk_cache(); + let c = PendingCache::with_defaults(ORACLE); let sel = FixedBytes::<4>::from(IVenusOracleWrite::updatePriceCall::SELECTOR); assert!(c.is_tracked_selector(sel)); let unknown = FixedBytes::<4>::from([0xde, 0xad, 0xbe, 0xef]); assert!(!c.is_tracked_selector(unknown)); + let legacy = FixedBytes::<4>::from(ILegacyVenusOracleWrite::setDirectPriceCall::SELECTOR); + assert!( + !c.is_tracked_selector(legacy), + "legacy selectors must not be tracked by default" + ); } #[test] @@ -959,4 +1363,50 @@ mod tests { let sel = FixedBytes::<4>::from([0xab, 0xcd, 0xef, 0x01]); assert_eq!(format_selector(sel), "0xabcdef01"); } + + #[test] + fn oracle_update_accessors_match_variant_fields() { + let selector = FixedBytes::<4>::from(IVenusOracleWrite::updatePriceCall::SELECTOR); + let refresh = OracleUpdate::Refresh { + tx_hash: HASH, + selector, + asset: ASSET, + }; + assert_eq!(refresh.tx_hash(), HASH); + assert_eq!(refresh.selector(), selector); + assert_eq!(refresh.asset(), ASSET); + assert_eq!(refresh.kind(), "refresh"); + + let direct = OracleUpdate::DirectUpdate { + tx_hash: HASH, + selector, + asset: ASSET, + price: U256::from(7u64), + }; + assert_eq!(direct.tx_hash(), HASH); + assert_eq!(direct.asset(), ASSET); + assert_eq!(direct.kind(), "direct"); + } + + #[test] + fn backoff_with_jitter_doubles_and_caps() { + let max = Duration::from_secs(30); + // First step: from 1 s should land in [2.0, 2.5) s. + let b = backoff_with_jitter(Duration::from_secs(1), max); + assert!( + b >= Duration::from_millis(2_000) && b < Duration::from_millis(2_500), + "unexpected step-1 backoff: {b:?}" + ); + + // Near-cap: from 20 s should cap at 30 s (40 s + jitter > cap). + let b = backoff_with_jitter(Duration::from_secs(20), max); + assert_eq!(b, max); + } + + #[test] + fn backoff_with_jitter_handles_zero() { + let max = Duration::from_secs(30); + let b = backoff_with_jitter(Duration::ZERO, max); + assert_eq!(b, Duration::ZERO); + } } From d21d4a6e244c980f45dce78e73712f479ebf11a6 Mon Sep 17 00:00:00 2001 From: obchain Date: Thu, 23 Apr 2026 22:41:30 +0530 Subject: [PATCH 4/4] feat(cli): wire MempoolMonitor into listen loop with pre-sign drain MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Spawns MempoolMonitor alongside BlockListener on the shared provider when the operator sets CHARON_VENUS_ORACLE to the live Venus oracle address. BlockListener now surfaces the block hash on every NewBlock; the listen loop pulls the confirmed tx-hash set via eth_getBlockByHash (hashes-only) and calls PendingCache::drain_for_block with the real hash set — acceptance requires this, not an empty placeholder. For each drained UnverifiedPreSigned the loop rebuilds the liquidator calldata via the Venus adapter + TxBuilder, runs it through Simulator::simulate, and only hands the pre-sign a SimulationVerdict::Ok proof token when simulation succeeds. The resulting PreSignedLiquidation is logged as "ready for broadcast"; eth_sendRawTransaction remains an explicit non-goal per the issue body (signer + liquidator-contract bridge tracked separately). The type-level guard (UnverifiedPreSigned → PreSignedLiquidation only via verify(SimulationVerdict::Ok)) is preserved end-to-end, so the eventual broadcast commit composes naturally without bypassing the CLAUDE.md eth_call gate. Oracle-update channel is logged at debug (pre-sign builder is a non-goal for #299) so operators can confirm the monitor is actually decoding writes on their upstream. Closes #299 --- crates/charon-cli/src/main.rs | 256 +++++++++++++++++++++++++- crates/charon-scanner/src/listener.rs | 9 + 2 files changed, 261 insertions(+), 4 deletions(-) diff --git a/crates/charon-cli/src/main.rs b/crates/charon-cli/src/main.rs index 9e3ae1c..918dca1 100644 --- a/crates/charon-cli/src/main.rs +++ b/crates/charon-cli/src/main.rs @@ -6,11 +6,14 @@ //! charon --config config/default.toml test-connection --chain bnb //! ``` +use std::collections::HashSet; use std::path::PathBuf; +use std::str::FromStr; use std::sync::Arc; -use alloy::primitives::{Address, U256}; -use alloy::providers::{ProviderBuilder, WsConnect}; +use alloy::primitives::{Address, B256, U256}; +use alloy::providers::{Provider, ProviderBuilder, WsConnect}; +use alloy::rpc::types::BlockTransactionsKind; use alloy::signers::local::PrivateKeySigner; use anyhow::{Context, Result}; use charon_core::{ @@ -21,13 +24,28 @@ use charon_executor::{Simulator, TxBuilder}; use charon_flashloan::{AaveFlashLoan, FlashLoanRouter}; use charon_protocols::VenusAdapter; use charon_scanner::{ - BlockListener, ChainEvent, ChainProvider, DEFAULT_MAX_AGE, HealthScanner, PriceCache, + BlockListener, ChainEvent, ChainProvider, DEFAULT_MAX_AGE, HealthScanner, MempoolMonitor, + OracleUpdate, PendingCache, PriceCache, SimulationVerdict, }; use clap::{Parser, Subcommand}; use tokio::sync::mpsc; use tracing::{debug, info, warn}; use tracing_subscriber::EnvFilter; +/// Buffer size for the mempool's `OracleUpdate` channel. Sized so a +/// short burst of oracle-write txs at block-boundary time doesn't +/// back-pressure the monitor task. +const ORACLE_UPDATE_CHANNEL: usize = 256; + +/// Env var the operator sets to enable the mempool monitor. Expected +/// value is the hex-encoded Venus oracle address whose write +/// selectors the monitor should track. Unset (or empty) skips the +/// mempool path cleanly so the CLI stays usable on profiles that do +/// not have a paid MEV stream. A future config-file knob can replace +/// this env var; for now keeping it env-only avoids a config-schema +/// change on feat/21. +const VENUS_ORACLE_ENV: &str = "CHARON_VENUS_ORACLE"; + /// Size of the fan-in channel from listeners to the scanner pipeline. /// One slot per ~5 blocks across all chains covers short stalls without /// back-pressuring the listener task. @@ -225,6 +243,67 @@ async fn run_listen(config: Config, borrowers: Vec
) -> Result<()> { )) }); + // ── Mempool monitor (#46 / #299) ────────────────────────────────── + // Spawn the pending-tx monitor alongside BlockListener on the + // shared provider. Enabled only when the operator sets + // `CHARON_VENUS_ORACLE` to a hex-encoded oracle address — most + // public BSC RPCs do not expose `newPendingTransactions` (see the + // mempool module's RPC-requirements docs). The returned + // `PendingCache` is retained here so the block-event drain can + // call `drain_for_block` with the real confirmed-tx set each + // tick; the `OracleUpdate` channel is logged (pre-sign builder + // wiring is explicitly non-goal for #299 per the issue body, so + // updates are observed and dropped until the signer + deployed + // liquidator bridge lands in a follow-up). + let mempool_cache: Option> = match std::env::var(VENUS_ORACLE_ENV) { + Ok(hex) if !hex.is_empty() => match Address::from_str(hex.trim()) { + Ok(oracle) => { + let monitor = MempoolMonitor::with_defaults(provider.clone(), oracle); + let cache = monitor.cache(); + let (oracle_tx, mut oracle_rx) = mpsc::channel::(ORACLE_UPDATE_CHANNEL); + let monitor_for_task = monitor.clone(); + tokio::spawn(async move { + if let Err(err) = monitor_for_task.run(oracle_tx).await { + warn!(error = ?err, "mempool monitor terminated"); + } + }); + tokio::spawn(async move { + // Non-goal: forwarding OracleUpdate into a + // pre-sign builder (signer + liquidator bridge + // tracked separately). Log at debug so operators + // can verify the monitor is actually decoding + // oracle writes on their upstream without the + // flood reaching info. + while let Some(update) = oracle_rx.recv().await { + debug!( + tx = %update.tx_hash(), + asset = %update.asset(), + kind = update.kind(), + "oracle update observed (pre-sign builder not yet wired)" + ); + } + }); + info!(oracle = %oracle, "mempool monitor spawned"); + Some(cache) + } + Err(err) => { + warn!( + env = VENUS_ORACLE_ENV, + error = ?err, + "mempool oracle env var set but unparseable; mempool monitor disabled" + ); + None + } + }, + _ => { + info!( + env = VENUS_ORACLE_ENV, + "mempool monitor disabled (no oracle address configured)" + ); + None + } + }; + // ── Profit-ordered queue ── let queue = Arc::new(tokio::sync::Mutex::new(OpportunityQueue::with_default_ttl())); @@ -256,7 +335,17 @@ async fn run_listen(config: Config, borrowers: Vec
) -> Result<()> { _ = async { while let Some(event) = rx.recv().await { match event { - ChainEvent::NewBlock { chain, number, timestamp } => { + ChainEvent::NewBlock { chain, number, timestamp, block_hash } => { + drain_mempool_for_block( + &chain, + block_hash, + mempool_cache.as_deref(), + adapter.as_ref(), + tx_builder.as_deref(), + simulator.as_deref(), + provider.as_ref(), + ) + .await; process_block( chain, number, @@ -451,3 +540,162 @@ fn repay_to_usd_cents_placeholder(amount: U256) -> u64 { let cents = amount / scale; u64::try_from(cents).unwrap_or(u64::MAX) } + +/// Drain pre-signed liquidations whose oracle trigger confirmed in +/// `block_hash` and run each through the executor's simulation gate +/// before the broadcast step (still non-goal per #299). +/// +/// Fetches the block's confirmed tx-hash set via +/// `eth_getBlockByHash` (hashes-only payload), calls +/// [`PendingCache::drain_for_block`], and for each returned +/// [`charon_scanner::UnverifiedPreSigned`] rebuilds the liquidator +/// calldata via the adapter + builder, runs it through +/// [`Simulator::simulate`], and only hands the pre-sign a +/// [`SimulationVerdict::Ok`] proof token when the simulator returns +/// success. `verify(Ok)` unwraps the pre-sign into a full +/// `PreSignedLiquidation`; broadcast is explicitly out of scope +/// (signer + liquidator bridge tracked separately) so the drained +/// tx is logged and dropped. +/// +/// Silently no-ops when the cache is `None` (mempool monitor is +/// disabled) or when the builder/simulator/params for a pre-sign +/// are unavailable — there is no way to honour the eth_call gate +/// without them, so the safer action is to re-insert-or-drop per +/// the cache's TTL and surface a warning. +/// +/// Never panics. Every RPC/encode/sim failure is logged and the +/// drain loop continues with the next pre-sign; the block-scanner +/// path is independent and must not be blocked by mempool hiccups. +#[allow(clippy::too_many_arguments)] +async fn drain_mempool_for_block( + chain: &str, + block_hash: B256, + cache: Option<&PendingCache>, + adapter: &VenusAdapter, + tx_builder: Option<&TxBuilder>, + simulator: Option<&Simulator>, + provider: &alloy::providers::RootProvider, +) { + let Some(cache) = cache else { + return; + }; + + // Fetch the block with hashes-only payload. `Hashes` keeps the + // response small — we only need the set membership check for + // `drain_for_block`, not full transaction envelopes. + let block = match provider + .get_block_by_hash(block_hash, BlockTransactionsKind::Hashes) + .await + { + Ok(Some(b)) => b, + Ok(None) => { + warn!(%block_hash, "block not found when draining mempool cache"); + return; + } + Err(err) => { + warn!(%block_hash, ?err, "get_block_by_hash failed when draining mempool cache"); + return; + } + }; + let confirmed: HashSet = block.transactions.hashes().collect(); + + let drained = cache.drain_for_block(block_hash, &confirmed); + if drained.is_empty() { + return; + } + debug!( + chain, + %block_hash, + drained = drained.len(), + confirmed_tx_count = confirmed.len(), + "mempool cache drained for block" + ); + + for presigned in drained { + let borrower = presigned.borrower(); + let trigger = presigned.trigger_tx(); + let opp = presigned.opportunity().clone(); + + // To honour the CLAUDE.md eth_call gate on the pre-sign + // path we need to simulate a concrete calldata. Rebuild it + // from the opportunity via the protocol adapter + builder — + // the pre-sign's own `raw_tx` is the signed envelope, which + // is intentionally unreachable without a `SimulationVerdict`. + let Some(builder) = tx_builder else { + warn!( + chain, + %borrower, + "pre-sign drained but tx_builder is absent — cannot honour sim gate, dropping" + ); + continue; + }; + let Some(sim) = simulator else { + warn!( + chain, + %borrower, + "pre-sign drained but simulator is absent — cannot honour sim gate, dropping" + ); + continue; + }; + + let params = match adapter.get_liquidation_params(&opp.position) { + Ok(p) => p, + Err(err) => { + warn!( + chain, + %borrower, + error = ?err, + "failed to rebuild liquidation params for drained pre-sign" + ); + continue; + } + }; + let calldata = match builder.encode_calldata(&opp, ¶ms) { + Ok(c) => c, + Err(err) => { + warn!( + chain, + %borrower, + error = ?err, + "failed to encode calldata for drained pre-sign" + ); + continue; + } + }; + match sim.simulate(provider, calldata).await { + Ok(()) => match presigned.verify(SimulationVerdict::approve()) { + Ok(ready) => { + // Non-goal: eth_sendRawTransaction. The + // `PreSignedLiquidation` is fully verified and + // ready for the future broadcast call site; log + // loudly so operators running the monitor end-to-end + // can see the gate opening. + info!( + chain, + %borrower, + %trigger, + raw_tx_len = ready.raw_tx.len(), + "pre-sign simulated OK — ready for broadcast (broadcast wiring follow-up)" + ); + } + Err((returned, verdict)) => { + warn!( + chain, + borrower = %returned.borrower(), + ?verdict, + "simulation verdict inconsistent with simulate outcome — dropping" + ); + } + }, + Err(err) => { + debug!( + chain, + %borrower, + %trigger, + error = ?err, + "pre-sign simulation reverted — dropping" + ); + } + } + } +} diff --git a/crates/charon-scanner/src/listener.rs b/crates/charon-scanner/src/listener.rs index ebf8ddc..5ba6620 100644 --- a/crates/charon-scanner/src/listener.rs +++ b/crates/charon-scanner/src/listener.rs @@ -7,6 +7,7 @@ use std::time::Duration; +use alloy::primitives::B256; use alloy::providers::Provider; use anyhow::{Context, Result}; use charon_core::config::ChainConfig; @@ -29,6 +30,11 @@ pub enum ChainEvent { number: u64, /// Unix timestamp from the block header. timestamp: u64, + /// Canonical block hash of the new head. Required by the + /// mempool pre-sign drain so it can correlate its log with the + /// block that triggered the drain and to let consumers fetch + /// the block's confirmed tx-hash set in a follow-up call. + block_hash: B256, }, } @@ -97,11 +103,13 @@ impl BlockListener { while let Some(header) = stream.next().await { let number = header.number; let timestamp = header.timestamp; + let block_hash = header.hash; info!( chain = %self.name, block = number, timestamp = timestamp, + %block_hash, "new block" ); @@ -109,6 +117,7 @@ impl BlockListener { chain: self.name.clone(), number, timestamp, + block_hash, }; if self.tx.send(event).await.is_err() {