Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 11 additions & 105 deletions minter/src/consolidate/mod.rs
Original file line number Diff line number Diff line change
@@ -1,89 +1,40 @@
use crate::{
constants::MAX_CONCURRENT_RPC_CALLS,
guard::TimerGuard,
numeric::LedgerMintIndex,
rpc::{SubmitTransactionError, get_recent_slot_and_blockhash, submit_transaction},
rpc_executor::{MAX_TRANSFERS_PER_CONSOLIDATION, WorkItem, enqueue, execute_rpc_queue},
runtime::CanisterRuntime,
sol_transfer::{CreateTransferError, MAX_SIGNATURES, create_signed_consolidation_transaction},
state::{
TaskType,
audit::process_event,
event::{EventType, TransactionPurpose},
mutate_state, read_state,
},
state::{TaskType, read_state},
};
use canlog::log;
use cksol_types_internal::log::Priority;
use icrc_ledger_types::icrc1::account::Account;
use itertools::Itertools;
use sol_rpc_types::{Lamport, Slot};
use solana_hash::Hash;
use solana_signature::Signature;
use std::collections::BTreeMap;
use std::time::Duration;
use thiserror::Error;
use sol_rpc_types::Lamport;
use std::{collections::BTreeMap, time::Duration};

#[cfg(test)]
mod tests;

pub const DEPOSIT_CONSOLIDATION_DELAY: Duration = Duration::from_mins(10);

pub(crate) const MAX_TRANSFERS_PER_CONSOLIDATION: usize = MAX_SIGNATURES as usize;

pub async fn consolidate_deposits<R: CanisterRuntime>(runtime: R) {
let _guard = match TimerGuard::new(TaskType::DepositConsolidation) {
Ok(guard) => guard,
Err(_) => return,
};

let all_deposits = read_state(|s| group_deposits_by_account(s.deposits_to_consolidate()));
let more_to_process =
all_deposits.len() > MAX_CONCURRENT_RPC_CALLS * MAX_TRANSFERS_PER_CONSOLIDATION;
let reschedule = scopeguard::guard(runtime.clone(), |runtime| {
runtime.set_timer(Duration::ZERO, consolidate_deposits);
});
if all_deposits.is_empty() {
return;
}

let batches: Vec<Vec<_>> = all_deposits
for batch in all_deposits
.into_iter()
.chunks(MAX_TRANSFERS_PER_CONSOLIDATION)
.into_iter()
.take(MAX_CONCURRENT_RPC_CALLS)
.map(Iterator::collect)
.collect();

if batches.is_empty() {
// Nothing to process
scopeguard::ScopeGuard::into_inner(reschedule);
return;
{
enqueue(WorkItem::SubmitConsolidationBatch(batch.collect()));
}

let (slot, recent_blockhash) = match get_recent_slot_and_blockhash(&runtime).await {
Ok(result) => result,
Err(e) => {
log!(Priority::Info, "Failed to fetch recent blockhash: {e}");
return;
}
};

futures::future::join_all(batches.into_iter().map(async |funds| {
match submit_consolidation_transaction(&runtime, funds, slot, recent_blockhash).await {
Ok(sig) => log!(Priority::Info, "Submitted consolidation transaction {sig}"),
Err(ConsolidationError::CreateTransactionFailed(e)) => log!(
Priority::Error,
"Failed to create deposit consolidation transaction: {e}"
),
Err(ConsolidationError::SubmitTransactionFailed(e)) => log!(
Priority::Info,
"Failed to submit deposit consolidation transaction (will retry): {e}"
),
}
}))
.await;

if !more_to_process {
// All work fits in this round
scopeguard::ScopeGuard::into_inner(reschedule);
}
runtime.set_timer(Duration::ZERO, execute_rpc_queue);
}

fn group_deposits_by_account(
Expand All @@ -97,48 +48,3 @@ fn group_deposits_by_account(
}
by_account.into_iter().collect()
}

#[derive(Debug, Error)]
enum ConsolidationError {
#[error("failed to create transaction: {0}")]
CreateTransactionFailed(#[from] CreateTransferError),
#[error("failed to submit transaction: {0}")]
SubmitTransactionFailed(#[from] SubmitTransactionError),
}

async fn submit_consolidation_transaction<R: CanisterRuntime>(
runtime: &R,
funds_to_consolidate: Vec<(Account, (Lamport, Vec<LedgerMintIndex>))>,
slot: Slot,
recent_blockhash: Hash,
) -> Result<Signature, ConsolidationError> {
let (sources, mint_indices): (Vec<_>, Vec<_>) = funds_to_consolidate
.into_iter()
.map(|(account, (lamport, indices))| ((account, lamport), indices))
.unzip();
let (transaction, signers) =
create_signed_consolidation_transaction(runtime, sources, recent_blockhash).await?;

let signature = transaction.signatures[0];
let message = transaction.message.clone();

mutate_state(|state| {
process_event(
state,
EventType::SubmittedTransaction {
signature,
message: message.into(),
signers,
slot,
purpose: TransactionPurpose::ConsolidateDeposits {
mint_indices: mint_indices.into_iter().flatten().collect(),
},
},
runtime,
)
});

submit_transaction(runtime, transaction).await?;

Ok(signature)
}
21 changes: 14 additions & 7 deletions minter/src/consolidate/tests.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use super::{MAX_TRANSFERS_PER_CONSOLIDATION, consolidate_deposits};
use crate::rpc_executor::execute_rpc_queue;
use crate::{
constants::MAX_CONCURRENT_RPC_CALLS,
numeric::LedgerMintIndex,
Expand Down Expand Up @@ -88,7 +89,8 @@ async fn should_submit_single_consolidation_request() {
)))
.add_signature(fee_payer_signature.into());

consolidate_deposits(runtime).await;
consolidate_deposits(runtime.clone()).await;
execute_rpc_queue(runtime).await;

EventsAssert::from_recorded()
.expect_event(|e| assert_matches!(e, EventType::AcceptedManualDeposit { .. }))
Expand Down Expand Up @@ -124,7 +126,8 @@ async fn should_record_events_even_if_transaction_submission_fails() {
.add_stub_response(SendTransactionResult::Inconsistent(vec![]))
.add_signature(fee_payer_signature.into());

consolidate_deposits(runtime).await;
consolidate_deposits(runtime.clone()).await;
execute_rpc_queue(runtime).await;

EventsAssert::from_recorded()
.expect_event(|e| assert_matches!(e, EventType::AcceptedManualDeposit { .. }))
Expand Down Expand Up @@ -171,7 +174,8 @@ async fn should_submit_multiple_consolidation_batches() {
runtime = runtime.add_signature(signature(i).into());
}

consolidate_deposits(runtime).await;
consolidate_deposits(runtime.clone()).await;
execute_rpc_queue(runtime).await;

let mut events_assert = EventsAssert::from_recorded();
// Events from setup
Expand Down Expand Up @@ -236,7 +240,8 @@ async fn should_consolidate_multiple_deposits_to_same_account_in_single_transfer
)))
.add_signature(fee_payer_signature.into());

