From 4e14a1137d9b330bc75928925f87c313ff0c75ab Mon Sep 17 00:00:00 2001 From: Louis Pahlavi Date: Tue, 28 Apr 2026 09:01:39 +0200 Subject: [PATCH] feat: introduce centralized RPC executor queue Replaces the per-timer async RPC logic with a single, bounded work queue. Timer functions (finalize, resubmit, consolidate, withdraw, poll) now only enqueue WorkItems and schedule the executor; all RPC calls are made by `execute_rpc_queue`, which drains up to MAX_CONCURRENT_RPC_CALLS items per run, fetches slot/blockhash once when any item needs it, and reschedules itself while the queue is non-empty. User-facing endpoints (process_deposit) are rate-limited independently via `UserRpcQuotaGuard` so they cannot be starved by timer work. Co-Authored-By: Claude Sonnet 4.6 --- minter/src/consolidate/mod.rs | 116 +----- minter/src/consolidate/tests.rs | 21 +- minter/src/constants.rs | 9 +- minter/src/deposit/automatic/mod.rs | 122 +----- minter/src/deposit/automatic/tests.rs | 19 +- minter/src/deposit/manual/mod.rs | 3 +- minter/src/guard/mod.rs | 49 ++- minter/src/lib.rs | 1 + minter/src/monitor/mod.rs | 285 ++------------ minter/src/monitor/tests.rs | 43 ++- minter/src/rpc_executor/mod.rs | 530 ++++++++++++++++++++++++++ minter/src/rpc_executor/tests.rs | 240 ++++++++++++ minter/src/state/mod.rs | 11 + minter/src/state/tests.rs | 1 + minter/src/withdraw/mod.rs | 134 +------ minter/src/withdraw/tests.rs | 26 +- 16 files changed, 981 insertions(+), 629 deletions(-) create mode 100644 minter/src/rpc_executor/mod.rs create mode 100644 minter/src/rpc_executor/tests.rs diff --git a/minter/src/consolidate/mod.rs b/minter/src/consolidate/mod.rs index dc71f797..3674bcdf 100644 --- a/minter/src/consolidate/mod.rs +++ b/minter/src/consolidate/mod.rs @@ -1,35 +1,20 @@ use crate::{ - constants::MAX_CONCURRENT_RPC_CALLS, guard::TimerGuard, numeric::LedgerMintIndex, - rpc::{SubmitTransactionError, get_recent_slot_and_blockhash, submit_transaction}, + rpc_executor::{MAX_TRANSFERS_PER_CONSOLIDATION, WorkItem, enqueue, execute_rpc_queue}, runtime::CanisterRuntime, - sol_transfer::{CreateTransferError, MAX_SIGNATURES, create_signed_consolidation_transaction}, - state::{ - TaskType, - audit::process_event, - event::{EventType, TransactionPurpose}, - mutate_state, read_state, - }, + state::{TaskType, read_state}, }; -use canlog::log; -use cksol_types_internal::log::Priority; use icrc_ledger_types::icrc1::account::Account; use itertools::Itertools; -use sol_rpc_types::{Lamport, Slot}; -use solana_hash::Hash; -use solana_signature::Signature; -use std::collections::BTreeMap; -use std::time::Duration; -use thiserror::Error; +use sol_rpc_types::Lamport; +use std::{collections::BTreeMap, time::Duration}; #[cfg(test)] mod tests; pub const DEPOSIT_CONSOLIDATION_DELAY: Duration = Duration::from_mins(10); -pub(crate) const MAX_TRANSFERS_PER_CONSOLIDATION: usize = MAX_SIGNATURES as usize; - pub async fn consolidate_deposits(runtime: R) { let _guard = match TimerGuard::new(TaskType::DepositConsolidation) { Ok(guard) => guard, @@ -37,53 +22,19 @@ pub async fn consolidate_deposits(runtime: R) { }; let all_deposits = read_state(|s| group_deposits_by_account(s.deposits_to_consolidate())); - let more_to_process = - all_deposits.len() > MAX_CONCURRENT_RPC_CALLS * MAX_TRANSFERS_PER_CONSOLIDATION; - let reschedule = scopeguard::guard(runtime.clone(), |runtime| { - runtime.set_timer(Duration::ZERO, consolidate_deposits); - }); + if all_deposits.is_empty() { + return; + } - let batches: Vec> = all_deposits + for batch in all_deposits .into_iter() .chunks(MAX_TRANSFERS_PER_CONSOLIDATION) .into_iter() - .take(MAX_CONCURRENT_RPC_CALLS) - .map(Iterator::collect) - .collect(); - - if batches.is_empty() { - // Nothing to process - scopeguard::ScopeGuard::into_inner(reschedule); - return; + { + enqueue(WorkItem::SubmitConsolidationBatch(batch.collect())); } - let (slot, recent_blockhash) = match get_recent_slot_and_blockhash(&runtime).await { - Ok(result) => result, - Err(e) => { - log!(Priority::Info, "Failed to fetch recent blockhash: {e}"); - return; - } - }; - - futures::future::join_all(batches.into_iter().map(async |funds| { - match submit_consolidation_transaction(&runtime, funds, slot, recent_blockhash).await { - Ok(sig) => log!(Priority::Info, "Submitted consolidation transaction {sig}"), - Err(ConsolidationError::CreateTransactionFailed(e)) => log!( - Priority::Error, - "Failed to create deposit consolidation transaction: {e}" - ), - Err(ConsolidationError::SubmitTransactionFailed(e)) => log!( - Priority::Info, - "Failed to submit deposit consolidation transaction (will retry): {e}" - ), - } - })) - .await; - - if !more_to_process { - // All work fits in this round - scopeguard::ScopeGuard::into_inner(reschedule); - } + runtime.set_timer(Duration::ZERO, execute_rpc_queue); } fn group_deposits_by_account( @@ -97,48 +48,3 @@ fn group_deposits_by_account( } by_account.into_iter().collect() } - -#[derive(Debug, Error)] -enum ConsolidationError { - #[error("failed to create transaction: {0}")] - CreateTransactionFailed(#[from] CreateTransferError), - #[error("failed to submit transaction: {0}")] - SubmitTransactionFailed(#[from] SubmitTransactionError), -} - -async fn submit_consolidation_transaction( - runtime: &R, - funds_to_consolidate: Vec<(Account, (Lamport, Vec))>, - slot: Slot, - recent_blockhash: Hash, -) -> Result { - let (sources, mint_indices): (Vec<_>, Vec<_>) = funds_to_consolidate - .into_iter() - .map(|(account, (lamport, indices))| ((account, lamport), indices)) - .unzip(); - let (transaction, signers) = - create_signed_consolidation_transaction(runtime, sources, recent_blockhash).await?; - - let signature = transaction.signatures[0]; - let message = transaction.message.clone(); - - mutate_state(|state| { - process_event( - state, - EventType::SubmittedTransaction { - signature, - message: message.into(), - signers, - slot, - purpose: TransactionPurpose::ConsolidateDeposits { - mint_indices: mint_indices.into_iter().flatten().collect(), - }, - }, - runtime, - ) - }); - - submit_transaction(runtime, transaction).await?; - - Ok(signature) -} diff --git a/minter/src/consolidate/tests.rs b/minter/src/consolidate/tests.rs index 28a56be9..dda7796f 100644 --- a/minter/src/consolidate/tests.rs +++ b/minter/src/consolidate/tests.rs @@ -1,4 +1,5 @@ use super::{MAX_TRANSFERS_PER_CONSOLIDATION, consolidate_deposits}; +use crate::rpc_executor::execute_rpc_queue; use crate::{ constants::MAX_CONCURRENT_RPC_CALLS, numeric::LedgerMintIndex, @@ -88,7 +89,8 @@ async fn should_submit_single_consolidation_request() { ))) .add_signature(fee_payer_signature.into()); - consolidate_deposits(runtime).await; + consolidate_deposits(runtime.clone()).await; + execute_rpc_queue(runtime).await; EventsAssert::from_recorded() .expect_event(|e| assert_matches!(e, EventType::AcceptedManualDeposit { .. })) @@ -124,7 +126,8 @@ async fn should_record_events_even_if_transaction_submission_fails() { .add_stub_response(SendTransactionResult::Inconsistent(vec![])) .add_signature(fee_payer_signature.into()); - consolidate_deposits(runtime).await; + consolidate_deposits(runtime.clone()).await; + execute_rpc_queue(runtime).await; EventsAssert::from_recorded() .expect_event(|e| assert_matches!(e, EventType::AcceptedManualDeposit { .. })) @@ -171,7 +174,8 @@ async fn should_submit_multiple_consolidation_batches() { runtime = runtime.add_signature(signature(i).into()); } - consolidate_deposits(runtime).await; + consolidate_deposits(runtime.clone()).await; + execute_rpc_queue(runtime).await; let mut events_assert = EventsAssert::from_recorded(); // Events from setup @@ -236,7 +240,8 @@ async fn should_consolidate_multiple_deposits_to_same_account_in_single_transfer ))) .add_signature(fee_payer_signature.into()); - consolidate_deposits(runtime).await; + consolidate_deposits(runtime.clone()).await; + execute_rpc_queue(runtime).await; EventsAssert::from_recorded() .expect_event(|e| assert_matches!(e, EventType::AcceptedManualDeposit { .. })) @@ -282,14 +287,16 @@ async fn should_reschedule_until_all_deposits_consolidated() { } consolidate_deposits(runtime.clone()).await; + execute_rpc_queue(runtime.clone()).await; read_state(|s| { assert_eq!(s.submitted_transactions().len(), MAX_CONCURRENT_RPC_CALLS); assert_eq!(s.deposits_to_consolidate().len(), 1); }); - assert_eq!(runtime.set_timer_call_count(), 1); + // consolidate_deposits called set_timer once; execute_rpc_queue called set_timer once + assert_eq!(runtime.set_timer_call_count(), 2); - // Round 2: processes the remaining 1 deposit → no reschedule + // Round 2: process the remaining batch (already in queue from executor's reschedule). let last_sig = signature(num_deposits); let runtime = TestCanisterRuntime::new() .with_increasing_time() @@ -298,7 +305,7 @@ async fn should_reschedule_until_all_deposits_consolidated() { .add_stub_response(SendTransactionResult::Consistent(Ok(last_sig.into()))) .add_signature(last_sig.into()); - consolidate_deposits(runtime.clone()).await; + execute_rpc_queue(runtime.clone()).await; read_state(|s| { assert_eq!( diff --git a/minter/src/constants.rs b/minter/src/constants.rs index 42b6198a..534e8711 100644 --- a/minter/src/constants.rs +++ b/minter/src/constants.rs @@ -1,6 +1,13 @@ -/// Maximum number of concurrent calls to the SOL RPC canister. +/// Maximum number of work items the executor processes in a single batch. pub const MAX_CONCURRENT_RPC_CALLS: usize = 10; +/// Maximum number of concurrent SOL RPC calls from user-facing endpoints +/// (e.g. `process_deposit`) that run outside of the timer-driven executor. +pub const MAX_CONCURRENT_USER_RPC_CALLS: u32 = 10; + +/// Maximum number of `getSignaturesForAddress` results to request per polled account. +pub const MAX_TRANSACTIONS_PER_ACCOUNT: usize = 10; + /// Matches the ICP HTTPS outcall response limit for variable-length RPC calls /// such as `getTransaction` and `getSignatureStatuses`: /// https://docs.internetcomputer.org/references/ic-interface-spec#ic-http_request diff --git a/minter/src/deposit/automatic/mod.rs b/minter/src/deposit/automatic/mod.rs index 32ee80fd..51275805 100644 --- a/minter/src/deposit/automatic/mod.rs +++ b/minter/src/deposit/automatic/mod.rs @@ -1,30 +1,14 @@ use crate::{ - address::{account_address, lazy_get_schnorr_master_key}, - constants::MAX_CONCURRENT_RPC_CALLS, guard::TimerGuard, - rpc::get_signatures_for_address, + rpc_executor::{WorkItem, enqueue, execute_rpc_queue}, runtime::CanisterRuntime, - state::{ - SchnorrPublicKey, TaskType, audit::process_event, event::EventType, mutate_state, - read_state, - }, + state::{TaskType, audit::process_event, event::EventType, mutate_state, read_state}, }; use canlog::log; use cksol_types::UpdateBalanceError; use cksol_types_internal::log::Priority; use icrc_ledger_types::icrc1::account::Account; -use sol_rpc_types::{CommitmentLevel, GetSignaturesForAddressParams}; -use solana_signature::Signature; -use std::{ - cell::RefCell, - collections::{BTreeMap, VecDeque}, - time::Duration, -}; - -thread_local! { - static PENDING_SIGNATURES: RefCell>> = - RefCell::default(); -} +use std::time::Duration; #[cfg(test)] mod tests; @@ -35,8 +19,9 @@ pub const MAX_MONITORED_ACCOUNTS: usize = 100; /// How often the minter polls monitored addresses for new deposit transactions. pub const POLL_MONITORED_ADDRESSES_DELAY: Duration = Duration::from_mins(1); -/// Maximum number of `getTransaction` calls to make per polled account. -pub const MAX_TRANSACTIONS_PER_ACCOUNT: usize = 10; +// Re-export test helpers whose backing storage lives in rpc_executor. +#[cfg(any(test, feature = "canbench-rs"))] +pub use crate::rpc_executor::{pending_signatures_for, reset_pending_signatures}; /// Registers the given account for automated deposit monitoring. /// @@ -69,9 +54,8 @@ pub fn update_balance( Ok(()) } -/// Polls all monitored addresses for new deposit transaction signatures. -/// -/// For each address, calls `getSignaturesForAddress` on the Solana RPC. +/// Enqueues a [`WorkItem::PollMonitoredAddress`] for every monitored account, +/// then triggers the executor immediately. pub async fn poll_monitored_addresses(runtime: R) { let _guard = match TimerGuard::new(TaskType::PollMonitoredAddresses) { Ok(guard) => guard, @@ -84,93 +68,9 @@ pub async fn poll_monitored_addresses(runtime: R) { return; } - let more_to_process = all_accounts.len() > MAX_CONCURRENT_RPC_CALLS; - let reschedule = scopeguard::guard(runtime.clone(), |runtime| { - runtime.set_timer(Duration::ZERO, poll_monitored_addresses); - }); - - let master_key = lazy_get_schnorr_master_key(&runtime).await; - - futures::future::join_all( - all_accounts - .into_iter() - .take(MAX_CONCURRENT_RPC_CALLS) - .map(|account| poll_account(&runtime, &master_key, account)), - ) - .await; - - if !more_to_process { - // All work fits in this round - scopeguard::ScopeGuard::into_inner(reschedule); + for account in all_accounts { + enqueue(WorkItem::PollMonitoredAddress(account)); } -} - -async fn poll_account( - runtime: &R, - master_key: &SchnorrPublicKey, - account: Account, -) { - let deposit_address = account_address(master_key, &account); - - let params = GetSignaturesForAddressParams { - pubkey: deposit_address.into(), - commitment: Some(CommitmentLevel::Finalized), - min_context_slot: None, - // Fetch no more signatures than we intend to process with `getTransaction`. - limit: Some( - (MAX_TRANSACTIONS_PER_ACCOUNT as u32) - .try_into() - .expect("MAX_TRANSACTIONS_PER_ACCOUNT must be between 1 and 1000"), - ), - before: None, - until: None, - }; - match get_signatures_for_address(runtime, params).await { - Err(e) => { - log!( - Priority::Info, - "Failed to get signatures for address {deposit_address}: {e}" - ); - } - Ok(signatures) => { - let new_sigs: Vec = signatures - .into_iter() - .filter(|s| s.err.is_none()) - .map(|s| s.signature.into()) - .collect(); - if !new_sigs.is_empty() { - PENDING_SIGNATURES.with(|pending| { - pending - .borrow_mut() - .entry(account) - .or_default() - .extend(new_sigs); - }); - } - } - } - - mutate_state(|state| { - process_event( - state, - EventType::StoppedMonitoringAccount { account }, - runtime, - ); - }); -} - -#[cfg(any(test, feature = "canbench-rs"))] -pub fn pending_signatures_for(account: &Account) -> Vec { - PENDING_SIGNATURES.with(|p| { - p.borrow() - .get(account) - .map(|q| q.iter().copied().collect()) - .unwrap_or_default() - }) -} - -#[cfg(any(test, feature = "canbench-rs"))] -pub fn reset_pending_signatures() { - PENDING_SIGNATURES.with(|p| p.borrow_mut().clear()); + runtime.set_timer(Duration::ZERO, execute_rpc_queue); } diff --git a/minter/src/deposit/automatic/tests.rs b/minter/src/deposit/automatic/tests.rs index a9b2024f..68e5d145 100644 --- a/minter/src/deposit/automatic/tests.rs +++ b/minter/src/deposit/automatic/tests.rs @@ -1,6 +1,7 @@ use super::*; use crate::{ constants::MAX_CONCURRENT_RPC_CALLS, + rpc_executor::execute_rpc_queue, state::{event::EventType, read_state}, test_fixtures::{ EventsAssert, account, events::start_monitoring_account, init_schnorr_master_key, @@ -8,6 +9,7 @@ use crate::{ }, }; use sol_rpc_types::{ConfirmedTransactionStatusWithSignature, MultiRpcResult, TransactionError}; +use solana_signature::Signature; type SignaturesResult = MultiRpcResult>; @@ -124,21 +126,24 @@ mod poll_monitored_addresses { } assert_eq!(monitored_accounts_count(), num_accounts); - // Round 1: polls MAX_CONCURRENT_RPC_CALLS accounts, 1 remains → reschedule. + // Round 1: poll_monitored_addresses enqueues MAX_CONCURRENT_RPC_CALLS+1 items. + // execute_rpc_queue processes MAX_CONCURRENT_RPC_CALLS items, 1 remains → reschedule. let mut runtime = TestCanisterRuntime::new().with_increasing_time(); for _ in 0..MAX_CONCURRENT_RPC_CALLS { runtime = runtime.add_stub_response(SignaturesResult::Consistent(Ok(vec![]))); } poll_monitored_addresses(runtime.clone()).await; + execute_rpc_queue(runtime.clone()).await; assert_eq!(monitored_accounts_count(), 1); - assert_eq!(runtime.set_timer_call_count(), 1); + // poll_monitored_addresses called set_timer once; execute_rpc_queue called set_timer once + assert_eq!(runtime.set_timer_call_count(), 2); - // Round 2: polls the remaining 1 account → no reschedule, queue empty. + // Round 2: process the remaining account (already in queue from executor's reschedule). let runtime = TestCanisterRuntime::new() .with_increasing_time() .add_stub_response(SignaturesResult::Consistent(Ok(vec![]))); - poll_monitored_addresses(runtime.clone()).await; + execute_rpc_queue(runtime.clone()).await; assert_eq!(monitored_accounts_count(), 0); assert_eq!(runtime.set_timer_call_count(), 0); @@ -170,7 +175,8 @@ mod poll_monitored_addresses { confirmed_tx(s2), ]))); - poll_monitored_addresses(runtime).await; + poll_monitored_addresses(runtime.clone()).await; + execute_rpc_queue(runtime).await; assert_eq!(pending_signatures_for(&acc), vec![s1, s2]); } @@ -192,7 +198,8 @@ mod poll_monitored_addresses { failed_tx(s_fail), ]))); - poll_monitored_addresses(runtime).await; + poll_monitored_addresses(runtime.clone()).await; + execute_rpc_queue(runtime).await; assert_eq!(pending_signatures_for(&acc), vec![s_ok]); } diff --git a/minter/src/deposit/manual/mod.rs b/minter/src/deposit/manual/mod.rs index 767f6c4c..c4100292 100644 --- a/minter/src/deposit/manual/mod.rs +++ b/minter/src/deposit/manual/mod.rs @@ -2,7 +2,7 @@ use crate::{ constants::GET_TRANSACTION_CYCLES, cycles::{charge_caller_cycles, check_caller_available_cycles}, deposit::fetch_and_validate_deposit, - guard::process_deposit_guard, + guard::{UserRpcQuotaGuard, process_deposit_guard}, ledger::mint, runtime::CanisterRuntime, state::{ @@ -27,6 +27,7 @@ pub async fn process_deposit( signature: Signature, ) -> Result { let _guard = process_deposit_guard(account)?; + let _rpc_quota = UserRpcQuotaGuard::new().map_err(|e| ProcessDepositError::from(e))?; let deposit_id = DepositId { account, signature }; diff --git a/minter/src/guard/mod.rs b/minter/src/guard/mod.rs index aeaf0b9e..bd9ecdf9 100644 --- a/minter/src/guard/mod.rs +++ b/minter/src/guard/mod.rs @@ -1,4 +1,7 @@ -use crate::state::{State, TaskType, mutate_state}; +use crate::{ + constants::MAX_CONCURRENT_USER_RPC_CALLS, + state::{State, TaskType, mutate_state}, +}; use cksol_types::{ProcessDepositError, WithdrawalError}; use icrc_ledger_types::icrc1::account::Account; use std::{collections::BTreeSet, marker::PhantomData}; @@ -130,3 +133,47 @@ impl Drop for TimerGuard { }); } } + +/// Error returned when the per-endpoint RPC quota is exhausted. +#[derive(Debug, Eq, PartialEq)] +pub enum UserRpcQuotaError { + TooManyConcurrentRequests, +} + +impl From for ProcessDepositError { + fn from(_: UserRpcQuotaError) -> Self { + Self::TemporarilyUnavailable("too many concurrent requests".to_string()) + } +} + +/// Guards a single SOL RPC call made from a user-facing endpoint (e.g. `process_deposit`). +/// +/// Acquiring this guard increments the active-user-RPC-call counter in canister state; +/// dropping it decrements the counter. [`UserRpcQuotaGuard::new`] fails with +/// [`UserRpcQuotaError::TooManyConcurrentRequests`] when +/// [`MAX_CONCURRENT_USER_RPC_CALLS`] guards are already held. +#[must_use] +pub struct UserRpcQuotaGuard; + +impl UserRpcQuotaGuard { + pub fn new() -> Result { + mutate_state(|s| { + if s.active_user_rpc_calls() >= MAX_CONCURRENT_USER_RPC_CALLS { + return Err(UserRpcQuotaError::TooManyConcurrentRequests); + } + *s.active_user_rpc_calls_mut() += 1; + Ok(Self) + }) + } +} + +impl Drop for UserRpcQuotaGuard { + fn drop(&mut self) { + mutate_state(|s| { + let count = s.active_user_rpc_calls_mut(); + *count = count + .checked_sub(1) + .expect("BUG: user RPC call counter underflow"); + }); + } +} diff --git a/minter/src/lib.rs b/minter/src/lib.rs index 08a6f526..960d6761 100644 --- a/minter/src/lib.rs +++ b/minter/src/lib.rs @@ -11,6 +11,7 @@ pub mod metrics; pub mod monitor; mod numeric; mod rpc; +pub mod rpc_executor; pub mod runtime; mod signer; mod sol_transfer; diff --git a/minter/src/monitor/mod.rs b/minter/src/monitor/mod.rs index b3ef33b0..3c745f90 100644 --- a/minter/src/monitor/mod.rs +++ b/minter/src/monitor/mod.rs @@ -1,45 +1,24 @@ use crate::{ - address::derivation_path, - constants::MAX_CONCURRENT_RPC_CALLS, guard::TimerGuard, - rpc::{ - SubmitTransactionError, get_recent_slot_and_blockhash, get_signature_statuses, - submit_transaction, - }, + rpc_executor::{MAX_SIGNATURES_PER_STATUS_CHECK, WorkItem, enqueue, execute_rpc_queue}, runtime::CanisterRuntime, - signer::sign_bytes, - state::{ - TaskType, - audit::process_event, - event::{EventType, VersionedMessage}, - mutate_state, read_state, - }, + state::{TaskType, event::VersionedMessage, read_state}, }; -use canlog::log; -use cksol_types_internal::log::Priority; -use ic_cdk_management_canister::SignCallError; use icrc_ledger_types::icrc1::account::Account; use itertools::Itertools; use sol_rpc_types::Slot; use solana_signature::Signature; -use solana_transaction::Transaction; -use solana_transaction_status_client_types::TransactionConfirmationStatus; -use std::collections::{BTreeMap, BTreeSet}; -use std::time::Duration; -use thiserror::Error; +use std::{collections::BTreeMap, time::Duration}; #[cfg(test)] mod tests; pub const FINALIZE_TRANSACTIONS_DELAY: Duration = Duration::from_mins(2); pub const RESUBMIT_TRANSACTIONS_DELAY: Duration = Duration::from_mins(3); -const MAX_BLOCKHASH_AGE: Slot = 150; -/// Maximum number of signatures per `getSignatureStatuses` RPC call. -/// See https://solana.com/docs/rpc/http/getsignaturestatuses -const MAX_SIGNATURES_PER_STATUS_CHECK: usize = 256; -/// Check the status of all submitted transactions, finalize succeeded/failed -/// ones, and mark expired transactions for resubmission. +/// Read all submitted transactions from state and enqueue +/// [`WorkItem::CheckSignatureStatuses`] items for the executor, then trigger +/// the executor immediately. pub async fn finalize_transactions(runtime: R) { let _guard = match TimerGuard::new(TaskType::FinalizeTransactions) { Ok(guard) => guard, @@ -57,88 +36,36 @@ pub async fn finalize_transactions(runtime: R) { return; } - let more_to_process = - all_transactions.len() > MAX_CONCURRENT_RPC_CALLS * MAX_SIGNATURES_PER_STATUS_CHECK; - let reschedule = scopeguard::guard(runtime.clone(), |runtime| { - runtime.set_timer(Duration::ZERO, finalize_transactions); - }); - - // Fetch the current slot before checking statuses: if a transaction finalizes - // after we snapshot the slot, the status check will see it as finalized rather - // than missing, so it will never be incorrectly marked as expired. - let (current_slot, _) = match get_recent_slot_and_blockhash(&runtime).await { - Ok(result) => result, - Err(e) => { - log!(Priority::Info, "Failed to get current slot: {e}"); - return; - } - }; - let signatures: Vec = all_transactions.keys().copied().collect(); - let statuses = check_transaction_statuses(&runtime, signatures).await; - - for (signature, error) in &statuses.errored { - log!( - Priority::Error, - "Transaction {signature} finalized with on-chain error: {error}" - ); - mutate_state(|state| { - process_event( - state, - EventType::FailedTransaction { - signature: *signature, - }, - &runtime, - ) - }); - } - - for signature in &statuses.succeeded { - log!(Priority::Info, "Transaction {signature} finalized"); - mutate_state(|state| { - process_event( - state, - EventType::SucceededTransaction { - signature: *signature, - }, - &runtime, - ) + for batch in signatures + .into_iter() + .chunks(MAX_SIGNATURES_PER_STATUS_CHECK) + .into_iter() + { + let batch: Vec = batch.collect(); + let submitted_slots: BTreeMap = batch + .iter() + .map(|sig| (*sig, all_transactions[sig])) + .collect(); + enqueue(WorkItem::CheckSignatureStatuses { + signatures: batch, + submitted_slots, }); } - for signature in &statuses.not_found { - if all_transactions[signature] + MAX_BLOCKHASH_AGE < current_slot { - log!( - Priority::Info, - "Transaction {signature} expired, marking for resubmission" - ); - mutate_state(|state| { - process_event( - state, - EventType::ExpiredTransaction { - signature: *signature, - }, - &runtime, - ) - }); - } - } - - if !more_to_process { - // All work fits in this round - scopeguard::ScopeGuard::into_inner(reschedule); - } + runtime.set_timer(Duration::ZERO, execute_rpc_queue); } -/// Resubmit transactions that have been marked for resubmission by -/// [`finalize_transactions`]. +/// Read all transactions-to-resubmit from state and enqueue +/// [`WorkItem::ResubmitTransaction`] items for the executor, then trigger the +/// executor immediately. pub async fn resubmit_transactions(runtime: R) { let _guard = match TimerGuard::new(TaskType::ResubmitTransactions) { Ok(guard) => guard, Err(_) => return, }; - let to_resubmit: Vec<_> = read_state(|state| { + let to_resubmit: Vec<(Signature, VersionedMessage, Vec)> = read_state(|state| { state .transactions_to_resubmit() .iter() @@ -149,163 +76,13 @@ pub async fn resubmit_transactions(runtime: R) { return; } - let more_to_process = to_resubmit.len() > MAX_CONCURRENT_RPC_CALLS; - let reschedule = scopeguard::guard(runtime.clone(), |runtime| { - runtime.set_timer(Duration::ZERO, resubmit_transactions); - }); - - resubmit_expired_transactions(&runtime, to_resubmit).await; - - if !more_to_process { - // All work fits in this round - scopeguard::ScopeGuard::into_inner(reschedule); - } -} - -/// Result of checking transaction statuses. -// Transactions that are in-flight (Processed/Confirmed) or whose status -// check failed are implicitly excluded from the below sets. -struct TransactionStatuses { - /// Transactions confirmed as finalized on-chain without errors. - succeeded: BTreeSet, - /// Transactions that finalized with an on-chain error. - errored: BTreeMap, - /// Transactions with no on-chain status (safe to resubmit if expired). - not_found: BTreeSet, -} - -async fn check_transaction_statuses( - runtime: &R, - signatures: Vec, -) -> TransactionStatuses { - let batches: Vec> = signatures - .into_iter() - .chunks(MAX_SIGNATURES_PER_STATUS_CHECK) - .into_iter() - .take(MAX_CONCURRENT_RPC_CALLS) - .map(Iterator::collect) - .collect(); - - let mut result = TransactionStatuses { - succeeded: BTreeSet::new(), - errored: BTreeMap::new(), - not_found: BTreeSet::new(), - }; - - let batch_results: Vec<_> = futures::future::join_all(batches.into_iter().map(async |batch| { - match get_signature_statuses(runtime, &batch).await { - Ok(statuses) => Some((batch, statuses)), - Err(e) => { - log!(Priority::Info, "Failed to check transaction statuses: {e}"); - None - } - } - })) - .await; - - for (sigs, statuses) in batch_results.into_iter().flatten() { - for (signature, status) in sigs.iter().zip(statuses) { - match status { - Some(s) - if s.confirmation_status == Some(TransactionConfirmationStatus::Finalized) => - { - if let Some(err) = s.err { - result.errored.insert(*signature, format!("{err:?}")); - } else { - result.succeeded.insert(*signature); - } - } - Some(_) => {} // in-flight (Processed/Confirmed) - None => { - result.not_found.insert(*signature); - } - } - } + for (old_signature, message, signers) in to_resubmit { + enqueue(WorkItem::ResubmitTransaction { + old_signature, + message, + signers, + }); } - result -} - -async fn resubmit_expired_transactions( - runtime: &R, - to_resubmit: Vec<(Signature, VersionedMessage, Vec)>, -) { - let (new_slot, new_blockhash) = match get_recent_slot_and_blockhash(runtime).await { - Ok(result) => result, - Err(e) => { - log!(Priority::Info, "Failed to get recent blockhash: {e}"); - return; - } - }; - - futures::future::join_all(to_resubmit.into_iter().take(MAX_CONCURRENT_RPC_CALLS).map( - async |(old_signature, message, signers)| { - match try_resubmit_transaction( - runtime, - old_signature, - message, - signers, - new_slot, - new_blockhash, - ) - .await - { - Ok(new_sig) => log!( - Priority::Info, - "Resubmitted transaction {old_signature} as {new_sig}" - ), - Err(e) => log!( - Priority::Info, - "Failed to resubmit transaction {old_signature}: {e}" - ), - } - }, - )) - .await; -} - -async fn try_resubmit_transaction( - runtime: &R, - old_signature: Signature, - versioned_message: VersionedMessage, - signers: Vec, - new_slot: Slot, - new_blockhash: solana_hash::Hash, -) -> Result { - let VersionedMessage::Legacy(mut message) = versioned_message; - message.recent_blockhash = new_blockhash; - - let mut transaction = Transaction::new_unsigned(message); - transaction.signatures = sign_bytes( - signers.iter().map(derivation_path), - &runtime.signer(), - transaction.message_data(), - ) - .await?; - - let new_signature = transaction.signatures[0]; - - mutate_state(|state| { - process_event( - state, - EventType::ResubmittedTransaction { - old_signature, - new_signature, - new_slot, - }, - runtime, - ) - }); - - submit_transaction(runtime, transaction).await?; - - Ok(new_signature) -} - -#[derive(Debug, Error)] -enum ResubmitError { - #[error("failed to submit new transaction: {0}")] - Submit(#[from] SubmitTransactionError), - #[error("failed to sign transaction: {0}")] - Signing(#[from] SignCallError), + runtime.set_timer(Duration::ZERO, execute_rpc_queue); } diff --git a/minter/src/monitor/tests.rs b/minter/src/monitor/tests.rs index b737f605..e2d5328a 100644 --- a/minter/src/monitor/tests.rs +++ b/minter/src/monitor/tests.rs @@ -1,7 +1,5 @@ -use super::{ - MAX_BLOCKHASH_AGE, MAX_SIGNATURES_PER_STATUS_CHECK, finalize_transactions, - resubmit_transactions, -}; +use super::{finalize_transactions, resubmit_transactions}; +use crate::rpc_executor::{MAX_BLOCKHASH_AGE, MAX_SIGNATURES_PER_STATUS_CHECK, execute_rpc_queue}; use crate::{ constants::MAX_CONCURRENT_RPC_CALLS, state::{TaskType, event::EventType, mutate_state, read_state, reset_state}, @@ -83,7 +81,8 @@ mod finalization { submit_consolidation_transaction_with_signature(i, RECENT_SLOT); } - // Round 1: finalizes MAX_CONCURRENT_RPC_CALLS batches, 1 transaction unchecked → reschedule + // Round 1: finalize_transactions enqueues MAX_CONCURRENT_RPC_CALLS+1 batches. + // execute_rpc_queue processes MAX_CONCURRENT_RPC_CALLS batches, 1 batch remains → reschedule. let mut runtime = TestCanisterRuntime::new() .with_increasing_time() .add_stub_response(SlotResult::Consistent(Ok(CURRENT_SLOT))) @@ -95,11 +94,13 @@ mod finalization { } finalize_transactions(runtime.clone()).await; + execute_rpc_queue(runtime.clone()).await; assert_eq!(read_state(|s| s.submitted_transactions().len()), 1); - assert_eq!(runtime.set_timer_call_count(), 1); + // finalize_transactions called set_timer once; execute_rpc_queue called set_timer once + assert_eq!(runtime.set_timer_call_count(), 2); - // Round 2: finalizes the remaining 1 transaction → no reschedule + // Round 2: process the remaining batch (already in queue from the executor's reschedule). let runtime = TestCanisterRuntime::new() .with_increasing_time() .add_stub_response(SlotResult::Consistent(Ok(CURRENT_SLOT))) @@ -108,7 +109,7 @@ mod finalization { finalized_status(), )]))); - finalize_transactions(runtime.clone()).await; + execute_rpc_queue(runtime.clone()).await; assert!(read_state(|s| s.submitted_transactions().is_empty())); assert_eq!(runtime.set_timer_call_count(), 0); @@ -128,7 +129,8 @@ mod finalization { finalized_status(), )]))); - finalize_transactions(runtime).await; + finalize_transactions(runtime.clone()).await; + execute_rpc_queue(runtime).await; EventsAssert::from_recorded() .expect_contains_event_eq(EventType::SucceededTransaction { signature }); @@ -199,7 +201,8 @@ mod finalization { }, )]))); - finalize_transactions(runtime).await; + finalize_transactions(runtime.clone()).await; + execute_rpc_queue(runtime).await; EventsAssert::from_recorded() .expect_contains_event_eq(EventType::FailedTransaction { signature }); @@ -233,7 +236,8 @@ mod finalization { ]))); // sig_b is not_found but RECENT_SLOT is not expired, so no resubmission. - finalize_transactions(runtime).await; + finalize_transactions(runtime.clone()).await; + execute_rpc_queue(runtime).await; EventsAssert::from_recorded() .expect_contains_event_eq(EventType::SucceededTransaction { @@ -328,7 +332,8 @@ mod resubmission { .add_stub_response(SendTransactionResult::Consistent(Ok(new_signature.into()))) .add_signature(new_signature.into()); - resubmit_transactions(resubmit_runtime).await; + resubmit_transactions(resubmit_runtime.clone()).await; + execute_rpc_queue(resubmit_runtime).await; EventsAssert::from_recorded() .expect_contains_event_eq(EventType::ExpiredTransaction { @@ -388,7 +393,8 @@ mod resubmission { .add_stub_response(SendTransactionResult::Inconsistent(vec![])) .add_signature(new_signature.into()); - resubmit_transactions(resubmit_runtime).await; + resubmit_transactions(resubmit_runtime.clone()).await; + execute_rpc_queue(resubmit_runtime).await; EventsAssert::from_recorded() .expect_contains_event_eq(EventType::ExpiredTransaction { @@ -411,7 +417,8 @@ mod resubmission { events::expire_transaction(sig); } - // Round 1: resubmits MAX_CONCURRENT_RPC_CALLS transactions, 1 remain → reschedule + // Round 1: resubmit_transactions enqueues MAX_CONCURRENT_RPC_CALLS+1 items. + // execute_rpc_queue processes MAX_CONCURRENT_RPC_CALLS items, 1 remains → reschedule. let mut runtime = TestCanisterRuntime::new() .with_increasing_time() .add_stub_response(SlotResult::Consistent(Ok(RESUBMISSION_SLOT))) @@ -425,6 +432,7 @@ mod resubmission { } resubmit_transactions(runtime.clone()).await; + execute_rpc_queue(runtime.clone()).await; read_state(|s| { assert_eq!(s.submitted_transactions().len(), MAX_CONCURRENT_RPC_CALLS); @@ -433,9 +441,10 @@ mod resubmission { num_transactions - MAX_CONCURRENT_RPC_CALLS ); }); - assert_eq!(runtime.set_timer_call_count(), 1); + // resubmit_transactions called set_timer once; execute_rpc_queue called set_timer once + assert_eq!(runtime.set_timer_call_count(), 2); - // Round 2: resubmits remaining transaction → no reschedule + // Round 2: process the remaining item (already in queue from executor's reschedule). let mut runtime = TestCanisterRuntime::new() .with_increasing_time() .add_stub_response(SlotResult::Consistent(Ok(RESUBMISSION_SLOT))) @@ -448,7 +457,7 @@ mod resubmission { .add_signature(signature(0xB0 + i).into()); } - resubmit_transactions(runtime.clone()).await; + execute_rpc_queue(runtime.clone()).await; assert!(read_state(|s| s.transactions_to_resubmit().is_empty())); assert_eq!(runtime.set_timer_call_count(), 0); diff --git a/minter/src/rpc_executor/mod.rs b/minter/src/rpc_executor/mod.rs new file mode 100644 index 00000000..149789c6 --- /dev/null +++ b/minter/src/rpc_executor/mod.rs @@ -0,0 +1,530 @@ +use crate::{ + address::{account_address, derivation_path, lazy_get_schnorr_master_key}, + constants::{MAX_CONCURRENT_RPC_CALLS, MAX_TRANSACTIONS_PER_ACCOUNT}, + guard::TimerGuard, + numeric::LedgerMintIndex, + rpc::{ + get_recent_slot_and_blockhash, get_signature_statuses, get_signatures_for_address, + submit_transaction, + }, + runtime::CanisterRuntime, + signer::sign_bytes, + sol_transfer::{ + MAX_SIGNATURES, create_signed_batch_withdrawal_transaction, + create_signed_consolidation_transaction, + }, + state::{ + TaskType, + audit::process_event, + event::{EventType, TransactionPurpose, VersionedMessage, WithdrawalRequest}, + mutate_state, + }, +}; +use canlog::log; +use cksol_types_internal::log::Priority; +use icrc_ledger_types::icrc1::account::Account; +use sol_rpc_types::{CommitmentLevel, GetSignaturesForAddressParams, Lamport, Slot}; +use solana_address::Address; +use solana_hash::Hash; +use solana_signature::Signature; +use solana_transaction::Transaction; +use solana_transaction_status_client_types::TransactionConfirmationStatus; +use std::{ + cell::RefCell, + collections::{BTreeMap, VecDeque}, + time::Duration, +}; + +#[cfg(test)] +mod tests; + +/// Maximum number of signatures in a single `getSignatureStatuses` RPC call. +/// See +pub const MAX_SIGNATURES_PER_STATUS_CHECK: usize = 256; + +/// Maximum number of transfers included in a single consolidation transaction. +pub const MAX_TRANSFERS_PER_CONSOLIDATION: usize = MAX_SIGNATURES as usize; + +/// How many slots a blockhash remains valid after it was included in a transaction. +/// A transaction whose blockhash is more than this many slots old is considered expired. +pub const MAX_BLOCKHASH_AGE: Slot = 150; + +thread_local! { + static WORK_QUEUE: RefCell> = RefCell::default(); + + /// Pending deposit signatures discovered by the automated polling loop, + /// keyed by the account they belong to. + static PENDING_SIGNATURES: RefCell>> = + RefCell::default(); +} + +/// A unit of work to be executed by [`execute_rpc_queue`]. +#[derive(Clone, Debug)] +pub enum WorkItem { + /// Check the on-chain status of a batch of submitted transactions. + CheckSignatureStatuses { + /// Signatures to check. + signatures: Vec, + /// Slot at which each transaction was submitted, used to detect expiry. + submitted_slots: BTreeMap, + }, + /// Poll a monitored address for new deposit transaction signatures. + PollMonitoredAddress(Account), + /// Submit a consolidation transaction for a batch of minted deposits. + SubmitConsolidationBatch(Vec<(Account, (Lamport, Vec))>), + /// Submit a batch withdrawal transaction. + SubmitWithdrawalBatch(Vec), + /// Re-sign and resubmit an expired transaction with a fresh blockhash. + ResubmitTransaction { + old_signature: Signature, + message: VersionedMessage, + signers: Vec, + }, +} + +impl WorkItem { + /// Returns `true` for items that require a current slot and recent blockhash + /// from the executor before they can be executed. + fn needs_slot_and_blockhash(&self) -> bool { + !matches!(self, WorkItem::PollMonitoredAddress(_)) + } +} + +/// Push a work item onto the back of the executor queue. +pub fn enqueue(item: WorkItem) { + WORK_QUEUE.with(|q| q.borrow_mut().push_back(item)); +} + +/// Drain up to `max` items from the front of the executor queue. +fn dequeue_batch(max: usize) -> Vec { + WORK_QUEUE.with(|q| { + let mut q = q.borrow_mut(); + let n = max.min(q.len()); + q.drain(..n).collect() + }) +} + +/// Returns `true` if the executor queue is empty. +pub fn queue_is_empty() -> bool { + WORK_QUEUE.with(|q| q.borrow().is_empty()) +} + +/// Consume up to [`MAX_CONCURRENT_RPC_CALLS`] items from the work queue and +/// execute them concurrently. +/// +/// If any queued item requires a recent slot or blockhash, one +/// `getLatestBlockhash` call is made first and the result is shared across +/// all items in the batch. Items whose prerequisite fetch fails are +/// re-enqueued for the next run; items that do not need a slot/blockhash are +/// silently dropped from this batch and will be re-enqueued by their +/// respective scheduler timers on their next firing. +pub async fn execute_rpc_queue(runtime: R) { + let _guard = match TimerGuard::new(TaskType::ExecuteRpcQueue) { + Ok(g) => g, + Err(_) => return, + }; + + let items = dequeue_batch(MAX_CONCURRENT_RPC_CALLS); + if items.is_empty() { + return; + } + + let needs_slot_and_blockhash = items.iter().any(WorkItem::needs_slot_and_blockhash); + let slot_and_blockhash = if needs_slot_and_blockhash { + match get_recent_slot_and_blockhash(&runtime).await { + Ok(result) => Some(result), + Err(e) => { + log!( + Priority::Info, + "Executor: failed to fetch slot and blockhash: {e}" + ); + // Re-enqueue only items that needed slot/blockhash so they are + // retried on the next executor run. + for item in items.into_iter().filter(WorkItem::needs_slot_and_blockhash) { + enqueue(item); + } + return; + } + } + } else { + None + }; + + futures::future::join_all( + items + .into_iter() + .map(|item| execute_work_item(&runtime, item, slot_and_blockhash)), + ) + .await; + + if !queue_is_empty() { + runtime.set_timer(Duration::ZERO, execute_rpc_queue); + } +} + +async fn execute_work_item( + runtime: &R, + item: WorkItem, + slot_and_blockhash: Option<(Slot, Hash)>, +) { + match item { + WorkItem::CheckSignatureStatuses { + signatures, + submitted_slots, + } => { + let (current_slot, _) = + slot_and_blockhash.expect("BUG: slot required for CheckSignatureStatuses"); + execute_check_signature_statuses(runtime, signatures, submitted_slots, current_slot) + .await; + } + WorkItem::PollMonitoredAddress(account) => { + execute_poll_monitored_address(runtime, account).await; + } + WorkItem::SubmitConsolidationBatch(funds) => { + let (slot, blockhash) = slot_and_blockhash + .expect("BUG: slot+blockhash required for SubmitConsolidationBatch"); + execute_submit_consolidation_batch(runtime, funds, slot, blockhash).await; + } + WorkItem::SubmitWithdrawalBatch(requests) => { + let (slot, blockhash) = + slot_and_blockhash.expect("BUG: slot+blockhash required for SubmitWithdrawalBatch"); + execute_submit_withdrawal_batch(runtime, requests, slot, blockhash).await; + } + WorkItem::ResubmitTransaction { + old_signature, + message, + signers, + } => { + let (new_slot, new_blockhash) = + slot_and_blockhash.expect("BUG: slot+blockhash required for ResubmitTransaction"); + execute_resubmit_transaction( + runtime, + old_signature, + message, + signers, + new_slot, + new_blockhash, + ) + .await; + } + } +} + +// --------------------------------------------------------------------------- +// Execution logic +// --------------------------------------------------------------------------- + +async fn execute_check_signature_statuses( + runtime: &R, + signatures: Vec, + submitted_slots: BTreeMap, + current_slot: Slot, +) { + match get_signature_statuses(runtime, &signatures).await { + Err(e) => { + log!(Priority::Info, "Failed to check transaction statuses: {e}"); + } + Ok(statuses) => { + for (signature, status) in signatures.iter().zip(statuses) { + match status { + Some(s) + if s.confirmation_status + == Some(TransactionConfirmationStatus::Finalized) => + { + if let Some(err) = s.err { + log!( + Priority::Error, + "Transaction {signature} finalized with on-chain error: {err:?}" + ); + mutate_state(|state| { + process_event( + state, + EventType::FailedTransaction { + signature: *signature, + }, + runtime, + ) + }); + } else { + log!(Priority::Info, "Transaction {signature} finalized"); + mutate_state(|state| { + process_event( + state, + EventType::SucceededTransaction { + signature: *signature, + }, + runtime, + ) + }); + } + } + Some(_) => {} // in-flight (Processed/Confirmed) + None => { + if let Some(&submitted_slot) = submitted_slots.get(signature) { + if submitted_slot + MAX_BLOCKHASH_AGE < current_slot { + log!( + Priority::Info, + "Transaction {signature} expired, marking for resubmission" + ); + mutate_state(|state| { + process_event( + state, + EventType::ExpiredTransaction { + signature: *signature, + }, + runtime, + ) + }); + } + } + } + } + } + } + } +} + +async fn execute_poll_monitored_address(runtime: &R, account: Account) { + let master_key = lazy_get_schnorr_master_key(runtime).await; + let deposit_address = account_address(&master_key, &account); + + let params = GetSignaturesForAddressParams { + pubkey: deposit_address.into(), + commitment: Some(CommitmentLevel::Finalized), + min_context_slot: None, + limit: Some( + (MAX_TRANSACTIONS_PER_ACCOUNT as u32) + .try_into() + .expect("MAX_TRANSACTIONS_PER_ACCOUNT must be between 1 and 1000"), + ), + before: None, + until: None, + }; + + match get_signatures_for_address(runtime, params).await { + Err(e) => { + log!( + Priority::Info, + "Failed to get signatures for address {deposit_address}: {e}" + ); + } + Ok(signatures) => { + let new_sigs: Vec = signatures + .into_iter() + .filter(|s| s.err.is_none()) + .map(|s| s.signature.into()) + .collect(); + if !new_sigs.is_empty() { + PENDING_SIGNATURES.with(|pending| { + pending + .borrow_mut() + .entry(account) + .or_default() + .extend(new_sigs); + }); + } + } + } + + mutate_state(|state| { + process_event( + state, + EventType::StoppedMonitoringAccount { account }, + runtime, + ); + }); +} + +async fn execute_submit_consolidation_batch( + runtime: &R, + funds_to_consolidate: Vec<(Account, (Lamport, Vec))>, + slot: Slot, + recent_blockhash: Hash, +) { + let (sources, mint_indices): (Vec<_>, Vec<_>) = funds_to_consolidate + .into_iter() + .map(|(account, (lamport, indices))| ((account, lamport), indices)) + .unzip(); + + let (transaction, signers) = + match create_signed_consolidation_transaction(runtime, sources, recent_blockhash).await { + Ok(tx) => tx, + Err(e) => { + log!( + Priority::Error, + "Failed to create deposit consolidation transaction: {e}" + ); + return; + } + }; + + let signature = transaction.signatures[0]; + let message = transaction.message.clone(); + + mutate_state(|state| { + process_event( + state, + EventType::SubmittedTransaction { + signature, + message: message.into(), + signers, + slot, + purpose: TransactionPurpose::ConsolidateDeposits { + mint_indices: mint_indices.into_iter().flatten().collect(), + }, + }, + runtime, + ) + }); + + match submit_transaction(runtime, transaction).await { + Ok(_) => log!( + Priority::Info, + "Submitted consolidation transaction {signature}" + ), + Err(e) => log!( + Priority::Info, + "Failed to submit deposit consolidation transaction (will retry): {e}" + ), + } +} + +async fn execute_submit_withdrawal_batch( + runtime: &R, + requests: Vec, + slot: Slot, + recent_blockhash: Hash, +) { + let targets: Vec<_> = requests + .iter() + .map(|r| (Address::from(r.solana_address), r.amount_to_transfer)) + .collect(); + + let (signed_tx, signers) = match create_signed_batch_withdrawal_transaction( + runtime, + &targets, + recent_blockhash, + ) + .await + { + Ok(tx) => tx, + Err(e) => { + let burn_indices: Vec<_> = requests.iter().map(|r| r.burn_block_index).collect(); + log!( + Priority::Error, + "Failed to create batch withdrawal transaction for burn indices {burn_indices:?}: {e}" + ); + return; + } + }; + + let signature = signed_tx.signatures[0]; + let message = VersionedMessage::Legacy(signed_tx.message.clone()); + let burn_indices: Vec<_> = requests.iter().map(|r| r.burn_block_index).collect(); + + mutate_state(|state| { + process_event( + state, + EventType::SubmittedTransaction { + signature, + message, + signers, + slot, + purpose: TransactionPurpose::WithdrawSol { + burn_indices: burn_indices.clone(), + }, + }, + runtime, + ) + }); + + match submit_transaction(runtime, signed_tx).await { + Ok(_) => log!( + Priority::Info, + "Submitted withdrawal transaction {signature} for burn indices {burn_indices:?}" + ), + Err(e) => log!( + Priority::Info, + "Failed to send withdrawal transaction {signature} (will be resubmitted): {e}" + ), + } +} + +async fn execute_resubmit_transaction( + runtime: &R, + old_signature: Signature, + versioned_message: VersionedMessage, + signers: Vec, + new_slot: Slot, + new_blockhash: Hash, +) { + let VersionedMessage::Legacy(mut message) = versioned_message; + message.recent_blockhash = new_blockhash; + + let mut transaction = Transaction::new_unsigned(message); + transaction.signatures = match sign_bytes( + signers.iter().map(derivation_path), + &runtime.signer(), + transaction.message_data(), + ) + .await + { + Ok(sigs) => sigs, + Err(e) => { + log!( + Priority::Info, + "Failed to sign resubmission of {old_signature}: {e}" + ); + return; + } + }; + + let new_signature = transaction.signatures[0]; + + mutate_state(|state| { + process_event( + state, + EventType::ResubmittedTransaction { + old_signature, + new_signature, + new_slot, + }, + runtime, + ) + }); + + match submit_transaction(runtime, transaction).await { + Ok(_) => log!( + Priority::Info, + "Resubmitted transaction {old_signature} as {new_signature}" + ), + Err(e) => log!( + Priority::Info, + "Failed to resubmit transaction {old_signature}: {e}" + ), + } +} + +// --------------------------------------------------------------------------- +// Test / bench helpers +// --------------------------------------------------------------------------- + +#[cfg(any(test, feature = "canbench-rs"))] +pub fn pending_signatures_for(account: &Account) -> Vec { + PENDING_SIGNATURES.with(|p| { + p.borrow() + .get(account) + .map(|q| q.iter().copied().collect()) + .unwrap_or_default() + }) +} + +#[cfg(any(test, feature = "canbench-rs"))] +pub fn reset_pending_signatures() { + PENDING_SIGNATURES.with(|p| p.borrow_mut().clear()); +} + +#[cfg(any(test, feature = "canbench-rs"))] +pub fn reset_work_queue() { + WORK_QUEUE.with(|q| q.borrow_mut().clear()); +} diff --git a/minter/src/rpc_executor/tests.rs b/minter/src/rpc_executor/tests.rs new file mode 100644 index 00000000..cdd9cdba --- /dev/null +++ b/minter/src/rpc_executor/tests.rs @@ -0,0 +1,240 @@ +use super::*; +use crate::{ + rpc_executor::{execute_rpc_queue, reset_work_queue}, + state::{event::EventType, read_state, reset_state}, + storage::reset_events, + test_fixtures::{ + EventsAssert, MINTER_ACCOUNT, confirmed_block, deposit_id, events, init_schnorr_master_key, + init_state, runtime::TestCanisterRuntime, signature, + }, +}; +use sol_rpc_types::{ + ConfirmedBlock, MultiRpcResult, RpcError, Slot, TransactionConfirmationStatus, + TransactionError, TransactionStatus, +}; + +type SlotResult = MultiRpcResult; +type BlockResult = MultiRpcResult; +type SignatureStatusesResult = MultiRpcResult>>; + +const CURRENT_SLOT: Slot = 408_807_102; +const RECENT_SLOT: Slot = CURRENT_SLOT - 10; +const EXPIRED_SLOT: Slot = CURRENT_SLOT - MAX_BLOCKHASH_AGE - 1; + +fn setup() { + reset_state(); + reset_events(); + reset_work_queue(); + init_state(); + init_schnorr_master_key(); +} + +fn finalized_status() -> TransactionStatus { + TransactionStatus { + slot: CURRENT_SLOT, + status: Ok(()), + err: None, + confirmation_status: Some(TransactionConfirmationStatus::Finalized), + } +} + +fn errored_status() -> TransactionStatus { + TransactionStatus { + slot: CURRENT_SLOT, + status: Err(TransactionError::AccountNotFound), + err: Some(TransactionError::AccountNotFound), + confirmation_status: Some(TransactionConfirmationStatus::Finalized), + } +} + +/// Submit a consolidation transaction to state so the executor can transition it. +fn submit_to_state(i: usize, sig: solana_signature::Signature, slot: Slot) { + events::accept_deposit(deposit_id(i), 1_000_000); + events::mint_deposit(deposit_id(i), i as u64); + events::submit_consolidation(sig, MINTER_ACCOUNT, slot, vec![i as u64]); +} + +// --------------------------------------------------------------------------- +// CheckSignatureStatuses +// --------------------------------------------------------------------------- + +#[tokio::test] +async fn should_mark_finalized_transaction_as_succeeded() { + setup(); + + let sig = signature(0); + submit_to_state(0, sig, RECENT_SLOT); + enqueue(WorkItem::CheckSignatureStatuses { + signatures: vec![sig], + submitted_slots: [(sig, RECENT_SLOT)].into_iter().collect(), + }); + + let runtime = TestCanisterRuntime::new() + .with_increasing_time() + .add_stub_response(SlotResult::Consistent(Ok(CURRENT_SLOT))) + .add_stub_response(BlockResult::Consistent(Ok(confirmed_block()))) + .add_stub_response(SignatureStatusesResult::Consistent(Ok(vec![Some( + finalized_status(), + )]))); + + execute_rpc_queue(runtime).await; + + EventsAssert::from_recorded() + .expect_contains_event_eq(EventType::SucceededTransaction { signature: sig }); + + read_state(|s| { + assert!(s.submitted_transactions().is_empty()); + assert!(s.succeeded_transactions().contains(&sig)); + }); +} + +#[tokio::test] +async fn should_mark_finalized_transaction_with_error_as_failed() { + setup(); + + let sig = signature(0); + submit_to_state(0, sig, RECENT_SLOT); + enqueue(WorkItem::CheckSignatureStatuses { + signatures: vec![sig], + submitted_slots: [(sig, RECENT_SLOT)].into_iter().collect(), + }); + + let runtime = TestCanisterRuntime::new() + .with_increasing_time() + .add_stub_response(SlotResult::Consistent(Ok(CURRENT_SLOT))) + .add_stub_response(BlockResult::Consistent(Ok(confirmed_block()))) + .add_stub_response(SignatureStatusesResult::Consistent(Ok(vec![Some( + errored_status(), + )]))); + + execute_rpc_queue(runtime).await; + + EventsAssert::from_recorded() + .expect_contains_event_eq(EventType::FailedTransaction { signature: sig }); + + read_state(|s| { + assert!(s.submitted_transactions().is_empty()); + assert_eq!(s.failed_transactions().len(), 1); + }); +} + +#[tokio::test] +async fn should_mark_missing_transaction_as_expired_when_old_enough() { + setup(); + + let sig = signature(0); + submit_to_state(0, sig, EXPIRED_SLOT); + enqueue(WorkItem::CheckSignatureStatuses { + signatures: vec![sig], + submitted_slots: [(sig, EXPIRED_SLOT)].into_iter().collect(), + }); + + let runtime = TestCanisterRuntime::new() + .with_increasing_time() + .add_stub_response(SlotResult::Consistent(Ok(CURRENT_SLOT))) + .add_stub_response(BlockResult::Consistent(Ok(confirmed_block()))) + .add_stub_response(SignatureStatusesResult::Consistent(Ok(vec![None]))); + + execute_rpc_queue(runtime).await; + + EventsAssert::from_recorded() + .expect_contains_event_eq(EventType::ExpiredTransaction { signature: sig }); + + read_state(|s| { + assert!(s.submitted_transactions().is_empty()); + assert!(s.transactions_to_resubmit().contains_key(&sig)); + }); +} + +#[tokio::test] +async fn should_not_expire_recent_missing_transaction() { + setup(); + + let sig = signature(0); + submit_to_state(0, sig, RECENT_SLOT); + enqueue(WorkItem::CheckSignatureStatuses { + signatures: vec![sig], + submitted_slots: [(sig, RECENT_SLOT)].into_iter().collect(), + }); + + let events_before = EventsAssert::from_recorded(); + + let runtime = TestCanisterRuntime::new() + .with_increasing_time() + .add_stub_response(SlotResult::Consistent(Ok(CURRENT_SLOT))) + .add_stub_response(BlockResult::Consistent(Ok(confirmed_block()))) + .add_stub_response(SignatureStatusesResult::Consistent(Ok(vec![None]))); + + execute_rpc_queue(runtime).await; + + // No new events: transaction remains submitted (not expired or succeeded) + let events_after = EventsAssert::from_recorded(); + assert_eq!(events_before, events_after); + + read_state(|s| { + assert!(s.submitted_transactions().contains_key(&sig)); + }); +} + +#[tokio::test] +async fn should_re_enqueue_on_blockhash_fetch_failure() { + setup(); + + let sig = signature(0); + submit_to_state(0, sig, RECENT_SLOT); + enqueue(WorkItem::CheckSignatureStatuses { + signatures: vec![sig], + submitted_slots: [(sig, RECENT_SLOT)].into_iter().collect(), + }); + + let events_before = EventsAssert::from_recorded(); + + let error = SlotResult::Consistent(Err(RpcError::ValidationError("Error".to_string()))); + let runtime = TestCanisterRuntime::new() + .add_stub_response(error.clone()) + .add_stub_response(error.clone()) + .add_stub_response(error); + + execute_rpc_queue(runtime).await; + + // No new state events; item was re-enqueued for retry + let events_after = EventsAssert::from_recorded(); + assert_eq!(events_before, events_after); + assert!(!queue_is_empty(), "item should have been re-enqueued"); +} + +#[tokio::test] +async fn should_process_multiple_items_in_one_batch() { + setup(); + + let sig0 = signature(0); + let sig1 = signature(1); + submit_to_state(0, sig0, RECENT_SLOT); + submit_to_state(1, sig1, RECENT_SLOT); + + enqueue(WorkItem::CheckSignatureStatuses { + signatures: vec![sig0], + submitted_slots: [(sig0, RECENT_SLOT)].into_iter().collect(), + }); + enqueue(WorkItem::CheckSignatureStatuses { + signatures: vec![sig1], + submitted_slots: [(sig1, RECENT_SLOT)].into_iter().collect(), + }); + + let runtime = TestCanisterRuntime::new() + .with_increasing_time() + .add_stub_response(SlotResult::Consistent(Ok(CURRENT_SLOT))) + .add_stub_response(BlockResult::Consistent(Ok(confirmed_block()))) + .add_stub_response(SignatureStatusesResult::Consistent(Ok(vec![Some( + finalized_status(), + )]))) + .add_stub_response(SignatureStatusesResult::Consistent(Ok(vec![Some( + finalized_status(), + )]))); + + execute_rpc_queue(runtime).await; + + EventsAssert::from_recorded() + .expect_contains_event_eq(EventType::SucceededTransaction { signature: sig0 }) + .expect_contains_event_eq(EventType::SucceededTransaction { signature: sig1 }); +} diff --git a/minter/src/state/mod.rs b/minter/src/state/mod.rs index c265d456..90ecd0d4 100644 --- a/minter/src/state/mod.rs +++ b/minter/src/state/mod.rs @@ -108,6 +108,7 @@ pub struct State { failed_transactions: InsertionOrderedMap, consolidation_transactions: InsertionOrderedMap, active_tasks: BTreeSet, + active_user_rpc_calls: u32, balance: Lamport, } @@ -321,6 +322,14 @@ impl State { &mut self.active_tasks } + pub fn active_user_rpc_calls(&self) -> u32 { + self.active_user_rpc_calls + } + + pub fn active_user_rpc_calls_mut(&mut self) -> &mut u32 { + &mut self.active_user_rpc_calls + } + fn transaction_fee(&self, message: &VersionedMessage) -> Lamport { let VersionedMessage::Legacy(msg) = message; FEE_PER_SIGNATURE * msg.header.num_required_signatures as u64 @@ -795,6 +804,7 @@ impl TryFrom for State { failed_transactions: InsertionOrderedMap::new(), consolidation_transactions: InsertionOrderedMap::new(), active_tasks: BTreeSet::new(), + active_user_rpc_calls: 0, balance: 0, }; state.validate()?; @@ -843,6 +853,7 @@ pub enum TaskType { ResubmitTransactions, WithdrawalProcessing, PollMonitoredAddresses, + ExecuteRpcQueue, } /// Details about a consolidation transaction, capturing the individual diff --git a/minter/src/state/tests.rs b/minter/src/state/tests.rs index e3ef55e3..48e026e1 100644 --- a/minter/src/state/tests.rs +++ b/minter/src/state/tests.rs @@ -245,6 +245,7 @@ mod state_from_init_args { failed_transactions: InsertionOrderedMap::new(), consolidation_transactions: InsertionOrderedMap::new(), active_tasks: BTreeSet::new(), + active_user_rpc_calls: 0, balance: 0, } ); diff --git a/minter/src/withdraw/mod.rs b/minter/src/withdraw/mod.rs index 19571653..670a5621 100644 --- a/minter/src/withdraw/mod.rs +++ b/minter/src/withdraw/mod.rs @@ -1,32 +1,24 @@ -use std::str::FromStr; -use std::time::Duration; - -use cksol_types::{WithdrawalError, WithdrawalOk, WithdrawalStatus}; -use icrc_ledger_types::icrc1::account::Account; -use solana_address::Address; - -use canlog::log; -use cksol_types_internal::log::Priority; - -use itertools::Itertools; -use sol_rpc_types::Slot; -use solana_hash::Hash; - use crate::{ consolidate::consolidate_deposits, - constants::MAX_CONCURRENT_RPC_CALLS, guard::{TimerGuard, withdrawal_guard}, ledger::{BurnError, burn}, - rpc::{get_recent_slot_and_blockhash, submit_transaction}, + rpc_executor::{WorkItem, enqueue, execute_rpc_queue}, runtime::CanisterRuntime, - sol_transfer::{MAX_WITHDRAWALS_PER_TX, create_signed_batch_withdrawal_transaction}, + sol_transfer::MAX_WITHDRAWALS_PER_TX, state::{ TaskType, audit::process_event, - event::{EventType, TransactionPurpose, VersionedMessage, WithdrawalRequest}, + event::{EventType, WithdrawalRequest}, mutate_state, read_state, }, }; +use canlog::log; +use cksol_types::{WithdrawalError, WithdrawalOk, WithdrawalStatus}; +use cksol_types_internal::log::Priority; +use icrc_ledger_types::icrc1::account::Account; +use itertools::Itertools; +use solana_address::Address; +use std::{str::FromStr, time::Duration}; pub const WITHDRAWAL_PROCESSING_DELAY: Duration = Duration::from_mins(1); @@ -134,111 +126,19 @@ pub async fn process_pending_withdrawals(runtime: R) { runtime.set_timer(Duration::ZERO, consolidate_deposits); } - let more_to_process = - affordable_requests.len() > MAX_CONCURRENT_RPC_CALLS * MAX_WITHDRAWALS_PER_TX; - let reschedule = scopeguard::guard(runtime.clone(), |runtime| { - runtime.set_timer(Duration::ZERO, process_pending_withdrawals); - }); + if affordable_requests.is_empty() { + return; + } - let batches: Vec> = affordable_requests + for batch in affordable_requests .into_iter() .chunks(MAX_WITHDRAWALS_PER_TX) .into_iter() - .take(MAX_CONCURRENT_RPC_CALLS) - .map(Iterator::collect) - .collect(); - - if batches.is_empty() { - // Nothing to process - scopeguard::ScopeGuard::into_inner(reschedule); - return; - } - - let (slot, recent_blockhash) = match get_recent_slot_and_blockhash(&runtime).await { - Ok(result) => result, - Err(e) => { - log!(Priority::Info, "Failed to fetch recent blockhash: {e}"); - return; - } - }; - - futures::future::join_all(batches.into_iter().map(async |batch| { - submit_withdrawal_transaction(&runtime, batch, slot, recent_blockhash).await - })) - .await; - - if !more_to_process { - // All work fits in this round - scopeguard::ScopeGuard::into_inner(reschedule); - } -} - -async fn submit_withdrawal_transaction( - runtime: &R, - requests: Vec, - slot: Slot, - recent_blockhash: Hash, -) { - let targets: Vec<_> = requests - .iter() - .map(|request| { - let destination = Address::from(request.solana_address); - (destination, request.amount_to_transfer) - }) - .collect(); - - let (signed_tx, signers) = match create_signed_batch_withdrawal_transaction( - runtime, - &targets, - recent_blockhash, - ) - .await { - Ok(tx) => tx, - Err(e) => { - let burn_indices: Vec<_> = requests.iter().map(|r| r.burn_block_index).collect(); - log!( - Priority::Error, - "Failed to create batch withdrawal transaction for burn indices {burn_indices:?}: {e}" - ); - return; - } - }; - - let signature = signed_tx.signatures[0]; - let message = VersionedMessage::Legacy(signed_tx.message.clone()); - let burn_indices: Vec<_> = requests.iter().map(|r| r.burn_block_index).collect(); - - mutate_state(|state| { - process_event( - state, - EventType::SubmittedTransaction { - signature, - message, - signers, - slot, - purpose: TransactionPurpose::WithdrawSol { - burn_indices: burn_indices.clone(), - }, - }, - runtime, - ) - }); - - match submit_transaction(runtime, signed_tx).await { - Ok(_) => { - log!( - Priority::Info, - "Submitted withdrawal transaction {signature} for burn indices {burn_indices:?}" - ); - } - Err(e) => { - log!( - Priority::Info, - "Failed to send withdrawal transaction {signature} (will be resubmitted): {e}" - ); - } + enqueue(WorkItem::SubmitWithdrawalBatch(batch.collect())); } + + runtime.set_timer(Duration::ZERO, execute_rpc_queue); } pub fn withdrawal_status(block_index: u64) -> WithdrawalStatus { diff --git a/minter/src/withdraw/tests.rs b/minter/src/withdraw/tests.rs index 87b2e8af..9425fe65 100644 --- a/minter/src/withdraw/tests.rs +++ b/minter/src/withdraw/tests.rs @@ -1,6 +1,7 @@ use crate::{ constants::MAX_CONCURRENT_RPC_CALLS, guard::{TimerGuard, withdrawal_guard}, + rpc_executor::execute_rpc_queue, sol_transfer::MAX_WITHDRAWALS_PER_TX, state::{TaskType, read_state}, test_fixtures::{ @@ -319,7 +320,8 @@ mod process_pending_withdrawals_tests { .add_stub_response(SendTransactionResult::Consistent(Ok(tx_signature.into()))) .with_increasing_time(); - process_pending_withdrawals(runtime).await; + process_pending_withdrawals(runtime.clone()).await; + execute_rpc_queue(runtime).await; // First two withdrawals should be submitted, third should remain pending assert_matches!(withdrawal_status(0), WithdrawalStatus::TxSent { .. }); @@ -348,7 +350,8 @@ mod process_pending_withdrawals_tests { .add_signature(tx_signature.into()) .add_stub_response(SendTransactionResult::Consistent(Ok(tx_signature.into()))); - process_pending_withdrawals(runtime).await; + process_pending_withdrawals(runtime.clone()).await; + execute_rpc_queue(runtime).await; assert_matches!(withdrawal_status(1), WithdrawalStatus::TxSent { .. }); } @@ -374,7 +377,8 @@ mod process_pending_withdrawals_tests { "slot unavailable".to_string(), )))); - process_pending_withdrawals(runtime).await; + process_pending_withdrawals(runtime.clone()).await; + execute_rpc_queue(runtime).await; // No withdrawal transaction event should be recorded let events_after = EventsAssert::from_recorded(); @@ -385,7 +389,7 @@ mod process_pending_withdrawals_tests { assert!( log.entries .iter() - .any(|e| e.message.contains("Failed to fetch recent blockhash")), + .any(|e| e.message.contains("failed to fetch slot and blockhash")), "Expected info log about blockhash failure, got: {:?}", log.entries ); @@ -413,7 +417,8 @@ mod process_pending_withdrawals_tests { CallRejected::with_rejection(4, "signing service unavailable".to_string()).into(), )); - process_pending_withdrawals(runtime).await; + process_pending_withdrawals(runtime.clone()).await; + execute_rpc_queue(runtime).await; // No transaction event should be recorded (signing failed) let events_after = EventsAssert::from_recorded(); @@ -457,7 +462,8 @@ mod process_pending_withdrawals_tests { .add_signature(signature(2).into()) .add_stub_response(SendTransactionResult::Consistent(Ok(signature(2).into()))); - process_pending_withdrawals(runtime).await; + process_pending_withdrawals(runtime.clone()).await; + execute_rpc_queue(runtime).await; // All withdrawals should be processed in a single invocation // (2 batches in 1 round, both within MAX_CONCURRENT_RPC_CALLS) @@ -496,14 +502,16 @@ mod process_pending_withdrawals_tests { } process_pending_withdrawals(runtime.clone()).await; + execute_rpc_queue(runtime.clone()).await; read_state(|s| { assert_eq!(s.submitted_transactions().len(), MAX_CONCURRENT_RPC_CALLS); assert_eq!(s.pending_withdrawal_requests().len(), 1); }); - assert_eq!(runtime.set_timer_call_count(), 1); + // process_pending_withdrawals called set_timer once; execute_rpc_queue called set_timer once + assert_eq!(runtime.set_timer_call_count(), 2); - // Round 2: processes the remaining 1 request → no reschedule + // Round 2: process the remaining request (already in queue from executor's reschedule). let last_sig = signature(num_requests); let runtime = TestCanisterRuntime::new() .with_increasing_time() @@ -512,7 +520,7 @@ mod process_pending_withdrawals_tests { .add_signature(last_sig.into()) .add_stub_response(SendTransactionResult::Consistent(Ok(last_sig.into()))); - process_pending_withdrawals(runtime.clone()).await; + execute_rpc_queue(runtime.clone()).await; assert!(read_state(|s| s.pending_withdrawal_requests().is_empty())); assert_eq!(runtime.set_timer_call_count(), 0);