diff --git a/Cargo.lock b/Cargo.lock index 2cef147..9b6b394 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1155,6 +1155,9 @@ dependencies = [ "anyhow", "async-trait", "charon-core", + "futures-util", + "metrics", + "rand 0.8.6", "tokio", "tracing", ] @@ -2291,6 +2294,16 @@ version = "2.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f8ca58f447f06ed17d5fc4043ce1b10dd205e060fb3ce5b979b8ed8e59ff3f79" +[[package]] +name = "metrics" +version = "0.24.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5d5312e9ba3771cfa961b585728215e3d972c950a3eed9252aa093d6301277e8" +dependencies = [ + "ahash", + "portable-atomic", +] + [[package]] name = "mio" version = "1.2.0" @@ -2588,6 +2601,12 @@ version = "0.3.33" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "19f132c84eca552bf34cab8ec81f1c1dcc229b811638f9d283dceabe58c5569e" +[[package]] +name = "portable-atomic" +version = "1.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c33a9471896f1c69cecef8d20cbe2f7accd12527ce60845ff44c153bb2a21b49" + [[package]] name = "potential_utf" version = "0.1.5" diff --git a/Cargo.toml b/Cargo.toml index 899ea0c..c4b3981 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -44,6 +44,15 @@ thiserror = "2" # Async trait objects async-trait = "0.1" +# Stream adapters (StreamExt for subscription streams) +futures-util = "0.3" + +# Backoff jitter +rand = "0.8" + +# Prometheus-style metrics recorder facade +metrics = "0.24" + # CLI clap = { version = "4", features = ["derive", "env"] } diff --git a/crates/charon-cli/src/main.rs b/crates/charon-cli/src/main.rs index 1371596..143ed5a 100644 --- a/crates/charon-cli/src/main.rs +++ b/crates/charon-cli/src/main.rs @@ -10,11 +10,17 @@ use std::path::PathBuf; use anyhow::{Context, Result}; use charon_core::Config; -use charon_scanner::ChainProvider; +use charon_scanner::{BlockListener, ChainEvent, ChainProvider}; use clap::{Parser, Subcommand}; -use tracing::info; +use tokio::sync::mpsc; +use tracing::{info, warn}; use tracing_subscriber::EnvFilter; +/// Size of the fan-in channel from listeners to the scanner pipeline. +/// One slot per ~5 blocks across all chains covers short stalls without +/// back-pressuring the listener task. +const CHAIN_EVENT_CHANNEL: usize = 1024; + /// Charon — multi-chain flash-loan liquidation bot. #[derive(Parser, Debug)] #[command(version, about, long_about = None)] @@ -34,8 +40,10 @@ struct Cli { #[derive(Subcommand, Debug)] enum Command { - /// Listen to chain events and track positions. - /// (Scanner wiring lands across multiple M1 issues — currently a stub.) + /// Spawn one block listener per configured chain and drain chain events. + /// + /// Downstream pipeline (scanner → profit calc → executor) consumes + /// the same channel once those layers land. Listen, /// Connect to a configured chain and print its latest block number. @@ -98,23 +106,90 @@ async fn main() -> Result<()> { Ok(()) } -/// Long-running listener entry point. Exits cleanly on SIGINT or SIGTERM so -/// the Docker `stop` → SIGTERM → SIGKILL sequence never tears mid-operation. -async fn run_listen(_config: &Config) -> Result<()> { - info!("listen: not wired up yet — scanner arrives in Day 2"); +/// Long-running listener entry point. Spawns one `BlockListener` per +/// configured chain, drains the shared `ChainEvent` channel, and exits +/// cleanly on SIGINT or SIGTERM so the Docker `stop` → SIGTERM → SIGKILL +/// sequence never tears mid-operation. +async fn run_listen(config: &Config) -> Result<()> { + if config.chain.is_empty() { + anyhow::bail!("no chains configured — nothing to listen to"); + } + + let (tx, mut rx) = mpsc::channel::(CHAIN_EVENT_CHANNEL); + let mut listeners: tokio::task::JoinSet<(String, Result<()>)> = + tokio::task::JoinSet::new(); + + // `ChainConfig: Clone` — we only borrow `config`, so each listener task + // gets its own owned copy. + for (name, chain_cfg) in &config.chain { + let name = name.clone(); + let chain_cfg = chain_cfg.clone(); + let listener = BlockListener::new(name.clone(), chain_cfg, tx.clone()); + listeners.spawn(async move { (name, listener.run().await) }); + } + // Drop our sender so the channel closes when every listener exits. + drop(tx); + + info!("listen: draining chain events (Ctrl-C or SIGTERM to stop)"); tokio::select! { + _ = async { + while let Some(event) = rx.recv().await { + match event { + ChainEvent::NewBlock { chain, number, timestamp, backfill } => { + tracing::debug!( + chain = %chain, + block = number, + timestamp = timestamp, + backfill, + "cli drained event" + ); + } + _ => {} + } + } + } => { + info!("all listeners exited"); + } + _ = supervise(&mut listeners) => { + info!("all listener tasks terminated"); + } _ = tokio::signal::ctrl_c() => { info!("received SIGINT, shutting down"); + listeners.shutdown().await; } _ = wait_sigterm() => { info!("received SIGTERM, shutting down"); + listeners.shutdown().await; } } Ok(()) } +/// Drain a `JoinSet` of listener tasks and surface panics / errors per chain. +/// Returns when every listener has exited so the caller can shut down. +async fn supervise( + listeners: &mut tokio::task::JoinSet<(String, Result<()>)>, +) { + while let Some(joined) = listeners.join_next().await { + match joined { + Ok((name, Ok(()))) => { + info!(chain = %name, "listener exited cleanly"); + } + Ok((name, Err(err))) => { + warn!(chain = %name, error = ?err, "listener terminated with error"); + } + Err(err) if err.is_panic() => { + warn!(error = ?err, "listener panicked"); + } + Err(err) => { + warn!(error = ?err, "listener join error"); + } + } + } +} + #[cfg(unix)] async fn wait_sigterm() { use tokio::signal::unix::{SignalKind, signal}; diff --git a/crates/charon-scanner/Cargo.toml b/crates/charon-scanner/Cargo.toml index 0a333de..7389732 100644 --- a/crates/charon-scanner/Cargo.toml +++ b/crates/charon-scanner/Cargo.toml @@ -12,6 +12,9 @@ anyhow = { workspace = true } async-trait = { workspace = true } tokio = { workspace = true } tracing = { workspace = true } +futures-util = { workspace = true } +rand = { workspace = true } +metrics = { workspace = true } [lints] workspace = true diff --git a/crates/charon-scanner/src/lib.rs b/crates/charon-scanner/src/lib.rs index 401a66b..052e807 100644 --- a/crates/charon-scanner/src/lib.rs +++ b/crates/charon-scanner/src/lib.rs @@ -1,5 +1,7 @@ //! Charon scanner — chain listener and health-factor scanner. +pub mod listener; pub mod provider; +pub use listener::{BlockListener, ChainEvent}; pub use provider::{ChainProvider, ChainProviderT, MockChainProvider}; diff --git a/crates/charon-scanner/src/listener.rs b/crates/charon-scanner/src/listener.rs new file mode 100644 index 0000000..cbced3a --- /dev/null +++ b/crates/charon-scanner/src/listener.rs @@ -0,0 +1,199 @@ +//! Block listener — the bot's heartbeat. +//! +//! Subscribes to `newHeads` over WebSocket and forwards each block into the +//! scanning pipeline via an `mpsc` channel. Wrapped in an outer loop so a +//! dropped WebSocket triggers a reconnect with jittered exponential backoff +//! instead of killing the bot. Reconnects backfill any blocks produced +//! during the disconnect window so the scanner never silently skips heads. + +use std::time::Duration; + +use alloy::providers::Provider; +use anyhow::{Context, Result}; +use charon_core::config::ChainConfig; +use futures_util::StreamExt; +use rand::Rng; +use tokio::sync::mpsc; +use tracing::{debug, info, warn}; + +use crate::provider::ChainProvider; + +/// Maximum reconnect backoff. BSC blocks every ~3 s, so 30 s is ~10 missed +/// blocks — the ceiling for how many we are willing to backfill at once. +const MAX_BACKOFF: Duration = Duration::from_secs(30); + +/// Upstream chain events produced by the listener. +/// +/// Held in a dedicated enum so the pipeline can grow new event kinds +/// (`ProtocolLog`, `OraclePriceUpdate`, …) without changing channel types. +#[derive(Debug, Clone)] +#[non_exhaustive] +pub enum ChainEvent { + /// A new head block arrived on the chain. + NewBlock { + /// Short chain name (matches `[chain.]` key in config). + chain: String, + number: u64, + /// Unix timestamp from the block header. + timestamp: u64, + /// `true` if the block was synthesised via reconnect backfill. + backfill: bool, + }, +} + +/// Listens to a single chain's `newHeads` stream and forwards events. +/// +/// The listener owns a fresh `ChainProvider` for each connection attempt, +/// so a WebSocket drop recovers cleanly by reconnecting from scratch on +/// the next loop iteration. +pub struct BlockListener { + name: String, + config: ChainConfig, + tx: mpsc::Sender, + last_seen: Option, +} + +impl BlockListener { + /// Build a listener for a single chain. + pub fn new( + name: impl Into, + config: ChainConfig, + tx: mpsc::Sender, + ) -> Self { + Self { + name: name.into(), + config, + tx, + last_seen: None, + } + } + + /// Run the listener forever. Reconnects with jittered exponential backoff + /// on any connection or subscription error. Returns `Ok(())` only if the + /// receiving side of the channel is dropped. + pub async fn run(mut self) -> Result<()> { + let mut backoff = Duration::from_secs(1); + loop { + metrics::counter!("charon_listener_connects_total", "chain" => self.name.clone()) + .increment(1); + match self.run_once().await { + Ok(()) => { + info!(chain = %self.name, "listener channel closed, exiting"); + return Ok(()); + } + Err(err) => { + metrics::counter!( + "charon_listener_disconnects_total", + "chain" => self.name.clone() + ) + .increment(1); + let jitter_ms = rand::thread_rng() + .gen_range(0..=(backoff.as_millis() as u64).saturating_div(4)); + let wait = backoff + Duration::from_millis(jitter_ms); + warn!( + chain = %self.name, + error = ?err, + wait_ms = wait.as_millis() as u64, + "listener error, reconnecting after jittered backoff" + ); + tokio::time::sleep(wait).await; + backoff = (backoff * 2).min(MAX_BACKOFF); + } + } + } + } + + /// One connect → subscribe → drain cycle. Returns `Ok(())` if the + /// stream ends cleanly (receiver dropped); returns `Err` on any + /// connection or subscription failure so `run` can retry. + async fn run_once(&mut self) -> Result<()> { + let provider = ChainProvider::connect(&self.name, &self.config).await?; + + // Backfill blocks produced during any prior disconnect window. + if let Some(last) = self.last_seen { + let head = provider + .provider() + .get_block_number() + .await + .with_context(|| format!("chain '{}': get_block_number failed", self.name))?; + if head > last + 1 { + let gap = head - (last + 1); + warn!( + chain = %self.name, + from = last + 1, + to = head - 1, + gap, + "reconnect gap detected — backfilling" + ); + for number in (last + 1)..head { + let header = provider + .provider() + .get_block_by_number(number.into(), false.into()) + .await + .with_context(|| { + format!("chain '{}': get_block_by_number({number}) failed", self.name) + })?; + let ts = header.map(|b| b.header.timestamp).unwrap_or_default(); + self.publish(number, ts, true); + } + } + } + + let sub = provider + .provider() + .subscribe_blocks() + .await + .with_context(|| format!("chain '{}': subscribe_blocks failed", self.name))?; + info!(chain = %self.name, "block subscription established"); + + let mut stream = sub.into_stream(); + while let Some(header) = stream.next().await { + self.publish(header.number, header.timestamp, false); + } + + anyhow::bail!("chain '{}': subscription stream ended", self.name) + } + + /// Emit a `ChainEvent::NewBlock` into the channel. Non-blocking so a + /// stalled consumer cannot stall the WebSocket drain loop; full channel + /// drops the event with a warning (back-pressure visible to ops). + fn publish(&mut self, number: u64, timestamp: u64, backfill: bool) { + metrics::counter!("charon_blocks_received_total", "chain" => self.name.clone()) + .increment(1); + debug!( + chain = %self.name, + block = number, + timestamp, + backfill, + "new block" + ); + self.last_seen = Some(match self.last_seen { + Some(prev) => prev.max(number), + None => number, + }); + let event = ChainEvent::NewBlock { + chain: self.name.clone(), + number, + timestamp, + backfill, + }; + match self.tx.try_send(event) { + Ok(()) => {} + Err(mpsc::error::TrySendError::Full(_)) => { + metrics::counter!( + "charon_listener_dropped_events_total", + "chain" => self.name.clone() + ) + .increment(1); + warn!( + chain = %self.name, + block = number, + "channel full — block dropped" + ); + } + Err(mpsc::error::TrySendError::Closed(_)) => { + debug!(chain = %self.name, "receiver closed, stop publishing"); + } + } + } +}