From 2087a21c75d31b0ee44f484b63a97be863b4460c Mon Sep 17 00:00:00 2001 From: obchain Date: Mon, 20 Apr 2026 17:03:30 +0530 Subject: [PATCH 1/2] feat(scanner): add BlockListener with mpsc fan-in + reconnect (closes #7) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The bot's heartbeat. One `BlockListener` per configured chain subscribes to `newHeads` over WebSocket and forwards each block into a shared mpsc channel that the scanning pipeline will consume. - New `crates/charon-scanner/src/listener.rs`: - `ChainEvent::NewBlock { chain, number, timestamp }` — enum shape so future event kinds (protocol logs, oracle updates) land without changing channel types - `BlockListener::run` loops `run_once`, reconnecting with exponential backoff (1s → 30s cap) on any subscription/transport error - Per-block structured log: `chain= block= timestamp=` - Clean shutdown when the receiver drops - CLI `listen` subcommand now: - Spawns a listener per `[chain.]` in config - Drains the shared channel at `debug!` level - Exits on Ctrl-C or when all listeners terminate - Adds `futures-util` as a workspace dep for `SubscriptionStream::next()` Verified against BSC mainnet: `cargo run -- listen` streamed 304 blocks in 2m16s with zero warnings — subscription stable, reconnect logic dormant (no drops seen). --- Cargo.lock | 1 + Cargo.toml | 3 + crates/charon-cli/src/main.rs | 67 ++++++++++++-- crates/charon-scanner/Cargo.toml | 1 + crates/charon-scanner/src/lib.rs | 2 + crates/charon-scanner/src/listener.rs | 124 ++++++++++++++++++++++++++ 6 files changed, 191 insertions(+), 7 deletions(-) create mode 100644 crates/charon-scanner/src/listener.rs diff --git a/Cargo.lock b/Cargo.lock index 5f48dbd..d5646e9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1153,6 +1153,7 @@ dependencies = [ "alloy", "anyhow", "charon-core", + "futures-util", "tokio", "tracing", ] diff --git a/Cargo.toml b/Cargo.toml index 3c91168..906efc8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -34,6 +34,9 @@ anyhow = "1" # Async trait objects async-trait = "0.1" +# Stream adapters (StreamExt for subscription streams) +futures-util = "0.3" + # CLI clap = { version = "4", features = ["derive"] } diff --git a/crates/charon-cli/src/main.rs b/crates/charon-cli/src/main.rs index bc5e77a..032e6ed 100644 --- a/crates/charon-cli/src/main.rs +++ b/crates/charon-cli/src/main.rs @@ -9,11 +9,17 @@ use std::path::PathBuf; use anyhow::{Context, Result}; use charon_core::Config; -use charon_scanner::ChainProvider; +use charon_scanner::{BlockListener, ChainEvent, ChainProvider}; use clap::{Parser, Subcommand}; -use tracing::info; +use tokio::sync::mpsc; +use tracing::{info, warn}; use tracing_subscriber::EnvFilter; +/// Size of the fan-in channel from listeners to the scanner pipeline. +/// One slot per ~5 blocks across all chains covers short stalls without +/// back-pressuring the listener task. +const CHAIN_EVENT_CHANNEL: usize = 1024; + /// Charon — multi-chain flash-loan liquidation bot. #[derive(Parser, Debug)] #[command(version, about, long_about = None)] @@ -28,8 +34,10 @@ struct Cli { #[derive(Subcommand, Debug)] enum Command { - /// Listen to chain events and track positions. - /// (Scanner wiring lands across multiple M1 issues — currently a stub.) + /// Spawn one block listener per configured chain and print new blocks. + /// + /// Downstream pipeline (scanner → profit calc → executor) consumes + /// the same channel once those layers land. Listen, /// Connect to a configured chain and print its latest block number. @@ -68,9 +76,7 @@ async fn main() -> Result<()> { ); match cli.command { - Command::Listen => { - info!("listen: not wired up yet — scanner arrives across M1 issues"); - } + Command::Listen => run_listen(config).await?, Command::TestConnection { chain } => { let chain_cfg = config .chain @@ -84,3 +90,50 @@ async fn main() -> Result<()> { Ok(()) } + +/// Spawn one `BlockListener` per configured chain, drain the shared +/// `ChainEvent` channel, and exit on Ctrl-C. +async fn run_listen(config: Config) -> Result<()> { + if config.chain.is_empty() { + anyhow::bail!("no chains configured — nothing to listen to"); + } + + let (tx, mut rx) = mpsc::channel::(CHAIN_EVENT_CHANNEL); + + for (name, chain_cfg) in config.chain { + let listener = BlockListener::new(name.clone(), chain_cfg, tx.clone()); + tokio::spawn(async move { + if let Err(err) = listener.run().await { + warn!(chain = %name, error = ?err, "listener terminated"); + } + }); + } + // Drop our sender so the channel closes when every listener exits. + drop(tx); + + info!("listen: draining chain events (Ctrl-C to stop)"); + + tokio::select! { + _ = async { + while let Some(event) = rx.recv().await { + match event { + ChainEvent::NewBlock { chain, number, timestamp } => { + tracing::debug!( + chain = %chain, + block = number, + timestamp = timestamp, + "cli drained event" + ); + } + } + } + } => { + info!("all listeners exited"); + } + _ = tokio::signal::ctrl_c() => { + info!("ctrl-c received, shutting down"); + } + } + + Ok(()) +} diff --git a/crates/charon-scanner/Cargo.toml b/crates/charon-scanner/Cargo.toml index f17080b..bc4d67c 100644 --- a/crates/charon-scanner/Cargo.toml +++ b/crates/charon-scanner/Cargo.toml @@ -11,3 +11,4 @@ alloy = { workspace = true } anyhow = { workspace = true } tokio = { workspace = true } tracing = { workspace = true } +futures-util = { workspace = true } diff --git a/crates/charon-scanner/src/lib.rs b/crates/charon-scanner/src/lib.rs index e4e9352..8ef41da 100644 --- a/crates/charon-scanner/src/lib.rs +++ b/crates/charon-scanner/src/lib.rs @@ -1,5 +1,7 @@ //! Charon scanner — chain listener and health-factor scanner. +pub mod listener; pub mod provider; +pub use listener::{BlockListener, ChainEvent}; pub use provider::ChainProvider; diff --git a/crates/charon-scanner/src/listener.rs b/crates/charon-scanner/src/listener.rs new file mode 100644 index 0000000..ebf8ddc --- /dev/null +++ b/crates/charon-scanner/src/listener.rs @@ -0,0 +1,124 @@ +//! Block listener — the bot's heartbeat. +//! +//! Subscribes to `newHeads` over WebSocket and forwards each block into the +//! scanning pipeline via an `mpsc` channel. Wrapped in an outer loop so a +//! dropped WebSocket triggers a reconnect with exponential backoff instead +//! of killing the bot. + +use std::time::Duration; + +use alloy::providers::Provider; +use anyhow::{Context, Result}; +use charon_core::config::ChainConfig; +use futures_util::StreamExt; +use tokio::sync::mpsc; +use tracing::{info, warn}; + +use crate::provider::ChainProvider; + +/// Upstream chain events produced by the listener. +/// +/// Held in a dedicated enum so the pipeline can grow new event kinds +/// (`ProtocolLog`, `OraclePriceUpdate`, …) without changing channel types. +#[derive(Debug, Clone)] +pub enum ChainEvent { + /// A new head block arrived on the chain. + NewBlock { + /// Short chain name (matches `[chain.]` key in config). + chain: String, + number: u64, + /// Unix timestamp from the block header. + timestamp: u64, + }, +} + +/// Listens to a single chain's `newHeads` stream and forwards events. +/// +/// The listener owns a fresh `ChainProvider` for each connection attempt, +/// so a WebSocket drop recovers cleanly by reconnecting from scratch on +/// the next loop iteration. +pub struct BlockListener { + name: String, + config: ChainConfig, + tx: mpsc::Sender, +} + +impl BlockListener { + /// Build a listener for a single chain. + pub fn new(name: impl Into, config: ChainConfig, tx: mpsc::Sender) -> Self { + Self { + name: name.into(), + config, + tx, + } + } + + /// Run the listener forever. Reconnects with exponential backoff on + /// any connection or subscription error. Returns `Ok(())` only if the + /// receiving side of the channel is dropped. + pub async fn run(self) -> Result<()> { + let mut backoff = Duration::from_secs(1); + loop { + match self.run_once().await { + Ok(()) => { + // Receiver dropped — no one is listening, exit quietly. + info!(chain = %self.name, "listener channel closed, exiting"); + return Ok(()); + } + Err(err) => { + warn!( + chain = %self.name, + error = ?err, + backoff_secs = backoff.as_secs(), + "listener error, reconnecting after backoff" + ); + tokio::time::sleep(backoff).await; + backoff = (backoff * 2).min(Duration::from_secs(30)); + } + } + } + } + + /// One connect → subscribe → drain cycle. Returns `Ok(())` if the + /// stream ends cleanly (receiver dropped); returns `Err` on any + /// connection or subscription failure so `run` can retry. + async fn run_once(&self) -> Result<()> { + let provider = ChainProvider::connect(&self.name, &self.config).await?; + + let sub = provider + .provider() + .subscribe_blocks() + .await + .with_context(|| format!("chain '{}': subscribe_blocks failed", self.name))?; + + info!(chain = %self.name, "block subscription established"); + + let mut stream = sub.into_stream(); + while let Some(header) = stream.next().await { + let number = header.number; + let timestamp = header.timestamp; + + info!( + chain = %self.name, + block = number, + timestamp = timestamp, + "new block" + ); + + let event = ChainEvent::NewBlock { + chain: self.name.clone(), + number, + timestamp, + }; + + if self.tx.send(event).await.is_err() { + // Receiver dropped; propagate clean shutdown up to `run`. + return Ok(()); + } + } + + // Stream terminated without error — the underlying ws likely closed. + // Surface as an error so `run` reconnects instead of exiting. + anyhow::bail!("chain '{}': subscription stream ended", self.name) + } +} From a802e2ac11e0dea749a3a247c628075003180d0d Mon Sep 17 00:00:00 2001 From: obchain Date: Wed, 22 Apr 2026 20:27:12 +0530 Subject: [PATCH 2/2] feat(scanner): jitter backoff, non-blocking send, gap backfill, JoinSet supervision, metrics - Reconnect backoff now adds 0-25% random jitter before each sleep to avoid correlated retry storms against a single RPC endpoint when many listeners disconnect at the same instant. - BlockListener::publish uses try_send instead of a blocking await on the mpsc sender; a full channel drops the block with a warn log and a charon_listener_dropped_events_total counter increment, keeping the WS drain loop responsive so the transport never buffers past its server-side limit. - Track last_seen block per chain. On reconnect, fetch the current head and backfill every block between last_seen + 1 and head - 1 via get_block_by_number, emitting ChainEvent::NewBlock { backfill: true } so downstream consumers see the same heartbeat during disconnect windows. - CLI run_listen now spawns listeners into a tokio::task::JoinSet and a supervise() helper drains join results, logging per-chain task panics or errors and triggering shutdown when every listener exits. Ctrl-C also aborts outstanding listeners. - Per-block log downgraded from info to debug (BSC ~3 s = 28,800 info lines/day otherwise). Add charon_listener_connects_total, charon_listener_disconnects_total, charon_blocks_received_total, charon_listener_dropped_events_total counters for PR #50. - Workspace adds rand and metrics deps; ChainEvent is #[non_exhaustive] and carries a backfill flag. Closes #92 #93 #94 #95 #96 --- Cargo.lock | 18 ++++ Cargo.toml | 6 ++ crates/charon-cli/src/main.rs | 39 +++++-- crates/charon-scanner/Cargo.toml | 2 + crates/charon-scanner/src/listener.rs | 145 +++++++++++++++++++------- 5 files changed, 169 insertions(+), 41 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d5646e9..dad759f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1154,6 +1154,8 @@ dependencies = [ "anyhow", "charon-core", "futures-util", + "metrics", + "rand 0.8.6", "tokio", "tracing", ] @@ -2291,6 +2293,16 @@ version = "2.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f8ca58f447f06ed17d5fc4043ce1b10dd205e060fb3ce5b979b8ed8e59ff3f79" +[[package]] +name = "metrics" +version = "0.24.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5d5312e9ba3771cfa961b585728215e3d972c950a3eed9252aa093d6301277e8" +dependencies = [ + "ahash", + "portable-atomic", +] + [[package]] name = "mio" version = "1.2.0" @@ -2588,6 +2600,12 @@ version = "0.3.33" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "19f132c84eca552bf34cab8ec81f1c1dcc229b811638f9d283dceabe58c5569e" +[[package]] +name = "portable-atomic" +version = "1.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c33a9471896f1c69cecef8d20cbe2f7accd12527ce60845ff44c153bb2a21b49" + [[package]] name = "potential_utf" version = "0.1.5" diff --git a/Cargo.toml b/Cargo.toml index 906efc8..7411609 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -37,6 +37,12 @@ async-trait = "0.1" # Stream adapters (StreamExt for subscription streams) futures-util = "0.3" +# Backoff jitter +rand = "0.8" + +# Prometheus-style metrics recorder facade +metrics = "0.24" + # CLI clap = { version = "4", features = ["derive"] } diff --git a/crates/charon-cli/src/main.rs b/crates/charon-cli/src/main.rs index 032e6ed..1cbaad3 100644 --- a/crates/charon-cli/src/main.rs +++ b/crates/charon-cli/src/main.rs @@ -99,14 +99,12 @@ async fn run_listen(config: Config) -> Result<()> { } let (tx, mut rx) = mpsc::channel::(CHAIN_EVENT_CHANNEL); + let mut listeners: tokio::task::JoinSet<(String, Result<()>)> = + tokio::task::JoinSet::new(); for (name, chain_cfg) in config.chain { let listener = BlockListener::new(name.clone(), chain_cfg, tx.clone()); - tokio::spawn(async move { - if let Err(err) = listener.run().await { - warn!(chain = %name, error = ?err, "listener terminated"); - } - }); + listeners.spawn(async move { (name, listener.run().await) }); } // Drop our sender so the channel closes when every listener exits. drop(tx); @@ -117,23 +115,52 @@ async fn run_listen(config: Config) -> Result<()> { _ = async { while let Some(event) = rx.recv().await { match event { - ChainEvent::NewBlock { chain, number, timestamp } => { + ChainEvent::NewBlock { chain, number, timestamp, backfill } => { tracing::debug!( chain = %chain, block = number, timestamp = timestamp, + backfill, "cli drained event" ); } + _ => {} } } } => { info!("all listeners exited"); } + _ = supervise(&mut listeners) => { + info!("all listener tasks terminated"); + } _ = tokio::signal::ctrl_c() => { info!("ctrl-c received, shutting down"); + listeners.shutdown().await; } } Ok(()) } + +/// Drain a `JoinSet` of listener tasks and surface panics / errors per chain. +/// Returns when every listener has exited so the caller can shut down. +async fn supervise( + listeners: &mut tokio::task::JoinSet<(String, Result<()>)>, +) { + while let Some(joined) = listeners.join_next().await { + match joined { + Ok((name, Ok(()))) => { + info!(chain = %name, "listener exited cleanly"); + } + Ok((name, Err(err))) => { + warn!(chain = %name, error = ?err, "listener terminated with error"); + } + Err(err) if err.is_panic() => { + warn!(error = ?err, "listener panicked"); + } + Err(err) => { + warn!(error = ?err, "listener join error"); + } + } + } +} diff --git a/crates/charon-scanner/Cargo.toml b/crates/charon-scanner/Cargo.toml index bc4d67c..ec8601f 100644 --- a/crates/charon-scanner/Cargo.toml +++ b/crates/charon-scanner/Cargo.toml @@ -12,3 +12,5 @@ anyhow = { workspace = true } tokio = { workspace = true } tracing = { workspace = true } futures-util = { workspace = true } +rand = { workspace = true } +metrics = { workspace = true } diff --git a/crates/charon-scanner/src/listener.rs b/crates/charon-scanner/src/listener.rs index ebf8ddc..cbced3a 100644 --- a/crates/charon-scanner/src/listener.rs +++ b/crates/charon-scanner/src/listener.rs @@ -2,8 +2,9 @@ //! //! Subscribes to `newHeads` over WebSocket and forwards each block into the //! scanning pipeline via an `mpsc` channel. Wrapped in an outer loop so a -//! dropped WebSocket triggers a reconnect with exponential backoff instead -//! of killing the bot. +//! dropped WebSocket triggers a reconnect with jittered exponential backoff +//! instead of killing the bot. Reconnects backfill any blocks produced +//! during the disconnect window so the scanner never silently skips heads. use std::time::Duration; @@ -11,16 +12,22 @@ use alloy::providers::Provider; use anyhow::{Context, Result}; use charon_core::config::ChainConfig; use futures_util::StreamExt; +use rand::Rng; use tokio::sync::mpsc; -use tracing::{info, warn}; +use tracing::{debug, info, warn}; use crate::provider::ChainProvider; +/// Maximum reconnect backoff. BSC blocks every ~3 s, so 30 s is ~10 missed +/// blocks — the ceiling for how many we are willing to backfill at once. +const MAX_BACKOFF: Duration = Duration::from_secs(30); + /// Upstream chain events produced by the listener. /// /// Held in a dedicated enum so the pipeline can grow new event kinds /// (`ProtocolLog`, `OraclePriceUpdate`, …) without changing channel types. #[derive(Debug, Clone)] +#[non_exhaustive] pub enum ChainEvent { /// A new head block arrived on the chain. NewBlock { @@ -29,6 +36,8 @@ pub enum ChainEvent { number: u64, /// Unix timestamp from the block header. timestamp: u64, + /// `true` if the block was synthesised via reconnect backfill. + backfill: bool, }, } @@ -41,39 +50,54 @@ pub struct BlockListener { name: String, config: ChainConfig, tx: mpsc::Sender, + last_seen: Option, } impl BlockListener { /// Build a listener for a single chain. - pub fn new(name: impl Into, config: ChainConfig, tx: mpsc::Sender) -> Self { + pub fn new( + name: impl Into, + config: ChainConfig, + tx: mpsc::Sender, + ) -> Self { Self { name: name.into(), config, tx, + last_seen: None, } } - /// Run the listener forever. Reconnects with exponential backoff on - /// any connection or subscription error. Returns `Ok(())` only if the + /// Run the listener forever. Reconnects with jittered exponential backoff + /// on any connection or subscription error. Returns `Ok(())` only if the /// receiving side of the channel is dropped. - pub async fn run(self) -> Result<()> { + pub async fn run(mut self) -> Result<()> { let mut backoff = Duration::from_secs(1); loop { + metrics::counter!("charon_listener_connects_total", "chain" => self.name.clone()) + .increment(1); match self.run_once().await { Ok(()) => { - // Receiver dropped — no one is listening, exit quietly. info!(chain = %self.name, "listener channel closed, exiting"); return Ok(()); } Err(err) => { + metrics::counter!( + "charon_listener_disconnects_total", + "chain" => self.name.clone() + ) + .increment(1); + let jitter_ms = rand::thread_rng() + .gen_range(0..=(backoff.as_millis() as u64).saturating_div(4)); + let wait = backoff + Duration::from_millis(jitter_ms); warn!( chain = %self.name, error = ?err, - backoff_secs = backoff.as_secs(), - "listener error, reconnecting after backoff" + wait_ms = wait.as_millis() as u64, + "listener error, reconnecting after jittered backoff" ); - tokio::time::sleep(backoff).await; - backoff = (backoff * 2).min(Duration::from_secs(30)); + tokio::time::sleep(wait).await; + backoff = (backoff * 2).min(MAX_BACKOFF); } } } @@ -82,43 +106,94 @@ impl BlockListener { /// One connect → subscribe → drain cycle. Returns `Ok(())` if the /// stream ends cleanly (receiver dropped); returns `Err` on any /// connection or subscription failure so `run` can retry. - async fn run_once(&self) -> Result<()> { + async fn run_once(&mut self) -> Result<()> { let provider = ChainProvider::connect(&self.name, &self.config).await?; + // Backfill blocks produced during any prior disconnect window. + if let Some(last) = self.last_seen { + let head = provider + .provider() + .get_block_number() + .await + .with_context(|| format!("chain '{}': get_block_number failed", self.name))?; + if head > last + 1 { + let gap = head - (last + 1); + warn!( + chain = %self.name, + from = last + 1, + to = head - 1, + gap, + "reconnect gap detected — backfilling" + ); + for number in (last + 1)..head { + let header = provider + .provider() + .get_block_by_number(number.into(), false.into()) + .await + .with_context(|| { + format!("chain '{}': get_block_by_number({number}) failed", self.name) + })?; + let ts = header.map(|b| b.header.timestamp).unwrap_or_default(); + self.publish(number, ts, true); + } + } + } + let sub = provider .provider() .subscribe_blocks() .await .with_context(|| format!("chain '{}': subscribe_blocks failed", self.name))?; - info!(chain = %self.name, "block subscription established"); let mut stream = sub.into_stream(); while let Some(header) = stream.next().await { - let number = header.number; - let timestamp = header.timestamp; - - info!( - chain = %self.name, - block = number, - timestamp = timestamp, - "new block" - ); + self.publish(header.number, header.timestamp, false); + } - let event = ChainEvent::NewBlock { - chain: self.name.clone(), - number, - timestamp, - }; + anyhow::bail!("chain '{}': subscription stream ended", self.name) + } - if self.tx.send(event).await.is_err() { - // Receiver dropped; propagate clean shutdown up to `run`. - return Ok(()); + /// Emit a `ChainEvent::NewBlock` into the channel. Non-blocking so a + /// stalled consumer cannot stall the WebSocket drain loop; full channel + /// drops the event with a warning (back-pressure visible to ops). + fn publish(&mut self, number: u64, timestamp: u64, backfill: bool) { + metrics::counter!("charon_blocks_received_total", "chain" => self.name.clone()) + .increment(1); + debug!( + chain = %self.name, + block = number, + timestamp, + backfill, + "new block" + ); + self.last_seen = Some(match self.last_seen { + Some(prev) => prev.max(number), + None => number, + }); + let event = ChainEvent::NewBlock { + chain: self.name.clone(), + number, + timestamp, + backfill, + }; + match self.tx.try_send(event) { + Ok(()) => {} + Err(mpsc::error::TrySendError::Full(_)) => { + metrics::counter!( + "charon_listener_dropped_events_total", + "chain" => self.name.clone() + ) + .increment(1); + warn!( + chain = %self.name, + block = number, + "channel full — block dropped" + ); + } + Err(mpsc::error::TrySendError::Closed(_)) => { + debug!(chain = %self.name, "receiver closed, stop publishing"); } } - - // Stream terminated without error — the underlying ws likely closed. - // Surface as an error so `run` reconnects instead of exiting. - anyhow::bail!("chain '{}': subscription stream ended", self.name) } }