diff --git a/Cargo.lock b/Cargo.lock index 2c67907..710cb14 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1173,6 +1173,7 @@ dependencies = [ "async-trait", "charon-core", "dashmap", + "dotenvy", "futures-util", "metrics", "rand 0.8.6", diff --git a/config/default.toml b/config/default.toml index 38a202d..0809fce 100644 --- a/config/default.toml +++ b/config/default.toml @@ -42,3 +42,13 @@ pool = "0x6807dc923806fe8fd134338eabca509979a7e0cb" # ── Deployed liquidator contracts ───────────────────────────────────────── # Populated once CharonLiquidator.sol is deployed on BSC mainnet. Do not # add a zero-address placeholder — config validation rejects it. + +# ── Chainlink price feeds (per chain, per asset symbol) ─────────────────── +# Only feeds listed here are polled by the PriceCache. Add more as new +# Venus markets become relevant. Feed addresses from docs.chain.link. +[chainlink.bnb] +BNB = "0x0567F2323251f0Aab15c8dFb1967E4e8A7D42aeE" +BTCB = "0x264990fbd0A4796A3E3d8E37022BdAf1A5a4C1f0" # BTC / USD (canonical, docs.chain.link) +ETH = "0x9ef1B8c0E4F7dc8bF5719Ea496883DC6401d5b2e" +USDT = "0xB97Ad0E74fa7d920791E90258A6E2085088b4320" +USDC = "0x51597f405303C4377E36123cBc172b13269EA163" diff --git a/crates/charon-cli/src/main.rs b/crates/charon-cli/src/main.rs index 9942c8c..fdb18ce 100644 --- a/crates/charon-cli/src/main.rs +++ b/crates/charon-cli/src/main.rs @@ -17,7 +17,8 @@ use anyhow::{Context, Result}; use charon_core::{Config, LendingProtocol}; use charon_protocols::VenusAdapter; use charon_scanner::{ - BlockListener, ChainEvent, ChainProvider, HealthScanner, PositionBucket, ScanScheduler, + BlockListener, ChainEvent, ChainProvider, DEFAULT_MAX_AGE, HealthScanner, PositionBucket, + PriceCache, ScanScheduler, }; use clap::{Parser, Subcommand}; use tokio::sync::mpsc; @@ -131,7 +132,11 @@ async fn main() -> Result<()> { /// For every `NewBlock` event on a chain with a `[protocol.venus]` entry /// the Venus adapter fetches positions anchored at the observed block, /// pushes them through the bucketed [`HealthScanner`], and limits fetches -/// to buckets whose cadence fires this block via [`ScanScheduler`]. +/// to buckets whose cadence fires this block via [`ScanScheduler`]. A +/// per-chain [`PriceCache`] is also refreshed on each scan tick so +/// downstream profit-ranking has a fresh Chainlink view; consumers are +/// wired up in a follow-up task (positions are opportunities only, no +/// profit calc yet). /// Chains without a Venus protocol config still flow through the drain /// loop but trigger no protocol scans (v0.1 scope). /// @@ -144,55 +149,84 @@ async fn run_listen(config: &Config, borrowers: Vec
) -> Result<()> { anyhow::bail!("no chains configured — nothing to listen to"); } - // Venus adapter + bucketed scanner + cadence scheduler are currently - // single-chain (BNB) per config scope. Build them only if - // `[protocol.venus]` exists and its target chain is configured; - // otherwise run the listener pipeline without a scanner. - let venus_adapter: Option<(String, Arc, Arc, ScanScheduler)> = - match config.protocol.get("venus") { - Some(venus_cfg) => { - let chain_name = &venus_cfg.chain; - let chain_cfg = config.chain.get(chain_name).with_context(|| { - format!( - "protocol 'venus' references chain '{chain_name}' which is not in [chain.*]" - ) - })?; - let adapter_ws = ProviderBuilder::new() - .on_ws(WsConnect::new(&chain_cfg.ws_url)) - .await - .context("venus adapter: failed to connect over ws")?; - let adapter = Arc::new( - VenusAdapter::connect(Arc::new(adapter_ws), venus_cfg.comptroller).await?, - ); - let scanner = Arc::new(HealthScanner::new( - config.bot.liquidatable_threshold_bps, - config.bot.near_liq_threshold_bps, - )?); - let sched = ScanScheduler::new( - config.bot.hot_scan_blocks, - config.bot.warm_scan_blocks, - config.bot.cold_scan_blocks, - ); - info!( - chain = %chain_name, - borrower_count = borrowers.len(), - market_count = adapter.markets().await.len(), - liquidatable_bps = config.bot.liquidatable_threshold_bps, - near_liq_bps = config.bot.near_liq_threshold_bps, - hot_blocks = sched.hot, - warm_blocks = sched.warm, - cold_blocks = sched.cold, - "venus adapter + scanner ready" - ); - Some((chain_name.clone(), adapter, scanner, sched)) - } - None => { - info!( - "no [protocol.venus] configured — listener will drain events without scanning" - ); - None + // Venus adapter + bucketed scanner + cadence scheduler + Chainlink + // price cache are currently single-chain (BNB) per config scope. + // Build them only if `[protocol.venus]` exists and its target chain + // is configured; otherwise run the listener pipeline without a + // scanner. + let venus_adapter: Option<( + String, + Arc, + Arc, + ScanScheduler, + Arc, + )> = match config.protocol.get("venus") { + Some(venus_cfg) => { + let chain_name = &venus_cfg.chain; + let chain_cfg = config.chain.get(chain_name).with_context(|| { + format!( + "protocol 'venus' references chain '{chain_name}' which is not in [chain.*]" + ) + })?; + let adapter_ws = ProviderBuilder::new() + .on_ws(WsConnect::new(&chain_cfg.ws_url)) + .await + .context("venus adapter: failed to connect over ws")?; + let adapter_ws = Arc::new(adapter_ws); + let adapter = + Arc::new(VenusAdapter::connect(adapter_ws.clone(), venus_cfg.comptroller).await?); + let scanner = Arc::new(HealthScanner::new( + config.bot.liquidatable_threshold_bps, + config.bot.near_liq_threshold_bps, + )?); + let sched = ScanScheduler::new( + config.bot.hot_scan_blocks, + config.bot.warm_scan_blocks, + config.bot.cold_scan_blocks, + ); + + // Chainlink price cache — feeds are configured per chain under + // `[chainlink.]`. Empty map = no feeds configured, cache + // stays idle and downstream stages fall back to protocol oracle. + // Reuses the Venus adapter's WS provider to avoid a second + // upstream connection; lifetime is tied to the scan task via Arc. + let price_feeds = config.chainlink.get(chain_name).cloned().unwrap_or_default(); + let prices = Arc::new(PriceCache::new(adapter_ws, price_feeds, DEFAULT_MAX_AGE)); + // Best-effort warm-up — individual feed failures are logged + // inside `refresh_all` so startup never hard-fails on a + // transient Chainlink blip. + prices.refresh_all().await; + let fresh_feeds: Vec = prices.symbols().map(str::to_string).collect(); + for sym in &fresh_feeds { + if let Some(p) = prices.get(sym) { + info!( + symbol = %sym, + price = %p.price, + decimals = p.decimals, + "chainlink feed" + ); + } } - }; + + info!( + chain = %chain_name, + borrower_count = borrowers.len(), + market_count = adapter.markets().await.len(), + feed_count = fresh_feeds.len(), + liquidatable_bps = config.bot.liquidatable_threshold_bps, + near_liq_bps = config.bot.near_liq_threshold_bps, + hot_blocks = sched.hot, + warm_blocks = sched.warm, + cold_blocks = sched.cold, + "venus adapter + scanner + price cache ready" + ); + Some((chain_name.clone(), adapter, scanner, sched, prices)) + } + None => { + info!("no [protocol.venus] configured — listener will drain events without scanning"); + None + } + }; let (tx, mut rx) = mpsc::channel::(CHAIN_EVENT_CHANNEL); let mut listeners: tokio::task::JoinSet<(String, Result<()>)> = tokio::task::JoinSet::new(); @@ -232,7 +266,7 @@ async fn run_listen(config: &Config, borrowers: Vec
) -> Result<()> { // snapshot the final state of the missed range. continue; } - if let Some((venus_chain, adapter, scanner, sched)) = + if let Some((venus_chain, adapter, scanner, sched, prices)) = venus_adapter.as_ref() { if venus_chain != &chain { @@ -258,6 +292,12 @@ async fn run_listen(config: &Config, borrowers: Vec
) -> Result<()> { if scan_set.is_empty() { continue; } + // Refresh Chainlink prices on every real scan + // tick so downstream profit-ranking (wired in a + // follow-up task) reads sub-heartbeat feeds. + // Individual feed failures are logged inside + // `refresh_all` and do not abort the scan. + prices.refresh_all().await; let block_tag = BlockNumberOrTag::Number(number); match adapter.fetch_positions(&scan_set, block_tag).await { Ok(positions) => { diff --git a/crates/charon-core/src/config.rs b/crates/charon-core/src/config.rs index ff49574..ac1b6e0 100644 --- a/crates/charon-core/src/config.rs +++ b/crates/charon-core/src/config.rs @@ -56,6 +56,11 @@ pub struct Config { pub flashloan: HashMap, /// Deployed liquidator contracts keyed by chain name. pub liquidator: HashMap, + /// Chainlink feed addresses per chain, keyed by asset symbol + /// (e.g. `chainlink.bnb.BNB = "0x…"`). Missing key = no feed + /// configured, scanner falls back to protocol oracle. + #[serde(default)] + pub chainlink: HashMap>, } impl fmt::Debug for Config { diff --git a/crates/charon-scanner/Cargo.toml b/crates/charon-scanner/Cargo.toml index 89a5caa..229a569 100644 --- a/crates/charon-scanner/Cargo.toml +++ b/crates/charon-scanner/Cargo.toml @@ -17,5 +17,8 @@ dashmap = { workspace = true } rand = { workspace = true } metrics = { workspace = true } +[dev-dependencies] +dotenvy = { workspace = true } + [lints] workspace = true diff --git a/crates/charon-scanner/src/lib.rs b/crates/charon-scanner/src/lib.rs index 0575442..5fff6b9 100644 --- a/crates/charon-scanner/src/lib.rs +++ b/crates/charon-scanner/src/lib.rs @@ -1,11 +1,11 @@ -//! Charon scanner — chain listener and health-factor scanner. +//! Charon scanner — chain listener, health-factor scanner, and price cache. pub mod listener; +pub mod oracle; pub mod provider; pub mod scanner; pub use listener::{BlockListener, ChainEvent}; +pub use oracle::{CachedPrice, DEFAULT_MAX_AGE, PriceCache}; pub use provider::{ChainProvider, ChainProviderT, MockChainProvider}; -pub use scanner::{ - BucketCounts, BucketedPosition, HealthScanner, PositionBucket, ScanScheduler, -}; +pub use scanner::{BucketCounts, BucketedPosition, HealthScanner, PositionBucket, ScanScheduler}; diff --git a/crates/charon-scanner/src/oracle.rs b/crates/charon-scanner/src/oracle.rs new file mode 100644 index 0000000..1d6c5de --- /dev/null +++ b/crates/charon-scanner/src/oracle.rs @@ -0,0 +1,302 @@ +//! Chainlink price cache. +//! +//! Polls `IAggregatorV3.latestRoundData()` for a configured set of +//! `(symbol → feed address)` entries and caches the result. Every read +//! carries: +//! - round-completeness check (`answeredInRound >= roundId`), +//! - `updatedAt > 0` uninitialized-feed check, +//! - per-feed staleness window (heartbeat-aware). +//! +//! Storage is a `DashMap`, same lock-free pattern +//! as the health scanner. + +use std::collections::HashMap; +use std::sync::Arc; +use std::time::{Duration, SystemTime, UNIX_EPOCH}; + +use alloy::primitives::{Address, I256, U256}; +use alloy::providers::RootProvider; +use alloy::pubsub::PubSubFrontend; +use alloy::sol; +use anyhow::{Context, Result}; +use dashmap::DashMap; +use tracing::{debug, warn}; + +/// Default freshness window: 10 minutes. Used only for feeds that do not +/// have an explicit per-symbol override in the config. +pub const DEFAULT_MAX_AGE: Duration = Duration::from_secs(10 * 60); + +sol! { + /// Chainlink AggregatorV3 interface (reduced to the fields we use). + #[sol(rpc)] + interface IAggregatorV3 { + function decimals() external view returns (uint8); + + /// Returns the latest round data for this feed. + /// + /// `answer` is int256; callers must reject negative values. + /// `answeredInRound < roundId` means the round is still being + /// aggregated — the returned answer is a carry-over from an + /// older round and must not be trusted. + function latestRoundData() + external view returns ( + uint80 roundId, + int256 answer, + uint256 startedAt, + uint256 updatedAt, + uint80 answeredInRound + ); + } +} + +/// One cached price reading with enough metadata to judge freshness and +/// to normalize across oracle scale conventions. +#[derive(Debug, Clone)] +pub struct CachedPrice { + /// Raw Chainlink answer in the feed's native decimals. + pub price: U256, + /// Number of decimals the feed reports (typically 8 on BSC). + pub decimals: u8, + /// Chainlink `updatedAt` unix timestamp. + pub updated_at: u64, + /// Wall-clock unix timestamp at which we pulled the round. + pub fetched_at: u64, +} + +impl CachedPrice { + /// Return `price` re-scaled from its native decimals to + /// `target_decimals`. Integer arithmetic — no f64. + /// + /// Callers converting to Venus oracle scale pass `target_decimals` + /// equal to the underlying token's decimals + (36 - 18) etc. For + /// Aave-style 18-decimal consumers, pass 18. + pub fn scaled_to(&self, target_decimals: u8) -> U256 { + use std::cmp::Ordering; + match target_decimals.cmp(&self.decimals) { + Ordering::Equal => self.price, + Ordering::Greater => { + let diff = target_decimals - self.decimals; + self.price * U256::from(10u64).pow(U256::from(diff)) + } + Ordering::Less => { + let diff = self.decimals - target_decimals; + self.price / U256::from(10u64).pow(U256::from(diff)) + } + } + } +} + +/// Thin wrapper around a provider + a per-symbol feed map + per-feed +/// staleness overrides + a concurrent cache. +pub struct PriceCache { + provider: Arc>, + feeds: HashMap, + default_max_age: Duration, + per_symbol_max_age: HashMap, + cache: DashMap, +} + +impl PriceCache { + /// Build a cache with a default staleness window. Equivalent to + /// `with_per_symbol_max_age(provider, feeds, default_max_age, {})`. + pub fn new( + provider: Arc>, + feeds: HashMap, + default_max_age: Duration, + ) -> Self { + Self::with_per_symbol_max_age(provider, feeds, default_max_age, HashMap::new()) + } + + /// Build a cache with per-symbol staleness overrides (e.g. stablecoin + /// feeds accept 24h, volatile pairs 2min). + pub fn with_per_symbol_max_age( + provider: Arc>, + feeds: HashMap, + default_max_age: Duration, + per_symbol_max_age: HashMap, + ) -> Self { + Self { + provider, + feeds, + default_max_age, + per_symbol_max_age, + cache: DashMap::new(), + } + } + + pub fn symbols(&self) -> impl Iterator { + self.feeds.keys().map(String::as_str) + } + + fn max_age_for(&self, symbol: &str) -> Duration { + self.per_symbol_max_age + .get(symbol) + .copied() + .unwrap_or(self.default_max_age) + } + + /// Fetch one feed by symbol, validate round completeness and + /// freshness, insert into the cache, return the parsed reading. + pub async fn refresh(&self, symbol: &str) -> Result { + let feed = self + .feeds + .get(symbol) + .with_context(|| format!("no Chainlink feed configured for '{symbol}'"))?; + + let agg = IAggregatorV3::new(*feed, self.provider.clone()); + let decimals = agg + .decimals() + .call() + .await + .with_context(|| format!("feed '{symbol}' ({feed}): decimals() failed"))? + ._0; + let round = agg + .latestRoundData() + .call() + .await + .with_context(|| format!("feed '{symbol}' ({feed}): latestRoundData() failed"))?; + + // Reject negative answers: most aggregators emit negative on + // degraded state; coercing to U256 would produce a huge number. + let raw_answer = round.answer; + let price = if raw_answer < I256::ZERO { + anyhow::bail!("feed '{symbol}' returned negative answer: {raw_answer}"); + } else { + U256::try_from(raw_answer) + .with_context(|| format!("feed '{symbol}': answer {raw_answer} → U256"))? + }; + + // Round-completeness: answeredInRound < roundId ⇒ current round + // has not finished aggregating. The returned answer is the + // previous round's value; reject to avoid stale pricing passed + // off as fresh updatedAt. + if round.answeredInRound < round.roundId { + anyhow::bail!( + "feed '{symbol}': round not complete (answeredInRound={}, roundId={})", + round.answeredInRound, + round.roundId + ); + } + + let updated_at: u64 = round.updatedAt.try_into().with_context(|| { + format!( + "feed '{symbol}': updatedAt {:?} does not fit in u64", + round.updatedAt + ) + })?; + if updated_at == 0 { + anyhow::bail!("feed '{symbol}': updatedAt=0, aggregator is uninitialized"); + } + + let now = unix_now().context("system clock unavailable, cannot judge freshness")?; + let max_age = self.max_age_for(symbol); + let age = now.saturating_sub(updated_at); + if age > max_age.as_secs() { + anyhow::bail!( + "feed '{symbol}' is stale (updated {} s ago, max_age {} s)", + age, + max_age.as_secs() + ); + } + + let cached = CachedPrice { + price, + decimals, + updated_at, + fetched_at: now, + }; + debug!( + symbol, + price = %cached.price, + decimals, + age_secs = age, + "chainlink price refreshed" + ); + self.cache.insert(symbol.to_string(), cached.clone()); + Ok(cached) + } + + /// Refresh every configured feed. Individual failures are logged + /// and do not abort the batch. + pub async fn refresh_all(&self) { + for symbol in self.feeds.keys() { + if let Err(err) = self.refresh(symbol).await { + warn!(symbol = %symbol, ?err, "chainlink refresh failed"); + } + } + } + + /// Return the most recently cached price iff it is still fresh. + /// Stale entries, entries from an uninitialized feed, or an + /// unusable system clock all yield `None`. + pub fn get(&self, symbol: &str) -> Option { + let entry = self.cache.get(symbol)?; + if self.is_fresh(symbol, &entry) { + Some(entry.clone()) + } else { + None + } + } + + /// Freshness predicate. Returns `false` if the system clock fails; + /// better to treat stale as stale than to serve old prices to the + /// liquidation path. + pub fn is_fresh(&self, symbol: &str, cached: &CachedPrice) -> bool { + match unix_now() { + Ok(now) => { + let max_age = self.max_age_for(symbol); + cached.updated_at + max_age.as_secs() >= now + } + Err(_) => false, + } + } +} + +/// Unix seconds since epoch. Errors on clock skew / pre-epoch so callers +/// can treat the failure distinctly (e.g. treat all cache entries as +/// stale rather than silently serve any entry because `now = 0`). +fn unix_now() -> Result { + let d = SystemTime::now() + .duration_since(UNIX_EPOCH) + .context("system clock is before UNIX_EPOCH")?; + Ok(d.as_secs()) +} + +#[cfg(test)] +mod tests { + use super::*; + use alloy::primitives::address; + + fn feeds_map() -> HashMap { + let mut m = HashMap::new(); + m.insert( + "BNB".to_string(), + address!("0567F2323251f0Aab15c8dFb1967E4e8A7D42aeE"), + ); + m + } + + #[test] + fn scaled_to_up_and_down() { + // 300.00000000 in 8 decimals. + let p = CachedPrice { + price: U256::from(300_00000000u64), + decimals: 8, + updated_at: 1, + fetched_at: 1, + }; + // 300 * 1e18 when scaled to 18 decimals. + assert_eq!( + p.scaled_to(18), + U256::from(300u64) * U256::from(10u64).pow(U256::from(18u64)) + ); + // 300 (no decimals) when scaled down. + assert_eq!(p.scaled_to(0), U256::from(300u64)); + } + + #[test] + fn feeds_map_is_iterable() { + let m = feeds_map(); + assert!(m.contains_key("BNB")); + } +} diff --git a/crates/charon-scanner/tests/chainlink_refresh.rs b/crates/charon-scanner/tests/chainlink_refresh.rs new file mode 100644 index 0000000..25b640e --- /dev/null +++ b/crates/charon-scanner/tests/chainlink_refresh.rs @@ -0,0 +1,68 @@ +//! Live Chainlink feed smoke test on BSC. +//! +//! Skipped without `BNB_WS_URL`. Verifies `PriceCache::refresh` speaks +//! to a real aggregator, rejects stale/negative readings, and caches +//! the result for subsequent `get` calls. + +use std::collections::HashMap; +use std::str::FromStr; +use std::sync::Arc; +use std::time::Duration; + +use alloy::primitives::{Address, U256}; +use alloy::providers::{ProviderBuilder, WsConnect}; +use charon_scanner::{DEFAULT_MAX_AGE, PriceCache}; + +const BNB_USD_FEED: &str = "0x0567F2323251f0Aab15c8dFb1967E4e8A7D42aeE"; + +#[tokio::test] +async fn refresh_pulls_live_bnb_usd_price() { + let _ = dotenvy::dotenv(); + let Ok(ws_url) = std::env::var("BNB_WS_URL") else { + eprintln!("skipping: BNB_WS_URL not set"); + return; + }; + + let provider = ProviderBuilder::new() + .on_ws(WsConnect::new(ws_url)) + .await + .expect("ws connect"); + + let mut feeds = HashMap::new(); + feeds.insert("BNB".to_string(), Address::from_str(BNB_USD_FEED).unwrap()); + + let cache = PriceCache::new(Arc::new(provider), feeds, DEFAULT_MAX_AGE); + + let price = cache.refresh("BNB").await.expect("refresh BNB"); + assert!(price.price > U256::ZERO, "BNB price should be positive"); + assert!(price.decimals >= 6, "Chainlink decimals are typically 8"); + assert!(cache.is_fresh("BNB", &price)); + + let cached = cache.get("BNB").expect("cached after refresh"); + assert_eq!(cached.price, price.price); +} + +#[tokio::test] +async fn stale_rejection_triggers_when_max_age_is_zero() { + let _ = dotenvy::dotenv(); + let Ok(ws_url) = std::env::var("BNB_WS_URL") else { + eprintln!("skipping: BNB_WS_URL not set"); + return; + }; + + let provider = ProviderBuilder::new() + .on_ws(WsConnect::new(ws_url)) + .await + .expect("ws connect"); + + let mut feeds = HashMap::new(); + feeds.insert("BNB".to_string(), Address::from_str(BNB_USD_FEED).unwrap()); + + // max_age = 0 forces every feed to look stale. + let cache = PriceCache::new(Arc::new(provider), feeds, Duration::from_secs(0)); + let err = cache + .refresh("BNB") + .await + .expect_err("should reject as stale"); + assert!(format!("{err:#}").contains("stale")); +}