Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 19 additions & 7 deletions minter/src/consolidate/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ pub async fn consolidate_deposits<R: CanisterRuntime>(runtime: R) {
.collect();

if batches.is_empty() {
// Nothing to process
scopeguard::ScopeGuard::into_inner(reschedule);
return;
}
Expand All @@ -65,8 +64,23 @@ pub async fn consolidate_deposits<R: CanisterRuntime>(runtime: R) {
}
};

futures::future::join_all(batches.into_iter().map(async |funds| {
match submit_consolidation_transaction(&runtime, funds, slot, recent_blockhash).await {
let results: Vec<Result<Signature, ConsolidationError>> =
futures::future::join_all(batches.into_iter().map(|funds| {
submit_consolidation_transaction(&runtime, funds, slot, recent_blockhash)
}))
.await;

let had_too_many_outcalls = results.iter().any(|r| {
matches!(
r,
Err(ConsolidationError::SubmitTransactionFailed(
SubmitTransactionError::TooManyOutcalls
))
)
});

for result in results {
match result {
Ok(sig) => log!(Priority::Info, "Submitted consolidation transaction {sig}"),
Err(ConsolidationError::CreateTransactionFailed(e)) => log!(
Priority::Error,
Expand All @@ -77,11 +91,9 @@ pub async fn consolidate_deposits<R: CanisterRuntime>(runtime: R) {
"Failed to submit deposit consolidation transaction (will retry): {e}"
),
}
}))
.await;
}

if !more_to_process {
// All work fits in this round
if !more_to_process && !had_too_many_outcalls {
scopeguard::ScopeGuard::into_inner(reschedule);
}
}
Expand Down
6 changes: 6 additions & 0 deletions minter/src/constants.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
/// Maximum number of concurrent calls to the SOL RPC canister.
pub const MAX_CONCURRENT_RPC_CALLS: usize = 10;

/// Maximum number of concurrent HTTP outcalls the minter may have in flight at once.
///
/// Each call to the SOL RPC canister triggers one or more HTTPS outcalls on the IC.
/// Limiting the number of in-flight outcalls prevents resource exhaustion.
pub const MAX_CONCURRENT_HTTP_OUTCALLS: u32 = 50;

/// Matches the ICP HTTPS outcall response limit for variable-length RPC calls
/// such as `getTransaction` and `getSignatureStatuses`:
/// https://docs.internetcomputer.org/references/ic-interface-spec#ic-http_request
Expand Down
24 changes: 16 additions & 8 deletions minter/src/deposit/automatic/mod.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use crate::{
address::{account_address, lazy_get_schnorr_master_key},
constants::MAX_CONCURRENT_RPC_CALLS,
guard::TimerGuard,
rpc::get_signatures_for_address,
guard::{HttpOutcallGuardError, TimerGuard},
rpc::{GetSignaturesForAddressError, get_signatures_for_address},
runtime::CanisterRuntime,
state::{
SchnorrPublicKey, TaskType, audit::process_event, event::EventType, mutate_state,
Expand Down Expand Up @@ -91,16 +91,17 @@ pub async fn poll_monitored_addresses<R: CanisterRuntime>(runtime: R) {

let master_key = lazy_get_schnorr_master_key(&runtime).await;

futures::future::join_all(
let results: Vec<Result<(), HttpOutcallGuardError>> = futures::future::join_all(
all_accounts
.into_iter()
.take(MAX_CONCURRENT_RPC_CALLS)
.map(|account| poll_account(&runtime, &master_key, account)),
)
.await;

if !more_to_process {
// All work fits in this round
let had_too_many_outcalls = results.iter().any(Result::is_err);

if !more_to_process && !had_too_many_outcalls {
scopeguard::ScopeGuard::into_inner(reschedule);
}
}
Expand All @@ -109,7 +110,7 @@ async fn poll_account<R: CanisterRuntime>(
runtime: &R,
master_key: &SchnorrPublicKey,
account: Account,
) {
) -> Result<(), HttpOutcallGuardError> {
let deposit_address = account_address(master_key, &account);

let params = GetSignaturesForAddressParams {
Expand All @@ -126,12 +127,16 @@ async fn poll_account<R: CanisterRuntime>(
until: None,
};

match get_signatures_for_address(runtime, params).await {
let result = match get_signatures_for_address(runtime, params).await {
Err(GetSignaturesForAddressError::TooManyOutcalls) => {
Err(HttpOutcallGuardError::TooManyOutcalls)
}
Err(e) => {
log!(
Priority::Info,
"Failed to get signatures for address {deposit_address}: {e}"
);
Ok(())
}
Ok(signatures) => {
let new_sigs: Vec<Signature> = signatures
Expand All @@ -148,8 +153,9 @@ async fn poll_account<R: CanisterRuntime>(
.extend(new_sigs);
});
}
Ok(())
}
}
};

mutate_state(|state| {
process_event(
Expand All @@ -158,6 +164,8 @@ async fn poll_account<R: CanisterRuntime>(
runtime,
);
});

result
}

#[cfg(any(test, feature = "canbench-rs"))]
Expand Down
42 changes: 41 additions & 1 deletion minter/src/guard/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use crate::state::{State, TaskType, mutate_state};
use crate::{
constants::MAX_CONCURRENT_HTTP_OUTCALLS,
state::{State, TaskType, mutate_state},
};
use cksol_types::{ProcessDepositError, WithdrawalError};
use icrc_ledger_types::icrc1::account::Account;
use std::{collections::BTreeSet, marker::PhantomData};
Expand Down Expand Up @@ -130,3 +133,40 @@ impl Drop for TimerGuard {
});
}
}

#[derive(Debug, Eq, PartialEq)]
pub enum HttpOutcallGuardError {
TooManyOutcalls,
}

/// Guards a single HTTP outcall to the SOL RPC canister.
///
/// Acquiring this guard increments the active-outcall counter in canister state;
/// dropping it decrements the counter. [`HttpOutcallGuard::new`] fails with
/// [`HttpOutcallGuardError::TooManyOutcalls`] when
/// [`MAX_CONCURRENT_HTTP_OUTCALLS`] guards are already held.
#[must_use]
pub struct HttpOutcallGuard;

impl HttpOutcallGuard {
pub fn new() -> Result<Self, HttpOutcallGuardError> {
mutate_state(|s| {
if s.active_http_outcalls() >= MAX_CONCURRENT_HTTP_OUTCALLS {
return Err(HttpOutcallGuardError::TooManyOutcalls);
}
*s.active_http_outcalls_mut() += 1;
Ok(Self)
})
}
}

impl Drop for HttpOutcallGuard {
fn drop(&mut self) {
mutate_state(|s| {
let count = s.active_http_outcalls_mut();
*count = count
.checked_sub(1)
.expect("BUG: HTTP outcall counter underflow");
});
}
}
82 changes: 80 additions & 2 deletions minter/src/guard/tests.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
use crate::{
guard::{GuardError, MAX_CONCURRENT, TimerGuard, TimerGuardError, process_deposit_guard},
state::TaskType,
constants::MAX_CONCURRENT_HTTP_OUTCALLS,
guard::{
GuardError, HttpOutcallGuard, HttpOutcallGuardError, MAX_CONCURRENT, TimerGuard,
TimerGuardError, process_deposit_guard,
},
state::{TaskType, read_state},
test_fixtures::init_state,
};
use candid::Principal;
Expand Down Expand Up @@ -111,3 +115,77 @@ mod timer_guard {
assert!(guard2.is_ok());
}
}

mod http_outcall_guard {
use super::*;

#[test]
fn should_acquire_and_release_guard() {
init_state();

assert_eq!(read_state(|s| s.active_http_outcalls()), 0);
{
let _guard = HttpOutcallGuard::new().unwrap();
assert_eq!(read_state(|s| s.active_http_outcalls()), 1);
}
assert_eq!(read_state(|s| s.active_http_outcalls()), 0);
}

#[test]
fn should_allow_up_to_max_concurrent_guards() {
init_state();

let guards: Vec<_> = (0..MAX_CONCURRENT_HTTP_OUTCALLS)
.map(|_| HttpOutcallGuard::new().expect("should succeed below limit"))
.collect();
assert_eq!(
read_state(|s| s.active_http_outcalls()),
MAX_CONCURRENT_HTTP_OUTCALLS
);
drop(guards);
assert_eq!(read_state(|s| s.active_http_outcalls()), 0);
}

#[test]
fn should_reject_when_limit_reached() {
init_state();

let _guards: Vec<_> = (0..MAX_CONCURRENT_HTTP_OUTCALLS)
.map(|_| HttpOutcallGuard::new().expect("should succeed below limit"))
.collect();

let result = HttpOutcallGuard::new();
assert_eq!(result.err(), Some(HttpOutcallGuardError::TooManyOutcalls));
}

#[test]
fn should_allow_new_guard_after_one_is_dropped() {
init_state();

let guards: Vec<_> = (0..MAX_CONCURRENT_HTTP_OUTCALLS)
.map(|_| HttpOutcallGuard::new().expect("should succeed below limit"))
.collect();

// Drop one
drop(guards);

// Should be able to acquire a new guard
let result = HttpOutcallGuard::new();
assert!(result.is_ok());
Comment on lines +165 to +174
Copy link

Copilot AI Apr 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test intends to verify that releasing a single guard below the limit allows acquiring a new one, but drop(guards) drops the entire Vec (all guards), so it doesn’t exercise the intended behavior. Consider dropping just one element (e.g., remove one guard from the Vec) and asserting the counter decreases by 1 before acquiring a new guard.

Suggested change
let guards: Vec<_> = (0..MAX_CONCURRENT_HTTP_OUTCALLS)
.map(|_| HttpOutcallGuard::new().expect("should succeed below limit"))
.collect();
// Drop one
drop(guards);
// Should be able to acquire a new guard
let result = HttpOutcallGuard::new();
assert!(result.is_ok());
let mut guards: Vec<_> = (0..MAX_CONCURRENT_HTTP_OUTCALLS)
.map(|_| HttpOutcallGuard::new().expect("should succeed below limit"))
.collect();
// Drop one
drop(guards.pop().expect("expected at least one guard to drop"));
assert_eq!(
read_state(|s| s.active_http_outcalls()),
MAX_CONCURRENT_HTTP_OUTCALLS - 1
);
// Should be able to acquire a new guard
let result = HttpOutcallGuard::new();
assert!(result.is_ok());
assert_eq!(
read_state(|s| s.active_http_outcalls()),
MAX_CONCURRENT_HTTP_OUTCALLS
);

Copilot uses AI. Check for mistakes.
}

#[test]
fn should_track_multiple_concurrent_guards_independently() {
init_state();

let guard1 = HttpOutcallGuard::new().unwrap();
let guard2 = HttpOutcallGuard::new().unwrap();
assert_eq!(read_state(|s| s.active_http_outcalls()), 2);

drop(guard1);
assert_eq!(read_state(|s| s.active_http_outcalls()), 1);

drop(guard2);
assert_eq!(read_state(|s| s.active_http_outcalls()), 0);
}
}
Loading
Loading