From 8a95d0eba1b19c9cf66019cff479b81220681d41 Mon Sep 17 00:00:00 2001 From: ilya Date: Tue, 25 Nov 2025 20:43:10 +0000 Subject: [PATCH 1/3] Use FuturesUnordered when fetching COW AMM balances --- crates/cow-amm/src/maintainers.rs | 25 ++++++++++++++++++------- 1 file changed, 18 insertions(+), 7 deletions(-) diff --git a/crates/cow-amm/src/maintainers.rs b/crates/cow-amm/src/maintainers.rs index b5cd5b6a23..ec4792b361 100644 --- a/crates/cow-amm/src/maintainers.rs +++ b/crates/cow-amm/src/maintainers.rs @@ -2,7 +2,10 @@ use { crate::{Amm, cache::Storage}, contracts::alloy::ERC20, ethrpc::Web3, - futures::future::{join_all, select_ok}, + futures::{ + future::{join_all, select_ok}, + stream::{FuturesUnordered, StreamExt}, + }, shared::maintenance::Maintaining, std::sync::Arc, tokio::sync::RwLock, @@ -55,13 +58,21 @@ impl Maintaining for EmptyPoolRemoval { amms_to_check.extend(storage.cow_amms().await); } } - let futures = amms_to_check.iter().map(|amm| async { - self.has_zero_balance(amm.clone()) - .await - .then_some(*amm.address()) - }); - let empty_amms: Vec<_> = join_all(futures).await.into_iter().flatten().collect(); + let empty_amms: Vec<_> = amms_to_check + .iter() + .map(|amm| { + let amm = amm.clone(); + async move { + self.has_zero_balance(amm.clone()) + .await + .then_some(*amm.address()) + } + }) + .collect::>() + .filter_map(|addr| async move { addr }) + .collect() + .await; if !empty_amms.is_empty() { tracing::debug!(amms = ?empty_amms, "removing AMMs with zero token balance"); let lock = self.storage.read().await; From abf85e8ac8472de1edefc3ed24114e43057c87d7 Mon Sep 17 00:00:00 2001 From: ilya Date: Tue, 25 Nov 2025 21:08:50 +0000 Subject: [PATCH 2/3] Fix --- crates/cow-amm/src/maintainers.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/crates/cow-amm/src/maintainers.rs b/crates/cow-amm/src/maintainers.rs index ec4792b361..0629ab538a 100644 --- a/crates/cow-amm/src/maintainers.rs +++ b/crates/cow-amm/src/maintainers.rs @@ -64,9 +64,8 @@ impl Maintaining for EmptyPoolRemoval { .map(|amm| { let amm = amm.clone(); async move { - self.has_zero_balance(amm.clone()) - .await - .then_some(*amm.address()) + let address = *amm.address(); + self.has_zero_balance(amm).await.then_some(address) } }) .collect::>() From 44a73e75ac4bd3b13dbcacfa6c5b1c424fe83bd7 Mon Sep 17 00:00:00 2001 From: ilya Date: Tue, 25 Nov 2025 21:10:14 +0000 Subject: [PATCH 3/3] Fix --- crates/cow-amm/src/maintainers.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/cow-amm/src/maintainers.rs b/crates/cow-amm/src/maintainers.rs index 0629ab538a..fe39199a65 100644 --- a/crates/cow-amm/src/maintainers.rs +++ b/crates/cow-amm/src/maintainers.rs @@ -69,7 +69,7 @@ impl Maintaining for EmptyPoolRemoval { } }) .collect::>() - .filter_map(|addr| async move { addr }) + .filter_map(std::future::ready) .collect() .await; if !empty_amms.is_empty() {