diff --git a/Cargo.lock b/Cargo.lock index 51eaa0f..2c67907 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1132,6 +1132,7 @@ dependencies = [ "charon-scanner", "clap", "dotenvy", + "metrics", "tokio", "tracing", "tracing-subscriber", @@ -1171,6 +1172,7 @@ dependencies = [ "anyhow", "async-trait", "charon-core", + "dashmap", "futures-util", "metrics", "rand 0.8.6", diff --git a/Cargo.toml b/Cargo.toml index feb62d3..326d454 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -48,6 +48,9 @@ async-trait = "0.1" # Stream adapters (StreamExt for subscription streams) futures-util = "0.3" +# Lock-free concurrent hashmap for scanner state +dashmap = "6" + # Backoff jitter rand = "0.8" diff --git a/config/default.toml b/config/default.toml index 80c9840..38a202d 100644 --- a/config/default.toml +++ b/config/default.toml @@ -12,6 +12,14 @@ min_profit_usd_1e6 = 5000000 max_gas_wei = "3000000000" # Polling cadence for protocols without push events (ms). scan_interval_ms = 1000 +# Health-factor band (bps of 1e18): liquidatable ≤ 10_000, near-liq window +# up to `near_liq_threshold_bps`. Omitted keys fall back to serde defaults: +# liquidatable_threshold_bps = 10000 (HF = 1.00) +# near_liq_threshold_bps = 10500 (HF = 1.05) +# Scan cadences per bucket (blocks): defaults hot=1 / warm=10 / cold=100. +# hot_scan_blocks = 1 +# warm_scan_blocks = 10 +# cold_scan_blocks = 100 # ── Chains ──────────────────────────────────────────────────────────────── [chain.bnb] diff --git a/crates/charon-cli/Cargo.toml b/crates/charon-cli/Cargo.toml index ff140bd..3cf6a82 100644 --- a/crates/charon-cli/Cargo.toml +++ b/crates/charon-cli/Cargo.toml @@ -20,6 +20,7 @@ anyhow = { workspace = true } tracing = { workspace = true } tracing-subscriber = { workspace = true } dotenvy = { workspace = true } +metrics = { workspace = true } [lints] workspace = true diff --git a/crates/charon-cli/src/main.rs b/crates/charon-cli/src/main.rs index 6d44e69..9942c8c 100644 --- a/crates/charon-cli/src/main.rs +++ b/crates/charon-cli/src/main.rs @@ -16,7 +16,9 @@ use alloy::providers::{ProviderBuilder, WsConnect}; use anyhow::{Context, Result}; use charon_core::{Config, LendingProtocol}; use charon_protocols::VenusAdapter; -use charon_scanner::{BlockListener, ChainEvent, ChainProvider}; +use charon_scanner::{ + BlockListener, ChainEvent, ChainProvider, HealthScanner, PositionBucket, ScanScheduler, +}; use clap::{Parser, Subcommand}; use tokio::sync::mpsc; use tracing::{info, warn}; @@ -126,49 +128,74 @@ async fn main() -> Result<()> { /// cleanly on SIGINT or SIGTERM so the Docker `stop` → SIGTERM → SIGKILL /// sequence never tears mid-operation. /// -/// For every `NewBlock` event on a chain with a `[protocol.venus]` entry, -/// the Venus adapter scans the supplied borrower list anchored at the -/// observed block. Chains without a Venus protocol config still flow -/// through the drain loop but trigger no protocol scans (v0.1 scope). +/// For every `NewBlock` event on a chain with a `[protocol.venus]` entry +/// the Venus adapter fetches positions anchored at the observed block, +/// pushes them through the bucketed [`HealthScanner`], and limits fetches +/// to buckets whose cadence fires this block via [`ScanScheduler`]. +/// Chains without a Venus protocol config still flow through the drain +/// loop but trigger no protocol scans (v0.1 scope). +/// +/// Backfill blocks (synthesised during WebSocket reconnect) are logged +/// but not scanned — the state they would produce is superseded by the +/// next real head and a fresh scan is cheaper than retroactive bucket +/// transitions. async fn run_listen(config: &Config, borrowers: Vec
) -> Result<()> { if config.chain.is_empty() { anyhow::bail!("no chains configured — nothing to listen to"); } - // Venus adapter is currently single-chain (BNB) per config scope. - // Build it only if `[protocol.venus]` exists and its target chain is - // configured; otherwise run the listener pipeline without a scanner. - let venus_adapter: Option<(String, Arc)> = match config.protocol.get("venus") { - Some(venus_cfg) => { - let chain_name = &venus_cfg.chain; - let chain_cfg = config.chain.get(chain_name).with_context(|| { - format!( - "protocol 'venus' references chain '{chain_name}' which is not in [chain.*]" - ) - })?; - let adapter_ws = ProviderBuilder::new() - .on_ws(WsConnect::new(&chain_cfg.ws_url)) - .await - .context("venus adapter: failed to connect over ws")?; - let adapter = - Arc::new(VenusAdapter::connect(Arc::new(adapter_ws), venus_cfg.comptroller).await?); - info!( - chain = %chain_name, - borrower_count = borrowers.len(), - market_count = adapter.markets().await.len(), - "venus adapter ready" - ); - Some((chain_name.clone(), adapter)) - } - None => { - info!("no [protocol.venus] configured — listener will drain events without scanning"); - None - } - }; + // Venus adapter + bucketed scanner + cadence scheduler are currently + // single-chain (BNB) per config scope. Build them only if + // `[protocol.venus]` exists and its target chain is configured; + // otherwise run the listener pipeline without a scanner. + let venus_adapter: Option<(String, Arc, Arc, ScanScheduler)> = + match config.protocol.get("venus") { + Some(venus_cfg) => { + let chain_name = &venus_cfg.chain; + let chain_cfg = config.chain.get(chain_name).with_context(|| { + format!( + "protocol 'venus' references chain '{chain_name}' which is not in [chain.*]" + ) + })?; + let adapter_ws = ProviderBuilder::new() + .on_ws(WsConnect::new(&chain_cfg.ws_url)) + .await + .context("venus adapter: failed to connect over ws")?; + let adapter = Arc::new( + VenusAdapter::connect(Arc::new(adapter_ws), venus_cfg.comptroller).await?, + ); + let scanner = Arc::new(HealthScanner::new( + config.bot.liquidatable_threshold_bps, + config.bot.near_liq_threshold_bps, + )?); + let sched = ScanScheduler::new( + config.bot.hot_scan_blocks, + config.bot.warm_scan_blocks, + config.bot.cold_scan_blocks, + ); + info!( + chain = %chain_name, + borrower_count = borrowers.len(), + market_count = adapter.markets().await.len(), + liquidatable_bps = config.bot.liquidatable_threshold_bps, + near_liq_bps = config.bot.near_liq_threshold_bps, + hot_blocks = sched.hot, + warm_blocks = sched.warm, + cold_blocks = sched.cold, + "venus adapter + scanner ready" + ); + Some((chain_name.clone(), adapter, scanner, sched)) + } + None => { + info!( + "no [protocol.venus] configured — listener will drain events without scanning" + ); + None + } + }; let (tx, mut rx) = mpsc::channel::(CHAIN_EVENT_CHANNEL); - let mut listeners: tokio::task::JoinSet<(String, Result<()>)> = - tokio::task::JoinSet::new(); + let mut listeners: tokio::task::JoinSet<(String, Result<()>)> = tokio::task::JoinSet::new(); // `ChainConfig: Clone` — we only borrow `config`, so each listener task // gets its own owned copy. @@ -183,6 +210,11 @@ async fn run_listen(config: &Config, borrowers: Vec
) -> Result<()> { info!("listen: draining chain events (Ctrl-C or SIGTERM to stop)"); + // The first real (non-backfill) block on the Venus chain seeds the + // scanner with the operator-supplied borrower list. Subsequent scans + // pull from the scheduler-selected bucket membership so we don't + // burn RPC re-fetching COLD positions every block. + let mut seeded = false; tokio::select! { _ = async { while let Some(event) = rx.recv().await { @@ -195,32 +227,67 @@ async fn run_listen(config: &Config, borrowers: Vec
) -> Result<()> { backfill, "cli drained event" ); - // Route to Venus scan only when this event is for - // the chain the Venus adapter was configured on. - if let Some((venus_chain, adapter)) = venus_adapter.as_ref() { - if venus_chain == &chain { - let start = std::time::Instant::now(); - let block_tag = BlockNumberOrTag::Number(number); - match adapter.fetch_positions(&borrowers, block_tag).await { - Ok(positions) => { - info!( - chain = %chain, - block = number, - timestamp, - backfill, - tracked = borrowers.len(), - returned = positions.len(), - scan_ms = start.elapsed().as_millis() as u64, - "venus scan" - ); + if backfill { + // Skip backfill — the next real head will + // snapshot the final state of the missed range. + continue; + } + if let Some((venus_chain, adapter, scanner, sched)) = + venus_adapter.as_ref() + { + if venus_chain != &chain { + continue; + } + let start = std::time::Instant::now(); + let scan_set: Vec
= if !seeded { + seeded = true; + borrowers.clone() + } else { + let mut v = Vec::new(); + for b in [ + PositionBucket::Liquidatable, + PositionBucket::NearLiquidation, + PositionBucket::Healthy, + ] { + if sched.should_scan(b, number) { + v.extend(scanner.borrowers_in_bucket(b)); } - Err(err) => warn!( + } + v + }; + if scan_set.is_empty() { + continue; + } + let block_tag = BlockNumberOrTag::Number(number); + match adapter.fetch_positions(&scan_set, block_tag).await { + Ok(positions) => { + let returned = positions.len(); + scanner.upsert(positions.clone()); + scanner.prune(&positions); + let counts = scanner.bucket_counts(); + metrics::histogram!( + "charon_scanner_scan_duration_seconds" + ) + .record(start.elapsed().as_secs_f64()); + info!( chain = %chain, block = number, - error = ?err, - "venus scan failed" - ), + timestamp, + tracked = scan_set.len(), + returned, + healthy = counts.healthy, + near_liq = counts.near_liquidation, + liquidatable = counts.liquidatable, + scan_ms = start.elapsed().as_millis() as u64, + "venus scan" + ); } + Err(err) => warn!( + chain = %chain, + block = number, + error = ?err, + "venus scan failed" + ), } } } @@ -248,9 +315,7 @@ async fn run_listen(config: &Config, borrowers: Vec
) -> Result<()> { /// Drain a `JoinSet` of listener tasks and surface panics / errors per chain. /// Returns when every listener has exited so the caller can shut down. -async fn supervise( - listeners: &mut tokio::task::JoinSet<(String, Result<()>)>, -) { +async fn supervise(listeners: &mut tokio::task::JoinSet<(String, Result<()>)>) { while let Some(joined) = listeners.join_next().await { match joined { Ok((name, Ok(()))) => { diff --git a/crates/charon-core/src/config.rs b/crates/charon-core/src/config.rs index c92877a..ff49574 100644 --- a/crates/charon-core/src/config.rs +++ b/crates/charon-core/src/config.rs @@ -96,6 +96,41 @@ pub struct BotConfig { pub max_gas_wei: U256, /// Polling interval for protocols that don't push events. pub scan_interval_ms: u64, + /// Health factor at or below which a position becomes liquidatable, + /// in basis points of 1e18 (10_000 = 1.0). Integer bps over f64 so + /// the boundary has no ULP-level drift (1.05 as f64 truncates to + /// 1_049_999_999_999_999_872 in 1e18 scale and silently leaks + /// positions out of the NearLiquidation bucket). + #[serde(default = "default_liquidatable_threshold_bps")] + pub liquidatable_threshold_bps: u32, + /// Upper bound of the near-liquidation watch band, same bps space. + #[serde(default = "default_near_liq_threshold_bps")] + pub near_liq_threshold_bps: u32, + /// HOT (Liquidatable) bucket scan cadence, in blocks. Default 1. + #[serde(default = "default_hot_scan_blocks")] + pub hot_scan_blocks: u64, + /// WARM (NearLiquidation) bucket scan cadence. Default every 10 blocks. + #[serde(default = "default_warm_scan_blocks")] + pub warm_scan_blocks: u64, + /// COLD (Healthy) bucket scan cadence. Default every 100 blocks. + #[serde(default = "default_cold_scan_blocks")] + pub cold_scan_blocks: u64, +} + +fn default_liquidatable_threshold_bps() -> u32 { + 10_000 // 1.0000 +} +fn default_near_liq_threshold_bps() -> u32 { + 10_500 // 1.0500 +} +fn default_hot_scan_blocks() -> u64 { + 1 +} +fn default_warm_scan_blocks() -> u64 { + 10 +} +fn default_cold_scan_blocks() -> u64 { + 100 } /// RPC endpoints for a single chain. **The URLs typically embed API keys; @@ -171,11 +206,26 @@ impl Config { Ok(config) } - /// Cross-reference chain keys, reject sentinel zero addresses. + /// Cross-reference chain keys, reject sentinel zero addresses, and + /// sanity-check scanner bucket thresholds + cadence. fn validate(&self) -> Result<()> { if self.chain.is_empty() { return Err(ConfigError::Validation("no [chain.*] entries".into())); } + if self.bot.near_liq_threshold_bps <= self.bot.liquidatable_threshold_bps { + return Err(ConfigError::Validation(format!( + "near_liq_threshold_bps ({}) must be > liquidatable_threshold_bps ({})", + self.bot.near_liq_threshold_bps, self.bot.liquidatable_threshold_bps + ))); + } + if self.bot.hot_scan_blocks == 0 + || self.bot.warm_scan_blocks == 0 + || self.bot.cold_scan_blocks == 0 + { + return Err(ConfigError::Validation( + "hot/warm/cold_scan_blocks must all be > 0".into(), + )); + } for (name, p) in &self.protocol { if !self.chain.contains_key(&p.chain) { return Err(ConfigError::Validation(format!( diff --git a/crates/charon-scanner/Cargo.toml b/crates/charon-scanner/Cargo.toml index 7389732..89a5caa 100644 --- a/crates/charon-scanner/Cargo.toml +++ b/crates/charon-scanner/Cargo.toml @@ -13,6 +13,7 @@ async-trait = { workspace = true } tokio = { workspace = true } tracing = { workspace = true } futures-util = { workspace = true } +dashmap = { workspace = true } rand = { workspace = true } metrics = { workspace = true } diff --git a/crates/charon-scanner/src/lib.rs b/crates/charon-scanner/src/lib.rs index 052e807..0575442 100644 --- a/crates/charon-scanner/src/lib.rs +++ b/crates/charon-scanner/src/lib.rs @@ -2,6 +2,10 @@ pub mod listener; pub mod provider; +pub mod scanner; pub use listener::{BlockListener, ChainEvent}; pub use provider::{ChainProvider, ChainProviderT, MockChainProvider}; +pub use scanner::{ + BucketCounts, BucketedPosition, HealthScanner, PositionBucket, ScanScheduler, +}; diff --git a/crates/charon-scanner/src/scanner.rs b/crates/charon-scanner/src/scanner.rs new file mode 100644 index 0000000..e7e8382 --- /dev/null +++ b/crates/charon-scanner/src/scanner.rs @@ -0,0 +1,357 @@ +//! Health-factor scanner — 3-bucket classifier + per-bucket scheduler. +//! +//! Protocol adapters supply positions; the scanner classifies each into +//! one of three buckets based on its `health_factor`: +//! +//! * **Liquidatable** (HOT) — `hf < liquidatable_threshold` (1.0 by default). +//! * **NearLiquidation** (WARM) — `liquidatable ≤ hf < near_liq`. +//! * **Healthy** (COLD) — everything else. +//! +//! The [`ScanScheduler`] answers "do I re-scan this bucket on this block?" +//! from the configured `{hot,warm,cold}_scan_blocks` cadence, so the scanner +//! does not burn RPC on a COLD bucket every block. +//! +//! Storage is a single [`DashMap`] — lock-free, shard-partitioned. The map +//! supports `prune()` so borrowers that fully repay are removed and do +//! not linger as stale Liquidatable entries forever. + +use std::collections::HashSet; + +use alloy::primitives::{Address, U256}; +use charon_core::Position; +use dashmap::DashMap; +use tracing::warn; + +/// Which classification bucket a borrower's position currently falls into. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub enum PositionBucket { + /// Safely over-collateralized; nothing to do (COLD). + Healthy, + /// Close to the liquidation boundary (WARM). + NearLiquidation, + /// Currently liquidatable (HOT). + Liquidatable, +} + +impl PositionBucket { + fn label(self) -> &'static str { + match self { + PositionBucket::Healthy => "healthy", + PositionBucket::NearLiquidation => "near_liquidation", + PositionBucket::Liquidatable => "liquidatable", + } + } +} + +#[derive(Debug, Clone)] +pub struct BucketedPosition { + pub position: Position, + pub bucket: PositionBucket, +} + +#[derive(Debug, Clone, Default)] +pub struct BucketCounts { + pub healthy: usize, + pub near_liquidation: usize, + pub liquidatable: usize, +} + +impl BucketCounts { + pub fn total(&self) -> usize { + self.healthy + self.near_liquidation + self.liquidatable + } +} + +/// Per-bucket scan cadence driver. +/// +/// `should_scan(bucket, block)` returns true when the given block number +/// falls on the bucket's cadence. HOT cadence is usually 1 (every block). +#[derive(Debug, Clone, Copy)] +pub struct ScanScheduler { + pub hot: u64, + pub warm: u64, + pub cold: u64, +} + +impl ScanScheduler { + pub fn new(hot: u64, warm: u64, cold: u64) -> Self { + Self { + hot: hot.max(1), + warm: warm.max(1), + cold: cold.max(1), + } + } + pub fn should_scan(&self, bucket: PositionBucket, block: u64) -> bool { + let period = match bucket { + PositionBucket::Liquidatable => self.hot, + PositionBucket::NearLiquidation => self.warm, + PositionBucket::Healthy => self.cold, + }; + block % period == 0 + } +} + +/// 3-bucket health-factor scanner. +pub struct HealthScanner { + liquidatable_threshold: U256, + near_liq_threshold: U256, + positions: DashMap, +} + +impl HealthScanner { + /// Build a scanner from basis-point thresholds of 1e18 (10_000 = 1.0). + pub fn new(liquidatable_bps: u32, near_liq_bps: u32) -> anyhow::Result { + if liquidatable_bps > near_liq_bps { + anyhow::bail!( + "liquidatable_threshold_bps ({liquidatable_bps}) must be ≤ near_liq_threshold_bps ({near_liq_bps})" + ); + } + Ok(Self { + liquidatable_threshold: bps_to_1e18(liquidatable_bps), + near_liq_threshold: bps_to_1e18(near_liq_bps), + positions: DashMap::new(), + }) + } + + /// Warn on startup if the supplied positions all carry the known + /// binary-HF sentinel values (0 / 2e18) emitted by adapters that have + /// not yet implemented a real ratio — otherwise the NearLiquidation + /// bucket is silently dead code. + pub fn warn_if_binary_sentinel(&self, sample: &[Position]) { + if sample.is_empty() { + return; + } + let scale = U256::from(10u64).pow(U256::from(18u64)); + let binary = sample + .iter() + .all(|p| p.health_factor == U256::ZERO || p.health_factor == scale * U256::from(2u64)); + if binary { + warn!( + "every observed health_factor is 0 or 2e18 — adapter appears to emit a binary sentinel; \ + NearLiquidation bucket will never populate until real HF is computed" + ); + } + } + + pub fn classify(&self, health_factor: U256) -> PositionBucket { + if health_factor < self.liquidatable_threshold { + PositionBucket::Liquidatable + } else if health_factor < self.near_liq_threshold { + PositionBucket::NearLiquidation + } else { + PositionBucket::Healthy + } + } + + /// Upsert a batch of freshly-fetched positions. Detects per-borrower + /// bucket transitions and increments `charon_scanner_transitions_total`. + pub fn upsert(&self, positions: impl IntoIterator) { + for p in positions { + let new_bucket = self.classify(p.health_factor); + let prev_bucket = self + .positions + .get(&p.borrower) + .map(|e| e.value().bucket); + self.positions.insert( + p.borrower, + BucketedPosition { + position: p, + bucket: new_bucket, + }, + ); + if let Some(prev) = prev_bucket { + if prev != new_bucket { + metrics::counter!( + "charon_scanner_transitions_total", + "from" => prev.label(), + "to" => new_bucket.label() + ) + .increment(1); + } + } + } + self.publish_gauges(); + } + + /// Remove a single borrower (e.g. after full repayment detected). + pub fn remove(&self, borrower: &Address) { + self.positions.remove(borrower); + self.publish_gauges(); + } + + /// Drop every tracked borrower that is **not** in `current`. Called by + /// the scan loop after `upsert(fresh_positions)` so positions whose debt + /// has been repaid (and thus no longer appear in the adapter response) + /// stop being reported as Liquidatable. + pub fn prune(&self, current: &[Position]) { + let keep: HashSet
= current.iter().map(|p| p.borrower).collect(); + self.positions.retain(|addr, _| keep.contains(addr)); + self.publish_gauges(); + } + + pub fn bucket_counts(&self) -> BucketCounts { + let mut counts = BucketCounts::default(); + for entry in self.positions.iter() { + match entry.value().bucket { + PositionBucket::Healthy => counts.healthy += 1, + PositionBucket::NearLiquidation => counts.near_liquidation += 1, + PositionBucket::Liquidatable => counts.liquidatable += 1, + } + } + counts + } + + /// Update the `charon_scanner_borrowers_in_bucket{bucket}` gauges. + fn publish_gauges(&self) { + let counts = self.bucket_counts(); + metrics::gauge!("charon_scanner_borrowers_in_bucket", "bucket" => "healthy") + .set(counts.healthy as f64); + metrics::gauge!("charon_scanner_borrowers_in_bucket", "bucket" => "near_liquidation") + .set(counts.near_liquidation as f64); + metrics::gauge!("charon_scanner_borrowers_in_bucket", "bucket" => "liquidatable") + .set(counts.liquidatable as f64); + } + + pub fn liquidatable(&self) -> Vec { + self.positions + .iter() + .filter(|e| e.value().bucket == PositionBucket::Liquidatable) + .map(|e| e.value().position.clone()) + .collect() + } + + pub fn near_liquidation(&self) -> Vec { + self.positions + .iter() + .filter(|e| e.value().bucket == PositionBucket::NearLiquidation) + .map(|e| e.value().position.clone()) + .collect() + } + + /// Return the borrowers currently assigned to `bucket`. Used by the + /// scan scheduler to fetch only the subset that is due this block. + pub fn borrowers_in_bucket(&self, bucket: PositionBucket) -> Vec
{ + self.positions + .iter() + .filter(|e| e.value().bucket == bucket) + .map(|e| *e.key()) + .collect() + } +} + +/// Convert a basis-point value into a 1e18-fixed `U256`. 10_000 bps == 1.0e18. +/// Integer arithmetic only — no f64 at any point. +pub fn bps_to_1e18(bps: u32) -> U256 { + // 1 bps = 1e14 in 1e18 scale. + U256::from(bps) * U256::from(10u64).pow(U256::from(14u64)) +} + +#[cfg(test)] +mod tests { + use super::*; + use alloy::primitives::address; + use charon_core::ProtocolId; + + fn one_e18() -> U256 { + U256::from(10u64).pow(U256::from(18u64)) + } + + fn mk_position(borrower_byte: u8, hf: U256) -> Position { + let mut bytes = [0u8; 20]; + bytes[19] = borrower_byte; + Position { + protocol: ProtocolId::Venus, + chain_id: 56, + borrower: Address::from(bytes), + collateral_token: address!("0000000000000000000000000000000000000001"), + debt_token: address!("0000000000000000000000000000000000000002"), + collateral_amount: U256::ZERO, + debt_amount: U256::ZERO, + health_factor: hf, + liquidation_bonus_bps: 1000, + } + } + + #[test] + fn bps_to_1e18_exact_boundary() { + // 10_500 bps must be exactly 1.05e18 — catches the old f64 ULP bug. + assert_eq!( + bps_to_1e18(10_500), + U256::from(1_050_000_000_000_000_000u128) + ); + assert_eq!( + bps_to_1e18(10_000), + U256::from(1_000_000_000_000_000_000u128) + ); + } + + #[test] + fn classify_partitions_positions_into_three_buckets() { + let s = HealthScanner::new(10_000, 10_500).unwrap(); + let e18 = one_e18(); + + assert_eq!( + s.classify(e18 / U256::from(2u64)), + PositionBucket::Liquidatable + ); + assert_eq!(s.classify(e18), PositionBucket::NearLiquidation); + let p104 = U256::from(1_040_000_000_000_000_000u128); + assert_eq!(s.classify(p104), PositionBucket::NearLiquidation); + let p105 = U256::from(1_050_000_000_000_000_000u128); + assert_eq!(s.classify(p105), PositionBucket::Healthy); + assert_eq!(s.classify(e18 * U256::from(2u64)), PositionBucket::Healthy); + } + + #[test] + fn upsert_updates_buckets_and_counts() { + let s = HealthScanner::new(10_000, 10_500).unwrap(); + let e18 = one_e18(); + s.upsert([ + mk_position(1, U256::from(0u64)), + mk_position(2, U256::from(1_020_000_000_000_000_000u128)), + mk_position(3, e18 * U256::from(3u64)), + ]); + let c = s.bucket_counts(); + assert_eq!(c.liquidatable, 1); + assert_eq!(c.near_liquidation, 1); + assert_eq!(c.healthy, 1); + } + + #[test] + fn prune_drops_repaid_borrowers() { + let s = HealthScanner::new(10_000, 10_500).unwrap(); + s.upsert([ + mk_position(1, U256::from(0u64)), + mk_position(2, U256::from(0u64)), + ]); + assert_eq!(s.bucket_counts().liquidatable, 2); + // Only borrower 2 still has a position after repayment. + s.prune(&[mk_position(2, U256::from(0u64))]); + assert_eq!(s.bucket_counts().liquidatable, 1); + } + + #[test] + fn remove_drops_single_borrower() { + let s = HealthScanner::new(10_000, 10_500).unwrap(); + s.upsert([mk_position(1, U256::from(0u64))]); + let mut bytes = [0u8; 20]; + bytes[19] = 1; + s.remove(&Address::from(bytes)); + assert_eq!(s.bucket_counts().total(), 0); + } + + #[test] + fn scheduler_gates_per_bucket_cadence() { + let sched = ScanScheduler::new(1, 10, 100); + assert!(sched.should_scan(PositionBucket::Liquidatable, 17)); + assert!(sched.should_scan(PositionBucket::NearLiquidation, 20)); + assert!(!sched.should_scan(PositionBucket::NearLiquidation, 21)); + assert!(sched.should_scan(PositionBucket::Healthy, 100)); + assert!(!sched.should_scan(PositionBucket::Healthy, 101)); + } + + #[test] + fn rejects_inverted_thresholds() { + assert!(HealthScanner::new(10_500, 10_000).is_err()); + } +}