diff --git a/Cargo.lock b/Cargo.lock index 19958bf7e5..0ffb0cf7bc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9189,6 +9189,7 @@ dependencies = [ "alloy-primitives 1.2.0", "alloy-u256-literal", "anyhow", + "async-trait", "blocksense-anomaly-detection", "blocksense-blockchain-data-model", "blocksense-config", diff --git a/apps/sequencer/Cargo.toml b/apps/sequencer/Cargo.toml index 16a4629e56..acb7dd4bb7 100644 --- a/apps/sequencer/Cargo.toml +++ b/apps/sequencer/Cargo.toml @@ -48,6 +48,7 @@ alloy-primitives = { workspace = true } alloy-u256-literal = { workspace = true } anyhow = { workspace = true } +async-trait = "0.1" bridgetree = { git = "https://github.com/zcash/incrementalmerkletree", package = "bridgetree" } bytes = { workspace = true } chrono = { workspace = true } diff --git a/apps/sequencer/README.md b/apps/sequencer/README.md index 7664c14da6..7acc66d328 100644 --- a/apps/sequencer/README.md +++ b/apps/sequencer/README.md @@ -40,6 +40,7 @@ echo -n 0x5de4111afa1a4b94908f83103eb1f1706367c2e68ca870fc3fb9a804cdab365a > /tm ### Optional WebSocket newHeads subscription - Provider field: add `websocket_url` (optional) under each provider in `SequencerConfig.providers.`. +- Provider field: add `websocket_reconnect` (optional) to override the WS reconnection policy (`initial_backoff_ms`, `backoff_multiplier`, `max_backoff_ms`). Defaults retry forever with exponential backoff capped at 5 minutes. - Format: must start with `ws://` or `wss://` (validated at startup). - Behavior: when set, the reorg tracker subscribes to `newHeads` over WS to trigger checks; all reads (blocks, receipts, finalized blocks, contract state) remain over HTTP. - Fallback: if WS connect/subscribe fails or drops, the tracker falls back to polling with exponential backoff (1s → 30s cap, ±10% jitter) and periodically retries WS. diff --git a/apps/sequencer/src/http_handlers/admin.rs b/apps/sequencer/src/http_handlers/admin.rs index e3299a154d..bf93a4dcfc 100644 --- a/apps/sequencer/src/http_handlers/admin.rs +++ b/apps/sequencer/src/http_handlers/admin.rs @@ -964,6 +964,7 @@ mod tests { private_key_path: key_path.to_str().unwrap().to_owned(), url, websocket_url: None, + websocket_reconnect: None, transaction_retries_count_limit: 42, transaction_retry_timeout_secs: 20, retry_fee_increment_fraction: 0.1, diff --git a/apps/sequencer/src/providers/Reorg.md b/apps/sequencer/src/providers/Reorg.md new file mode 100644 index 0000000000..220fca5c04 --- /dev/null +++ b/apps/sequencer/src/providers/Reorg.md @@ -0,0 +1,142 @@ +# Reorg Tracking + +This document describes how `apps/sequencer/src/providers/reorg_tracking.rs` monitors +canonical chain progress, detects reorganizations, and recovers the Sequencer’s +state. + +## Tracker State and Inputs + +- `ReorgTracker` keeps per-network state: the most recent finalized height, + the highest height we have observed locally, an iteration counter, and the + RPC timeout derived from `ReorgConfig`. +- Access to providers is shared via `SharedRpcProviders`, where each + `RpcProvider` owns the HTTP transport, optional websocket transport, and an + `InflightObservations` cache (`observed_block_hashes` plus + `non_finalized_updates`). +- Reintroduced batches are sent through a `CountedSender` + so the relayer loop can replay messages that were lost on a fork. +- When websocket support is configured (`Provider.websocket_url` with optional + `WebsocketReconnectConfig`) the tracker builds a `ResilientWsConnect` to + subscribe to `eth_subscribe:newHeads`. + +## Control Loop (`loop_tracking_for_reorg_in_network`) + +1. **Transport setup** – If a websocket URL is present we attempt to connect + and subscribe to `newHeads`. The handshake and subsequent reconnects use the + `WsReconnectPolicy` backoff helpers (`should_attempt_ws`, `schedule_ws_retry`, + `reset_ws_backoff`). On failure the tracker logs the reason, schedules a + retry, and temporarily falls back to HTTP polling. +2. **Polling cadence** – For pure HTTP operation we call + `calculate_block_generation_time_in_network` (look back 100 blocks) to + estimate the poll interval. All block reads are wrapped in + `actix_web::rt::time::timeout`; warnings are emitted when a 5‑second deadline + is exceeded. +3. **Per-iteration work** – Each loop iteration: + - Clones the provider handle and a snapshot of `observed_block_hashes`. + - Selects the websocket provider when available (HTTP otherwise) for reads. + - Fetches the latest head (`BlockNumberOrTag::Latest`) and, if successful, + passes it to `process_new_block`. + - Reads the on-chain ADFS root via `rpc_get_storage_at`. If the storage slot + differs from the locally tracked Merkle root we update + `RpcProvider.merkle_root_in_contract` and mark that the ring buffer indices + must be resynchronized. + - Fetches `BlockNumberOrTag::Finalized` to advance + `observer_finalized_height`. When finalization moves forward we prune both + `non_finalized_updates` and `observed_block_hashes` through + `InflightObservations::prune_observed_up_to`. If the tracker falls behind + finalization we reset `observed_latest_height` to the finalized height and + seed the observed hash from the finalized block. + - When `need_resync_indices` is true we call `try_to_sync` so the ring-buffer + indices are refreshed directly from the contract state. + - If the provider disappears from the shared map we log and terminate the + loop for that network. + +## Detecting Divergence (`process_new_block`) + +- **Tip pre-check** – Before ingesting new blocks we refetch the chain block at + `observed_latest_height`. A hash mismatch signals that the tracked tip was + replaced, so we increment `ProviderMetrics.observed_reorgs` and delegate to + `handle_reorg`. +- **Parent mismatch** – When new blocks appear we load the first new block and + compare its parent hash to the stored hash for `observed_latest_height`. Any + difference indicates a fork at or above that height. +- **Same-height hash change** – Even if the height does not advance, the latest + header fetched from RPC is compared with the cached hash. If it changes we + treat it as a reorg. +- Successful ingestion stores fresh hashes in + `provider.inflight.observed_block_hashes` for every block seen, ensuring we + can later walk backwards to locate a common ancestor. + +All block and storage reads funnel through `rpc_get_block_by_number` and +`rpc_get_storage_at`. These helpers prefer the websocket provider when it is +healthy and fall back to HTTP otherwise, still enforcing the per-call timeout. + +## Handling a Reorg (`handle_reorg`) + +1. Walk the cached heights in descending order (starting from the newest cached + block) and refetch each block from the chain until we locate the first height + where the stored hash matches the canonical hash. The function logs any + diverged heights it sees along the way. +2. The fork point is `first_common + 1`. We log both the ancestor and the fork + height for operators. +3. Holding the provider lock, we remove every entry in + `non_finalized_updates` with a height ≥ fork height. Each batch is sent back + to the relayer channel in ascending order and the resend outcome is logged. + Pre-fork entries stay untouched so they can be + pruned only when the network finalizes them. +4. If no common ancestor is found within the cached history we warn, but the + loop continues on the canonical head revealed by RPC. + +## Finalization and Cleanup + +- `observer_finalized_height` tracks the latest finalized block we have seen. + Advancing it triggers a pruning pass in `InflightObservations` which clears + outdated block hashes and relayer batches that can never reorg again. +- If we discover that `observed_latest_height` lags behind finalization we + fast-forward it to the finalized height and overwrite the cached hash to keep + the tracker anchored to known-final data. +- Additional blocks observed after a fork are appended to + `observed_block_hashes`, giving the tracker the history it needs for future + reorg detection. + +## Websocket Strategy and Fallbacks + +- When websockets are configured the tracker continuously consumes the + `newHeads` stream. Every received header simply wakes the loop so the same + verification logic runs against HTTP (to reuse existing RPC primitives). +- Disconnects or subscription failures trigger a configurable backoff sequence. + While waiting for the next retry we revert to the polling cadence determined + by the average block time. +- If no websocket URL is configured we never try to connect; the loop purely + relies on the adaptive polling interval. + +## Metrics and Observability + +- `ProviderMetrics.observed_reorgs` (an `IntCounterVec` keyed by network) is the + primary signal that a reorg was detected. Every detection increments it before + the corrective flow begins. +- Logs include network, observed/latest heights, finalized checkpoints, and + loop counters, mirroring the legacy `eth_send_utils` diagnostics so existing + dashboards remain useful. Diverged block hashes, fork points, and resend + activity are explicitly printed. + +## Test Coverage + +- `test_loop_tracking_reorg_detect_and_resync_indices_http` and + `test_loop_tracking_reorg_detect_and_resync_indices_websock` + (`apps/sequencer/src/providers/reorg_tracking.rs:1573`) spin up an Anvil node, + inject synthetic non-finalized batches (tagged via `EncodedFeedId`), and drive + a deterministic reorg using snapshot/revert. +- The tests assert that: + - `observed_reorgs` increments for the network. + - Only the batches at or above the fork height are replayed through the + relayer channel and they arrive in ascending order. + - Pre-fork batches remain cached until later finalization causes pruning. + - The on-chain ADFS root drift triggers a `try_to_sync` resync, even without + deploying contracts. + - Both the HTTP polling path and websocket-triggered path exercise the same + logic by running the scenario twice. + +Together these pieces ensure the Sequencer can withstand short-lived forks, +recover the state required to keep publishing updates, and provide operators the +signals they need to observe the system’s behavior. diff --git a/apps/sequencer/src/providers/eth_send_utils.rs b/apps/sequencer/src/providers/eth_send_utils.rs index d80f3e5057..e5a5c7021a 100644 --- a/apps/sequencer/src/providers/eth_send_utils.rs +++ b/apps/sequencer/src/providers/eth_send_utils.rs @@ -331,12 +331,16 @@ pub async fn create_per_network_reorg_trackers( let net_clone = net.clone(); let sequencer_state_providers_clone = sequencer_state.providers.clone(); let sequencer_config = sequencer_state.sequencer_config.read().await; - let (reorg_tracker_config, websocket_url_opt) = + let (reorg_tracker_config, websocket_url_opt, websocket_reconnect_opt) = match sequencer_config.providers.get(net.as_str()) { - Some(c) => (c.reorg.clone(), c.websocket_url.clone()), + Some(c) => ( + c.reorg.clone(), + c.websocket_url.clone(), + c.websocket_reconnect.clone(), + ), None => { error!("No config for provider for network {net} will set to default!"); - (ReorgConfig::default(), None) + (ReorgConfig::default(), None, None) } }; let relayer_send_channel = match sequencer_state @@ -356,6 +360,7 @@ pub async fn create_per_network_reorg_trackers( sequencer_state_providers_clone, relayer_send_channel, websocket_url_opt, + websocket_reconnect_opt, ); collected_futures.push( tokio::task::Builder::new() diff --git a/apps/sequencer/src/providers/inflight_observations.rs b/apps/sequencer/src/providers/inflight_observations.rs index 43716e96e8..58743a358e 100644 --- a/apps/sequencer/src/providers/inflight_observations.rs +++ b/apps/sequencer/src/providers/inflight_observations.rs @@ -84,6 +84,7 @@ mod tests { private_key_path: "dummy".to_string(), url: "http://localhost:8545".to_string(), websocket_url: None, + websocket_reconnect: None, transaction_retries_count_limit: 3, transaction_retry_timeout_secs: 10, retry_fee_increment_fraction: 0.1, diff --git a/apps/sequencer/src/providers/mod.rs b/apps/sequencer/src/providers/mod.rs index 6a9983a8f2..bea4e456dd 100644 --- a/apps/sequencer/src/providers/mod.rs +++ b/apps/sequencer/src/providers/mod.rs @@ -2,3 +2,4 @@ pub mod eth_send_utils; pub mod inflight_observations; pub mod provider; pub mod reorg_tracking; +pub mod ws; diff --git a/apps/sequencer/src/providers/provider.rs b/apps/sequencer/src/providers/provider.rs index ca25b4e8b8..a00d446ea4 100644 --- a/apps/sequencer/src/providers/provider.rs +++ b/apps/sequencer/src/providers/provider.rs @@ -10,6 +10,7 @@ use alloy::{ Identity, ProviderBuilder, RootProvider, }, signers::local::PrivateKeySigner, + transports::ws::WsConnect, }; use alloy_primitives::{keccak256, B256, U256}; use alloy_u256_literal::u256; @@ -40,14 +41,15 @@ use std::collections::HashMap; use std::sync::Arc; use std::{fs, mem}; use tokio::sync::{Mutex, RwLock}; -use tokio::time::error::Elapsed; -use tokio::time::Duration; +use tokio::time::{error::Elapsed, Duration}; use tracing::{debug, error, info, warn}; use crate::providers::eth_send_utils::{ get_gas_limit, get_tx_retry_params, BatchOfUpdatesToProcess, GasFees, }; use crate::providers::inflight_observations::InflightObservations; +use crate::providers::ws::{ResilientWsConnect, WsReconnectMetrics, WsReconnectPolicy}; +use async_trait::async_trait; use std::time::Instant; pub type ProviderType = @@ -106,6 +108,49 @@ impl Hashable for HashValue { } } +struct ProviderWsRecorder { + metrics: Arc>, + network: String, +} + +impl ProviderWsRecorder { + fn new(metrics: Arc>, network: &str) -> Self { + Self { + metrics, + network: network.to_owned(), + } + } +} + +#[async_trait] +impl WsReconnectMetrics for ProviderWsRecorder { + async fn on_disconnect(&self) { + self.metrics + .read() + .await + .ws_disconnects_detected + .with_label_values(&[self.network.as_str()]) + .inc(); + } + + async fn on_attempt(&self) { + self.metrics + .read() + .await + .ws_reconnect_attempts + .with_label_values(&[self.network.as_str()]) + .inc(); + } + + async fn on_success(&self) { + self.metrics + .read() + .await + .ws_reconnect_successes + .with_label_values(&[self.network.as_str()]) + .inc(); + } +} pub struct RpcProvider { pub calldata_merkle_tree_frontier: Frontier, pub merkle_root_in_contract: Option, @@ -309,10 +354,34 @@ impl RpcProvider { provider_metrics: &Arc>, feeds_config: &AllFeedsConfig, ) -> RpcProvider { - let provider = ProviderBuilder::new() - .disable_recommended_fillers() - .wallet(EthereumWallet::from(signer.clone())) - .connect_http(rpc_url.clone()); + let provider = match rpc_url.scheme() { + "ws" | "wss" => { + let metrics_recorder = Arc::new(ProviderWsRecorder::new( + Arc::clone(provider_metrics), + network, + )); + let resilient_connect = ResilientWsConnect::new( + WsConnect::new(rpc_url.as_str().to_owned()), + WsReconnectPolicy::from_config(p.websocket_reconnect.as_ref()), + metrics_recorder, + network, + ); + + ProviderBuilder::new() + .disable_recommended_fillers() + .wallet(EthereumWallet::from(signer.clone())) + .connect_pubsub_with(resilient_connect) + .await + .unwrap_or_else(|err| { + panic!("Failed to connect to provider over websocket {rpc_url}: {err:?}") + }) + } + "http" | "https" => ProviderBuilder::new() + .disable_recommended_fillers() + .wallet(EthereumWallet::from(signer.clone())) + .connect_http(rpc_url.clone()), + other => panic!("Unsupported RPC URL scheme `{other}` for network {network}"), + }; let impersonated_anvil_account = p .impersonated_anvil_account diff --git a/apps/sequencer/src/providers/reorg_tracking.rs b/apps/sequencer/src/providers/reorg_tracking.rs index c2dfced05d..def746fdc5 100644 --- a/apps/sequencer/src/providers/reorg_tracking.rs +++ b/apps/sequencer/src/providers/reorg_tracking.rs @@ -1,18 +1,18 @@ use crate::providers::provider::{RpcProvider, SharedRpcProviders}; use actix_web::rt::time::timeout; +use alloy::eips::BlockNumberOrTag; use alloy::hex; -use alloy::providers::ProviderBuilder; +use alloy::providers::{Provider, ProviderBuilder, RootProvider}; use alloy::rpc::types::Block; use alloy::transports::ws::WsConnect; -use alloy::{eips::BlockNumberOrTag, providers::Provider}; use alloy_primitives::{Address, B256}; -use blocksense_config::ReorgConfig; +use async_trait::async_trait; +use blocksense_config::{ReorgConfig, WebsocketReconnectConfig}; use blocksense_utils::counter_unbounded_channel::CountedSender; -use eyre::Report; -use rand::Rng; +use eyre::{eyre, Report}; use std::collections::HashMap; use std::sync::Arc; -use tokio::sync::Mutex; +use tokio::sync::{Mutex, RwLock}; use tokio::time::{Duration, Instant}; use tokio_stream::StreamExt; use tracing::{debug, error, info, trace, warn}; @@ -23,6 +23,7 @@ use blocksense_utils::await_time; // Local helpers from eth_send_utils we need to call use crate::providers::eth_send_utils::{try_to_sync, BatchOfUpdatesToProcess}; +use crate::providers::ws::{ResilientWsConnect, WsReconnectMetrics, WsReconnectPolicy}; async fn rpc_get_block_by_number( rpc_http: &dyn Provider, @@ -49,81 +50,71 @@ async fn rpc_get_storage_at( .map_err(Report::from) } -struct ReconnectBackoff { - backoff_idx: usize, - next_retry_at: Option, - jittered_ms: u64, +pub struct ReorgTracker { + observer_finalized_height: u64, + observed_latest_height: u64, + loop_count: u64, + rpc_timeout: Duration, + net: String, + providers_mutex: SharedRpcProviders, + updates_relayer_send_chan: CountedSender, + websocket_url: Option, + websocket_policy: WsReconnectPolicy, + websocket_retry_attempt: u64, + next_websocket_retry_at: Option, } -impl ReconnectBackoff { - pub fn new() -> ReconnectBackoff { - ReconnectBackoff { - backoff_idx: 0, - next_retry_at: None, - jittered_ms: 0, - } - } - - pub fn get_next_retry_at(&self) -> Option { - self.next_retry_at +impl ReorgTracker { + fn should_attempt_ws(&self) -> bool { + self.next_websocket_retry_at + .is_none_or(|deadline| Instant::now() >= deadline) } - pub fn get_next_retry_ms(&self) -> u64 { - self.jittered_ms + fn reset_ws_backoff(&mut self) { + self.websocket_retry_attempt = 0; + self.next_websocket_retry_at = None; } - pub fn connection_established(&mut self) { - self.backoff_idx = 0; - self.next_retry_at = None; - self.jittered_ms = 0; + fn schedule_ws_retry(&mut self) -> Duration { + self.websocket_retry_attempt = self.websocket_retry_attempt.saturating_add(1); + let delay = self + .websocket_policy + .backoff_delay(self.websocket_retry_attempt); + self.next_websocket_retry_at = Some(Instant::now() + delay); + delay } - fn set_next_retry_at(&mut self) { - let base_ms = Self::get_retry_periods_ms()[self.backoff_idx]; - self.jittered_ms = Self::jitter_backoff_ms(base_ms); - self.next_retry_at = Some(Instant::now() + Duration::from_millis(self.jittered_ms)); - } + async fn build_ws_provider( + &self, + ws_url: &str, + providers_mutex: &SharedRpcProviders, + ) -> eyre::Result { + let provider_mutex = { + let providers = providers_mutex.read().await; + providers.get(self.net.as_str()).cloned() + } + .ok_or_else(|| eyre!("No active provider found for network {}", self.net))?; - fn get_retry_periods_ms() -> &'static [u64; 9] { - &[ - 1_000, 2_000, 5_000, 10_000, 20_000, 40_000, 80_000, 160_000, 320_000, - ] - } + let provider_metrics = { + let provider = provider_mutex.lock().await; + provider.provider_metrics.clone() + }; - fn jitter_backoff_ms(base_ms: u64) -> u64 { - if base_ms == 0 { - return 0; - } - let jitter = base_ms / 10; - if jitter == 0 { - return base_ms; - } - let mut rng = rand::thread_rng(); - let min = base_ms.saturating_sub(jitter); - let max = base_ms + jitter; - rng.gen_range(min..=max) - } + let recorder = Arc::new(ReorgWsRecorder::new(provider_metrics, &self.net)); + let ws_connect = ResilientWsConnect::new( + WsConnect::new(ws_url.to_owned()), + self.websocket_policy.clone(), + recorder, + &self.net, + ); - fn inc_retries_count(&mut self) { - self.set_next_retry_at(); - if self.backoff_idx < Self::get_retry_periods_ms().len() - 1 { - self.backoff_idx += 1; - } + ProviderBuilder::new() + .disable_recommended_fillers() + .connect_pubsub_with(ws_connect) + .await + .map_err(Report::from) } -} -pub struct ReorgTracker { - observer_finalized_height: u64, - observed_latest_height: u64, - loop_count: u64, - rpc_timeout: Duration, - net: String, - providers_mutex: SharedRpcProviders, - updates_relayer_send_chan: CountedSender, - websocket_url: Option, -} - -impl ReorgTracker { // Helper to handle reorg once already detected. Finds fork point and prints // discarded observations, mirroring the existing log messages and structure. async fn handle_reorg( @@ -280,39 +271,31 @@ impl ReorgTracker { const DEFAULT_POLL_INTERVAL_MS: u64 = 60 * 1000; - let mut reconnect_backoff_tracker = ReconnectBackoff::new(); - if let Some(ref ws_url) = websocket_url { info!("Attempting WS connect for {net} to {ws_url}"); - match ProviderBuilder::new() - .disable_recommended_fillers() - .connect_ws(WsConnect::new(ws_url)) - .await - { - Ok(provider_ws) => { - info!("WS connected for {net}; subscribing to newHeads"); - match provider_ws.subscribe_blocks().await { - Ok(sub) => { - stream_opt = Some(sub.into_stream()); - provider_ws_opt = Some(provider_ws); - reconnect_backoff_tracker.connection_established(); - } - Err(e) => { - warn!("WS subscribe_blocks failed for {net}: {e:?}"); - reconnect_backoff_tracker.inc_retries_count(); - warn!( - "Will retry WS subscribe for {net} in {jittered_ms:?}ms after error: {e:?}", - jittered_ms = reconnect_backoff_tracker.get_next_retry_ms() - ); - } + match self.build_ws_provider(ws_url, &providers_mutex).await { + Ok(provider_ws) => match provider_ws.subscribe_blocks().await { + Ok(sub) => { + info!("WS connected for {net}; subscribed to newHeads"); + stream_opt = Some(sub.into_stream()); + provider_ws_opt = Some(provider_ws); + self.reset_ws_backoff(); } - } + Err(e) => { + warn!("WS subscribe_blocks failed for {net}: {e:?}"); + let delay = self.schedule_ws_retry(); + warn!( + "Will retry WS setup for {net} in {delay_ms}ms after subscribe error", + delay_ms = delay.as_millis(), + ); + } + }, Err(e) => { warn!("WS connect failed for {net}: {e:?}"); - reconnect_backoff_tracker.inc_retries_count(); + let delay = self.schedule_ws_retry(); warn!( - "Will retry WS connect for {net} in {jittered_ms:?}ms after error: {e:?}", - jittered_ms = reconnect_backoff_tracker.get_next_retry_ms() + "Will retry WS setup for {net} in {delay_ms}ms after connect error", + delay_ms = delay.as_millis(), ); } } @@ -340,42 +323,31 @@ impl ReorgTracker { loop { if stream_opt.is_none() { if let Some(ref ws_url) = websocket_url { - let should_attempt = match reconnect_backoff_tracker.get_next_retry_at() { - Some(deadline) => Instant::now() >= deadline, - None => true, - }; - if should_attempt { + if self.should_attempt_ws() { info!("Attempting WS reconnect for {net} to {ws_url}"); - - match ProviderBuilder::new() - .disable_recommended_fillers() - .connect_ws(WsConnect::new(ws_url)) - .await - { - Ok(provider_ws) => { - info!("WS reconnected for {net}; subscribing to newHeads"); - match provider_ws.subscribe_blocks().await { - Ok(sub) => { - stream_opt = Some(sub.into_stream()); - provider_ws_opt = Some(provider_ws); - reconnect_backoff_tracker.connection_established(); - } - Err(e) => { - warn!("WS subscribe_blocks failed for {net}: {e:?}"); - reconnect_backoff_tracker.inc_retries_count(); - warn!( - "Will retry WS subscribe for {net} in {jittered_ms:?}ms after error: {e:?}", - jittered_ms = reconnect_backoff_tracker.get_next_retry_ms() - ); - } + match self.build_ws_provider(ws_url, &providers_mutex).await { + Ok(provider_ws) => match provider_ws.subscribe_blocks().await { + Ok(sub) => { + info!("WS reconnected for {net}; subscribed to newHeads"); + stream_opt = Some(sub.into_stream()); + provider_ws_opt = Some(provider_ws); + self.reset_ws_backoff(); } - } + Err(e) => { + warn!("WS subscribe_blocks failed for {net}: {e:?}"); + let delay = self.schedule_ws_retry(); + warn!( + "Will retry WS setup for {net} in {delay_ms}ms after subscribe error", + delay_ms = delay.as_millis(), + ); + } + }, Err(e) => { warn!("WS reconnect failed for {net}: {e:?}"); - reconnect_backoff_tracker.inc_retries_count(); + let delay = self.schedule_ws_retry(); warn!( - "Will retry WS connect for {net} in {jittered_ms:?}ms after error: {e:?}", - jittered_ms = reconnect_backoff_tracker.get_next_retry_ms() + "Will retry WS setup for {net} in {delay_ms}ms after connect error", + delay_ms = delay.as_millis(), ); } } @@ -402,15 +374,38 @@ impl ReorgTracker { ); stream_opt = None; provider_ws_opt = None; - reconnect_backoff_tracker.inc_retries_count(); + let delay = self.schedule_ws_retry(); warn!( - "Falling back to polling for {jittered_ms:?}ms before retrying WS in network {net}", - jittered_ms = reconnect_backoff_tracker.get_next_retry_ms() + "Will retry WS setup for {net} in {delay_ms}ms after stream closed", + delay_ms = delay.as_millis(), ); } }, None => { - if average_block_generation_time == 0 { + if websocket_url.is_some() { + let delay = match self.next_websocket_retry_at { + Some(deadline) => { + let now = Instant::now(); + if deadline > now { + deadline - now + } else { + Duration::from_millis(0) + } + } + None => { + let attempt = if self.websocket_retry_attempt == 0 { + 1 + } else { + self.websocket_retry_attempt + }; + self.websocket_policy.backoff_delay(attempt) + } + }; + + if !delay.is_zero() { + tokio::time::sleep(delay).await; + } + } else if average_block_generation_time == 0 { match self .calculate_block_generation_time_in_network( net.as_str(), @@ -442,7 +437,9 @@ impl ReorgTracker { } } } - await_time(average_block_generation_time).await; + if websocket_url.is_none() { + await_time(average_block_generation_time).await; + } } } @@ -866,6 +863,7 @@ impl ReorgTracker { providers_mutex: SharedRpcProviders, updates_relayer_send_chan: CountedSender, websocket_url: Option, + websocket_reconnect: Option, ) -> ReorgTracker { ReorgTracker { observer_finalized_height: 0, @@ -876,6 +874,9 @@ impl ReorgTracker { providers_mutex, updates_relayer_send_chan, websocket_url, + websocket_policy: WsReconnectPolicy::from_config(websocket_reconnect.as_ref()), + websocket_retry_attempt: 0, + next_websocket_retry_at: None, } } /// Calculates the average block generation time over the last `num_blocks` @@ -968,6 +969,51 @@ impl ReorgTracker { Some(total_span_ms / num_blocks) } } + +struct ReorgWsRecorder { + metrics: Arc>, + network: String, +} + +impl ReorgWsRecorder { + fn new(metrics: Arc>, network: &str) -> Self { + Self { + metrics, + network: network.to_owned(), + } + } +} + +#[async_trait] +impl WsReconnectMetrics for ReorgWsRecorder { + async fn on_disconnect(&self) { + self.metrics + .read() + .await + .reorg_ws_disconnects_detected + .with_label_values(&[self.network.as_str()]) + .inc(); + } + + async fn on_attempt(&self) { + self.metrics + .read() + .await + .reorg_ws_reconnect_attempts + .with_label_values(&[self.network.as_str()]) + .inc(); + } + + async fn on_success(&self) { + self.metrics + .read() + .await + .reorg_ws_reconnect_successes + .with_label_values(&[self.network.as_str()]) + .inc(); + } +} + #[cfg(test)] mod tests { use super::*; @@ -1232,6 +1278,7 @@ mod tests { providers_clone, feed_updates_send, websocket_url_clone, + None, ); reorg_tracker.loop_tracking_for_reorg_in_network().await; }) diff --git a/apps/sequencer/src/providers/ws.rs b/apps/sequencer/src/providers/ws.rs new file mode 100644 index 0000000000..528436f1a1 --- /dev/null +++ b/apps/sequencer/src/providers/ws.rs @@ -0,0 +1,363 @@ +use alloy::{ + pubsub::{ConnectionHandle, PubSubConnect}, + transports::{ws::WsConnect, TransportResult}, +}; +use async_trait::async_trait; +use blocksense_config::WebsocketReconnectConfig; +use std::{future::Future, sync::Arc}; +#[cfg(not(test))] +use tokio::time::sleep; +use tokio::time::Duration; +use tracing::{info, warn}; + +#[async_trait] +pub trait WsReconnectMetrics: Send + Sync { + async fn on_disconnect(&self); + async fn on_attempt(&self); + async fn on_success(&self); +} + +#[derive(Debug, Clone)] +pub struct WsReconnectPolicy { + initial: Duration, + max: Duration, + multiplier: f64, +} + +impl WsReconnectPolicy { + pub fn from_config(config: Option<&WebsocketReconnectConfig>) -> Self { + let cfg = config.cloned().unwrap_or_default(); + let initial = Duration::from_millis(cfg.initial_backoff_ms); + let max = Duration::from_millis(cfg.max_backoff_ms); + let multiplier = cfg.backoff_multiplier; + let clamped_initial = initial.min(max); + Self { + initial: clamped_initial, + max, + multiplier, + } + } + + pub fn backoff_delay(&self, attempt: u64) -> Duration { + if attempt == 0 { + return Duration::ZERO; + } + let exponent = (attempt - 1) as f64; + let scaled = self.initial.mul_f64(self.multiplier.powf(exponent)); + scaled.min(self.max) + } +} + +impl Default for WsReconnectPolicy { + fn default() -> Self { + Self::from_config(None) + } +} + +#[derive(Clone)] +pub struct ResilientWsConnect { + inner: WsConnect, + policy: WsReconnectPolicy, + metrics: Arc, + network: Arc, +} + +#[cfg(test)] +mod backoff_recorder { + use std::{future::Future, sync::Arc, time::Duration}; + use tokio::sync::Mutex; + + tokio::task_local! { + static RECORDER: Arc>>; + } + + // Run fut with RECORDER set as task local var for the current task. + pub(super) async fn with_recorder(recorder: Arc>>, fut: F) -> R + where + F: Future, + { + RECORDER.scope(recorder, fut).await + } + + pub(super) async fn record_backoff(delay: Duration) { + if let Ok(recorder) = RECORDER.try_with(|r| r.clone()) { + recorder.lock().await.push(delay); + } + } +} + +impl ResilientWsConnect { + pub fn new( + inner: WsConnect, + policy: WsReconnectPolicy, + metrics: Arc, + network: &str, + ) -> Self { + Self { + inner, + policy, + metrics, + network: Arc::new(network.to_owned()), + } + } +} + +impl PubSubConnect for ResilientWsConnect { + fn is_local(&self) -> bool { + self.inner.is_local() + } + + async fn connect(&self) -> TransportResult { + let inner = self.inner.clone(); + let handle = PubSubConnect::connect(&inner).await?; + Ok(handle + .with_max_retries(u32::MAX) + .with_retry_interval(Duration::from_secs(0))) + } + + async fn try_reconnect(&self) -> TransportResult { + let inner = self.inner.clone(); + self.reconnect_loop(|| PubSubConnect::connect(&inner)).await + } +} + +impl ResilientWsConnect { + async fn reconnect_loop(&self, mut connect: F) -> TransportResult + where + F: FnMut() -> Fut + Send, + Fut: Future> + Send, + { + self.metrics.on_disconnect().await; + + warn!( + network = self.network.as_str(), + initial_backoff_ms = self.policy.initial.as_millis(), + max_backoff_ms = self.policy.max.as_millis(), + multiplier = self.policy.multiplier, + "WS transport disconnected; starting exponential reconnect attempts" + ); + + let mut attempt: u64 = 0; + loop { + attempt = attempt.saturating_add(1); + self.metrics.on_attempt().await; + + match connect().await { + Ok(handle) => { + self.metrics.on_success().await; + info!( + network = self.network.as_str(), + attempt, "WS transport reconnected after {attempt} attempt(s)" + ); + return Ok(handle + .with_max_retries(u32::MAX) + .with_retry_interval(Duration::from_secs(0))); + } + Err(err) => { + let delay = self.policy.backoff_delay(attempt); + warn!( + network = self.network.as_str(), + attempt, + backoff_ms = delay.as_millis(), + capped = delay == self.policy.max, + error = %err, + "WS reconnect attempt failed; will retry" + ); + self.wait_before_retry(delay).await; + } + } + } + } + + #[cfg(not(test))] + async fn wait_before_retry(&self, delay: Duration) { + sleep(delay).await; + } + + #[cfg(test)] + async fn wait_before_retry(&self, delay: Duration) { + backoff_recorder::record_backoff(delay).await; + } +} + +#[cfg(test)] +mod tests { + use super::*; + use alloy::transports::TransportErrorKind; + use async_trait::async_trait; + use std::{ + collections::VecDeque, + sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, + }, + }; + use tokio::sync::Mutex; + + #[derive(Clone)] + struct MockConnector { + outcomes: Arc>>, + attempts: Arc, + } + + #[derive(Clone)] + enum MockOutcome { + Ok, + Err(&'static str), + } + + impl MockConnector { + fn new(outcomes: Vec) -> Self { + Self { + outcomes: Arc::new(Mutex::new(outcomes.into())), + attempts: Arc::new(AtomicUsize::new(0)), + } + } + + fn next(&self) -> impl Future> + Send + 'static { + let outcomes = Arc::clone(&self.outcomes); + let attempts = Arc::clone(&self.attempts); + async move { + attempts.fetch_add(1, Ordering::SeqCst); + let outcome = outcomes + .lock() + .await + .pop_front() + .expect("mock connector exhausted"); + match outcome { + MockOutcome::Ok => { + let (handle, _iface) = ConnectionHandle::new(); + Ok(handle) + } + MockOutcome::Err(msg) => Err(TransportErrorKind::custom_str(msg)), + } + } + } + + fn attempts(&self) -> usize { + self.attempts.load(Ordering::SeqCst) + } + + async fn remaining(&self) -> usize { + self.outcomes.lock().await.len() + } + } + + #[derive(Default)] + struct RecordingMetrics { + disconnects: AtomicUsize, + attempts: AtomicUsize, + successes: AtomicUsize, + order: Mutex>, + } + + impl RecordingMetrics { + fn counts(&self) -> (usize, usize, usize) { + ( + self.disconnects.load(Ordering::SeqCst), + self.attempts.load(Ordering::SeqCst), + self.successes.load(Ordering::SeqCst), + ) + } + + async fn events(&self) -> Vec<&'static str> { + self.order.lock().await.clone() + } + } + + #[async_trait] + impl WsReconnectMetrics for RecordingMetrics { + async fn on_disconnect(&self) { + self.disconnects.fetch_add(1, Ordering::SeqCst); + self.order.lock().await.push("disconnect"); + } + + async fn on_attempt(&self) { + self.attempts.fetch_add(1, Ordering::SeqCst); + self.order.lock().await.push("attempt"); + } + + async fn on_success(&self) { + self.successes.fetch_add(1, Ordering::SeqCst); + self.order.lock().await.push("success"); + } + } + + #[test] + fn backoff_delay_scales_and_caps() { + let cfg = WebsocketReconnectConfig { + initial_backoff_ms: 100, + backoff_multiplier: 2.0, + max_backoff_ms: 350, + }; + let policy = WsReconnectPolicy::from_config(Some(&cfg)); + + assert_eq!(policy.backoff_delay(0), Duration::ZERO); + assert_eq!(policy.backoff_delay(1), Duration::from_millis(100)); + assert_eq!(policy.backoff_delay(2), Duration::from_millis(200)); + assert_eq!(policy.backoff_delay(3), Duration::from_millis(350)); + assert_eq!(policy.backoff_delay(4), Duration::from_millis(350)); + } + + #[tokio::test] + async fn reconnect_loop_retries_metrics_and_stops_after_success() { + let cfg = WebsocketReconnectConfig { + initial_backoff_ms: 100, + backoff_multiplier: 2.0, + max_backoff_ms: 350, + }; + let policy = WsReconnectPolicy::from_config(Some(&cfg)); + let metrics = Arc::new(RecordingMetrics::default()); + let connector = MockConnector::new(vec![ + MockOutcome::Err("fail-1"), + MockOutcome::Err("fail-2"), + MockOutcome::Ok, + MockOutcome::Err("unused"), + ]); + + let recorded_delays = Arc::new(Mutex::new(Vec::new())); + let resilient = ResilientWsConnect::new( + WsConnect::new("ws://example.invalid"), + policy.clone(), + metrics.clone(), + "testnet", + ); + + let expected_delays = [policy.backoff_delay(1), policy.backoff_delay(2)]; + + backoff_recorder::with_recorder(recorded_delays.clone(), async { + let connector_for_task = connector.clone(); + let resilient_for_task = resilient.clone(); + let join = tokio::spawn(backoff_recorder::with_recorder( + recorded_delays.clone(), + async move { + resilient_for_task + .reconnect_loop(move || connector_for_task.next()) + .await + }, + )); + + let result = join.await.expect("task completed"); + let handle = result.expect("reconnect eventually succeeds"); + handle.shutdown(); + }) + .await; + + assert_eq!(connector.attempts(), 3); + assert_eq!(connector.remaining().await, 1); + + let recorded = recorded_delays.lock().await.clone(); + assert_eq!(recorded, expected_delays.to_vec()); + + let (disconnects, attempts, successes) = metrics.counts(); + assert_eq!(disconnects, 1); + assert_eq!(attempts, 3); + assert_eq!(successes, 1); + + let events = metrics.events().await; + assert_eq!( + events, + vec!["disconnect", "attempt", "attempt", "attempt", "success"] + ); + } +} diff --git a/libs/config/src/lib.rs b/libs/config/src/lib.rs index 33cba1b96b..6ae3101962 100644 --- a/libs/config/src/lib.rs +++ b/libs/config/src/lib.rs @@ -204,6 +204,9 @@ pub struct Provider { #[serde(default)] #[serde(skip_serializing_if = "Option::is_none")] pub websocket_url: Option, + #[serde(default)] + #[serde(skip_serializing_if = "Option::is_none")] + pub websocket_reconnect: Option, pub transaction_retries_count_limit: u32, pub transaction_retry_timeout_secs: u32, pub retry_fee_increment_fraction: f64, @@ -240,6 +243,53 @@ pub struct Provider { pub reorg: ReorgConfig, } +#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)] +pub struct WebsocketReconnectConfig { + #[serde(default = "default_ws_initial_backoff_ms")] + pub initial_backoff_ms: u64, + #[serde(default = "default_ws_backoff_multiplier")] + pub backoff_multiplier: f64, + #[serde(default = "default_ws_max_backoff_ms")] + pub max_backoff_ms: u64, +} + +impl Default for WebsocketReconnectConfig { + fn default() -> Self { + Self { + initial_backoff_ms: default_ws_initial_backoff_ms(), + backoff_multiplier: default_ws_backoff_multiplier(), + max_backoff_ms: default_ws_max_backoff_ms(), + } + } +} + +impl WebsocketReconnectConfig { + pub fn validate(&self, context: &str) -> anyhow::Result<()> { + if self.initial_backoff_ms == 0 { + anyhow::bail!("{context}: initial_backoff_ms must be greater than 0"); + } + if !(1.0..=10.0).contains(&self.backoff_multiplier) { + anyhow::bail!("{context}: backoff_multiplier must be between 1.0 and 10.0 (inclusive)"); + } + if self.max_backoff_ms < self.initial_backoff_ms { + anyhow::bail!("{context}: max_backoff_ms cannot be smaller than initial_backoff_ms"); + } + Ok(()) + } +} + +const fn default_ws_initial_backoff_ms() -> u64 { + 3_000 +} + +const fn default_ws_backoff_multiplier() -> f64 { + 2.0 +} + +const fn default_ws_max_backoff_ms() -> u64 { + 5 * 60 * 1_000 +} + fn default_is_enabled() -> bool { true } @@ -344,6 +394,10 @@ impl Validated for Provider { ); } } + if let Some(reconnect) = &self.websocket_reconnect { + let reconnect_ctx = format!("{context}: websocket_reconnect"); + reconnect.validate(&reconnect_ctx)?; + } Ok(()) } } @@ -612,6 +666,7 @@ pub fn get_test_config_with_multiple_providers( .to_string(), url: url.to_string(), websocket_url: None, + websocket_reconnect: None, transaction_retries_count_limit: 10, transaction_retry_timeout_secs: 24, retry_fee_increment_fraction: 0.1, diff --git a/libs/metrics/src/metrics.rs b/libs/metrics/src/metrics.rs index 5f38da421b..0c2d5b8ffc 100644 --- a/libs/metrics/src/metrics.rs +++ b/libs/metrics/src/metrics.rs @@ -173,6 +173,12 @@ pub struct ProviderMetrics { pub num_transactions_in_queue: IntGaugeVec, pub is_enabled: IntGaugeVec, pub observed_reorgs: IntCounterVec, + pub ws_disconnects_detected: IntCounterVec, + pub ws_reconnect_attempts: IntCounterVec, + pub ws_reconnect_successes: IntCounterVec, + pub reorg_ws_disconnects_detected: IntCounterVec, + pub reorg_ws_reconnect_attempts: IntCounterVec, + pub reorg_ws_reconnect_successes: IntCounterVec, } impl ProviderMetrics { @@ -284,6 +290,36 @@ impl ProviderMetrics { "Total number of observed chain reorganizations for the network", &["Network"] )?, + ws_disconnects_detected: register_int_counter_vec!( + format!("{}ws_disconnects_detected", prefix), + "Observed websocket disconnections for the network", + &["Network"] + )?, + ws_reconnect_attempts: register_int_counter_vec!( + format!("{}ws_reconnect_attempts", prefix), + "Attempted websocket reconnections for the network", + &["Network"] + )?, + ws_reconnect_successes: register_int_counter_vec!( + format!("{}ws_reconnect_successes", prefix), + "Successful websocket reconnections for the network", + &["Network"] + )?, + reorg_ws_disconnects_detected: register_int_counter_vec!( + format!("{}reorg_ws_disconnects_detected", prefix), + "Observed websocket disconnections for the reorg tracker", + &["Network"] + )?, + reorg_ws_reconnect_attempts: register_int_counter_vec!( + format!("{}reorg_ws_reconnect_attempts", prefix), + "Attempted websocket reconnections for the reorg tracker", + &["Network"] + )?, + reorg_ws_reconnect_successes: register_int_counter_vec!( + format!("{}reorg_ws_reconnect_successes", prefix), + "Successful websocket reconnections for the reorg tracker", + &["Network"] + )?, }) } }