diff --git a/crates/charon-cli/src/main.rs b/crates/charon-cli/src/main.rs index 6bb3a45..cd587f2 100644 --- a/crates/charon-cli/src/main.rs +++ b/crates/charon-cli/src/main.rs @@ -48,18 +48,22 @@ use std::sync::Arc; use std::time::Duration; use alloy::eips::BlockNumberOrTag; +use alloy::network::TransactionBuilder; use alloy::primitives::{Address, B256, Bytes, U256}; use alloy::providers::{Provider, ProviderBuilder, RootProvider, WsConnect}; use alloy::pubsub::PubSubFrontend; -use alloy::rpc::types::BlockTransactionsKind; +use alloy::rpc::types::{BlockTransactionsKind, TransactionRequest}; use alloy::signers::local::PrivateKeySigner; -use anyhow::{Context, Result}; +use anyhow::{Context, Result, bail}; use async_trait::async_trait; use charon_core::{ Config, FlashLoanQuote, LendingProtocol, LiquidationOpportunity, LiquidationParams, OpportunityQueue, Position, Price, ProfitInputs, calculate_profit, }; -use charon_executor::{Simulator, TxBuilder}; +use charon_executor::{ + DEFAULT_SUBMIT_TIMEOUT, GasDecision, GasOracle, NonceManager, Simulator, SubmitError, + Submitter, TxBuilder, +}; use charon_flashloan::{AaveFlashLoan, FlashLoanRouter}; use charon_metrics::{bucket, drop_stage, sim_result}; use charon_protocols::VenusAdapter; @@ -139,6 +143,26 @@ const PLACEHOLDER_DEBT_DECIMALS: u8 = 18; /// blocking across multiple heads. const PER_BLOCK_TIMEOUT: Duration = Duration::from_millis(2_500); +/// Env var the operator must set (to `1`) before `--execute` is +/// honoured. A purely belt-and-braces second confirmation beyond the +/// CLI flag so a stale shell-history invocation cannot broadcast +/// signed liquidations by accident. Unset or any value other than +/// `1` refuses to build the execution harness, regardless of other +/// safety gates. Checked at startup; the listener then falls back to +/// scan+simulate and logs a loud warning so the operator notices. +const EXECUTE_CONFIRMATION_ENV: &str = "CHARON_EXECUTE_CONFIRMED"; + +/// Multiplicative broadcast-gas buffer on top of `eth_estimateGas`: +/// 130% (= 13/10). 30% headroom covers state drift between estimate +/// time and inclusion time — vToken index ticks, Chainlink oracle +/// writes landing in the same block, PancakeSwap reserve updates. +/// BSC gas is cheap enough that the extra buffer is worth the +/// reduction in out-of-gas reverts. Tuned alongside the simulation +/// gate's own 2 M hard ceiling so both agree on "tx will fit on +/// chain". +const BROADCAST_GAS_BUFFER_NUM: u64 = 13; +const BROADCAST_GAS_BUFFER_DEN: u64 = 10; + /// Charon — multi-chain flash-loan liquidation bot. #[derive(Parser, Debug)] #[command(version, about, long_about = None)] @@ -172,6 +196,22 @@ enum Command { /// multiple borrowers. #[arg(long = "borrower")] borrowers: Vec
, + + /// Sign and broadcast the liquidation tx for every + /// opportunity that clears the simulation gate. Off by + /// default — the pipeline runs scan + simulate only. Requires + /// all of: + /// * `bot.signer_key` populated (via `BOT_SIGNER_KEY` env), + /// * every chain with a `[liquidator.]` section has + /// a non-zero `contract_address`, + /// * every chain has either `private_rpc_url` configured or + /// `allow_public_mempool = true` (dev only), and + /// * `CHARON_EXECUTE_CONFIRMED=1` in the environment. + /// Any gate failing aborts startup — `--execute` is an + /// explicit operator intent and must not silently degrade to + /// scan-only. + #[arg(long = "execute", default_value_t = false)] + execute: bool, }, /// Connect to a configured chain and print its latest block number. @@ -196,7 +236,10 @@ struct VenusPipeline { liquidator: Address, provider: Arc>, /// Queue for opportunities that pass the simulation gate. The - /// broadcast stage lands on top of this in a follow-up PR. + /// broadcast stage reads from this when the `--execute` harness + /// is populated; entries are pushed *before* the broadcast call + /// so a later submit failure still leaves a record of the ranked + /// candidate. queue: Arc, /// Built lazily on first actionable opportunity so scan-only /// runs (no signer configured) never touch the secret. @@ -204,6 +247,48 @@ struct VenusPipeline { simulator: tokio::sync::OnceCell>, min_profit_usd_1e6: u64, chain_id: u64, + /// Present only when the operator ran `listen --execute` and + /// every safety gate passed. `None` means scan-only or + /// scan+simulate mode — `process_opportunity` observes the + /// simulation gate and queues candidates, but never signs or + /// broadcasts. Eagerly assembled at startup rather than + /// lazy-initialised so a mis-configured private RPC or bad + /// signer key is caught on boot, not on the first liquidatable + /// position to land. + exec_harness: Option>, +} + +/// Bundle of executor components needed to broadcast a simulated +/// opportunity. Present only when the operator ran `listen --execute` +/// and every safety gate passed; `None` means the pipeline is in +/// scan-only or scan+simulate mode. +/// +/// Single-chain scope (BNB) for v0.1, matching the rest of the +/// `VenusPipeline` — a multi-chain harness is a follow-up when a +/// second adapter lands. +struct ExecHarness { + /// Per-chain EIP-1559 fee source. Honours `bot.max_gas_wei` as + /// the ceiling and the chain's `priority_fee_gwei` as the tip. + /// Cached per block so a single tick doesn't spam the RPC with + /// repeated `eth_feeHistory` reads. + gas_oracle: GasOracle, + /// Local atomic nonce counter. Initialised against the pending + /// block on startup, incremented per `next()`, and resynced to + /// the chain on any rejection that leaves the counter ahead of + /// confirmed state. + nonce_manager: Arc, + /// Private-RPC submitter. HTTPS / WSS only. Single-shot per + /// submit — the caller owns retry + staleness decisions via the + /// opportunity queue TTL. + submitter: Arc, + /// Hot-wallet signer address. Pre-materialised here so the + /// broadcast path never re-derives it from the `TxBuilder` on + /// the hot path. + signer_address: Address, + /// Host-only label for logs (scheme + host, no api-key in query + /// string). Logged on every submit so operators can pivot on + /// submit latency per endpoint. + submitter_label: String, } // Explicit multi-thread flavor so the concurrency contract survives any @@ -246,8 +331,8 @@ async fn main() -> Result<()> { ); match cli.command { - Command::Listen { borrowers } => { - run_listen(&config, borrowers).await?; + Command::Listen { borrowers, execute } => { + run_listen(&config, borrowers, execute).await?; } Command::TestConnection { chain } => { let chain_cfg = config @@ -286,11 +371,78 @@ async fn main() -> Result<()> { /// but not scanned — the state they would produce is superseded by /// the next real head and a fresh scan is cheaper than retroactive /// bucket transitions. -async fn run_listen(config: &Config, borrowers: Vec
) -> Result<()> { +async fn run_listen(config: &Config, borrowers: Vec
, execute: bool) -> Result<()> { if config.chain.is_empty() { anyhow::bail!("no chains configured — nothing to listen to"); } + // ── Execute-gate safety checks (#305) ───────────────────────────── + // + // `--execute` is an explicit operator intent to sign and broadcast + // liquidations — it must never silently degrade to scan-only. If + // any gate below fails we abort startup with a descriptive error + // rather than spinning up a half-wired pipeline. + // + // Four gates, all mandatory when `--execute` is set: + // + // 1. `bot.signer_key` is populated (empty strings are already + // collapsed to `None` by `normalize_empty_secrets` — this + // only checks presence, never inspects the value). + // 2. Every chain that has a `[liquidator.]` entry + // references a non-zero `contract_address`. A zero address + // here would route `executeOperation` into the zero address + // on broadcast. + // 3. Every chain has either `private_rpc_url` configured or + // has `allow_public_mempool = true` (dev-only). Enforced + // centrally in `Config::validate()` — re-checked here with + // a precise error message. + // 4. `CHARON_EXECUTE_CONFIRMED=1` is set in the environment. + // Belt-and-braces second confirmation so stale shell + // history cannot broadcast by accident. + // + // These gates run *before* any WS connection or RPC call so a + // misconfigured profile fails fast. + if execute { + if config.bot.signer_key.is_none() { + bail!( + "--execute refuses to start: bot.signer_key is not set (expected via \ + BOT_SIGNER_KEY env in the signer_key = \"${{BOT_SIGNER_KEY}}\" substitution)" + ); + } + for (chain_name, liq_cfg) in &config.liquidator { + if liq_cfg.contract_address == Address::ZERO { + bail!( + "--execute refuses to start: [liquidator.{chain_name}] has zero-address \ + contract_address — deploy the liquidator and set the address before \ + broadcasting" + ); + } + } + for (chain_name, chain_cfg) in &config.chain { + if chain_cfg.private_rpc_url.is_none() && !chain_cfg.allow_public_mempool { + bail!( + "--execute refuses to start: chain '{chain_name}' has no private_rpc_url \ + and allow_public_mempool is false — liquidation txs must not leak to the \ + public mempool" + ); + } + } + let confirmed = std::env::var(EXECUTE_CONFIRMATION_ENV) + .ok() + .map(|v| v.trim().to_string()) + .unwrap_or_default(); + if confirmed != "1" { + bail!( + "--execute refuses to start: set {EXECUTE_CONFIRMATION_ENV}=1 in the environment \ + to confirm you intend to sign and broadcast liquidations" + ); + } + warn!( + "execute mode confirmed — bot will sign and broadcast liquidations on every \ + simulation-passing opportunity" + ); + } + // Venus pipeline state is currently single-chain (BNB) per config // scope. Build it only if `[protocol.venus]` exists and its target // chain plus flashloan+liquidator entries are all configured; @@ -404,25 +556,115 @@ async fn run_listen(config: &Config, borrowers: Vec
) -> Result<()> { ); match router { - Some((router, liquidator)) => Some(Arc::new(VenusPipeline { - chain_name: chain_name.clone(), - adapter, - scanner, - scheduler, - prices, - router, - liquidator, - provider, - queue: Arc::new(OpportunityQueue::with_default_ttl()), - tx_builder: tokio::sync::OnceCell::new(), - simulator: tokio::sync::OnceCell::new(), - min_profit_usd_1e6: config.bot.min_profit_usd_1e6, - chain_id, - })), - None => None, + Some((router, liquidator)) => { + // ── Execution harness (--execute only) ──────────── + // + // Eagerly assemble gas oracle + nonce manager + + // submitter when the operator opted in. Any + // failure here aborts startup: by this point + // `--execute` has already cleared the four safety + // gates above, so a missing private RPC here would + // be a config bug we refuse to paper over. The + // signer is materialised once, used only to derive + // the address for the nonce manager, and dropped + // — the builder+simulator path in + // `ensure_executor` re-parses the key from + // `SecretString` for its own use so the raw bytes + // never outlive either call site. + let exec_harness: Option> = if execute { + let signer_key = config + .bot + .signer_key + .as_ref() + .expect("--execute safety gate guarantees signer_key is Some"); + let raw = signer_key.expose_secret(); + let signer: PrivateKeySigner = raw.parse().context( + "--execute: bot.signer_key failed to parse as a PrivateKeySigner", + )?; + let signer_address = signer.address(); + drop(signer); + + let private_url = chain_cfg.private_rpc_url.as_ref().context( + "--execute: chain has no private_rpc_url (allow_public_mempool \ + is dev-only and is not supported by the Submitter)", + )?; + let submitter = Submitter::connect( + private_url, + chain_cfg.private_rpc_auth.as_ref(), + DEFAULT_SUBMIT_TIMEOUT, + ) + .await + .context("--execute: failed to connect private-RPC submitter")?; + let submitter_label = submitter.endpoint().to_string(); + + let nonce_manager = + NonceManager::init(provider.as_ref(), signer_address) + .await + .context("--execute: failed to initialise nonce manager")?; + + let gas_oracle = GasOracle::new_for_chain( + chain_name.clone(), + config.bot.max_gas_wei, + chain_cfg.priority_fee_gwei, + ); + + warn!( + chain = %chain_name, + signer = %signer_address, + liquidator = %liquidator, + submitter = %submitter_label, + max_gas_wei = %config.bot.max_gas_wei, + priority_fee_gwei = chain_cfg.priority_fee_gwei, + "execute harness ready — liquidations will be signed and broadcast" + ); + + Some(Arc::new(ExecHarness { + gas_oracle, + nonce_manager: Arc::new(nonce_manager), + submitter: Arc::new(submitter), + signer_address, + submitter_label, + })) + } else { + None + }; + + Some(Arc::new(VenusPipeline { + chain_name: chain_name.clone(), + adapter, + scanner, + scheduler, + prices, + router, + liquidator, + provider, + queue: Arc::new(OpportunityQueue::with_default_ttl()), + tx_builder: tokio::sync::OnceCell::new(), + simulator: tokio::sync::OnceCell::new(), + min_profit_usd_1e6: config.bot.min_profit_usd_1e6, + chain_id, + exec_harness, + })) + } + None => { + if execute { + bail!( + "--execute requires a [flashloan.aave_v3_bsc] and \ + [liquidator.] pair — configure both before enabling \ + broadcast" + ); + } + None + } } } None => { + if execute { + bail!( + "--execute requires [protocol.venus] to be configured — refusing to \ + start an execute-mode listener without a protocol pipeline" + ); + } info!( "no [protocol.venus] configured — listener will drain events without scanning" ); @@ -1056,12 +1298,193 @@ async fn process_opportunity( // f. Push to the profit-ordered queue. `simulated = true` because // the production path only reaches here after a successful // `eth_call` gate — dry-run entries never get here. + // **Queue-before-broadcast**: insert into the queue first so a + // later submit failure still leaves a record of the ranked + // candidate, and so the (future) broadcast-retry stage can + // walk the queue without racing the broadcast attempt below. let profit_cents = wei_to_usd_cents(opp.net_profit_wei); - pipeline.queue.push(opp, block).await; + pipeline.queue.push(opp.clone(), block).await; charon_metrics::record_opportunity_queued(chain, profit_cents, true); + + // g. Broadcast stage — only when `--execute` assembled the + // harness on startup. Re-encodes calldata (pure local work; + // no RPC) rather than threading the sim calldata through the + // gate trait. The broadcast is deliberately best-effort: any + // failure is logged and the opportunity stays queued, so a + // future retry stage can pick it up. `Ok(true)` still reports + // "enqueued"; broadcast success is an additional metric label. + if let Some(harness) = pipeline.exec_harness.as_ref() { + match broadcast_opportunity(pipeline.as_ref(), harness, &opp, ¶ms, builder).await { + Ok(tx_hash) => { + info!( + chain = %pipeline.chain_name, + borrower = %pos.borrower, + %tx_hash, + submitter = %harness.submitter_label, + net_profit_cents = profit_cents, + "liquidation broadcast" + ); + } + Err(err) => { + warn!( + chain = %pipeline.chain_name, + borrower = %pos.borrower, + error = %format!("{err:#}"), + submitter = %harness.submitter_label, + "broadcast failed — opportunity left in queue for future retry" + ); + } + } + } + Ok(true) } +/// Sign and broadcast one simulation-passing opportunity. +/// +/// Flow: +/// 1. `GasOracle::fetch_params` for the current block. A +/// `SkipCeilingExceeded` verdict drops the opportunity (the +/// current tip of the mempool is too expensive for our +/// `bot.max_gas_wei` ceiling to make economic sense). +/// 2. `eth_estimateGas` on a request with the resolved fees, then +/// scale by [`BROADCAST_GAS_BUFFER_NUM`]/[`BROADCAST_GAS_BUFFER_DEN`] +/// (30% headroom) to cover state drift between estimate and +/// inclusion. +/// 3. `NonceManager::next()` claims a nonce atomically. The local +/// counter is the source of truth for the in-flight window; the +/// pending-block read only runs on init + resync, never on the +/// hot path. +/// 4. `TxBuilder::build_tx` + `TxBuilder::sign` produce the raw +/// EIP-2718 envelope. +/// 5. `Submitter::submit` posts to the private RPC with a single +/// attempt. Timeout / rejection handling is encoded in +/// `SubmitError`. +/// +/// Nonce-gap handling — invariants mirror the submit doc: +/// * Sign failure: tx never hit the wire but `next()` already +/// consumed a nonce, so the counter is ahead of the chain. Force a +/// resync before returning so the next broadcast sees canonical +/// state. +/// * `SubmitError::RpcRejected`: node rejected the tx (bad nonce, +/// revert-on-broadcast, insufficient funds, rate-limit). Counter is +/// ahead of the chain — resync. +/// * `SubmitError::Timeout` / `SubmitError::ConnectionLost`: tx may +/// still land (transport blip, vendor side spike). Leaving the +/// counter alone is the correct call — a bogus resync here would +/// reuse a nonce that a later block confirms. The next +/// rejection-with-nonce-too-low drives a recovery. +async fn broadcast_opportunity( + pipeline: &VenusPipeline, + harness: &ExecHarness, + opp: &LiquidationOpportunity, + params: &LiquidationParams, + builder: &TxBuilder, +) -> Result { + let calldata: Bytes = builder + .encode_calldata(opp, params) + .context("broadcast: re-encode calldata failed")?; + + // 1. Gas params (fee pair + ceiling check). + let decision = harness + .gas_oracle + .fetch_params(pipeline.provider.as_ref(), None) + .await + .context("broadcast: gas oracle fetch_params failed")?; + let gas_params = match decision { + GasDecision::Proceed(p) => p, + GasDecision::SkipCeilingExceeded { + max_fee_wei, + ceiling_wei, + } => { + bail!( + "gas ceiling tripped: max_fee_wei={max_fee_wei} exceeds \ + bot.max_gas_wei={ceiling_wei}" + ); + } + // `GasDecision` is `#[non_exhaustive]`; a new skip reason + // added upstream lands here and is treated as a drop until + // the broadcast call site is taught to handle it. Safer than + // a blanket `SkipCeilingExceeded` mapping that would silently + // reinterpret unrelated drops. + _ => bail!("broadcast: unknown gas-oracle decision variant"), + }; + + // 2. Estimate gas on a minimal request — provider needs + // from/to/data + fees to simulate execution. Then apply a + // 130% buffer (30% headroom). `provider.estimate_gas` is used + // directly rather than `GasOracle::estimate_gas_units` + // because the oracle's internal 120% buffer would compound + // with ours; we want one explicit buffer at one call site. + let est_tx = TransactionRequest::default() + .with_from(harness.signer_address) + .with_to(builder.liquidator()) + .with_input(calldata.clone()) + .with_max_fee_per_gas(gas_params.max_fee_per_gas) + .with_max_priority_fee_per_gas(gas_params.max_priority_fee_per_gas); + let gas_units = pipeline + .provider + .estimate_gas(&est_tx) + .await + .context("broadcast: eth_estimateGas failed")?; + let gas_limit = gas_units + .saturating_mul(BROADCAST_GAS_BUFFER_NUM) + / BROADCAST_GAS_BUFFER_DEN; + + // 3. Claim a nonce locally — atomic, no race with a parallel + // opportunity in the same block. + let nonce = harness.nonce_manager.next(); + + // 4. Build + sign. + let tx = builder + .build_tx( + calldata, + nonce, + gas_params.max_fee_per_gas, + gas_params.max_priority_fee_per_gas, + gas_limit, + ) + .context("broadcast: build_tx failed")?; + let raw = match builder.sign(tx).await { + Ok(bytes) => bytes, + Err(err) => { + // Nonce consumed but no tx hit the wire — resync so the + // counter doesn't leave a permanent gap. + if let Err(resync_err) = harness + .nonce_manager + .resync(pipeline.provider.as_ref()) + .await + { + warn!( + error = %format!("{resync_err:#}"), + "nonce resync failed after sign error" + ); + } + return Err(anyhow::Error::new(err).context("broadcast: sign failed")); + } + }; + + // 5. Submit. + match harness.submitter.submit(raw).await { + Ok(hash) => Ok(hash), + Err(err) => { + if matches!(err, SubmitError::RpcRejected(_)) { + if let Err(resync_err) = harness + .nonce_manager + .resync(pipeline.provider.as_ref()) + .await + { + warn!( + error = %format!("{resync_err:#}"), + "nonce resync failed after RPC rejection" + ); + } + } + Err(anyhow::Error::new(err).context("broadcast: submit failed")) + } + } +} + /// Convert a `net_profit_wei` (debt-token smallest units, assumed /// 18-decimal stablecoin for v0.1) to USD cents for the profit /// histogram. Saturates on overflow so a corrupted upper-bound sample diff --git a/crates/charon-executor/src/builder.rs b/crates/charon-executor/src/builder.rs index 793fa07..9dde2fc 100644 --- a/crates/charon-executor/src/builder.rs +++ b/crates/charon-executor/src/builder.rs @@ -8,17 +8,16 @@ //! Solidity-side `LiquidationParams` struct and ABI-encode the //! `executeLiquidation(...)` call. //! 2. [`TxBuilder::build_tx`] — wrap the calldata in an unsigned -//! [`TransactionRequest`] with EIP-1559 fee fields and the -//! **pending** nonce for the bot's hot wallet. +//! [`TransactionRequest`] with EIP-1559 fee fields and a nonce +//! supplied by the caller (typically [`crate::NonceManager::next`]). //! 3. [`TxBuilder::sign`] — sign the request, returning the raw bytes //! that go into `eth_sendRawTransaction` (or a Flashbots bundle). use std::fmt; -use alloy::eips::{BlockId, BlockNumberOrTag, eip2718::Encodable2718}; +use alloy::eips::eip2718::Encodable2718; use alloy::network::{EthereumWallet, TransactionBuilder}; use alloy::primitives::{Address, Bytes}; -use alloy::providers::Provider; use alloy::rpc::types::TransactionRequest; use alloy::signers::local::PrivateKeySigner; use alloy::sol; @@ -209,31 +208,27 @@ impl TxBuilder { /// Build an unsigned EIP-1559 [`TransactionRequest`] pointing at /// the configured liquidator. /// - /// Pulls the **pending** nonce from `provider` so a broadcast - /// that is still in the mempool does not collide with the newly - /// built transaction. `gas_limit` is supplied by the caller - /// (typically a multiple of `eth_estimateGas` plus a safety - /// buffer). Fee fields are passed through; producing them is - /// the gas oracle's job, not the builder's. + /// The caller supplies the nonce (typically from + /// [`crate::NonceManager::next`]) and gas parameters from the gas + /// oracle. This method intentionally does **not** hit the provider + /// — doing so would race against the `NonceManager`'s local counter + /// and hand out duplicate nonces when two opportunities land in the + /// same block. `gas_limit` is supplied by the caller (typically a + /// multiple of `eth_estimateGas` plus a safety buffer). Fee fields + /// are passed through; producing them is the gas oracle's job, not + /// the builder's. /// /// Fee invariant: `max_priority_fee_per_gas <= max_fee_per_gas`. /// Violating it is rejected here rather than letting the node /// reject it after a network round-trip. - // TODO(#43): replace the direct `eth_getTransactionCount` read - // with the upcoming `NonceManager` once PR #43 lands, so bursty - // submission windows don't have to re-RPC for every tx. - pub async fn build_tx( + pub fn build_tx( &self, - provider: &P, calldata: Bytes, + nonce: u64, max_fee_per_gas: u128, max_priority_fee_per_gas: u128, gas_limit: u64, - ) -> Result - where - P: Provider, - T: alloy::transports::Transport + Clone, - { + ) -> Result { if max_priority_fee_per_gas > max_fee_per_gas { return Err(BuilderError::InvalidFees( max_priority_fee_per_gas, @@ -242,12 +237,6 @@ impl TxBuilder { } let from = self.signer.address(); - let nonce = provider - .get_transaction_count(from) - .block_id(BlockId::Number(BlockNumberOrTag::Pending)) - .await - .map_err(BuilderError::NonceFetch)?; - let tx = TransactionRequest::default() .with_from(from) .with_to(self.liquidator)