From 7b948962be71f382c7db4eb9f539a0ac2886505b Mon Sep 17 00:00:00 2001 From: Louis Pahlavi Date: Mon, 27 Apr 2026 14:07:14 +0200 Subject: [PATCH 1/3] feat: limit concurrent HTTP outcalls with a guard Adds `HttpOutcallGuard`, a count-based guard that tracks the number of in-flight HTTP outcalls to the SOL RPC canister and rejects new ones once `MAX_CONCURRENT_HTTP_OUTCALLS` (50) are already in flight. Every SOL RPC function (`get_transaction`, `submit_transaction`, `get_recent_slot_and_blockhash`, `get_signature_statuses`, `get_signatures_for_address`) now acquires the guard before making the actual outcall and releases it when the response is received. Timer functions additionally check `too_many_http_outcalls()` after confirming there is work to do and reschedule themselves rather than starting new work when the system is at capacity. Co-Authored-By: Claude Sonnet 4.6 --- minter/src/consolidate/mod.rs | 10 +++- minter/src/constants.rs | 6 +++ minter/src/deposit/automatic/mod.rs | 10 +++- minter/src/guard/mod.rs | 51 +++++++++++++++++- minter/src/guard/tests.rs | 84 ++++++++++++++++++++++++++++- minter/src/monitor/mod.rs | 18 ++++++- minter/src/rpc/mod.rs | 21 ++++++++ minter/src/state/mod.rs | 10 ++++ minter/src/state/tests.rs | 1 + minter/src/withdraw/mod.rs | 10 +++- 10 files changed, 214 insertions(+), 7 deletions(-) diff --git a/minter/src/consolidate/mod.rs b/minter/src/consolidate/mod.rs index dc71f797..be182aac 100644 --- a/minter/src/consolidate/mod.rs +++ b/minter/src/consolidate/mod.rs @@ -1,6 +1,6 @@ use crate::{ constants::MAX_CONCURRENT_RPC_CALLS, - guard::TimerGuard, + guard::{TimerGuard, too_many_http_outcalls}, numeric::LedgerMintIndex, rpc::{SubmitTransactionError, get_recent_slot_and_blockhash, submit_transaction}, runtime::CanisterRuntime, @@ -57,6 +57,14 @@ pub async fn consolidate_deposits(runtime: R) { return; } + if too_many_http_outcalls() { + log!( + Priority::Info, + "Too many concurrent HTTP outcalls, rescheduling consolidate_deposits" + ); + return; + } + let (slot, recent_blockhash) = match get_recent_slot_and_blockhash(&runtime).await { Ok(result) => result, Err(e) => { diff --git a/minter/src/constants.rs b/minter/src/constants.rs index 09ac540c..b61b20fc 100644 --- a/minter/src/constants.rs +++ b/minter/src/constants.rs @@ -1,6 +1,12 @@ /// Maximum number of concurrent calls to the SOL RPC canister. pub const MAX_CONCURRENT_RPC_CALLS: usize = 10; +/// Maximum number of concurrent HTTP outcalls the minter may have in flight at once. +/// +/// Each call to the SOL RPC canister triggers one or more HTTPS outcalls on the IC. +/// Limiting the number of in-flight outcalls prevents resource exhaustion. +pub const MAX_CONCURRENT_HTTP_OUTCALLS: u32 = 50; + /// Matches the ICP HTTPS outcall response limit for variable-length RPC calls /// such as `getTransaction` and `getSignatureStatuses`: /// https://docs.internetcomputer.org/references/ic-interface-spec#ic-http_request diff --git a/minter/src/deposit/automatic/mod.rs b/minter/src/deposit/automatic/mod.rs index 32ee80fd..c8c2a7cc 100644 --- a/minter/src/deposit/automatic/mod.rs +++ b/minter/src/deposit/automatic/mod.rs @@ -1,7 +1,7 @@ use crate::{ address::{account_address, lazy_get_schnorr_master_key}, constants::MAX_CONCURRENT_RPC_CALLS, - guard::TimerGuard, + guard::{TimerGuard, too_many_http_outcalls}, rpc::get_signatures_for_address, runtime::CanisterRuntime, state::{ @@ -89,6 +89,14 @@ pub async fn poll_monitored_addresses(runtime: R) { runtime.set_timer(Duration::ZERO, poll_monitored_addresses); }); + if too_many_http_outcalls() { + log!( + Priority::Info, + "Too many concurrent HTTP outcalls, rescheduling poll_monitored_addresses" + ); + return; + } + let master_key = lazy_get_schnorr_master_key(&runtime).await; futures::future::join_all( diff --git a/minter/src/guard/mod.rs b/minter/src/guard/mod.rs index aeaf0b9e..5978a8e9 100644 --- a/minter/src/guard/mod.rs +++ b/minter/src/guard/mod.rs @@ -1,4 +1,7 @@ -use crate::state::{State, TaskType, mutate_state}; +use crate::{ + constants::MAX_CONCURRENT_HTTP_OUTCALLS, + state::{State, TaskType, mutate_state, read_state}, +}; use cksol_types::{ProcessDepositError, WithdrawalError}; use icrc_ledger_types::icrc1::account::Account; use std::{collections::BTreeSet, marker::PhantomData}; @@ -130,3 +133,49 @@ impl Drop for TimerGuard { }); } } + +#[derive(Debug, Eq, PartialEq)] +pub enum HttpOutcallGuardError { + TooManyOutcalls, +} + +/// Guards a single HTTP outcall to the SOL RPC canister. +/// +/// Acquiring this guard increments the active-outcall counter in canister state; +/// dropping it decrements the counter. [`HttpOutcallGuard::new`] fails with +/// [`HttpOutcallGuardError::TooManyOutcalls`] when +/// [`MAX_CONCURRENT_HTTP_OUTCALLS`] guards are already held. +#[must_use] +pub struct HttpOutcallGuard; + +impl HttpOutcallGuard { + pub fn new() -> Result { + mutate_state(|s| { + if s.active_http_outcalls() >= MAX_CONCURRENT_HTTP_OUTCALLS { + return Err(HttpOutcallGuardError::TooManyOutcalls); + } + *s.active_http_outcalls_mut() += 1; + Ok(Self) + }) + } +} + +impl Drop for HttpOutcallGuard { + fn drop(&mut self) { + mutate_state(|s| { + let count = s.active_http_outcalls_mut(); + *count = count + .checked_sub(1) + .expect("BUG: HTTP outcall counter underflow"); + }); + } +} + +/// Returns `true` when the number of in-flight HTTP outcalls has reached the +/// configured limit. +/// +/// Timer functions should check this at startup and reschedule rather than +/// starting new work when the system is already at capacity. +pub fn too_many_http_outcalls() -> bool { + read_state(|s| s.active_http_outcalls() >= MAX_CONCURRENT_HTTP_OUTCALLS) +} diff --git a/minter/src/guard/tests.rs b/minter/src/guard/tests.rs index 5d368696..4cb34615 100644 --- a/minter/src/guard/tests.rs +++ b/minter/src/guard/tests.rs @@ -1,6 +1,10 @@ use crate::{ - guard::{GuardError, MAX_CONCURRENT, TimerGuard, TimerGuardError, process_deposit_guard}, - state::TaskType, + constants::MAX_CONCURRENT_HTTP_OUTCALLS, + guard::{ + GuardError, HttpOutcallGuard, HttpOutcallGuardError, MAX_CONCURRENT, TimerGuard, + TimerGuardError, process_deposit_guard, too_many_http_outcalls, + }, + state::{TaskType, read_state}, test_fixtures::init_state, }; use candid::Principal; @@ -111,3 +115,79 @@ mod timer_guard { assert!(guard2.is_ok()); } } + +mod http_outcall_guard { + use super::*; + + #[test] + fn should_acquire_and_release_guard() { + init_state(); + + assert_eq!(read_state(|s| s.active_http_outcalls()), 0); + { + let _guard = HttpOutcallGuard::new().unwrap(); + assert_eq!(read_state(|s| s.active_http_outcalls()), 1); + } + assert_eq!(read_state(|s| s.active_http_outcalls()), 0); + } + + #[test] + fn should_allow_up_to_max_concurrent_guards() { + init_state(); + + let guards: Vec<_> = (0..MAX_CONCURRENT_HTTP_OUTCALLS) + .map(|_| HttpOutcallGuard::new().expect("should succeed below limit")) + .collect(); + assert_eq!( + read_state(|s| s.active_http_outcalls()), + MAX_CONCURRENT_HTTP_OUTCALLS + ); + assert!(too_many_http_outcalls()); + drop(guards); + assert_eq!(read_state(|s| s.active_http_outcalls()), 0); + assert!(!too_many_http_outcalls()); + } + + #[test] + fn should_reject_when_limit_reached() { + init_state(); + + let _guards: Vec<_> = (0..MAX_CONCURRENT_HTTP_OUTCALLS) + .map(|_| HttpOutcallGuard::new().expect("should succeed below limit")) + .collect(); + + let result = HttpOutcallGuard::new(); + assert_eq!(result.err(), Some(HttpOutcallGuardError::TooManyOutcalls)); + } + + #[test] + fn should_allow_new_guard_after_one_is_dropped() { + init_state(); + + let guards: Vec<_> = (0..MAX_CONCURRENT_HTTP_OUTCALLS) + .map(|_| HttpOutcallGuard::new().expect("should succeed below limit")) + .collect(); + + // Drop one + drop(guards); + + // Should be able to acquire a new guard + let result = HttpOutcallGuard::new(); + assert!(result.is_ok()); + } + + #[test] + fn should_track_multiple_concurrent_guards_independently() { + init_state(); + + let guard1 = HttpOutcallGuard::new().unwrap(); + let guard2 = HttpOutcallGuard::new().unwrap(); + assert_eq!(read_state(|s| s.active_http_outcalls()), 2); + + drop(guard1); + assert_eq!(read_state(|s| s.active_http_outcalls()), 1); + + drop(guard2); + assert_eq!(read_state(|s| s.active_http_outcalls()), 0); + } +} diff --git a/minter/src/monitor/mod.rs b/minter/src/monitor/mod.rs index b3ef33b0..5fe79b39 100644 --- a/minter/src/monitor/mod.rs +++ b/minter/src/monitor/mod.rs @@ -1,7 +1,7 @@ use crate::{ address::derivation_path, constants::MAX_CONCURRENT_RPC_CALLS, - guard::TimerGuard, + guard::{TimerGuard, too_many_http_outcalls}, rpc::{ SubmitTransactionError, get_recent_slot_and_blockhash, get_signature_statuses, submit_transaction, @@ -63,6 +63,14 @@ pub async fn finalize_transactions(runtime: R) { runtime.set_timer(Duration::ZERO, finalize_transactions); }); + if too_many_http_outcalls() { + log!( + Priority::Info, + "Too many concurrent HTTP outcalls, rescheduling finalize_transactions" + ); + return; + } + // Fetch the current slot before checking statuses: if a transaction finalizes // after we snapshot the slot, the status check will see it as finalized rather // than missing, so it will never be incorrectly marked as expired. @@ -154,6 +162,14 @@ pub async fn resubmit_transactions(runtime: R) { runtime.set_timer(Duration::ZERO, resubmit_transactions); }); + if too_many_http_outcalls() { + log!( + Priority::Info, + "Too many concurrent HTTP outcalls, rescheduling resubmit_transactions" + ); + return; + } + resubmit_expired_transactions(&runtime, to_resubmit).await; if !more_to_process { diff --git a/minter/src/rpc/mod.rs b/minter/src/rpc/mod.rs index cbbfc860..05c4a992 100644 --- a/minter/src/rpc/mod.rs +++ b/minter/src/rpc/mod.rs @@ -1,5 +1,6 @@ use crate::{ constants::{GET_SIGNATURE_STATUSES_CYCLES, MAX_HTTP_OUTCALL_RESPONSE_BYTES}, + guard::{HttpOutcallGuard, HttpOutcallGuardError}, runtime::CanisterRuntime, state::read_state, }; @@ -24,6 +25,8 @@ pub async fn get_transaction( signature: Signature, cycles_to_attach: u128, ) -> Result, GetTransactionError> { + let _guard = HttpOutcallGuard::new() + .map_err(|_: HttpOutcallGuardError| GetTransactionError::TooManyOutcalls)?; let result = read_state(|state| state.sol_rpc_client(runtime.inter_canister_call_runtime())) .get_transaction(signature) .with_encoding(GetTransactionEncoding::Base64) @@ -48,6 +51,8 @@ pub enum GetTransactionError { RpcError(RpcError), #[error("Inconsistent RPC results for transaction")] InconsistentRpcResults, + #[error("Too many concurrent HTTP outcalls")] + TooManyOutcalls, } impl From for ProcessDepositError { @@ -60,6 +65,8 @@ pub async fn submit_transaction( runtime: &R, transaction: Transaction, ) -> Result { + let _guard = HttpOutcallGuard::new() + .map_err(|_: HttpOutcallGuardError| SubmitTransactionError::TooManyOutcalls)?; let client = read_state(|state| state.sol_rpc_client(runtime.inter_canister_call_runtime())); match client.send_transaction(transaction).try_send().await { Ok(MultiRpcResult::Consistent(Ok(signature))) => Ok(signature), @@ -77,11 +84,15 @@ pub enum SubmitTransactionError { RpcError(RpcError), #[error("Inconsistent RPC results for sendTransaction")] InconsistentRpcResults, + #[error("Too many concurrent HTTP outcalls")] + TooManyOutcalls, } pub async fn get_recent_slot_and_blockhash( runtime: &R, ) -> Result<(Slot, Hash), GetRecentBlockhashError> { + let _guard = HttpOutcallGuard::new() + .map_err(|_: HttpOutcallGuardError| GetRecentBlockhashError::TooManyOutcalls)?; let client = read_state(|state| state.sol_rpc_client(runtime.inter_canister_call_runtime())); match client.get_recent_block().try_send().await { Ok((slot, block)) => { @@ -104,6 +115,8 @@ pub async fn get_recent_slot_and_blockhash( pub enum GetRecentBlockhashError { #[error("Failed to get recent block: {0:?}")] Failed(Vec), + #[error("Too many concurrent HTTP outcalls")] + TooManyOutcalls, } pub async fn get_signature_statuses( @@ -113,6 +126,8 @@ pub async fn get_signature_statuses( Vec>, GetSignatureStatusesError, > { + let _guard = HttpOutcallGuard::new() + .map_err(|_: HttpOutcallGuardError| GetSignatureStatusesError::TooManyOutcalls)?; let client = read_state(|state| state.sol_rpc_client(runtime.inter_canister_call_runtime())); let result = client .get_signature_statuses(signatures) @@ -136,12 +151,16 @@ pub enum GetSignatureStatusesError { RpcError(RpcError), #[error("Inconsistent RPC results for getSignatureStatuses")] InconsistentRpcResults, + #[error("Too many concurrent HTTP outcalls")] + TooManyOutcalls, } pub async fn get_signatures_for_address( runtime: &R, params: GetSignaturesForAddressParams, ) -> Result, GetSignaturesForAddressError> { + let _guard = HttpOutcallGuard::new() + .map_err(|_: HttpOutcallGuardError| GetSignaturesForAddressError::TooManyOutcalls)?; let client = read_state(|state| state.sol_rpc_client(runtime.inter_canister_call_runtime())); let result = client.get_signatures_for_address(params).try_send().await; match result? { @@ -161,4 +180,6 @@ pub enum GetSignaturesForAddressError { RpcError(RpcError), #[error("Inconsistent RPC results for getSignaturesForAddress")] InconsistentRpcResults, + #[error("Too many concurrent HTTP outcalls")] + TooManyOutcalls, } diff --git a/minter/src/state/mod.rs b/minter/src/state/mod.rs index c265d456..82d2559c 100644 --- a/minter/src/state/mod.rs +++ b/minter/src/state/mod.rs @@ -108,6 +108,7 @@ pub struct State { failed_transactions: InsertionOrderedMap, consolidation_transactions: InsertionOrderedMap, active_tasks: BTreeSet, + active_http_outcalls: u32, balance: Lamport, } @@ -321,6 +322,14 @@ impl State { &mut self.active_tasks } + pub fn active_http_outcalls(&self) -> u32 { + self.active_http_outcalls + } + + pub fn active_http_outcalls_mut(&mut self) -> &mut u32 { + &mut self.active_http_outcalls + } + fn transaction_fee(&self, message: &VersionedMessage) -> Lamport { let VersionedMessage::Legacy(msg) = message; FEE_PER_SIGNATURE * msg.header.num_required_signatures as u64 @@ -795,6 +804,7 @@ impl TryFrom for State { failed_transactions: InsertionOrderedMap::new(), consolidation_transactions: InsertionOrderedMap::new(), active_tasks: BTreeSet::new(), + active_http_outcalls: 0, balance: 0, }; state.validate()?; diff --git a/minter/src/state/tests.rs b/minter/src/state/tests.rs index e3ef55e3..ef8a60d3 100644 --- a/minter/src/state/tests.rs +++ b/minter/src/state/tests.rs @@ -245,6 +245,7 @@ mod state_from_init_args { failed_transactions: InsertionOrderedMap::new(), consolidation_transactions: InsertionOrderedMap::new(), active_tasks: BTreeSet::new(), + active_http_outcalls: 0, balance: 0, } ); diff --git a/minter/src/withdraw/mod.rs b/minter/src/withdraw/mod.rs index 19571653..ce16cef2 100644 --- a/minter/src/withdraw/mod.rs +++ b/minter/src/withdraw/mod.rs @@ -15,7 +15,7 @@ use solana_hash::Hash; use crate::{ consolidate::consolidate_deposits, constants::MAX_CONCURRENT_RPC_CALLS, - guard::{TimerGuard, withdrawal_guard}, + guard::{TimerGuard, too_many_http_outcalls, withdrawal_guard}, ledger::{BurnError, burn}, rpc::{get_recent_slot_and_blockhash, submit_transaction}, runtime::CanisterRuntime, @@ -154,6 +154,14 @@ pub async fn process_pending_withdrawals(runtime: R) { return; } + if too_many_http_outcalls() { + log!( + Priority::Info, + "Too many concurrent HTTP outcalls, rescheduling process_pending_withdrawals" + ); + return; + } + let (slot, recent_blockhash) = match get_recent_slot_and_blockhash(&runtime).await { Ok(result) => result, Err(e) => { From 98125f3267c7c4580d0a1232ed369c0293af59bb Mon Sep 17 00:00:00 2001 From: Louis Pahlavi Date: Mon, 27 Apr 2026 14:23:39 +0200 Subject: [PATCH 2/3] fixup! feat: limit concurrent HTTP outcalls with a guard Replace the upfront too_many_http_outcalls() check in timers with inspection of the concurrent work's return values. Each timer now detects TooManyOutcalls directly from its join_all results and skips defusing the reschedule guard when any concurrent call was blocked by the limit. Co-Authored-By: Claude Sonnet 4.6 --- minter/src/consolidate/mod.rs | 34 +++++--- minter/src/deposit/automatic/mod.rs | 31 +++---- minter/src/guard/mod.rs | 11 +-- minter/src/guard/tests.rs | 4 +- minter/src/monitor/mod.rs | 128 +++++++++++++++------------- minter/src/withdraw/mod.rs | 33 +++---- 6 files changed, 126 insertions(+), 115 deletions(-) diff --git a/minter/src/consolidate/mod.rs b/minter/src/consolidate/mod.rs index be182aac..4d12c264 100644 --- a/minter/src/consolidate/mod.rs +++ b/minter/src/consolidate/mod.rs @@ -1,6 +1,6 @@ use crate::{ constants::MAX_CONCURRENT_RPC_CALLS, - guard::{TimerGuard, too_many_http_outcalls}, + guard::TimerGuard, numeric::LedgerMintIndex, rpc::{SubmitTransactionError, get_recent_slot_and_blockhash, submit_transaction}, runtime::CanisterRuntime, @@ -57,14 +57,6 @@ pub async fn consolidate_deposits(runtime: R) { return; } - if too_many_http_outcalls() { - log!( - Priority::Info, - "Too many concurrent HTTP outcalls, rescheduling consolidate_deposits" - ); - return; - } - let (slot, recent_blockhash) = match get_recent_slot_and_blockhash(&runtime).await { Ok(result) => result, Err(e) => { @@ -73,8 +65,23 @@ pub async fn consolidate_deposits(runtime: R) { } }; - futures::future::join_all(batches.into_iter().map(async |funds| { - match submit_consolidation_transaction(&runtime, funds, slot, recent_blockhash).await { + let results: Vec> = + futures::future::join_all(batches.into_iter().map(|funds| { + submit_consolidation_transaction(&runtime, funds, slot, recent_blockhash) + })) + .await; + + let had_too_many_outcalls = results.iter().any(|r| { + matches!( + r, + Err(ConsolidationError::SubmitTransactionFailed( + SubmitTransactionError::TooManyOutcalls + )) + ) + }); + + for result in results { + match result { Ok(sig) => log!(Priority::Info, "Submitted consolidation transaction {sig}"), Err(ConsolidationError::CreateTransactionFailed(e)) => log!( Priority::Error, @@ -85,10 +92,9 @@ pub async fn consolidate_deposits(runtime: R) { "Failed to submit deposit consolidation transaction (will retry): {e}" ), } - })) - .await; + } - if !more_to_process { + if !more_to_process && !had_too_many_outcalls { // All work fits in this round scopeguard::ScopeGuard::into_inner(reschedule); } diff --git a/minter/src/deposit/automatic/mod.rs b/minter/src/deposit/automatic/mod.rs index c8c2a7cc..4dad2987 100644 --- a/minter/src/deposit/automatic/mod.rs +++ b/minter/src/deposit/automatic/mod.rs @@ -1,8 +1,8 @@ use crate::{ address::{account_address, lazy_get_schnorr_master_key}, constants::MAX_CONCURRENT_RPC_CALLS, - guard::{TimerGuard, too_many_http_outcalls}, - rpc::get_signatures_for_address, + guard::TimerGuard, + rpc::{GetSignaturesForAddressError, get_signatures_for_address}, runtime::CanisterRuntime, state::{ SchnorrPublicKey, TaskType, audit::process_event, event::EventType, mutate_state, @@ -89,17 +89,9 @@ pub async fn poll_monitored_addresses(runtime: R) { runtime.set_timer(Duration::ZERO, poll_monitored_addresses); }); - if too_many_http_outcalls() { - log!( - Priority::Info, - "Too many concurrent HTTP outcalls, rescheduling poll_monitored_addresses" - ); - return; - } - let master_key = lazy_get_schnorr_master_key(&runtime).await; - futures::future::join_all( + let results: Vec = futures::future::join_all( all_accounts .into_iter() .take(MAX_CONCURRENT_RPC_CALLS) @@ -107,17 +99,22 @@ pub async fn poll_monitored_addresses(runtime: R) { ) .await; - if !more_to_process { + let had_too_many_outcalls = results.iter().any(|&b| b); + + if !more_to_process && !had_too_many_outcalls { // All work fits in this round scopeguard::ScopeGuard::into_inner(reschedule); } } +/// Poll a single monitored account for new deposit signatures. Returns `true` +/// if the RPC call was rejected due to [`GetSignaturesForAddressError::TooManyOutcalls`], +/// which the caller uses to decide whether to reschedule immediately. async fn poll_account( runtime: &R, master_key: &SchnorrPublicKey, account: Account, -) { +) -> bool { let deposit_address = account_address(master_key, &account); let params = GetSignaturesForAddressParams { @@ -134,12 +131,13 @@ async fn poll_account( until: None, }; - match get_signatures_for_address(runtime, params).await { + let had_too_many_outcalls = match get_signatures_for_address(runtime, params).await { Err(e) => { log!( Priority::Info, "Failed to get signatures for address {deposit_address}: {e}" ); + matches!(e, GetSignaturesForAddressError::TooManyOutcalls) } Ok(signatures) => { let new_sigs: Vec = signatures @@ -156,8 +154,9 @@ async fn poll_account( .extend(new_sigs); }); } + false } - } + }; mutate_state(|state| { process_event( @@ -166,6 +165,8 @@ async fn poll_account( runtime, ); }); + + had_too_many_outcalls } #[cfg(any(test, feature = "canbench-rs"))] diff --git a/minter/src/guard/mod.rs b/minter/src/guard/mod.rs index 5978a8e9..87feefdb 100644 --- a/minter/src/guard/mod.rs +++ b/minter/src/guard/mod.rs @@ -1,6 +1,6 @@ use crate::{ constants::MAX_CONCURRENT_HTTP_OUTCALLS, - state::{State, TaskType, mutate_state, read_state}, + state::{State, TaskType, mutate_state}, }; use cksol_types::{ProcessDepositError, WithdrawalError}; use icrc_ledger_types::icrc1::account::Account; @@ -170,12 +170,3 @@ impl Drop for HttpOutcallGuard { }); } } - -/// Returns `true` when the number of in-flight HTTP outcalls has reached the -/// configured limit. -/// -/// Timer functions should check this at startup and reschedule rather than -/// starting new work when the system is already at capacity. -pub fn too_many_http_outcalls() -> bool { - read_state(|s| s.active_http_outcalls() >= MAX_CONCURRENT_HTTP_OUTCALLS) -} diff --git a/minter/src/guard/tests.rs b/minter/src/guard/tests.rs index 4cb34615..c6e77148 100644 --- a/minter/src/guard/tests.rs +++ b/minter/src/guard/tests.rs @@ -2,7 +2,7 @@ use crate::{ constants::MAX_CONCURRENT_HTTP_OUTCALLS, guard::{ GuardError, HttpOutcallGuard, HttpOutcallGuardError, MAX_CONCURRENT, TimerGuard, - TimerGuardError, process_deposit_guard, too_many_http_outcalls, + TimerGuardError, process_deposit_guard, }, state::{TaskType, read_state}, test_fixtures::init_state, @@ -142,10 +142,8 @@ mod http_outcall_guard { read_state(|s| s.active_http_outcalls()), MAX_CONCURRENT_HTTP_OUTCALLS ); - assert!(too_many_http_outcalls()); drop(guards); assert_eq!(read_state(|s| s.active_http_outcalls()), 0); - assert!(!too_many_http_outcalls()); } #[test] diff --git a/minter/src/monitor/mod.rs b/minter/src/monitor/mod.rs index 5fe79b39..0cc62f9e 100644 --- a/minter/src/monitor/mod.rs +++ b/minter/src/monitor/mod.rs @@ -1,10 +1,10 @@ use crate::{ address::derivation_path, constants::MAX_CONCURRENT_RPC_CALLS, - guard::{TimerGuard, too_many_http_outcalls}, + guard::TimerGuard, rpc::{ - SubmitTransactionError, get_recent_slot_and_blockhash, get_signature_statuses, - submit_transaction, + GetSignatureStatusesError, SubmitTransactionError, get_recent_slot_and_blockhash, + get_signature_statuses, submit_transaction, }, runtime::CanisterRuntime, signer::sign_bytes, @@ -63,14 +63,6 @@ pub async fn finalize_transactions(runtime: R) { runtime.set_timer(Duration::ZERO, finalize_transactions); }); - if too_many_http_outcalls() { - log!( - Priority::Info, - "Too many concurrent HTTP outcalls, rescheduling finalize_transactions" - ); - return; - } - // Fetch the current slot before checking statuses: if a transaction finalizes // after we snapshot the slot, the status check will see it as finalized rather // than missing, so it will never be incorrectly marked as expired. @@ -132,7 +124,7 @@ pub async fn finalize_transactions(runtime: R) { } } - if !more_to_process { + if !more_to_process && !statuses.had_too_many_outcalls { // All work fits in this round scopeguard::ScopeGuard::into_inner(reschedule); } @@ -162,17 +154,9 @@ pub async fn resubmit_transactions(runtime: R) { runtime.set_timer(Duration::ZERO, resubmit_transactions); }); - if too_many_http_outcalls() { - log!( - Priority::Info, - "Too many concurrent HTTP outcalls, rescheduling resubmit_transactions" - ); - return; - } - - resubmit_expired_transactions(&runtime, to_resubmit).await; + let had_too_many_outcalls = resubmit_expired_transactions(&runtime, to_resubmit).await; - if !more_to_process { + if !more_to_process && !had_too_many_outcalls { // All work fits in this round scopeguard::ScopeGuard::into_inner(reschedule); } @@ -188,6 +172,8 @@ struct TransactionStatuses { errored: BTreeMap, /// Transactions with no on-chain status (safe to resubmit if expired). not_found: BTreeSet, + /// `true` if any batch returned [`GetSignatureStatusesError::TooManyOutcalls`]. + had_too_many_outcalls: bool, } async fn check_transaction_statuses( @@ -206,20 +192,29 @@ async fn check_transaction_statuses( succeeded: BTreeSet::new(), errored: BTreeMap::new(), not_found: BTreeSet::new(), + had_too_many_outcalls: false, }; - let batch_results: Vec<_> = futures::future::join_all(batches.into_iter().map(async |batch| { - match get_signature_statuses(runtime, &batch).await { - Ok(statuses) => Some((batch, statuses)), + let batch_results: Vec> = + futures::future::join_all(batches.into_iter().map(async |batch| { + get_signature_statuses(runtime, &batch) + .await + .map(|statuses| (batch, statuses)) + })) + .await; + + for batch_result in batch_results { + let (sigs, statuses) = match batch_result { + Ok(data) => data, + Err(GetSignatureStatusesError::TooManyOutcalls) => { + result.had_too_many_outcalls = true; + continue; + } Err(e) => { log!(Priority::Info, "Failed to check transaction statuses: {e}"); - None + continue; } - } - })) - .await; - - for (sigs, statuses) in batch_results.into_iter().flatten() { + }; for (signature, status) in sigs.iter().zip(statuses) { match status { Some(s) @@ -242,42 +237,61 @@ async fn check_transaction_statuses( result } +/// Resubmit expired transactions. Returns `true` if any call was rejected due +/// to [`HttpOutcallGuardError::TooManyOutcalls`], which the caller uses to decide +/// whether to reschedule immediately. async fn resubmit_expired_transactions( runtime: &R, to_resubmit: Vec<(Signature, VersionedMessage, Vec)>, -) { +) -> bool { let (new_slot, new_blockhash) = match get_recent_slot_and_blockhash(runtime).await { Ok(result) => result, Err(e) => { log!(Priority::Info, "Failed to get recent blockhash: {e}"); - return; + return false; } }; - futures::future::join_all(to_resubmit.into_iter().take(MAX_CONCURRENT_RPC_CALLS).map( - async |(old_signature, message, signers)| { - match try_resubmit_transaction( - runtime, - old_signature, - message, - signers, - new_slot, - new_blockhash, - ) - .await - { - Ok(new_sig) => log!( - Priority::Info, - "Resubmitted transaction {old_signature} as {new_sig}" - ), - Err(e) => log!( - Priority::Info, - "Failed to resubmit transaction {old_signature}: {e}" - ), - } - }, - )) - .await; + let results: Vec<(Signature, Result)> = + futures::future::join_all(to_resubmit.into_iter().take(MAX_CONCURRENT_RPC_CALLS).map( + async |(old_signature, message, signers)| { + let result = try_resubmit_transaction( + runtime, + old_signature, + message, + signers, + new_slot, + new_blockhash, + ) + .await; + (old_signature, result) + }, + )) + .await; + + let had_too_many_outcalls = results.iter().any(|(_, r)| { + matches!( + r, + Err(ResubmitError::Submit( + SubmitTransactionError::TooManyOutcalls + )) + ) + }); + + for (old_signature, result) in results { + match result { + Ok(new_sig) => log!( + Priority::Info, + "Resubmitted transaction {old_signature} as {new_sig}" + ), + Err(e) => log!( + Priority::Info, + "Failed to resubmit transaction {old_signature}: {e}" + ), + } + } + + had_too_many_outcalls } async fn try_resubmit_transaction( diff --git a/minter/src/withdraw/mod.rs b/minter/src/withdraw/mod.rs index ce16cef2..cbea08a8 100644 --- a/minter/src/withdraw/mod.rs +++ b/minter/src/withdraw/mod.rs @@ -15,9 +15,9 @@ use solana_hash::Hash; use crate::{ consolidate::consolidate_deposits, constants::MAX_CONCURRENT_RPC_CALLS, - guard::{TimerGuard, too_many_http_outcalls, withdrawal_guard}, + guard::{TimerGuard, withdrawal_guard}, ledger::{BurnError, burn}, - rpc::{get_recent_slot_and_blockhash, submit_transaction}, + rpc::{SubmitTransactionError, get_recent_slot_and_blockhash, submit_transaction}, runtime::CanisterRuntime, sol_transfer::{MAX_WITHDRAWALS_PER_TX, create_signed_batch_withdrawal_transaction}, state::{ @@ -154,14 +154,6 @@ pub async fn process_pending_withdrawals(runtime: R) { return; } - if too_many_http_outcalls() { - log!( - Priority::Info, - "Too many concurrent HTTP outcalls, rescheduling process_pending_withdrawals" - ); - return; - } - let (slot, recent_blockhash) = match get_recent_slot_and_blockhash(&runtime).await { Ok(result) => result, Err(e) => { @@ -170,23 +162,30 @@ pub async fn process_pending_withdrawals(runtime: R) { } }; - futures::future::join_all(batches.into_iter().map(async |batch| { - submit_withdrawal_transaction(&runtime, batch, slot, recent_blockhash).await - })) + let results: Vec = futures::future::join_all( + batches + .into_iter() + .map(|batch| submit_withdrawal_transaction(&runtime, batch, slot, recent_blockhash)), + ) .await; - if !more_to_process { + let had_too_many_outcalls = results.iter().any(|&b| b); + + if !more_to_process && !had_too_many_outcalls { // All work fits in this round scopeguard::ScopeGuard::into_inner(reschedule); } } +/// Submit a batch withdrawal transaction. Returns `true` if the submission was +/// rejected due to [`SubmitTransactionError::TooManyOutcalls`], which the +/// caller uses to decide whether to reschedule immediately. async fn submit_withdrawal_transaction( runtime: &R, requests: Vec, slot: Slot, recent_blockhash: Hash, -) { +) -> bool { let targets: Vec<_> = requests .iter() .map(|request| { @@ -209,7 +208,7 @@ async fn submit_withdrawal_transaction( Priority::Error, "Failed to create batch withdrawal transaction for burn indices {burn_indices:?}: {e}" ); - return; + return false; } }; @@ -239,12 +238,14 @@ async fn submit_withdrawal_transaction( Priority::Info, "Submitted withdrawal transaction {signature} for burn indices {burn_indices:?}" ); + false } Err(e) => { log!( Priority::Info, "Failed to send withdrawal transaction {signature} (will be resubmitted): {e}" ); + matches!(e, SubmitTransactionError::TooManyOutcalls) } } } From 8d04ce40375b5699ecbbf1b84d8810d95f6ce11e Mon Sep 17 00:00:00 2001 From: Louis Pahlavi Date: Mon, 27 Apr 2026 14:55:20 +0200 Subject: [PATCH 3/3] fixup! feat: limit concurrent HTTP outcalls with a guard Co-Authored-By: Claude Sonnet 4.6 --- minter/src/consolidate/mod.rs | 2 -- minter/src/deposit/automatic/mod.rs | 23 +++++++++++------------ minter/src/monitor/mod.rs | 21 +++++++++++---------- minter/src/withdraw/mod.rs | 20 ++++++++------------ 4 files changed, 30 insertions(+), 36 deletions(-) diff --git a/minter/src/consolidate/mod.rs b/minter/src/consolidate/mod.rs index 4d12c264..a4ecdc89 100644 --- a/minter/src/consolidate/mod.rs +++ b/minter/src/consolidate/mod.rs @@ -52,7 +52,6 @@ pub async fn consolidate_deposits(runtime: R) { .collect(); if batches.is_empty() { - // Nothing to process scopeguard::ScopeGuard::into_inner(reschedule); return; } @@ -95,7 +94,6 @@ pub async fn consolidate_deposits(runtime: R) { } if !more_to_process && !had_too_many_outcalls { - // All work fits in this round scopeguard::ScopeGuard::into_inner(reschedule); } } diff --git a/minter/src/deposit/automatic/mod.rs b/minter/src/deposit/automatic/mod.rs index 4dad2987..3ddf3de0 100644 --- a/minter/src/deposit/automatic/mod.rs +++ b/minter/src/deposit/automatic/mod.rs @@ -1,7 +1,7 @@ use crate::{ address::{account_address, lazy_get_schnorr_master_key}, constants::MAX_CONCURRENT_RPC_CALLS, - guard::TimerGuard, + guard::{HttpOutcallGuardError, TimerGuard}, rpc::{GetSignaturesForAddressError, get_signatures_for_address}, runtime::CanisterRuntime, state::{ @@ -91,7 +91,7 @@ pub async fn poll_monitored_addresses(runtime: R) { let master_key = lazy_get_schnorr_master_key(&runtime).await; - let results: Vec = futures::future::join_all( + let results: Vec> = futures::future::join_all( all_accounts .into_iter() .take(MAX_CONCURRENT_RPC_CALLS) @@ -99,22 +99,18 @@ pub async fn poll_monitored_addresses(runtime: R) { ) .await; - let had_too_many_outcalls = results.iter().any(|&b| b); + let had_too_many_outcalls = results.iter().any(Result::is_err); if !more_to_process && !had_too_many_outcalls { - // All work fits in this round scopeguard::ScopeGuard::into_inner(reschedule); } } -/// Poll a single monitored account for new deposit signatures. Returns `true` -/// if the RPC call was rejected due to [`GetSignaturesForAddressError::TooManyOutcalls`], -/// which the caller uses to decide whether to reschedule immediately. async fn poll_account( runtime: &R, master_key: &SchnorrPublicKey, account: Account, -) -> bool { +) -> Result<(), HttpOutcallGuardError> { let deposit_address = account_address(master_key, &account); let params = GetSignaturesForAddressParams { @@ -131,13 +127,16 @@ async fn poll_account( until: None, }; - let had_too_many_outcalls = match get_signatures_for_address(runtime, params).await { + let result = match get_signatures_for_address(runtime, params).await { + Err(GetSignaturesForAddressError::TooManyOutcalls) => { + Err(HttpOutcallGuardError::TooManyOutcalls) + } Err(e) => { log!( Priority::Info, "Failed to get signatures for address {deposit_address}: {e}" ); - matches!(e, GetSignaturesForAddressError::TooManyOutcalls) + Ok(()) } Ok(signatures) => { let new_sigs: Vec = signatures @@ -154,7 +153,7 @@ async fn poll_account( .extend(new_sigs); }); } - false + Ok(()) } }; @@ -166,7 +165,7 @@ async fn poll_account( ); }); - had_too_many_outcalls + result } #[cfg(any(test, feature = "canbench-rs"))] diff --git a/minter/src/monitor/mod.rs b/minter/src/monitor/mod.rs index 0cc62f9e..afb16a03 100644 --- a/minter/src/monitor/mod.rs +++ b/minter/src/monitor/mod.rs @@ -1,7 +1,7 @@ use crate::{ address::derivation_path, constants::MAX_CONCURRENT_RPC_CALLS, - guard::TimerGuard, + guard::{HttpOutcallGuardError, TimerGuard}, rpc::{ GetSignatureStatusesError, SubmitTransactionError, get_recent_slot_and_blockhash, get_signature_statuses, submit_transaction, @@ -125,7 +125,6 @@ pub async fn finalize_transactions(runtime: R) { } if !more_to_process && !statuses.had_too_many_outcalls { - // All work fits in this round scopeguard::ScopeGuard::into_inner(reschedule); } } @@ -154,10 +153,11 @@ pub async fn resubmit_transactions(runtime: R) { runtime.set_timer(Duration::ZERO, resubmit_transactions); }); - let had_too_many_outcalls = resubmit_expired_transactions(&runtime, to_resubmit).await; + let had_too_many_outcalls = resubmit_expired_transactions(&runtime, to_resubmit) + .await + .is_err(); if !more_to_process && !had_too_many_outcalls { - // All work fits in this round scopeguard::ScopeGuard::into_inner(reschedule); } } @@ -237,18 +237,15 @@ async fn check_transaction_statuses( result } -/// Resubmit expired transactions. Returns `true` if any call was rejected due -/// to [`HttpOutcallGuardError::TooManyOutcalls`], which the caller uses to decide -/// whether to reschedule immediately. async fn resubmit_expired_transactions( runtime: &R, to_resubmit: Vec<(Signature, VersionedMessage, Vec)>, -) -> bool { +) -> Result<(), HttpOutcallGuardError> { let (new_slot, new_blockhash) = match get_recent_slot_and_blockhash(runtime).await { Ok(result) => result, Err(e) => { log!(Priority::Info, "Failed to get recent blockhash: {e}"); - return false; + return Ok(()); } }; @@ -291,7 +288,11 @@ async fn resubmit_expired_transactions( } } - had_too_many_outcalls + if had_too_many_outcalls { + Err(HttpOutcallGuardError::TooManyOutcalls) + } else { + Ok(()) + } } async fn try_resubmit_transaction( diff --git a/minter/src/withdraw/mod.rs b/minter/src/withdraw/mod.rs index cbea08a8..356dfbdc 100644 --- a/minter/src/withdraw/mod.rs +++ b/minter/src/withdraw/mod.rs @@ -15,7 +15,7 @@ use solana_hash::Hash; use crate::{ consolidate::consolidate_deposits, constants::MAX_CONCURRENT_RPC_CALLS, - guard::{TimerGuard, withdrawal_guard}, + guard::{HttpOutcallGuardError, TimerGuard, withdrawal_guard}, ledger::{BurnError, burn}, rpc::{SubmitTransactionError, get_recent_slot_and_blockhash, submit_transaction}, runtime::CanisterRuntime, @@ -149,7 +149,6 @@ pub async fn process_pending_withdrawals(runtime: R) { .collect(); if batches.is_empty() { - // Nothing to process scopeguard::ScopeGuard::into_inner(reschedule); return; } @@ -162,30 +161,26 @@ pub async fn process_pending_withdrawals(runtime: R) { } }; - let results: Vec = futures::future::join_all( + let results: Vec> = futures::future::join_all( batches .into_iter() .map(|batch| submit_withdrawal_transaction(&runtime, batch, slot, recent_blockhash)), ) .await; - let had_too_many_outcalls = results.iter().any(|&b| b); + let had_too_many_outcalls = results.iter().any(Result::is_err); if !more_to_process && !had_too_many_outcalls { - // All work fits in this round scopeguard::ScopeGuard::into_inner(reschedule); } } -/// Submit a batch withdrawal transaction. Returns `true` if the submission was -/// rejected due to [`SubmitTransactionError::TooManyOutcalls`], which the -/// caller uses to decide whether to reschedule immediately. async fn submit_withdrawal_transaction( runtime: &R, requests: Vec, slot: Slot, recent_blockhash: Hash, -) -> bool { +) -> Result<(), HttpOutcallGuardError> { let targets: Vec<_> = requests .iter() .map(|request| { @@ -208,7 +203,7 @@ async fn submit_withdrawal_transaction( Priority::Error, "Failed to create batch withdrawal transaction for burn indices {burn_indices:?}: {e}" ); - return false; + return Ok(()); } }; @@ -238,14 +233,15 @@ async fn submit_withdrawal_transaction( Priority::Info, "Submitted withdrawal transaction {signature} for burn indices {burn_indices:?}" ); - false + Ok(()) } + Err(SubmitTransactionError::TooManyOutcalls) => Err(HttpOutcallGuardError::TooManyOutcalls), Err(e) => { log!( Priority::Info, "Failed to send withdrawal transaction {signature} (will be resubmitted): {e}" ); - matches!(e, SubmitTransactionError::TooManyOutcalls) + Ok(()) } } }