Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 9 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,15 @@ thiserror = "2"
# Async trait objects
async-trait = "0.1"

# Stream adapters (StreamExt for subscription streams)
futures-util = "0.3"

# Backoff jitter
rand = "0.8"

# Prometheus-style metrics recorder facade
metrics = "0.24"

# CLI
clap = { version = "4", features = ["derive", "env"] }

Expand Down
91 changes: 83 additions & 8 deletions crates/charon-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,17 @@ use std::path::PathBuf;

use anyhow::{Context, Result};
use charon_core::Config;
use charon_scanner::ChainProvider;
use charon_scanner::{BlockListener, ChainEvent, ChainProvider};
use clap::{Parser, Subcommand};
use tracing::info;
use tokio::sync::mpsc;
use tracing::{info, warn};
use tracing_subscriber::EnvFilter;

/// Size of the fan-in channel from listeners to the scanner pipeline.
/// One slot per ~5 blocks across all chains covers short stalls without
/// back-pressuring the listener task.
const CHAIN_EVENT_CHANNEL: usize = 1024;

/// Charon — multi-chain flash-loan liquidation bot.
#[derive(Parser, Debug)]
#[command(version, about, long_about = None)]
Expand All @@ -34,8 +40,10 @@ struct Cli {

#[derive(Subcommand, Debug)]
enum Command {
/// Listen to chain events and track positions.
/// (Scanner wiring lands across multiple M1 issues — currently a stub.)
/// Spawn one block listener per configured chain and drain chain events.
///
/// Downstream pipeline (scanner → profit calc → executor) consumes
/// the same channel once those layers land.
Listen,

/// Connect to a configured chain and print its latest block number.
Expand Down Expand Up @@ -98,23 +106,90 @@ async fn main() -> Result<()> {
Ok(())
}

/// Long-running listener entry point. Exits cleanly on SIGINT or SIGTERM so
/// the Docker `stop` → SIGTERM → SIGKILL sequence never tears mid-operation.
async fn run_listen(_config: &Config) -> Result<()> {
info!("listen: not wired up yet — scanner arrives in Day 2");
/// Long-running listener entry point. Spawns one `BlockListener` per
/// configured chain, drains the shared `ChainEvent` channel, and exits
/// cleanly on SIGINT or SIGTERM so the Docker `stop` → SIGTERM → SIGKILL
/// sequence never tears mid-operation.
async fn run_listen(config: &Config) -> Result<()> {
if config.chain.is_empty() {
anyhow::bail!("no chains configured — nothing to listen to");
}

let (tx, mut rx) = mpsc::channel::<ChainEvent>(CHAIN_EVENT_CHANNEL);
let mut listeners: tokio::task::JoinSet<(String, Result<()>)> =
tokio::task::JoinSet::new();

// `ChainConfig: Clone` — we only borrow `config`, so each listener task
// gets its own owned copy.
for (name, chain_cfg) in &config.chain {
let name = name.clone();
let chain_cfg = chain_cfg.clone();
let listener = BlockListener::new(name.clone(), chain_cfg, tx.clone());
listeners.spawn(async move { (name, listener.run().await) });
}
// Drop our sender so the channel closes when every listener exits.
drop(tx);

info!("listen: draining chain events (Ctrl-C or SIGTERM to stop)");

tokio::select! {
_ = async {
while let Some(event) = rx.recv().await {
match event {
ChainEvent::NewBlock { chain, number, timestamp, backfill } => {
tracing::debug!(
chain = %chain,
block = number,
timestamp = timestamp,
backfill,
"cli drained event"
);
}
_ => {}
}
}
} => {
info!("all listeners exited");
}
_ = supervise(&mut listeners) => {
info!("all listener tasks terminated");
}
_ = tokio::signal::ctrl_c() => {
info!("received SIGINT, shutting down");
listeners.shutdown().await;
}
_ = wait_sigterm() => {
info!("received SIGTERM, shutting down");
listeners.shutdown().await;
}
}

Ok(())
}

/// Drain a `JoinSet` of listener tasks and surface panics / errors per chain.
/// Returns when every listener has exited so the caller can shut down.
async fn supervise(
listeners: &mut tokio::task::JoinSet<(String, Result<()>)>,
) {
while let Some(joined) = listeners.join_next().await {
match joined {
Ok((name, Ok(()))) => {
info!(chain = %name, "listener exited cleanly");
}
Ok((name, Err(err))) => {
warn!(chain = %name, error = ?err, "listener terminated with error");
}
Err(err) if err.is_panic() => {
warn!(error = ?err, "listener panicked");
}
Err(err) => {
warn!(error = ?err, "listener join error");
}
}
}
}

#[cfg(unix)]
async fn wait_sigterm() {
use tokio::signal::unix::{SignalKind, signal};
Expand Down
3 changes: 3 additions & 0 deletions crates/charon-scanner/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ anyhow = { workspace = true }
async-trait = { workspace = true }
tokio = { workspace = true }
tracing = { workspace = true }
futures-util = { workspace = true }
rand = { workspace = true }
metrics = { workspace = true }

[lints]
workspace = true
2 changes: 2 additions & 0 deletions crates/charon-scanner/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
//! Charon scanner — chain listener and health-factor scanner.

pub mod listener;
pub mod provider;

pub use listener::{BlockListener, ChainEvent};
pub use provider::{ChainProvider, ChainProviderT, MockChainProvider};
Loading