Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

302 changes: 296 additions & 6 deletions crates/charon-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,17 @@
//! and a fresh scan is cheaper than reconciling retroactive bucket
//! transitions.

use std::collections::HashSet;
use std::path::PathBuf;
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;

use alloy::eips::BlockNumberOrTag;
use alloy::primitives::{Address, Bytes, U256};
use alloy::providers::{ProviderBuilder, RootProvider, WsConnect};
use alloy::primitives::{Address, B256, Bytes, U256};
use alloy::providers::{Provider, ProviderBuilder, RootProvider, WsConnect};
use alloy::pubsub::PubSubFrontend;
use alloy::rpc::types::BlockTransactionsKind;
use alloy::signers::local::PrivateKeySigner;
use anyhow::{Context, Result};
use async_trait::async_trait;
Expand All @@ -60,15 +63,29 @@ use charon_executor::{Simulator, TxBuilder};
use charon_flashloan::{AaveFlashLoan, FlashLoanRouter};
use charon_protocols::VenusAdapter;
use charon_scanner::{
BlockListener, ChainEvent, ChainProvider, DEFAULT_MAX_AGE, HealthScanner, PositionBucket,
PriceCache, ScanScheduler,
BlockListener, ChainEvent, ChainProvider, DEFAULT_MAX_AGE, HealthScanner, MempoolMonitor,
OracleUpdate, PendingCache, PositionBucket, PriceCache, ScanScheduler, SimulationVerdict,
};
use clap::{Parser, Subcommand};
use secrecy::ExposeSecret;
use tokio::sync::mpsc;
use tracing::{debug, info, warn};
use tracing_subscriber::EnvFilter;

/// Buffer size for the mempool's `OracleUpdate` channel. Sized so a
/// short burst of oracle-write txs at block-boundary time doesn't
/// back-pressure the monitor task.
const ORACLE_UPDATE_CHANNEL: usize = 256;

/// Env var the operator sets to enable the mempool monitor. Expected
/// value is the hex-encoded Venus oracle address whose write
/// selectors the monitor should track. Unset (or empty) skips the
/// mempool path cleanly so the CLI stays usable on profiles that do
/// not have a paid MEV stream. A future config-file knob can replace
/// this env var; for now keeping it env-only avoids a config-schema
/// change on feat/21.
const VENUS_ORACLE_ENV: &str = "CHARON_VENUS_ORACLE";

