diff --git a/crates/cow-amm/src/maintainers.rs b/crates/cow-amm/src/maintainers.rs index b5cd5b6a23..fe39199a65 100644 --- a/crates/cow-amm/src/maintainers.rs +++ b/crates/cow-amm/src/maintainers.rs @@ -2,7 +2,10 @@ use { crate::{Amm, cache::Storage}, contracts::alloy::ERC20, ethrpc::Web3, - futures::future::{join_all, select_ok}, + futures::{ + future::{join_all, select_ok}, + stream::{FuturesUnordered, StreamExt}, + }, shared::maintenance::Maintaining, std::sync::Arc, tokio::sync::RwLock, @@ -55,13 +58,20 @@ impl Maintaining for EmptyPoolRemoval { amms_to_check.extend(storage.cow_amms().await); } } - let futures = amms_to_check.iter().map(|amm| async { - self.has_zero_balance(amm.clone()) - .await - .then_some(*amm.address()) - }); - let empty_amms: Vec<_> = join_all(futures).await.into_iter().flatten().collect(); + let empty_amms: Vec<_> = amms_to_check + .iter() + .map(|amm| { + let amm = amm.clone(); + async move { + let address = *amm.address(); + self.has_zero_balance(amm).await.then_some(address) + } + }) + .collect::>() + .filter_map(std::future::ready) + .collect() + .await; if !empty_amms.is_empty() { tracing::debug!(amms = ?empty_amms, "removing AMMs with zero token balance"); let lock = self.storage.read().await;