consolidate_deposits(runtime).await;
consolidate_deposits(runtime.clone()).await;
execute_rpc_queue(runtime).await;

EventsAssert::from_recorded()
.expect_event(|e| assert_matches!(e, EventType::AcceptedManualDeposit { .. }))
Expand Down Expand Up @@ -282,14 +287,16 @@ async fn should_reschedule_until_all_deposits_consolidated() {
}

consolidate_deposits(runtime.clone()).await;
execute_rpc_queue(runtime.clone()).await;

read_state(|s| {
assert_eq!(s.submitted_transactions().len(), MAX_CONCURRENT_RPC_CALLS);
assert_eq!(s.deposits_to_consolidate().len(), 1);
});
assert_eq!(runtime.set_timer_call_count(), 1);
// consolidate_deposits called set_timer once; execute_rpc_queue called set_timer once
assert_eq!(runtime.set_timer_call_count(), 2);

// Round 2: processes the remaining 1 deposit → no reschedule
// Round 2: process the remaining batch (already in queue from executor's reschedule).
let last_sig = signature(num_deposits);
let runtime = TestCanisterRuntime::new()
.with_increasing_time()
Expand All @@ -298,7 +305,7 @@ async fn should_reschedule_until_all_deposits_consolidated() {
.add_stub_response(SendTransactionResult::Consistent(Ok(last_sig.into())))
.add_signature(last_sig.into());

consolidate_deposits(runtime.clone()).await;
execute_rpc_queue(runtime.clone()).await;

read_state(|s| {
assert_eq!(
Expand Down
9 changes: 8 additions & 1 deletion minter/src/constants.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@
/// Maximum number of concurrent calls to the SOL RPC canister.
/// Maximum number of work items the executor processes in a single batch.
pub const MAX_CONCURRENT_RPC_CALLS: usize = 10;

/// Maximum number of concurrent SOL RPC calls from user-facing endpoints
/// (e.g. `process_deposit`) that run outside of the timer-driven executor.
pub const MAX_CONCURRENT_USER_RPC_CALLS: u32 = 10;

/// Maximum number of `getSignaturesForAddress` results to request per polled account.
pub const MAX_TRANSACTIONS_PER_ACCOUNT: usize = 10;

/// Matches the ICP HTTPS outcall response limit for variable-length RPC calls
/// such as `getTransaction` and `getSignatureStatuses`:
/// https://docs.internetcomputer.org/references/ic-interface-spec#ic-http_request
Expand Down
122 changes: 11 additions & 111 deletions minter/src/deposit/automatic/mod.rs
Original file line number Diff line number Diff line change
@@ -1,30 +1,14 @@
use crate::{
address::{account_address, lazy_get_schnorr_master_key},
constants::MAX_CONCURRENT_RPC_CALLS,
guard::TimerGuard,
rpc::get_signatures_for_address,
rpc_executor::{WorkItem, enqueue, execute_rpc_queue},
runtime::CanisterRuntime,
state::{
SchnorrPublicKey, TaskType, audit::process_event, event::EventType, mutate_state,
read_state,
},
state::{TaskType, audit::process_event, event::EventType, mutate_state, read_state},
};
use canlog::log;
use cksol_types::UpdateBalanceError;
use cksol_types_internal::log::Priority;
use icrc_ledger_types::icrc1::account::Account;
use sol_rpc_types::{CommitmentLevel, GetSignaturesForAddressParams};
use solana_signature::Signature;
use std::{
cell::RefCell,
collections::{BTreeMap, VecDeque},
time::Duration,
};

thread_local! {
static PENDING_SIGNATURES: RefCell<BTreeMap<Account, VecDeque<Signature>>> =
RefCell::default();
}
use std::time::Duration;

#[cfg(test)]
mod tests;
Expand All @@ -35,8 +19,9 @@ pub const MAX_MONITORED_ACCOUNTS: usize = 100;
/// How often the minter polls monitored addresses for new deposit transactions.
pub const POLL_MONITORED_ADDRESSES_DELAY: Duration = Duration::from_mins(1);

/// Maximum number of `getTransaction` calls to make per polled account.
pub const MAX_TRANSACTIONS_PER_ACCOUNT: usize = 10;
// Re-export test helpers whose backing storage lives in rpc_executor.
#[cfg(any(test, feature = "canbench-rs"))]
pub use crate::rpc_executor::{pending_signatures_for, reset_pending_signatures};

/// Registers the given account for automated deposit monitoring.
///
Expand Down Expand Up @@ -69,9 +54,8 @@ pub fn update_balance<R: CanisterRuntime>(
Ok(())
}

/// Polls all monitored addresses for new deposit transaction signatures.
///
/// For each address, calls `getSignaturesForAddress` on the Solana RPC.
/// Enqueues a [`WorkItem::PollMonitoredAddress`] for every monitored account,
/// then triggers the executor immediately.
pub async fn poll_monitored_addresses<R: CanisterRuntime>(runtime: R) {
let _guard = match TimerGuard::new(TaskType::PollMonitoredAddresses) {
Ok(guard) => guard,
Expand All @@ -84,93 +68,9 @@ pub async fn poll_monitored_addresses<R: CanisterRuntime>(runtime: R) {
return;
}

let more_to_process = all_accounts.len() > MAX_CONCURRENT_RPC_CALLS;
let reschedule = scopeguard::guard(runtime.clone(), |runtime| {
runtime.set_timer(Duration::ZERO, poll_monitored_addresses);
});

let master_key = lazy_get_schnorr_master_key(&runtime).await;

futures::future::join_all(
all_accounts
.into_iter()
.take(MAX_CONCURRENT_RPC_CALLS)
.map(|account| poll_account(&runtime, &master_key, account)),
)
.await;

if !more_to_process {
// All work fits in this round
scopeguard::ScopeGuard::into_inner(reschedule);
for account in all_accounts {
enqueue(WorkItem::PollMonitoredAddress(account));
}
}

async fn poll_account<R: CanisterRuntime>(
runtime: &R,
master_key: &SchnorrPublicKey,
account: Account,
) {
let deposit_address = account_address(master_key, &account);

let params = GetSignaturesForAddressParams {
pubkey: deposit_address.into(),
commitment: Some(CommitmentLevel::Finalized),
min_context_slot: None,
// Fetch no more signatures than we intend to process with `getTransaction`.
limit: Some(
(MAX_TRANSACTIONS_PER_ACCOUNT as u32)
.try_into()
.expect("MAX_TRANSACTIONS_PER_ACCOUNT must be between 1 and 1000"),
),
before: None,
until: None,
};

match get_signatures_for_address(runtime, params).await {
Err(e) => {
log!(
Priority::Info,
"Failed to get signatures for address {deposit_address}: {e}"
);
}
Ok(signatures) => {
let new_sigs: Vec<Signature> = signatures
.into_iter()
.filter(|s| s.err.is_none())
.map(|s| s.signature.into())
.collect();
if !new_sigs.is_empty() {
PENDING_SIGNATURES.with(|pending| {
pending
.borrow_mut()
.entry(account)
.or_default()
.extend(new_sigs);
});
}
}
}

mutate_state(|state| {
process_event(
state,
EventType::StoppedMonitoringAccount { account },
runtime,
);
});
}

#[cfg(any(test, feature = "canbench-rs"))]
pub fn pending_signatures_for(account: &Account) -> Vec<Signature> {
PENDING_SIGNATURES.with(|p| {
p.borrow()
.get(account)
.map(|q| q.iter().copied().collect())
.unwrap_or_default()
})
}

#[cfg(any(test, feature = "canbench-rs"))]
pub fn reset_pending_signatures() {
PENDING_SIGNATURES.with(|p| p.borrow_mut().clear());
runtime.set_timer(Duration::ZERO, execute_rpc_queue);
}
Loading
Loading