From b896266c3948422c975f3a0da82105a47828ef3b Mon Sep 17 00:00:00 2001 From: Marcos Nicolau Date: Tue, 15 Oct 2024 17:08:34 -0300 Subject: [PATCH 01/11] feat: remove disconnected sockets from batch --- batcher/aligned-batcher/src/lib.rs | 53 +++++++++++++++++++++++++++++- 1 file changed, 52 insertions(+), 1 deletion(-) diff --git a/batcher/aligned-batcher/src/lib.rs b/batcher/aligned-batcher/src/lib.rs index eb7f8cb976..6c9c53a672 100644 --- a/batcher/aligned-batcher/src/lib.rs +++ b/batcher/aligned-batcher/src/lib.rs @@ -7,7 +7,7 @@ use ethers::signers::Signer; use types::batch_state::BatchState; use types::user_state::UserState; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::env; use std::net::SocketAddr; use std::sync::Arc; @@ -837,6 +837,8 @@ impl Batcher { finalized_batch: Vec, gas_price: U256, ) -> Result<(), BatcherError> { + let finalized_batch = self.validate_sockets_connection(finalized_batch).await; + let nonced_batch_verifcation_data: Vec = finalized_batch .clone() .into_iter() @@ -913,6 +915,55 @@ impl Batcher { .await } + /// here we go to each entry and make sure the client connection is still alive + /// if not, we remove it from the finalized batch. + /// we also remove all the user proofs in the current batch queue as this will create discrepancies with the nonces + /// finally, we remove the user entry from the cache as to re-query its nonce from ethereum when he sends another proof + async fn validate_sockets_connection( + &self, + finalized_batch: Vec, + ) -> Vec { + let mut filtered_finalized_batch = vec![]; + let mut closed_clients = HashSet::new(); + + let remove_client = |addr: Address, mut batch_state_lock: MutexGuard<'_, BatchState>| { + batch_state_lock.batch_queue = batch_state_lock + .batch_queue + .clone() + .into_iter() + .filter(|entry| (entry.0.sender == addr)) + .collect(); + batch_state_lock.user_states.remove(&addr); + }; + + for batch_entry in finalized_batch { + let addr = batch_entry.sender; + if closed_clients.contains(&addr) { + continue; + } + + let Some(ws_conn) = batch_entry.messaging_sink.clone() else { + closed_clients.insert(addr); + remove_client(addr, self.batch_state.lock().await); + continue; + }; + + // we make sure its still alive by sending a ping message + let ping_msg = Message::Ping(vec![]); + if let Err(e) = ws_conn.clone().write().await.send(ping_msg).await { + // todo: add metric here + error!("Failed to send ping, WebSocket may be closed: {:?}", e); + closed_clients.insert(addr); + remove_client(batch_entry.sender, self.batch_state.lock().await); + continue; + }; + + filtered_finalized_batch.push(batch_entry); + } + + filtered_finalized_batch + } + async fn flush_queue_and_clear_nonce_cache(&self) { warn!("Resetting state... Flushing queue and nonces"); let mut batch_state_lock = self.batch_state.lock().await; From dbca94004282fb316143b655cf3e79ef866a5e0c Mon Sep 17 00:00:00 2001 From: Marcos Nicolau Date: Tue, 15 Oct 2024 17:20:46 -0300 Subject: [PATCH 02/11] feat: dismissed sockets metric --- batcher/aligned-batcher/src/lib.rs | 3 ++- batcher/aligned-batcher/src/metrics.rs | 7 +++++++ 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/batcher/aligned-batcher/src/lib.rs b/batcher/aligned-batcher/src/lib.rs index 6c9c53a672..29ffd5c6a5 100644 --- a/batcher/aligned-batcher/src/lib.rs +++ b/batcher/aligned-batcher/src/lib.rs @@ -936,6 +936,7 @@ impl Batcher { batch_state_lock.user_states.remove(&addr); }; + self.metrics.dismissed_sockets_latest_batch.set(0); for batch_entry in finalized_batch { let addr = batch_entry.sender; if closed_clients.contains(&addr) { @@ -951,8 +952,8 @@ impl Batcher { // we make sure its still alive by sending a ping message let ping_msg = Message::Ping(vec![]); if let Err(e) = ws_conn.clone().write().await.send(ping_msg).await { - // todo: add metric here error!("Failed to send ping, WebSocket may be closed: {:?}", e); + self.metrics.dismissed_sockets_latest_batch.inc(); closed_clients.insert(addr); remove_client(batch_entry.sender, self.batch_state.lock().await); continue; diff --git a/batcher/aligned-batcher/src/metrics.rs b/batcher/aligned-batcher/src/metrics.rs index d3e83a35d0..9ea6a56caf 100644 --- a/batcher/aligned-batcher/src/metrics.rs +++ b/batcher/aligned-batcher/src/metrics.rs @@ -14,6 +14,7 @@ pub struct BatcherMetrics { pub batcher_started: IntCounter, pub gas_price_used_on_latest_batch: IntGauge, pub broken_sockets_latest_batch: IntGauge, + pub dismissed_sockets_latest_batch: IntGauge, } impl BatcherMetrics { @@ -30,6 +31,10 @@ impl BatcherMetrics { register_int_gauge!(opts!("gas_price_used_on_latest_batch", "Gas Price"))?; let broken_sockets_latest_batch = register_int_gauge!(opts!("broken_sockets_latest_batch", "Broken sockets"))?; + let dismissed_sockets_latest_batch = register_int_gauge!(opts!( + "dismissed_sockets_latest_batch", + "Dismissed sockets latest batch" + ))?; registry.register(Box::new(open_connections.clone()))?; registry.register(Box::new(received_proofs.clone()))?; @@ -37,6 +42,7 @@ impl BatcherMetrics { registry.register(Box::new(reverted_batches.clone()))?; registry.register(Box::new(batcher_started.clone()))?; registry.register(Box::new(broken_sockets_latest_batch.clone()))?; + registry.register(Box::new(dismissed_sockets_latest_batch.clone()))?; let metrics_route = warp::path!("metrics") .and(warp::any().map(move || registry.clone())) @@ -56,6 +62,7 @@ impl BatcherMetrics { batcher_started, gas_price_used_on_latest_batch, broken_sockets_latest_batch, + dismissed_sockets_latest_batch, }) } From 019b0e9bdff9d9120712848e334c9e66b4c2ecf0 Mon Sep 17 00:00:00 2001 From: Marcos Nicolau Date: Tue, 15 Oct 2024 17:31:05 -0300 Subject: [PATCH 03/11] chore: add prometheus new modules to json --- .../aligned/aggregator_batcher.json | 120 ++++++++++++++++-- 1 file changed, 110 insertions(+), 10 deletions(-) diff --git a/grafana/provisioning/dashboards/aligned/aggregator_batcher.json b/grafana/provisioning/dashboards/aligned/aggregator_batcher.json index b890c4671b..9791be160f 100644 --- a/grafana/provisioning/dashboards/aligned/aggregator_batcher.json +++ b/grafana/provisioning/dashboards/aligned/aggregator_batcher.json @@ -715,7 +715,7 @@ "type": "prometheus", "uid": "prometheus" }, - "description": "Socket connections that were disconnected when sending a batch response.", + "description": "Socket connections that were disconnected when sending a batch response. These proofs were included in the bath but the socket connection failed when sending the batch response to the client", "fieldConfig": { "defaults": { "color": { @@ -810,6 +810,106 @@ "title": "Broken sockets", "type": "timeseries" }, + { + "datasource": { + "type": "prometheus", + "uid": "prometheus" + }, + "description": "The clients whose proofs have been removed from the batch because their socket was disconnected.", + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 12, + "y": 19 + }, + "id": 21, + "interval": "1m", + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "single", + "sort": "none" + } + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "prometheus" + }, + "disableTextWrap": false, + "editorMode": "builder", + "expr": "dismissed_sockets_latest_batch", + "fullMetaSearch": false, + "includeNullMetadata": true, + "instant": false, + "legendFormat": "__auto", + "range": true, + "refId": "A", + "useBackend": false + } + ], + "title": "Dismissed sockets", + "type": "timeseries" + }, { "collapsed": true, "gridPos": { @@ -858,7 +958,7 @@ "h": 7, "w": 10, "x": 0, - "y": 42 + "y": 50 }, "id": 9, "options": { @@ -1010,7 +1110,7 @@ "h": 7, "w": 10, "x": 10, - "y": 42 + "y": 50 }, "id": 1, "options": { @@ -1074,7 +1174,7 @@ "h": 7, "w": 5, "x": 0, - "y": 49 + "y": 57 }, "id": 8, "options": { @@ -1144,7 +1244,7 @@ "h": 7, "w": 5, "x": 5, - "y": 49 + "y": 57 }, "id": 7, "options": { @@ -1209,7 +1309,7 @@ "h": 7, "w": 5, "x": 0, - "y": 56 + "y": 64 }, "id": 2, "options": { @@ -1279,7 +1379,7 @@ "h": 7, "w": 5, "x": 5, - "y": 56 + "y": 64 }, "id": 5, "options": { @@ -1326,7 +1426,7 @@ "type": "row" } ], - "refresh": "5s", + "refresh": false, "schemaVersion": 38, "style": "dark", "tags": [], @@ -1334,8 +1434,8 @@ "list": [] }, "time": { - "from": "now-5m", - "to": "now" + "from": "2024-10-15T20:24:27.682Z", + "to": "2024-10-15T20:25:15.664Z" }, "timepicker": {}, "timezone": "browser", From 04866f4dec2f6d5a6a55240080e861e17b6afb77 Mon Sep 17 00:00:00 2001 From: nicolau Date: Wed, 16 Oct 2024 10:11:23 -0300 Subject: [PATCH 04/11] feat: disconnect other opened connections if another one breaks --- batcher/aligned-batcher/src/connection.rs | 21 ++++++++++++++++++- batcher/aligned-batcher/src/lib.rs | 21 +++++++++++++++++-- .../aligned-batcher/src/types/batch_queue.rs | 2 +- 3 files changed, 40 insertions(+), 4 deletions(-) diff --git a/batcher/aligned-batcher/src/connection.rs b/batcher/aligned-batcher/src/connection.rs index 57f38d5bca..9845731f50 100644 --- a/batcher/aligned-batcher/src/connection.rs +++ b/batcher/aligned-batcher/src/connection.rs @@ -10,7 +10,10 @@ use log::{error, info}; use serde::Serialize; use tokio::{net::TcpStream, sync::RwLock}; use tokio_tungstenite::{ - tungstenite::{Error, Message}, + tungstenite::{ + protocol::{frame::coding::CloseCode, CloseFrame}, + Error, Message, + }, WebSocketStream, }; @@ -75,3 +78,19 @@ pub(crate) async fn send_message(ws_conn_sink: WsMessageSink, mess Err(e) => error!("Error while serializing message: {}", e), } } + +pub(crate) async fn drop_connection(ws_conn_sink: WsMessageSink, reason: Option<&str>) { + let close_frame = CloseFrame { + code: CloseCode::Normal, + reason: "Closing connection".into(), + }; + + ws_conn_sink + .write() + .await + .send(tokio_tungstenite::tungstenite::Message::Close(Some( + close_frame, + ))) + .await + .expect("Failed to send close frame"); +} diff --git a/batcher/aligned-batcher/src/lib.rs b/batcher/aligned-batcher/src/lib.rs index 1fc47c14ad..e7b6ab1765 100644 --- a/batcher/aligned-batcher/src/lib.rs +++ b/batcher/aligned-batcher/src/lib.rs @@ -1,6 +1,6 @@ use aligned_sdk::communication::serialization::{cbor_deserialize, cbor_serialize}; use config::NonPayingConfig; -use connection::{send_message, WsMessageSink}; +use connection::{drop_connection, send_message, WsMessageSink}; use dotenvy::dotenv; use ethers::contract::ContractError; use ethers::signers::Signer; @@ -931,8 +931,25 @@ impl Batcher { .batch_queue .clone() .into_iter() - .filter(|entry| (entry.0.sender == addr)) + .filter(|(entry, _)| { + let should_remove = entry.sender == addr; + + // also disconnect any other socket connection + if should_remove { + if let Some(ws_conn) = &entry.messaging_sink { + drop_connection( + ws_conn.clone(), + Some( + "Another opened connection from your address was disconnected", + ), + ) + }; + }; + + should_remove + }) .collect(); + batch_state_lock.user_states.remove(&addr); }; diff --git a/batcher/aligned-batcher/src/types/batch_queue.rs b/batcher/aligned-batcher/src/types/batch_queue.rs index c90219d237..9e3981f902 100644 --- a/batcher/aligned-batcher/src/types/batch_queue.rs +++ b/batcher/aligned-batcher/src/types/batch_queue.rs @@ -12,7 +12,7 @@ use std::{ use super::errors::BatcherError; use crate::connection::WsMessageSink; -#[derive(Clone)] +#[derive(Clone, Debug)] pub(crate) struct BatchQueueEntry { pub(crate) nonced_verification_data: NoncedVerificationData, pub(crate) verification_data_commitment: VerificationDataCommitment, From 6938df1336a8f006dcd0be8a6ca0f864d21336c2 Mon Sep 17 00:00:00 2001 From: nicolau Date: Wed, 16 Oct 2024 10:36:28 -0300 Subject: [PATCH 05/11] fix: remove drop connection --- batcher/aligned-batcher/src/connection.rs | 4 +- batcher/aligned-batcher/src/lib.rs | 56 +++++++++++++---------- 2 files changed, 33 insertions(+), 27 deletions(-) diff --git a/batcher/aligned-batcher/src/connection.rs b/batcher/aligned-batcher/src/connection.rs index 9845731f50..8507f86e7f 100644 --- a/batcher/aligned-batcher/src/connection.rs +++ b/batcher/aligned-batcher/src/connection.rs @@ -79,10 +79,10 @@ pub(crate) async fn send_message(ws_conn_sink: WsMessageSink, mess } } -pub(crate) async fn drop_connection(ws_conn_sink: WsMessageSink, reason: Option<&str>) { +pub(crate) async fn drop_connection(ws_conn_sink: WsMessageSink, reason: String) { let close_frame = CloseFrame { code: CloseCode::Normal, - reason: "Closing connection".into(), + reason: reason.into(), }; ws_conn_sink diff --git a/batcher/aligned-batcher/src/lib.rs b/batcher/aligned-batcher/src/lib.rs index e7b6ab1765..f351b11a7c 100644 --- a/batcher/aligned-batcher/src/lib.rs +++ b/batcher/aligned-batcher/src/lib.rs @@ -4,6 +4,7 @@ use connection::{drop_connection, send_message, WsMessageSink}; use dotenvy::dotenv; use ethers::contract::ContractError; use ethers::signers::Signer; +use sha3::digest::generic_array::iter; use types::batch_state::BatchState; use types::user_state::UserState; @@ -21,7 +22,7 @@ use eth::{try_create_new_task, BatcherPaymentService, CreateNewTaskFeeParams, Si use ethers::prelude::{Middleware, Provider}; use ethers::providers::Ws; use ethers::types::{Address, Signature, TransactionReceipt, U256}; -use futures_util::{future, SinkExt, StreamExt, TryStreamExt}; +use futures_util::{future, stream, SinkExt, StreamExt, TryStreamExt}; use lambdaworks_crypto::merkle_tree::merkle::MerkleTree; use lambdaworks_crypto::merkle_tree::traits::IsMerkleTreeBackend; use log::{debug, error, info, warn}; @@ -925,33 +926,30 @@ impl Batcher { ) -> Vec { let mut filtered_finalized_batch = vec![]; let mut closed_clients = HashSet::new(); - - let remove_client = |addr: Address, mut batch_state_lock: MutexGuard<'_, BatchState>| { - batch_state_lock.batch_queue = batch_state_lock - .batch_queue - .clone() - .into_iter() - .filter(|(entry, _)| { - let should_remove = entry.sender == addr; - - // also disconnect any other socket connection - if should_remove { - if let Some(ws_conn) = &entry.messaging_sink { - drop_connection( - ws_conn.clone(), - Some( - "Another opened connection from your address was disconnected", - ), - ) + let mut connections_to_drop = vec![]; + + let mut remove_client = + |addr: Address, mut batch_state_lock: MutexGuard<'_, BatchState>| { + batch_state_lock.batch_queue = batch_state_lock + .batch_queue + .clone() + .into_iter() + .filter(|(entry, _)| { + let should_remove = entry.sender == addr; + + // also disconnect any other socket connection from the same address + if should_remove { + if let Some(ws_conn) = &entry.messaging_sink { + connections_to_drop.push(ws_conn.clone()); + }; }; - }; - should_remove - }) - .collect(); + should_remove + }) + .collect(); - batch_state_lock.user_states.remove(&addr); - }; + batch_state_lock.user_states.remove(&addr); + }; self.metrics.dismissed_sockets_latest_batch.set(0); for batch_entry in finalized_batch { @@ -979,6 +977,14 @@ impl Batcher { filtered_finalized_batch.push(batch_entry); } + for ws_conn in connections_to_drop { + drop_connection( + ws_conn, + "Another connection of yours has disconnected".into(), + ) + .await; + } + filtered_finalized_batch } From 4c4dc7dc4638ba5ec69f694250604544b3d2138b Mon Sep 17 00:00:00 2001 From: nicolau Date: Wed, 16 Oct 2024 10:57:40 -0300 Subject: [PATCH 06/11] chore: add new errs to cli and sdk --- batcher/aligned-sdk/src/communication/messaging.rs | 10 ++++++++++ batcher/aligned-sdk/src/core/errors.rs | 2 ++ batcher/aligned/src/main.rs | 1 + 3 files changed, 13 insertions(+) diff --git a/batcher/aligned-sdk/src/communication/messaging.rs b/batcher/aligned-sdk/src/communication/messaging.rs index 6c64ea961e..e1e1e020e3 100644 --- a/batcher/aligned-sdk/src/communication/messaging.rs +++ b/batcher/aligned-sdk/src/communication/messaging.rs @@ -75,6 +75,16 @@ pub async fn send_messages( } }; + if msg.is_close() { + if let Message::Close(Some(frame)) = msg { + return Err(SubmitError::ConnectionClose(frame.reason.to_string())); + } else { + return Err(SubmitError::ConnectionClose( + "Connection was closed".to_string(), + )); + } + }; + let response_msg: ValidityResponseMessage = cbor_deserialize(msg.into_data().as_slice()) .map_err(SubmitError::SerializationError)?; diff --git a/batcher/aligned-sdk/src/core/errors.rs b/batcher/aligned-sdk/src/core/errors.rs index 56327c4515..b60e7d85da 100644 --- a/batcher/aligned-sdk/src/core/errors.rs +++ b/batcher/aligned-sdk/src/core/errors.rs @@ -91,6 +91,7 @@ pub enum SubmitError { InvalidPaymentServiceAddress(H160, H160), BatchSubmissionFailed(String), AddToBatchError, + ConnectionClose(String), GenericError(String), } @@ -200,6 +201,7 @@ impl fmt::Display for SubmitError { } SubmitError::ProofQueueFlushed => write!(f, "Batch reset"), SubmitError::AddToBatchError => write!(f, "Error while adding entry to batch"), + SubmitError::ConnectionClose(reason) => write!(f, "Connection closed: {}", reason), } } } diff --git a/batcher/aligned/src/main.rs b/batcher/aligned/src/main.rs index d1ffa511d0..650d66e2d3 100644 --- a/batcher/aligned/src/main.rs +++ b/batcher/aligned/src/main.rs @@ -554,6 +554,7 @@ async fn handle_submit_err(err: SubmitError, nonce_file: &str) { SubmitError::InsufficientBalance => { error!("Insufficient balance to pay for the transaction") } + SubmitError::ConnectionClose(reason) => error!("Connection closed: {}", reason), _ => {} } From 496bd95537188a18137191270e73c7ed3359af18 Mon Sep 17 00:00:00 2001 From: nicolau Date: Wed, 16 Oct 2024 10:57:50 -0300 Subject: [PATCH 07/11] chore: clippy --- batcher/aligned-batcher/src/lib.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/batcher/aligned-batcher/src/lib.rs b/batcher/aligned-batcher/src/lib.rs index f351b11a7c..60715b25fb 100644 --- a/batcher/aligned-batcher/src/lib.rs +++ b/batcher/aligned-batcher/src/lib.rs @@ -4,7 +4,6 @@ use connection::{drop_connection, send_message, WsMessageSink}; use dotenvy::dotenv; use ethers::contract::ContractError; use ethers::signers::Signer; -use sha3::digest::generic_array::iter; use types::batch_state::BatchState; use types::user_state::UserState; @@ -22,7 +21,7 @@ use eth::{try_create_new_task, BatcherPaymentService, CreateNewTaskFeeParams, Si use ethers::prelude::{Middleware, Provider}; use ethers::providers::Ws; use ethers::types::{Address, Signature, TransactionReceipt, U256}; -use futures_util::{future, stream, SinkExt, StreamExt, TryStreamExt}; +use futures_util::{future, SinkExt, StreamExt, TryStreamExt}; use lambdaworks_crypto::merkle_tree::merkle::MerkleTree; use lambdaworks_crypto::merkle_tree::traits::IsMerkleTreeBackend; use log::{debug, error, info, warn}; From 2f3fdc3242f0d19e5c8f794918080f5870d43516 Mon Sep 17 00:00:00 2001 From: Marcos Nicolau Date: Wed, 16 Oct 2024 14:58:16 -0300 Subject: [PATCH 08/11] fix: non_paying removal --- batcher/aligned-batcher/src/lib.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/batcher/aligned-batcher/src/lib.rs b/batcher/aligned-batcher/src/lib.rs index 60715b25fb..e163bc6479 100644 --- a/batcher/aligned-batcher/src/lib.rs +++ b/batcher/aligned-batcher/src/lib.rs @@ -947,7 +947,9 @@ impl Batcher { }) .collect(); - batch_state_lock.user_states.remove(&addr); + if !self.is_nonpaying(&addr) { + batch_state_lock.user_states.remove(&addr); + } }; self.metrics.dismissed_sockets_latest_batch.set(0); From 53350cdcefe2149c69fb6aead8864ff824f24bd0 Mon Sep 17 00:00:00 2001 From: Marcos Nicolau Date: Wed, 16 Oct 2024 17:45:06 -0300 Subject: [PATCH 09/11] refactor: comments and fn names --- batcher/aligned-batcher/src/lib.rs | 33 ++++++++++++++++++------------ 1 file changed, 20 insertions(+), 13 deletions(-) diff --git a/batcher/aligned-batcher/src/lib.rs b/batcher/aligned-batcher/src/lib.rs index e163bc6479..0e0db695a4 100644 --- a/batcher/aligned-batcher/src/lib.rs +++ b/batcher/aligned-batcher/src/lib.rs @@ -917,17 +917,16 @@ impl Batcher { /// here we go to each entry and make sure the client connection is still alive /// if not, we remove it from the finalized batch. - /// we also remove all the user proofs in the current batch queue as this will create discrepancies with the nonces - /// finally, we remove the user entry from the cache as to re-query its nonce from ethereum when he sends another proof + /// we also remove all the user proofs in the current batch as this will create discrepancies with the nonces async fn validate_sockets_connection( &self, finalized_batch: Vec, ) -> Vec { let mut filtered_finalized_batch = vec![]; let mut closed_clients = HashSet::new(); - let mut connections_to_drop = vec![]; + let mut conns_to_drop = vec![]; - let mut remove_client = + let mut remove_client_proofs_from_batch = |addr: Address, mut batch_state_lock: MutexGuard<'_, BatchState>| { batch_state_lock.batch_queue = batch_state_lock .batch_queue @@ -936,32 +935,40 @@ impl Batcher { .filter(|(entry, _)| { let should_remove = entry.sender == addr; - // also disconnect any other socket connection from the same address + // we can't use async predicates in iterators + // so we push the connection to drop later as they require futures if should_remove { if let Some(ws_conn) = &entry.messaging_sink { - connections_to_drop.push(ws_conn.clone()); + conns_to_drop.push(ws_conn.clone()); }; + // remove the entry so if the user sends a new proof we re-query the nonce from eth + batch_state_lock.user_states.remove(&addr); }; should_remove }) .collect(); - - if !self.is_nonpaying(&addr) { - batch_state_lock.user_states.remove(&addr); - } }; self.metrics.dismissed_sockets_latest_batch.set(0); for batch_entry in finalized_batch { let addr = batch_entry.sender; + + // if the wallet is a non_paying we don't to remove the proof + // as this is used only in test environments and removing it would require handling the nonce + // effectively, adding a lot of overhead + if self.is_nonpaying(&addr) { + filtered_finalized_batch.push(batch_entry); + continue; + } + if closed_clients.contains(&addr) { continue; } let Some(ws_conn) = batch_entry.messaging_sink.clone() else { closed_clients.insert(addr); - remove_client(addr, self.batch_state.lock().await); + remove_client_proofs_from_batch(addr, self.batch_state.lock().await); continue; }; @@ -971,14 +978,14 @@ impl Batcher { error!("Failed to send ping, WebSocket may be closed: {:?}", e); self.metrics.dismissed_sockets_latest_batch.inc(); closed_clients.insert(addr); - remove_client(batch_entry.sender, self.batch_state.lock().await); + remove_client_proofs_from_batch(batch_entry.sender, self.batch_state.lock().await); continue; }; filtered_finalized_batch.push(batch_entry); } - for ws_conn in connections_to_drop { + for ws_conn in conns_to_drop { drop_connection( ws_conn, "Another connection of yours has disconnected".into(), From d43527f88cf1ca5e00bb07cac5dd4bc43e8b522f Mon Sep 17 00:00:00 2001 From: Marcos Nicolau Date: Wed, 16 Oct 2024 17:45:32 -0300 Subject: [PATCH 10/11] fix: ignore ping messages in sdk response --- batcher/aligned-sdk/src/communication/messaging.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/batcher/aligned-sdk/src/communication/messaging.rs b/batcher/aligned-sdk/src/communication/messaging.rs index e1e1e020e3..b779c8024c 100644 --- a/batcher/aligned-sdk/src/communication/messaging.rs +++ b/batcher/aligned-sdk/src/communication/messaging.rs @@ -175,6 +175,9 @@ pub async fn receive( .to_string(), )); } + if msg.is_ping() { + continue; + } process_batch_inclusion_data( msg, &mut aligned_verification_data, From 743aeca9224708bb2c9ad4b8b8a1f525dd1dea6a Mon Sep 17 00:00:00 2001 From: Marcos Nicolau Date: Wed, 16 Oct 2024 17:53:37 -0300 Subject: [PATCH 11/11] chore: add traces --- batcher/aligned-batcher/src/lib.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/batcher/aligned-batcher/src/lib.rs b/batcher/aligned-batcher/src/lib.rs index 04b0f2eebc..794a5aebce 100644 --- a/batcher/aligned-batcher/src/lib.rs +++ b/batcher/aligned-batcher/src/lib.rs @@ -923,6 +923,7 @@ impl Batcher { &self, finalized_batch: Vec, ) -> Vec { + info!("Finalized batch: verifying that clients are still connected"); let mut filtered_finalized_batch = vec![]; let mut closed_clients = HashSet::new(); let mut conns_to_drop = vec![]; @@ -974,6 +975,7 @@ impl Batcher { }; // we make sure its still alive by sending a ping message + debug!("Sending pig message"); let ping_msg = Message::Ping(vec![]); if let Err(e) = ws_conn.clone().write().await.send(ping_msg).await { error!("Failed to send ping, WebSocket may be closed: {:?}", e); @@ -987,6 +989,7 @@ impl Batcher { } for ws_conn in conns_to_drop { + debug!("Connection dropped"); drop_connection( ws_conn, "Another connection of yours has disconnected".into(), @@ -994,6 +997,7 @@ impl Batcher { .await; } + info!("Finalized batch: clients connection verification ended"); filtered_finalized_batch }