diff --git a/minter/src/consolidate/mod.rs b/minter/src/consolidate/mod.rs index dc71f797..a4ecdc89 100644 --- a/minter/src/consolidate/mod.rs +++ b/minter/src/consolidate/mod.rs @@ -52,7 +52,6 @@ pub async fn consolidate_deposits(runtime: R) { .collect(); if batches.is_empty() { - // Nothing to process scopeguard::ScopeGuard::into_inner(reschedule); return; } @@ -65,8 +64,23 @@ pub async fn consolidate_deposits(runtime: R) { } }; - futures::future::join_all(batches.into_iter().map(async |funds| { - match submit_consolidation_transaction(&runtime, funds, slot, recent_blockhash).await { + let results: Vec> = + futures::future::join_all(batches.into_iter().map(|funds| { + submit_consolidation_transaction(&runtime, funds, slot, recent_blockhash) + })) + .await; + + let had_too_many_outcalls = results.iter().any(|r| { + matches!( + r, + Err(ConsolidationError::SubmitTransactionFailed( + SubmitTransactionError::TooManyOutcalls + )) + ) + }); + + for result in results { + match result { Ok(sig) => log!(Priority::Info, "Submitted consolidation transaction {sig}"), Err(ConsolidationError::CreateTransactionFailed(e)) => log!( Priority::Error, @@ -77,11 +91,9 @@ pub async fn consolidate_deposits(runtime: R) { "Failed to submit deposit consolidation transaction (will retry): {e}" ), } - })) - .await; + } - if !more_to_process { - // All work fits in this round + if !more_to_process && !had_too_many_outcalls { scopeguard::ScopeGuard::into_inner(reschedule); } } diff --git a/minter/src/constants.rs b/minter/src/constants.rs index 09ac540c..b61b20fc 100644 --- a/minter/src/constants.rs +++ b/minter/src/constants.rs @@ -1,6 +1,12 @@ /// Maximum number of concurrent calls to the SOL RPC canister. pub const MAX_CONCURRENT_RPC_CALLS: usize = 10; +/// Maximum number of concurrent HTTP outcalls the minter may have in flight at once. +/// +/// Each call to the SOL RPC canister triggers one or more HTTPS outcalls on the IC. +/// Limiting the number of in-flight outcalls prevents resource exhaustion. +pub const MAX_CONCURRENT_HTTP_OUTCALLS: u32 = 50; + /// Matches the ICP HTTPS outcall response limit for variable-length RPC calls /// such as `getTransaction` and `getSignatureStatuses`: /// https://docs.internetcomputer.org/references/ic-interface-spec#ic-http_request diff --git a/minter/src/deposit/automatic/mod.rs b/minter/src/deposit/automatic/mod.rs index 32ee80fd..3ddf3de0 100644 --- a/minter/src/deposit/automatic/mod.rs +++ b/minter/src/deposit/automatic/mod.rs @@ -1,8 +1,8 @@ use crate::{ address::{account_address, lazy_get_schnorr_master_key}, constants::MAX_CONCURRENT_RPC_CALLS, - guard::TimerGuard, - rpc::get_signatures_for_address, + guard::{HttpOutcallGuardError, TimerGuard}, + rpc::{GetSignaturesForAddressError, get_signatures_for_address}, runtime::CanisterRuntime, state::{ SchnorrPublicKey, TaskType, audit::process_event, event::EventType, mutate_state, @@ -91,7 +91,7 @@ pub async fn poll_monitored_addresses(runtime: R) { let master_key = lazy_get_schnorr_master_key(&runtime).await; - futures::future::join_all( + let results: Vec> = futures::future::join_all( all_accounts .into_iter() .take(MAX_CONCURRENT_RPC_CALLS) @@ -99,8 +99,9 @@ pub async fn poll_monitored_addresses(runtime: R) { ) .await; - if !more_to_process { - // All work fits in this round + let had_too_many_outcalls = results.iter().any(Result::is_err); + + if !more_to_process && !had_too_many_outcalls { scopeguard::ScopeGuard::into_inner(reschedule); } } @@ -109,7 +110,7 @@ async fn poll_account( runtime: &R, master_key: &SchnorrPublicKey, account: Account, -) { +) -> Result<(), HttpOutcallGuardError> { let deposit_address = account_address(master_key, &account); let params = GetSignaturesForAddressParams { @@ -126,12 +127,16 @@ async fn poll_account( until: None, }; - match get_signatures_for_address(runtime, params).await { + let result = match get_signatures_for_address(runtime, params).await { + Err(GetSignaturesForAddressError::TooManyOutcalls) => { + Err(HttpOutcallGuardError::TooManyOutcalls) + } Err(e) => { log!( Priority::Info, "Failed to get signatures for address {deposit_address}: {e}" ); + Ok(()) } Ok(signatures) => { let new_sigs: Vec = signatures @@ -148,8 +153,9 @@ async fn poll_account( .extend(new_sigs); }); } + Ok(()) } - } + }; mutate_state(|state| { process_event( @@ -158,6 +164,8 @@ async fn poll_account( runtime, ); }); + + result } #[cfg(any(test, feature = "canbench-rs"))] diff --git a/minter/src/guard/mod.rs b/minter/src/guard/mod.rs index aeaf0b9e..87feefdb 100644 --- a/minter/src/guard/mod.rs +++ b/minter/src/guard/mod.rs @@ -1,4 +1,7 @@ -use crate::state::{State, TaskType, mutate_state}; +use crate::{ + constants::MAX_CONCURRENT_HTTP_OUTCALLS, + state::{State, TaskType, mutate_state}, +}; use cksol_types::{ProcessDepositError, WithdrawalError}; use icrc_ledger_types::icrc1::account::Account; use std::{collections::BTreeSet, marker::PhantomData}; @@ -130,3 +133,40 @@ impl Drop for TimerGuard { }); } } + +#[derive(Debug, Eq, PartialEq)] +pub enum HttpOutcallGuardError { + TooManyOutcalls, +} + +/// Guards a single HTTP outcall to the SOL RPC canister. +/// +/// Acquiring this guard increments the active-outcall counter in canister state; +/// dropping it decrements the counter. [`HttpOutcallGuard::new`] fails with +/// [`HttpOutcallGuardError::TooManyOutcalls`] when +/// [`MAX_CONCURRENT_HTTP_OUTCALLS`] guards are already held. +#[must_use] +pub struct HttpOutcallGuard; + +impl HttpOutcallGuard { + pub fn new() -> Result { + mutate_state(|s| { + if s.active_http_outcalls() >= MAX_CONCURRENT_HTTP_OUTCALLS { + return Err(HttpOutcallGuardError::TooManyOutcalls); + } + *s.active_http_outcalls_mut() += 1; + Ok(Self) + }) + } +} + +impl Drop for HttpOutcallGuard { + fn drop(&mut self) { + mutate_state(|s| { + let count = s.active_http_outcalls_mut(); + *count = count + .checked_sub(1) + .expect("BUG: HTTP outcall counter underflow"); + }); + } +} diff --git a/minter/src/guard/tests.rs b/minter/src/guard/tests.rs index 5d368696..c6e77148 100644 --- a/minter/src/guard/tests.rs +++ b/minter/src/guard/tests.rs @@ -1,6 +1,10 @@ use crate::{ - guard::{GuardError, MAX_CONCURRENT, TimerGuard, TimerGuardError, process_deposit_guard}, - state::TaskType, + constants::MAX_CONCURRENT_HTTP_OUTCALLS, + guard::{ + GuardError, HttpOutcallGuard, HttpOutcallGuardError, MAX_CONCURRENT, TimerGuard, + TimerGuardError, process_deposit_guard, + }, + state::{TaskType, read_state}, test_fixtures::init_state, }; use candid::Principal; @@ -111,3 +115,77 @@ mod timer_guard { assert!(guard2.is_ok()); } } + +mod http_outcall_guard { + use super::*; + + #[test] + fn should_acquire_and_release_guard() { + init_state(); + + assert_eq!(read_state(|s| s.active_http_outcalls()), 0); + { + let _guard = HttpOutcallGuard::new().unwrap(); + assert_eq!(read_state(|s| s.active_http_outcalls()), 1); + } + assert_eq!(read_state(|s| s.active_http_outcalls()), 0); + } + + #[test] + fn should_allow_up_to_max_concurrent_guards() { + init_state(); + + let guards: Vec<_> = (0..MAX_CONCURRENT_HTTP_OUTCALLS) + .map(|_| HttpOutcallGuard::new().expect("should succeed below limit")) + .collect(); + assert_eq!( + read_state(|s| s.active_http_outcalls()), + MAX_CONCURRENT_HTTP_OUTCALLS + ); + drop(guards); + assert_eq!(read_state(|s| s.active_http_outcalls()), 0); + } + + #[test] + fn should_reject_when_limit_reached() { + init_state(); + + let _guards: Vec<_> = (0..MAX_CONCURRENT_HTTP_OUTCALLS) + .map(|_| HttpOutcallGuard::new().expect("should succeed below limit")) + .collect(); + + let result = HttpOutcallGuard::new(); + assert_eq!(result.err(), Some(HttpOutcallGuardError::TooManyOutcalls)); + } + + #[test] + fn should_allow_new_guard_after_one_is_dropped() { + init_state(); + + let guards: Vec<_> = (0..MAX_CONCURRENT_HTTP_OUTCALLS) + .map(|_| HttpOutcallGuard::new().expect("should succeed below limit")) + .collect(); + + // Drop one + drop(guards); + + // Should be able to acquire a new guard + let result = HttpOutcallGuard::new(); + assert!(result.is_ok()); + } + + #[test] + fn should_track_multiple_concurrent_guards_independently() { + init_state(); + + let guard1 = HttpOutcallGuard::new().unwrap(); + let guard2 = HttpOutcallGuard::new().unwrap(); + assert_eq!(read_state(|s| s.active_http_outcalls()), 2); + + drop(guard1); + assert_eq!(read_state(|s| s.active_http_outcalls()), 1); + + drop(guard2); + assert_eq!(read_state(|s| s.active_http_outcalls()), 0); + } +} diff --git a/minter/src/monitor/mod.rs b/minter/src/monitor/mod.rs index b3ef33b0..afb16a03 100644 --- a/minter/src/monitor/mod.rs +++ b/minter/src/monitor/mod.rs @@ -1,10 +1,10 @@ use crate::{ address::derivation_path, constants::MAX_CONCURRENT_RPC_CALLS, - guard::TimerGuard, + guard::{HttpOutcallGuardError, TimerGuard}, rpc::{ - SubmitTransactionError, get_recent_slot_and_blockhash, get_signature_statuses, - submit_transaction, + GetSignatureStatusesError, SubmitTransactionError, get_recent_slot_and_blockhash, + get_signature_statuses, submit_transaction, }, runtime::CanisterRuntime, signer::sign_bytes, @@ -124,8 +124,7 @@ pub async fn finalize_transactions(runtime: R) { } } - if !more_to_process { - // All work fits in this round + if !more_to_process && !statuses.had_too_many_outcalls { scopeguard::ScopeGuard::into_inner(reschedule); } } @@ -154,10 +153,11 @@ pub async fn resubmit_transactions(runtime: R) { runtime.set_timer(Duration::ZERO, resubmit_transactions); }); - resubmit_expired_transactions(&runtime, to_resubmit).await; + let had_too_many_outcalls = resubmit_expired_transactions(&runtime, to_resubmit) + .await + .is_err(); - if !more_to_process { - // All work fits in this round + if !more_to_process && !had_too_many_outcalls { scopeguard::ScopeGuard::into_inner(reschedule); } } @@ -172,6 +172,8 @@ struct TransactionStatuses { errored: BTreeMap, /// Transactions with no on-chain status (safe to resubmit if expired). not_found: BTreeSet, + /// `true` if any batch returned [`GetSignatureStatusesError::TooManyOutcalls`]. + had_too_many_outcalls: bool, } async fn check_transaction_statuses( @@ -190,20 +192,29 @@ async fn check_transaction_statuses( succeeded: BTreeSet::new(), errored: BTreeMap::new(), not_found: BTreeSet::new(), + had_too_many_outcalls: false, }; - let batch_results: Vec<_> = futures::future::join_all(batches.into_iter().map(async |batch| { - match get_signature_statuses(runtime, &batch).await { - Ok(statuses) => Some((batch, statuses)), + let batch_results: Vec> = + futures::future::join_all(batches.into_iter().map(async |batch| { + get_signature_statuses(runtime, &batch) + .await + .map(|statuses| (batch, statuses)) + })) + .await; + + for batch_result in batch_results { + let (sigs, statuses) = match batch_result { + Ok(data) => data, + Err(GetSignatureStatusesError::TooManyOutcalls) => { + result.had_too_many_outcalls = true; + continue; + } Err(e) => { log!(Priority::Info, "Failed to check transaction statuses: {e}"); - None + continue; } - } - })) - .await; - - for (sigs, statuses) in batch_results.into_iter().flatten() { + }; for (signature, status) in sigs.iter().zip(statuses) { match status { Some(s) @@ -229,39 +240,59 @@ async fn check_transaction_statuses( async fn resubmit_expired_transactions( runtime: &R, to_resubmit: Vec<(Signature, VersionedMessage, Vec)>, -) { +) -> Result<(), HttpOutcallGuardError> { let (new_slot, new_blockhash) = match get_recent_slot_and_blockhash(runtime).await { Ok(result) => result, Err(e) => { log!(Priority::Info, "Failed to get recent blockhash: {e}"); - return; + return Ok(()); } }; - futures::future::join_all(to_resubmit.into_iter().take(MAX_CONCURRENT_RPC_CALLS).map( - async |(old_signature, message, signers)| { - match try_resubmit_transaction( - runtime, - old_signature, - message, - signers, - new_slot, - new_blockhash, - ) - .await - { - Ok(new_sig) => log!( - Priority::Info, - "Resubmitted transaction {old_signature} as {new_sig}" - ), - Err(e) => log!( - Priority::Info, - "Failed to resubmit transaction {old_signature}: {e}" - ), - } - }, - )) - .await; + let results: Vec<(Signature, Result)> = + futures::future::join_all(to_resubmit.into_iter().take(MAX_CONCURRENT_RPC_CALLS).map( + async |(old_signature, message, signers)| { + let result = try_resubmit_transaction( + runtime, + old_signature, + message, + signers, + new_slot, + new_blockhash, + ) + .await; + (old_signature, result) + }, + )) + .await; + + let had_too_many_outcalls = results.iter().any(|(_, r)| { + matches!( + r, + Err(ResubmitError::Submit( + SubmitTransactionError::TooManyOutcalls + )) + ) + }); + + for (old_signature, result) in results { + match result { + Ok(new_sig) => log!( + Priority::Info, + "Resubmitted transaction {old_signature} as {new_sig}" + ), + Err(e) => log!( + Priority::Info, + "Failed to resubmit transaction {old_signature}: {e}" + ), + } + } + + if had_too_many_outcalls { + Err(HttpOutcallGuardError::TooManyOutcalls) + } else { + Ok(()) + } } async fn try_resubmit_transaction( diff --git a/minter/src/rpc/mod.rs b/minter/src/rpc/mod.rs index cbbfc860..05c4a992 100644 --- a/minter/src/rpc/mod.rs +++ b/minter/src/rpc/mod.rs @@ -1,5 +1,6 @@ use crate::{ constants::{GET_SIGNATURE_STATUSES_CYCLES, MAX_HTTP_OUTCALL_RESPONSE_BYTES}, + guard::{HttpOutcallGuard, HttpOutcallGuardError}, runtime::CanisterRuntime, state::read_state, }; @@ -24,6 +25,8 @@ pub async fn get_transaction( signature: Signature, cycles_to_attach: u128, ) -> Result, GetTransactionError> { + let _guard = HttpOutcallGuard::new() + .map_err(|_: HttpOutcallGuardError| GetTransactionError::TooManyOutcalls)?; let result = read_state(|state| state.sol_rpc_client(runtime.inter_canister_call_runtime())) .get_transaction(signature) .with_encoding(GetTransactionEncoding::Base64) @@ -48,6 +51,8 @@ pub enum GetTransactionError { RpcError(RpcError), #[error("Inconsistent RPC results for transaction")] InconsistentRpcResults, + #[error("Too many concurrent HTTP outcalls")] + TooManyOutcalls, } impl From for ProcessDepositError { @@ -60,6 +65,8 @@ pub async fn submit_transaction( runtime: &R, transaction: Transaction, ) -> Result { + let _guard = HttpOutcallGuard::new() + .map_err(|_: HttpOutcallGuardError| SubmitTransactionError::TooManyOutcalls)?; let client = read_state(|state| state.sol_rpc_client(runtime.inter_canister_call_runtime())); match client.send_transaction(transaction).try_send().await { Ok(MultiRpcResult::Consistent(Ok(signature))) => Ok(signature), @@ -77,11 +84,15 @@ pub enum SubmitTransactionError { RpcError(RpcError), #[error("Inconsistent RPC results for sendTransaction")] InconsistentRpcResults, + #[error("Too many concurrent HTTP outcalls")] + TooManyOutcalls, } pub async fn get_recent_slot_and_blockhash( runtime: &R, ) -> Result<(Slot, Hash), GetRecentBlockhashError> { + let _guard = HttpOutcallGuard::new() + .map_err(|_: HttpOutcallGuardError| GetRecentBlockhashError::TooManyOutcalls)?; let client = read_state(|state| state.sol_rpc_client(runtime.inter_canister_call_runtime())); match client.get_recent_block().try_send().await { Ok((slot, block)) => { @@ -104,6 +115,8 @@ pub async fn get_recent_slot_and_blockhash( pub enum GetRecentBlockhashError { #[error("Failed to get recent block: {0:?}")] Failed(Vec), + #[error("Too many concurrent HTTP outcalls")] + TooManyOutcalls, } pub async fn get_signature_statuses( @@ -113,6 +126,8 @@ pub async fn get_signature_statuses( Vec>, GetSignatureStatusesError, > { + let _guard = HttpOutcallGuard::new() + .map_err(|_: HttpOutcallGuardError| GetSignatureStatusesError::TooManyOutcalls)?; let client = read_state(|state| state.sol_rpc_client(runtime.inter_canister_call_runtime())); let result = client .get_signature_statuses(signatures) @@ -136,12 +151,16 @@ pub enum GetSignatureStatusesError { RpcError(RpcError), #[error("Inconsistent RPC results for getSignatureStatuses")] InconsistentRpcResults, + #[error("Too many concurrent HTTP outcalls")] + TooManyOutcalls, } pub async fn get_signatures_for_address( runtime: &R, params: GetSignaturesForAddressParams, ) -> Result, GetSignaturesForAddressError> { + let _guard = HttpOutcallGuard::new() + .map_err(|_: HttpOutcallGuardError| GetSignaturesForAddressError::TooManyOutcalls)?; let client = read_state(|state| state.sol_rpc_client(runtime.inter_canister_call_runtime())); let result = client.get_signatures_for_address(params).try_send().await; match result? { @@ -161,4 +180,6 @@ pub enum GetSignaturesForAddressError { RpcError(RpcError), #[error("Inconsistent RPC results for getSignaturesForAddress")] InconsistentRpcResults, + #[error("Too many concurrent HTTP outcalls")] + TooManyOutcalls, } diff --git a/minter/src/state/mod.rs b/minter/src/state/mod.rs index c265d456..82d2559c 100644 --- a/minter/src/state/mod.rs +++ b/minter/src/state/mod.rs @@ -108,6 +108,7 @@ pub struct State { failed_transactions: InsertionOrderedMap, consolidation_transactions: InsertionOrderedMap, active_tasks: BTreeSet, + active_http_outcalls: u32, balance: Lamport, } @@ -321,6 +322,14 @@ impl State { &mut self.active_tasks } + pub fn active_http_outcalls(&self) -> u32 { + self.active_http_outcalls + } + + pub fn active_http_outcalls_mut(&mut self) -> &mut u32 { + &mut self.active_http_outcalls + } + fn transaction_fee(&self, message: &VersionedMessage) -> Lamport { let VersionedMessage::Legacy(msg) = message; FEE_PER_SIGNATURE * msg.header.num_required_signatures as u64 @@ -795,6 +804,7 @@ impl TryFrom for State { failed_transactions: InsertionOrderedMap::new(), consolidation_transactions: InsertionOrderedMap::new(), active_tasks: BTreeSet::new(), + active_http_outcalls: 0, balance: 0, }; state.validate()?; diff --git a/minter/src/state/tests.rs b/minter/src/state/tests.rs index e3ef55e3..ef8a60d3 100644 --- a/minter/src/state/tests.rs +++ b/minter/src/state/tests.rs @@ -245,6 +245,7 @@ mod state_from_init_args { failed_transactions: InsertionOrderedMap::new(), consolidation_transactions: InsertionOrderedMap::new(), active_tasks: BTreeSet::new(), + active_http_outcalls: 0, balance: 0, } ); diff --git a/minter/src/withdraw/mod.rs b/minter/src/withdraw/mod.rs index 19571653..356dfbdc 100644 --- a/minter/src/withdraw/mod.rs +++ b/minter/src/withdraw/mod.rs @@ -15,9 +15,9 @@ use solana_hash::Hash; use crate::{ consolidate::consolidate_deposits, constants::MAX_CONCURRENT_RPC_CALLS, - guard::{TimerGuard, withdrawal_guard}, + guard::{HttpOutcallGuardError, TimerGuard, withdrawal_guard}, ledger::{BurnError, burn}, - rpc::{get_recent_slot_and_blockhash, submit_transaction}, + rpc::{SubmitTransactionError, get_recent_slot_and_blockhash, submit_transaction}, runtime::CanisterRuntime, sol_transfer::{MAX_WITHDRAWALS_PER_TX, create_signed_batch_withdrawal_transaction}, state::{ @@ -149,7 +149,6 @@ pub async fn process_pending_withdrawals(runtime: R) { .collect(); if batches.is_empty() { - // Nothing to process scopeguard::ScopeGuard::into_inner(reschedule); return; } @@ -162,13 +161,16 @@ pub async fn process_pending_withdrawals(runtime: R) { } }; - futures::future::join_all(batches.into_iter().map(async |batch| { - submit_withdrawal_transaction(&runtime, batch, slot, recent_blockhash).await - })) + let results: Vec> = futures::future::join_all( + batches + .into_iter() + .map(|batch| submit_withdrawal_transaction(&runtime, batch, slot, recent_blockhash)), + ) .await; - if !more_to_process { - // All work fits in this round + let had_too_many_outcalls = results.iter().any(Result::is_err); + + if !more_to_process && !had_too_many_outcalls { scopeguard::ScopeGuard::into_inner(reschedule); } } @@ -178,7 +180,7 @@ async fn submit_withdrawal_transaction( requests: Vec, slot: Slot, recent_blockhash: Hash, -) { +) -> Result<(), HttpOutcallGuardError> { let targets: Vec<_> = requests .iter() .map(|request| { @@ -201,7 +203,7 @@ async fn submit_withdrawal_transaction( Priority::Error, "Failed to create batch withdrawal transaction for burn indices {burn_indices:?}: {e}" ); - return; + return Ok(()); } }; @@ -231,12 +233,15 @@ async fn submit_withdrawal_transaction( Priority::Info, "Submitted withdrawal transaction {signature} for burn indices {burn_indices:?}" ); + Ok(()) } + Err(SubmitTransactionError::TooManyOutcalls) => Err(HttpOutcallGuardError::TooManyOutcalls), Err(e) => { log!( Priority::Info, "Failed to send withdrawal transaction {signature} (will be resubmitted): {e}" ); + Ok(()) } } }