Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions apps/sequencer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ alloy-primitives = { workspace = true }
alloy-u256-literal = { workspace = true }

anyhow = { workspace = true }
async-trait = "0.1"
bridgetree = { git = "https://github.com/zcash/incrementalmerkletree", package = "bridgetree" }
bytes = { workspace = true }
chrono = { workspace = true }
Expand Down
1 change: 1 addition & 0 deletions apps/sequencer/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ echo -n 0x5de4111afa1a4b94908f83103eb1f1706367c2e68ca870fc3fb9a804cdab365a > /tm
### Optional WebSocket newHeads subscription

- Provider field: add `websocket_url` (optional) under each provider in `SequencerConfig.providers.<name>`.
- Provider field: add `websocket_reconnect` (optional) to override the WS reconnection policy (`initial_backoff_ms`, `backoff_multiplier`, `max_backoff_ms`). Defaults retry forever with exponential backoff capped at 5 minutes.
- Format: must start with `ws://` or `wss://` (validated at startup).
- Behavior: when set, the reorg tracker subscribes to `newHeads` over WS to trigger checks; all reads (blocks, receipts, finalized blocks, contract state) remain over HTTP.
- Fallback: if WS connect/subscribe fails or drops, the tracker falls back to polling with exponential backoff (1s → 30s cap, ±10% jitter) and periodically retries WS.
Expand Down
1 change: 1 addition & 0 deletions apps/sequencer/src/http_handlers/admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -964,6 +964,7 @@ mod tests {
private_key_path: key_path.to_str().unwrap().to_owned(),
url,
websocket_url: None,
websocket_reconnect: None,
transaction_retries_count_limit: 42,
transaction_retry_timeout_secs: 20,
retry_fee_increment_fraction: 0.1,
Expand Down
142 changes: 142 additions & 0 deletions apps/sequencer/src/providers/Reorg.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
# Reorg Tracking

This document describes how `apps/sequencer/src/providers/reorg_tracking.rs` monitors
canonical chain progress, detects reorganizations, and recovers the Sequencer’s
state.

## Tracker State and Inputs

- `ReorgTracker` keeps per-network state: the most recent finalized height,
the highest height we have observed locally, an iteration counter, and the
RPC timeout derived from `ReorgConfig`.
- Access to providers is shared via `SharedRpcProviders`, where each
`RpcProvider` owns the HTTP transport, optional websocket transport, and an
`InflightObservations` cache (`observed_block_hashes` plus
`non_finalized_updates`).
- Reintroduced batches are sent through a `CountedSender<BatchOfUpdatesToProcess>`
so the relayer loop can replay messages that were lost on a fork.
- When websocket support is configured (`Provider.websocket_url` with optional
`WebsocketReconnectConfig`) the tracker builds a `ResilientWsConnect` to
subscribe to `eth_subscribe:newHeads`.

## Control Loop (`loop_tracking_for_reorg_in_network`)

1. **Transport setup** – If a websocket URL is present we attempt to connect
and subscribe to `newHeads`. The handshake and subsequent reconnects use the
`WsReconnectPolicy` backoff helpers (`should_attempt_ws`, `schedule_ws_retry`,
`reset_ws_backoff`). On failure the tracker logs the reason, schedules a
retry, and temporarily falls back to HTTP polling.
2. **Polling cadence** – For pure HTTP operation we call
`calculate_block_generation_time_in_network` (look back 100 blocks) to
estimate the poll interval. All block reads are wrapped in
`actix_web::rt::time::timeout`; warnings are emitted when a 5‑second deadline
is exceeded.
3. **Per-iteration work** – Each loop iteration:
- Clones the provider handle and a snapshot of `observed_block_hashes`.
- Selects the websocket provider when available (HTTP otherwise) for reads.
- Fetches the latest head (`BlockNumberOrTag::Latest`) and, if successful,
passes it to `process_new_block`.
- Reads the on-chain ADFS root via `rpc_get_storage_at`. If the storage slot
differs from the locally tracked Merkle root we update
`RpcProvider.merkle_root_in_contract` and mark that the ring buffer indices
must be resynchronized.
- Fetches `BlockNumberOrTag::Finalized` to advance
`observer_finalized_height`. When finalization moves forward we prune both
`non_finalized_updates` and `observed_block_hashes` through
`InflightObservations::prune_observed_up_to`. If the tracker falls behind
finalization we reset `observed_latest_height` to the finalized height and
seed the observed hash from the finalized block.
- When `need_resync_indices` is true we call `try_to_sync` so the ring-buffer
indices are refreshed directly from the contract state.
- If the provider disappears from the shared map we log and terminate the
loop for that network.

## Detecting Divergence (`process_new_block`)

- **Tip pre-check** – Before ingesting new blocks we refetch the chain block at
`observed_latest_height`. A hash mismatch signals that the tracked tip was
replaced, so we increment `ProviderMetrics.observed_reorgs` and delegate to
`handle_reorg`.
- **Parent mismatch** – When new blocks appear we load the first new block and
compare its parent hash to the stored hash for `observed_latest_height`. Any
difference indicates a fork at or above that height.
- **Same-height hash change** – Even if the height does not advance, the latest
header fetched from RPC is compared with the cached hash. If it changes we
treat it as a reorg.
- Successful ingestion stores fresh hashes in
`provider.inflight.observed_block_hashes` for every block seen, ensuring we
can later walk backwards to locate a common ancestor.

All block and storage reads funnel through `rpc_get_block_by_number` and
`rpc_get_storage_at`. These helpers prefer the websocket provider when it is
healthy and fall back to HTTP otherwise, still enforcing the per-call timeout.

## Handling a Reorg (`handle_reorg`)

1. Walk the cached heights in descending order (starting from the newest cached
block) and refetch each block from the chain until we locate the first height
where the stored hash matches the canonical hash. The function logs any
diverged heights it sees along the way.
2. The fork point is `first_common + 1`. We log both the ancestor and the fork
height for operators.
3. Holding the provider lock, we remove every entry in
`non_finalized_updates` with a height ≥ fork height. Each batch is sent back
to the relayer channel in ascending order and the resend outcome is logged.
Pre-fork entries stay untouched so they can be
pruned only when the network finalizes them.
4. If no common ancestor is found within the cached history we warn, but the
loop continues on the canonical head revealed by RPC.

## Finalization and Cleanup

- `observer_finalized_height` tracks the latest finalized block we have seen.
Advancing it triggers a pruning pass in `InflightObservations` which clears
outdated block hashes and relayer batches that can never reorg again.
- If we discover that `observed_latest_height` lags behind finalization we
fast-forward it to the finalized height and overwrite the cached hash to keep
the tracker anchored to known-final data.
- Additional blocks observed after a fork are appended to
`observed_block_hashes`, giving the tracker the history it needs for future
reorg detection.

## Websocket Strategy and Fallbacks

- When websockets are configured the tracker continuously consumes the
`newHeads` stream. Every received header simply wakes the loop so the same
verification logic runs against HTTP (to reuse existing RPC primitives).
- Disconnects or subscription failures trigger a configurable backoff sequence.
While waiting for the next retry we revert to the polling cadence determined
by the average block time.
- If no websocket URL is configured we never try to connect; the loop purely
relies on the adaptive polling interval.

## Metrics and Observability

- `ProviderMetrics.observed_reorgs` (an `IntCounterVec` keyed by network) is the
primary signal that a reorg was detected. Every detection increments it before
the corrective flow begins.
- Logs include network, observed/latest heights, finalized checkpoints, and
loop counters, mirroring the legacy `eth_send_utils` diagnostics so existing
dashboards remain useful. Diverged block hashes, fork points, and resend
activity are explicitly printed.

## Test Coverage

- `test_loop_tracking_reorg_detect_and_resync_indices_http` and
`test_loop_tracking_reorg_detect_and_resync_indices_websock`
(`apps/sequencer/src/providers/reorg_tracking.rs:1573`) spin up an Anvil node,
inject synthetic non-finalized batches (tagged via `EncodedFeedId`), and drive
a deterministic reorg using snapshot/revert.
- The tests assert that:
- `observed_reorgs` increments for the network.
- Only the batches at or above the fork height are replayed through the
relayer channel and they arrive in ascending order.
- Pre-fork batches remain cached until later finalization causes pruning.
- The on-chain ADFS root drift triggers a `try_to_sync` resync, even without
deploying contracts.
- Both the HTTP polling path and websocket-triggered path exercise the same
logic by running the scenario twice.

Together these pieces ensure the Sequencer can withstand short-lived forks,
recover the state required to keep publishing updates, and provide operators the
signals they need to observe the system’s behavior.
11 changes: 8 additions & 3 deletions apps/sequencer/src/providers/eth_send_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -331,12 +331,16 @@ pub async fn create_per_network_reorg_trackers(
let net_clone = net.clone();
let sequencer_state_providers_clone = sequencer_state.providers.clone();
let sequencer_config = sequencer_state.sequencer_config.read().await;
let (reorg_tracker_config, websocket_url_opt) =
let (reorg_tracker_config, websocket_url_opt, websocket_reconnect_opt) =
match sequencer_config.providers.get(net.as_str()) {
Some(c) => (c.reorg.clone(), c.websocket_url.clone()),
Some(c) => (
c.reorg.clone(),
c.websocket_url.clone(),
c.websocket_reconnect.clone(),
),
None => {
error!("No config for provider for network {net} will set to default!");
(ReorgConfig::default(), None)
(ReorgConfig::default(), None, None)
}
};
let relayer_send_channel = match sequencer_state
Expand All @@ -356,6 +360,7 @@ pub async fn create_per_network_reorg_trackers(
sequencer_state_providers_clone,
relayer_send_channel,
websocket_url_opt,
websocket_reconnect_opt,
);
collected_futures.push(
tokio::task::Builder::new()
Expand Down
1 change: 1 addition & 0 deletions apps/sequencer/src/providers/inflight_observations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ mod tests {
private_key_path: "dummy".to_string(),
url: "http://localhost:8545".to_string(),
websocket_url: None,
websocket_reconnect: None,
transaction_retries_count_limit: 3,
transaction_retry_timeout_secs: 10,
retry_fee_increment_fraction: 0.1,
Expand Down
1 change: 1 addition & 0 deletions apps/sequencer/src/providers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@ pub mod eth_send_utils;
pub mod inflight_observations;
pub mod provider;
pub mod reorg_tracking;
pub mod ws;
81 changes: 75 additions & 6 deletions apps/sequencer/src/providers/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use alloy::{
Identity, ProviderBuilder, RootProvider,
},
signers::local::PrivateKeySigner,
transports::ws::WsConnect,
};
use alloy_primitives::{keccak256, B256, U256};
use alloy_u256_literal::u256;
Expand Down Expand Up @@ -40,14 +41,15 @@ use std::collections::HashMap;
use std::sync::Arc;
use std::{fs, mem};
use tokio::sync::{Mutex, RwLock};
use tokio::time::error::Elapsed;
use tokio::time::Duration;
use tokio::time::{error::Elapsed, Duration};
use tracing::{debug, error, info, warn};

use crate::providers::eth_send_utils::{
get_gas_limit, get_tx_retry_params, BatchOfUpdatesToProcess, GasFees,
};
use crate::providers::inflight_observations::InflightObservations;
use crate::providers::ws::{ResilientWsConnect, WsReconnectMetrics, WsReconnectPolicy};
use async_trait::async_trait;
use std::time::Instant;

pub type ProviderType =
Expand Down Expand Up @@ -106,6 +108,49 @@ impl Hashable for HashValue {
}
}

struct ProviderWsRecorder {
metrics: Arc<RwLock<ProviderMetrics>>,
network: String,
}

impl ProviderWsRecorder {
fn new(metrics: Arc<RwLock<ProviderMetrics>>, network: &str) -> Self {
Self {
metrics,
network: network.to_owned(),
}
}
}

#[async_trait]
impl WsReconnectMetrics for ProviderWsRecorder {
async fn on_disconnect(&self) {
self.metrics
.read()
.await
.ws_disconnects_detected
.with_label_values(&[self.network.as_str()])
.inc();
}

async fn on_attempt(&self) {
self.metrics
.read()
.await
.ws_reconnect_attempts
.with_label_values(&[self.network.as_str()])
.inc();
}

async fn on_success(&self) {
self.metrics
.read()
.await
.ws_reconnect_successes
.with_label_values(&[self.network.as_str()])
.inc();
}
}
pub struct RpcProvider {
pub calldata_merkle_tree_frontier: Frontier<HashValue, 32>,
pub merkle_root_in_contract: Option<HashValue>,
Expand Down Expand Up @@ -309,10 +354,34 @@ impl RpcProvider {
provider_metrics: &Arc<tokio::sync::RwLock<ProviderMetrics>>,
feeds_config: &AllFeedsConfig,
) -> RpcProvider {
let provider = ProviderBuilder::new()
.disable_recommended_fillers()
.wallet(EthereumWallet::from(signer.clone()))
.connect_http(rpc_url.clone());
let provider = match rpc_url.scheme() {
"ws" | "wss" => {
let metrics_recorder = Arc::new(ProviderWsRecorder::new(
Arc::clone(provider_metrics),
network,
));
let resilient_connect = ResilientWsConnect::new(
WsConnect::new(rpc_url.as_str().to_owned()),
WsReconnectPolicy::from_config(p.websocket_reconnect.as_ref()),
metrics_recorder,
network,
);

ProviderBuilder::new()
.disable_recommended_fillers()
.wallet(EthereumWallet::from(signer.clone()))
.connect_pubsub_with(resilient_connect)
.await
.unwrap_or_else(|err| {
panic!("Failed to connect to provider over websocket {rpc_url}: {err:?}")
})
}
"http" | "https" => ProviderBuilder::new()
.disable_recommended_fillers()
.wallet(EthereumWallet::from(signer.clone()))
.connect_http(rpc_url.clone()),
other => panic!("Unsupported RPC URL scheme `{other}` for network {network}"),
};

let impersonated_anvil_account = p
.impersonated_anvil_account
Expand Down
Loading