/// Size of the fan-in channel from listeners to the scanner pipeline.
/// One slot per ~5 blocks across all chains covers short stalls without
/// back-pressuring the listener task.
Expand Down Expand Up @@ -415,6 +432,98 @@ async fn run_listen(config: &Config, borrowers: Vec<Address>) -> Result<()> {
let (tx, mut rx) = mpsc::channel::<ChainEvent>(CHAIN_EVENT_CHANNEL);
let mut listeners: tokio::task::JoinSet<(String, Result<()>)> = tokio::task::JoinSet::new();

// ── Mempool monitor (#46 / #299) ──────────────────────────────────
// Spawn the pending-tx monitor alongside `BlockListener` on the
// Venus pipeline's shared provider. Enabled only when the operator
// sets `CHARON_VENUS_ORACLE` to a hex-encoded oracle address — most
// public BSC RPCs do not expose `newPendingTransactions` (see the
// mempool module's RPC-requirements docs). The returned
// [`PendingCache`] is retained so the block-event drain can call
// `drain_for_block` with the real confirmed-tx set each tick; the
// [`OracleUpdate`] channel is currently logged only (pre-sign
// builder wiring is explicitly non-goal for #299, so updates are
// observed and dropped until the signer + deployed liquidator
// bridge lands in a follow-up).
//
// The monitor is only wired when a Venus pipeline exists; without
// one there is no consumer for either the cache drain or the
// oracle-update channel.
let mempool_cache: Option<Arc<PendingCache>> =
match (venus.as_ref(), std::env::var(VENUS_ORACLE_ENV)) {
(Some(pipeline), Ok(hex)) if !hex.is_empty() => {
match Address::from_str(hex.trim()) {
Ok(oracle) => {
let monitor = Arc::new(MempoolMonitor::with_defaults(
pipeline.provider.clone(),
oracle,
));
let cache = monitor.cache();
let (oracle_tx, mut oracle_rx) =
mpsc::channel::<OracleUpdate>(ORACLE_UPDATE_CHANNEL);
let monitor_for_task = monitor.clone();
let mempool_task_name = format!("mempool/{}", pipeline.chain_name);
listeners.spawn(async move {
let name = mempool_task_name;
let res: Result<()> = monitor_for_task
.run(oracle_tx)
.await
.map_err(|err| anyhow::anyhow!("mempool monitor: {err}"));
(name, res)
});
let watch_task_name = format!("oracle-watch/{}", pipeline.chain_name);
listeners.spawn(async move {
let name = watch_task_name;
// Non-goal: forwarding OracleUpdate into a
// pre-sign builder or into PriceCache
// refresh (signer + liquidator bridge and
// price-cache push-update API tracked
// separately). Log at debug so operators
// can verify the monitor is actually
// decoding oracle writes on their upstream
// without the flood reaching info.
while let Some(update) = oracle_rx.recv().await {
debug!(
tx = %update.tx_hash(),
asset = %update.asset(),
kind = update.kind(),
"oracle update observed (pre-sign builder not yet wired)"
);
}
(name, Ok::<(), anyhow::Error>(()))
});
info!(
oracle = %oracle,
chain = %pipeline.chain_name,
"mempool monitor spawned"
);
Some(cache)
}
Err(err) => {
warn!(
env = VENUS_ORACLE_ENV,
error = ?err,
"mempool oracle env var set but unparseable; mempool monitor disabled"
);
None
}
}
}
(None, _) => {
info!(
env = VENUS_ORACLE_ENV,
"mempool monitor disabled (no venus pipeline configured)"
);
None
}
_ => {
info!(
env = VENUS_ORACLE_ENV,
"mempool monitor disabled (no oracle address configured)"
);
None
}
};

// `ChainConfig: Clone` — we only borrow `config`, so each listener
// task gets its own owned copy.
for (name, chain_cfg) in &config.chain {
Expand Down Expand Up @@ -446,18 +555,29 @@ async fn run_listen(config: &Config, borrowers: Vec<Address>) -> Result<()> {
_ = async {
while let Some(event) = rx.recv().await {
match event {
ChainEvent::NewBlock { chain, number, timestamp, backfill } => {
ChainEvent::NewBlock {
chain,
number,
timestamp,
block_hash,
backfill,
} => {
tracing::debug!(
chain = %chain,
block = number,
timestamp = timestamp,
%block_hash,
backfill,
"cli drained event"
);
if backfill {
// Skip backfill — the next real head will
// snapshot the final state of the missed
// range.
// range. The mempool drain is intentionally
// skipped here too: backfilled blocks are
// already several heads behind, so any
// pre-signed tx tied to them would have
// long since expired via cache TTL.
continue;
}
let Some(pipeline) = venus.as_ref() else {
Expand All @@ -466,6 +586,20 @@ async fn run_listen(config: &Config, borrowers: Vec<Address>) -> Result<()> {
if pipeline.chain_name != chain {
continue;
}

// Drain any pre-signed liquidations whose
// oracle trigger landed in this block before
// running the main scan pass. Independent of
// the scan — a mempool hiccup must not block
// the block pipeline.
drain_mempool_for_block(
pipeline.as_ref(),
block_hash,
mempool_cache.as_deref(),
signer_key.as_ref(),
)
.await;

// Per-block deadline: a stalled adapter /
// router / simulator must not block the event
// drain across multiple heads.
Expand Down Expand Up @@ -913,3 +1047,159 @@ async fn wait_sigterm() {
async fn wait_sigterm() {
std::future::pending::<()>().await
}

/// Drain pre-signed liquidations whose oracle trigger confirmed in
/// `block_hash` and run each through the executor's simulation gate
/// before the broadcast step (still non-goal per #299).
///
/// Fetches the block's confirmed tx-hash set via
/// `eth_getBlockByHash` (hashes-only payload), calls
/// [`PendingCache::drain_for_block`], and for each returned
/// [`charon_scanner::UnverifiedPreSigned`] rebuilds the liquidator
/// calldata via the adapter + builder, runs it through
/// [`Simulator::simulate`], and only hands the pre-sign a
/// [`SimulationVerdict::Ok`] proof token when the simulator returns
/// success. `verify(Ok)` unwraps the pre-sign into a full
/// `PreSignedLiquidation`; broadcast is explicitly out of scope
/// (signer + liquidator bridge tracked separately) so the drained
/// tx is logged and dropped.
///
/// Silently no-ops when the cache is `None` (mempool monitor is
/// disabled) or when the builder/simulator/params for a pre-sign
/// are unavailable — there is no way to honour the eth_call gate
/// without them, so the safer action is to re-insert-or-drop per
/// the cache's TTL and surface a warning.
///
/// Never panics. Every RPC/encode/sim failure is logged and the
/// drain loop continues with the next pre-sign; the block-scanner
/// path is independent and must not be blocked by mempool hiccups.
async fn drain_mempool_for_block(
pipeline: &VenusPipeline,
block_hash: B256,
cache: Option<&PendingCache>,
signer_key: Option<&secrecy::SecretString>,
) {
let Some(cache) = cache else {
return;
};
let chain = pipeline.chain_name.as_str();

// Fetch the block with hashes-only payload. `Hashes` keeps the
// response small — we only need the set membership check for
// `drain_for_block`, not full transaction envelopes.
let block = match pipeline
.provider
.get_block_by_hash(block_hash, BlockTransactionsKind::Hashes)
.await
{
Ok(Some(b)) => b,
Ok(None) => {
warn!(%block_hash, "block not found when draining mempool cache");
return;
}
Err(err) => {
warn!(%block_hash, ?err, "get_block_by_hash failed when draining mempool cache");
return;
}
};
let confirmed: HashSet<B256> = block.transactions.hashes().collect();

let drained = cache.drain_for_block(block_hash, &confirmed);
if drained.is_empty() {
return;
}
debug!(
chain,
%block_hash,
drained = drained.len(),
confirmed_tx_count = confirmed.len(),
"mempool cache drained for block"
);

// Materialise the executor pair lazily — if the operator runs
// scan-only (no signer) we cannot honour the eth_call gate, so we
// drop drained pre-signs with a warning. Same contract as
// `process_opportunity`: no signer → no simulation → no
// broadcast-ready artefact.
let Some((builder, sim)) = ensure_executor(pipeline, signer_key).await else {
warn!(
chain,
drained = drained.len(),
"pre-signs drained but no signer configured — dropping (sim gate cannot be honoured)"
);
return;
};

for presigned in drained {
let borrower = presigned.borrower();
let trigger = presigned.trigger_tx();
let opp = presigned.opportunity().clone();

// Rebuild calldata from the opportunity via the protocol
// adapter + builder — the pre-sign's own `raw_tx` is the
// signed envelope, which is intentionally unreachable without
// a `SimulationVerdict`.
let params = match pipeline.adapter.get_liquidation_params(&opp.position) {
Ok(p) => p,
Err(err) => {
warn!(
chain,
%borrower,
error = ?err,
"failed to rebuild liquidation params for drained pre-sign"
);
continue;
}
};
let calldata: Bytes = match builder.encode_calldata(&opp, &params) {
Ok(c) => c,
Err(err) => {
warn!(
chain,
%borrower,
error = ?err,
"failed to encode calldata for drained pre-sign"
);
continue;
}
};
match sim
.simulate(pipeline.provider.as_ref(), calldata, SIMULATION_GAS_LIMIT)
.await
{
Ok(()) => match presigned.verify(SimulationVerdict::approve()) {
Ok(ready) => {
// Non-goal: eth_sendRawTransaction. The
// `PreSignedLiquidation` is fully verified and
// ready for the future broadcast call site; log
// loudly so operators running the monitor end-to-end
// can see the gate opening.
info!(
chain,
%borrower,
%trigger,
raw_tx_len = ready.raw_tx.len(),
"pre-sign simulated OK — ready for broadcast (broadcast wiring follow-up)"
);
}
Err((returned, verdict)) => {
warn!(
chain,
borrower = %returned.borrower(),
?verdict,
"simulation verdict inconsistent with simulate outcome — dropping"
);
}
},
Err(err) => {
debug!(
chain,
%borrower,
%trigger,
error = ?err,
"pre-sign simulation reverted — dropping"
);
}
}
}
}
1 change: 1 addition & 0 deletions crates/charon-scanner/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ description = "Chain listener and health-factor scanner for Charon"
charon-core = { workspace = true }
alloy = { workspace = true }
anyhow = { workspace = true }
thiserror = { workspace = true }
async-trait = { workspace = true }
tokio = { workspace = true }
tracing = { workspace = true }
Expand Down
Loading