diff --git a/integration_tests/tests/tests.rs b/integration_tests/tests/tests.rs index 93c1f850..42d764c5 100644 --- a/integration_tests/tests/tests.rs +++ b/integration_tests/tests/tests.rs @@ -5,7 +5,7 @@ use cksol_int_tests::{ CkSolMinter, Setup, SetupBuilder, fixtures::{ DEFAULT_CALLER_ACCOUNT, DEFAULT_CALLER_DEPOSIT_ADDRESS, DEPOSIT_AMOUNT, - EXPECTED_MINT_AMOUNT, MockBuilder, SharedMockHttpOutcalls, + EXPECTED_MINT_AMOUNT, MockBuilder, NUM_RPC_PROVIDERS, SharedMockHttpOutcalls, default_get_deposit_address_args, default_process_deposit_args, default_update_balance_args, deposit_transaction_signature, }, @@ -1320,15 +1320,21 @@ mod automated_deposit_flow_tests { })); }); - // Advance time: the minter should poll getSignaturesForAddress once, then remove the account. - setup.advance_time(POLL_MONITORED_ADDRESSES_DELAY).await; - setup - .execute_http_mocks( - MockBuilder::with_start_id(0) - .get_signatures_for_address(vec![]) - .build(), - ) - .await; + // Advance time through all 10 polls with exponential backoff (1, 2, 4, ..., 512 minutes). + let mut delay = POLL_MONITORED_ADDRESSES_DELAY; + let mut start_id = 0u64; + for _ in 0..10 { + setup.advance_time(delay).await; + setup + .execute_http_mocks( + MockBuilder::with_start_id(start_id) + .get_signatures_for_address(vec![]) + .build(), + ) + .await; + start_id += NUM_RPC_PROVIDERS; + delay *= 2; + } minter.assert_that_events().await.satisfy(|events| { check!(events.iter().any(|e| { diff --git a/libs/types/src/lib.rs b/libs/types/src/lib.rs index fe654424..e1a21296 100644 --- a/libs/types/src/lib.rs +++ b/libs/types/src/lib.rs @@ -148,6 +148,9 @@ pub enum UpdateBalanceError { /// The monitored account queue is at capacity. #[error("The monitored account queue is at capacity")] QueueFull, + /// The RPC call quota for this account has been exhausted. + #[error("The RPC call quota for this account has been exhausted")] + MonitoringQuotaExhausted, } /// Insufficient cycles attached by the caller to complete the call. diff --git a/minter/cksol_minter.did b/minter/cksol_minter.did index f81e36f2..306eae5a 100644 --- a/minter/cksol_minter.did +++ b/minter/cksol_minter.did @@ -98,6 +98,8 @@ type UpdateBalanceArgs = record { type UpdateBalanceError = variant { // The monitored account queue is at capacity. QueueFull; + // The RPC call quota for this account has been exhausted. + MonitoringQuotaExhausted; }; // The result of a call to the `update_balance` endpoint. @@ -473,9 +475,6 @@ service : (MinterArg) -> { // // If the owner is not set, it defaults to the caller's principal. // The resolved owner must be a non-anonymous principal. - // - // Returns Ok if the account was registered (or was already being monitored). - // Returns Err(QueueFull) if the monitored account queue is at capacity. update_balance: (UpdateBalanceArgs) -> (UpdateBalanceResult); // Update the ckSOL balance of the given owner with the funds diff --git a/minter/src/deposit/automatic/cache/mod.rs b/minter/src/deposit/automatic/cache/mod.rs new file mode 100644 index 00000000..04789f2d --- /dev/null +++ b/minter/src/deposit/automatic/cache/mod.rs @@ -0,0 +1,124 @@ +use crate::utils::sorted_key_map::SortedKeyMap; +use icrc_ledger_types::icrc1::account::Account; + +/// Maximum number of `getSignaturesForAddress` calls allowed per monitored account. +pub const MAX_GET_SIGNATURES_CALLS: u32 = 10; + +/// Maximum number of `getTransaction` calls allowed per monitored account. +pub const MAX_RETRIEVED_TRANSACTIONS: u32 = 50; + +/// Initial backoff delay in minutes before the first poll. +pub const INITIAL_BACKOFF_DELAY_MINS: u64 = 1; + +/// Per-account state for automated deposit discovery. +/// +/// This cache is intentionally separate from the event log: it can be fully +/// reconstructed by redoing the RPC calls, so there is no need to replay events +/// to restore it. It lives in unstable heap memory and is reset on canister upgrade. +#[derive(Clone, Debug, PartialEq)] +pub struct AutomaticDepositCacheEntry { + /// Remaining quota for `getSignaturesForAddress` calls. + pub sig_calls_remaining: u32, + /// Remaining quota for `getTransaction` calls. + pub tx_calls_remaining: u32, + /// The delay in minutes before the next poll. Doubles after each poll. + pub next_backoff_delay_mins: u64, +} + +impl Default for AutomaticDepositCacheEntry { + fn default() -> Self { + Self { + sig_calls_remaining: MAX_GET_SIGNATURES_CALLS, + tx_calls_remaining: MAX_RETRIEVED_TRANSACTIONS, + next_backoff_delay_mins: INITIAL_BACKOFF_DELAY_MINS, + } + } +} + +/// Heap-memory cache storing per-account automated deposit discovery state, +/// ordered by next poll time for efficient scheduling. +/// +/// Backed by a [`SortedKeyMap`] with `Account` as key and `u64` (nanosecond timestamp) +/// as the sort index. +/// +/// Accounts that have been stopped are stored with `next_poll_at = u64::MAX` +/// so they are never picked up by the poll loop, but their quota is retained +/// for future `update_balance` calls. +#[derive(Default)] +pub struct AutomaticDepositCache(SortedKeyMap); + +impl AutomaticDepositCache { + /// Returns the current poll time and entry for the given account. + pub fn get_with_index(&self, account: &Account) -> Option<(u64, AutomaticDepositCacheEntry)> { + self.0.get_with_index(account).map(|(t, e)| (*t, e.clone())) + } + + /// Inserts or updates an entry, updating the poll-time index atomically. + pub fn insert( + &mut self, + account: Account, + next_poll_at: u64, + entry: AutomaticDepositCacheEntry, + ) { + self.0.insert(account, next_poll_at, entry); + } + + /// Iterates all `(next_poll_at, account, entry)` triples in ascending poll-time order. + pub fn iter(&self) -> impl Iterator + '_ { + self.0 + .iter() + .map(|(t, account, entry)| (*t, *account, entry.clone())) + } + + pub fn len(&self) -> usize { + self.0.len() + } + + pub fn is_empty(&self) -> bool { + self.0.is_empty() + } +} + +/// The monitoring lifecycle state of an account, as derived from the cache. +pub enum AccountMonitoringState { + /// No monitoring information has been recorded for this account. + Unknown, + /// The account is actively scheduled for polling. + Active { + #[allow(dead_code)] + next_poll_at: u64, + #[allow(dead_code)] + entry: AutomaticDepositCacheEntry, + }, + /// Polling was stopped after a successful deposit was found. The account + /// can be rescheduled via `update_balance`. + Stopped { entry: AutomaticDepositCacheEntry }, + /// The `getSignaturesForAddress` quota for this account has been exhausted. + /// `update_balance` will return `MonitoringQuotaExhausted` until the manual + /// flow replenishes the quota. + Exhausted { + #[allow(dead_code)] + entry: AutomaticDepositCacheEntry, + }, +} + +pub trait AutomaticDepositCacheExt { + /// Returns the current monitoring state of the given account. + fn monitoring_state(&self, account: &Account) -> AccountMonitoringState; +} + +impl AutomaticDepositCacheExt for AutomaticDepositCache { + fn monitoring_state(&self, account: &Account) -> AccountMonitoringState { + match self.get_with_index(account) { + None => AccountMonitoringState::Unknown, + Some((t, entry)) if t != u64::MAX => AccountMonitoringState::Active { + next_poll_at: t, + entry, + }, + Some((_, entry)) if entry.sig_calls_remaining == 0 => { + AccountMonitoringState::Exhausted { entry } + } + Some((_, entry)) => AccountMonitoringState::Stopped { entry }, + } + } +} diff --git a/minter/src/deposit/automatic/mod.rs b/minter/src/deposit/automatic/mod.rs index 5454c9c8..5d7aacd4 100644 --- a/minter/src/deposit/automatic/mod.rs +++ b/minter/src/deposit/automatic/mod.rs @@ -8,6 +8,11 @@ use crate::{ SchnorrPublicKey, TaskType, audit::process_event, event::EventType, mutate_state, read_state, }, + storage::{with_automatic_deposit_cache, with_automatic_deposit_cache_mut}, +}; +use cache::{ + AccountMonitoringState, AutomaticDepositCacheEntry, AutomaticDepositCacheExt, + INITIAL_BACKOFF_DELAY_MINS, }; use canlog::log; use cksol_types::UpdateBalanceError; @@ -16,6 +21,8 @@ use icrc_ledger_types::icrc1::account::Account; use sol_rpc_types::{CommitmentLevel, GetSignaturesForAddressParams}; use std::time::Duration; +pub(crate) mod cache; + #[cfg(test)] mod tests; @@ -25,25 +32,40 @@ pub const MAX_MONITORED_ACCOUNTS: usize = 100; /// How often the minter polls monitored addresses for new deposit transactions. pub const POLL_MONITORED_ADDRESSES_DELAY: Duration = Duration::from_mins(1); -/// Maximum number of `getTransaction` calls to make per polled account. -pub const MAX_TRANSACTIONS_PER_ACCOUNT: usize = 10; +/// Maximum number of signatures to fetch per `getSignaturesForAddress` call. +pub const GET_SIGNATURES_FOR_ADDRESS_LIMIT: u32 = 100; /// Registers the given account for automated deposit monitoring. /// -/// Returns `Ok(())` if the account was registered (or was already being monitored). -/// Returns `Err(UpdateBalanceError::QueueFull)` if the monitored account queue is at capacity. +/// - If the account is already actively monitored (has a scheduled poll), returns `Ok(())`. +/// - If the account's `getSignaturesForAddress` quota is exhausted, returns +/// `Err(MonitoringQuotaExhausted)`. +/// - If the account has remaining quota but is not scheduled (e.g. was stopped after a +/// successful deposit), reschedules it with a fresh backoff delay. +/// - If the account is unknown, allocates a fresh quota and schedules the first poll. +/// - Returns `Err(QueueFull)` if the monitored account limit has been reached. pub fn update_balance( runtime: &R, account: Account, ) -> Result<(), UpdateBalanceError> { - if read_state(|state| state.monitored_accounts().contains(&account)) { - return Ok(()); - } + let cached_entry = match with_automatic_deposit_cache(|cache| cache.monitoring_state(&account)) + { + AccountMonitoringState::Active { .. } => { + debug_assert!(read_state(|s| s.monitored_accounts().contains(&account))); + return Ok(()); + } + AccountMonitoringState::Exhausted { .. } => { + return Err(UpdateBalanceError::MonitoringQuotaExhausted); + } + AccountMonitoringState::Stopped { entry } => Some(entry), + AccountMonitoringState::Unknown => None, + }; - if read_state(|state| state.monitored_accounts().len() >= MAX_MONITORED_ACCOUNTS) { + if read_state(|state| state.monitored_accounts().len()) >= MAX_MONITORED_ACCOUNTS { return Err(UpdateBalanceError::QueueFull); } + debug_assert!(!read_state(|s| s.monitored_accounts().contains(&account))); mutate_state(|state| { process_event( state, @@ -52,25 +74,50 @@ pub fn update_balance( ); }); + let new_entry = match cached_entry { + Some(entry) => AutomaticDepositCacheEntry { + sig_calls_remaining: entry.sig_calls_remaining, + tx_calls_remaining: entry.tx_calls_remaining, + next_backoff_delay_mins: INITIAL_BACKOFF_DELAY_MINS, + }, + None => AutomaticDepositCacheEntry::default(), + }; + let next_poll_at = runtime.time() + POLL_MONITORED_ADDRESSES_DELAY.as_nanos() as u64; + with_automatic_deposit_cache_mut(|cache| { + cache.insert(account, next_poll_at, new_entry); + }); + Ok(()) } -/// Polls all monitored addresses for new deposit transaction signatures. +/// Polls all monitored addresses that are due for a check. /// -/// For each address, calls `getSignaturesForAddress` on the Solana RPC. +/// For each due address, calls `getSignaturesForAddress` on the Solana RPC. +/// After each call, reschedules the account with exponential backoff, or marks it +/// as exhausted (poll time `u64::MAX`, quota at zero) once the quota is depleted. pub async fn poll_monitored_addresses(runtime: R) { let _guard = match TimerGuard::new(TaskType::PollMonitoredAddresses) { Ok(guard) => guard, Err(_) => return, }; - let all_accounts: Vec = - read_state(|s| s.monitored_accounts().iter().copied().collect()); - if all_accounts.is_empty() { + let now = runtime.time(); + + let due: Vec<(Account, AutomaticDepositCacheEntry)> = with_automatic_deposit_cache(|cache| { + cache + .iter() + .take_while(|(t, ..)| *t <= now) + .map(|(_, account, entry)| (account, entry)) + // +1 to detect whether more accounts remain after this round. + .take(MAX_CONCURRENT_RPC_CALLS + 1) + .collect() + }); + + if due.is_empty() { return; } - let more_to_process = all_accounts.len() > MAX_CONCURRENT_RPC_CALLS; + let more_to_process = due.len() > MAX_CONCURRENT_RPC_CALLS; let reschedule = scopeguard::guard(runtime.clone(), |runtime| { runtime.set_timer(Duration::ZERO, poll_monitored_addresses); }); @@ -78,15 +125,13 @@ pub async fn poll_monitored_addresses(runtime: R) { let master_key = lazy_get_schnorr_master_key(&runtime).await; futures::future::join_all( - all_accounts - .into_iter() + due.into_iter() .take(MAX_CONCURRENT_RPC_CALLS) - .map(|account| poll_account(&runtime, &master_key, account)), + .map(|(account, entry)| poll_account(&runtime, &master_key, account, entry)), ) .await; if !more_to_process { - // All work fits in this round scopeguard::ScopeGuard::into_inner(reschedule); } } @@ -95,23 +140,25 @@ async fn poll_account( runtime: &R, master_key: &SchnorrPublicKey, account: Account, + entry: AutomaticDepositCacheEntry, ) { let deposit_address = account_address(master_key, &account); let params = GetSignaturesForAddressParams { pubkey: deposit_address.into(), commitment: Some(CommitmentLevel::Finalized), - min_context_slot: None, - // Fetch no more signatures than we intend to process with `getTransaction`. limit: Some( - (MAX_TRANSACTIONS_PER_ACCOUNT as u32) + GET_SIGNATURES_FOR_ADDRESS_LIMIT .try_into() - .expect("MAX_TRANSACTIONS_PER_ACCOUNT must be between 1 and 1000"), + .expect("GET_SIGNATURES_FOR_ADDRESS_LIMIT must be between 1 and 1000"), ), + min_context_slot: None, before: None, until: None, }; + let sig_calls_remaining = entry.sig_calls_remaining.saturating_sub(1); + match get_signatures_for_address(runtime, params).await { Err(e) => { log!( @@ -124,11 +171,47 @@ async fn poll_account( } } - mutate_state(|state| { - process_event( - state, - EventType::StoppedMonitoringAccount { account }, - runtime, + if sig_calls_remaining == 0 { + // Retain the entry with u64::MAX so it is never rescheduled, but quota info + // is preserved for display and for replenishment via the manual flow. + with_automatic_deposit_cache_mut(|cache| { + cache.insert( + account, + u64::MAX, + AutomaticDepositCacheEntry { + sig_calls_remaining, + tx_calls_remaining: entry.tx_calls_remaining, + next_backoff_delay_mins: entry.next_backoff_delay_mins, + }, + ); + }); + debug_assert!(read_state(|s| s.monitored_accounts().contains(&account))); + mutate_state(|state| { + process_event( + state, + EventType::StoppedMonitoringAccount { account }, + runtime, + ); + }); + log!( + Priority::Info, + "Stopped monitoring {deposit_address}: getSignaturesForAddress quota exhausted" ); - }); + } else { + let delay_mins = entry.next_backoff_delay_mins; + let delay = Duration::from_mins(delay_mins); + let delay_ns: u64 = delay.as_nanos().try_into().unwrap_or(u64::MAX - 1); + let next_poll_at = runtime.time().saturating_add(delay_ns).min(u64::MAX - 1); + with_automatic_deposit_cache_mut(|cache| { + cache.insert( + account, + next_poll_at, + AutomaticDepositCacheEntry { + sig_calls_remaining, + tx_calls_remaining: entry.tx_calls_remaining, + next_backoff_delay_mins: delay_mins.saturating_mul(2), + }, + ); + }); + } } diff --git a/minter/src/deposit/automatic/tests.rs b/minter/src/deposit/automatic/tests.rs index 618759df..87d5da06 100644 --- a/minter/src/deposit/automatic/tests.rs +++ b/minter/src/deposit/automatic/tests.rs @@ -1,138 +1,303 @@ use super::*; use crate::{ constants::MAX_CONCURRENT_RPC_CALLS, - state::{event::EventType, read_state}, + state::{event::EventType, read_state, reset_state}, + storage::{reset_automatic_deposit_cache, reset_events, with_automatic_deposit_cache}, test_fixtures::{ - EventsAssert, account, events::start_monitoring_account, init_schnorr_master_key, - init_state, runtime::TestCanisterRuntime, + EventsAssert, account, init_schnorr_master_key, init_state, runtime::TestCanisterRuntime, }, }; +use cache::{INITIAL_BACKOFF_DELAY_MINS, MAX_GET_SIGNATURES_CALLS, MAX_RETRIEVED_TRANSACTIONS}; +use candid::Principal; use sol_rpc_types::{ConfirmedTransactionStatusWithSignature, MultiRpcResult}; +use std::iter; type SignaturesResult = MultiRpcResult>; -fn monitored_accounts_count() -> usize { - read_state(|s| s.monitored_accounts().len()) -} - -fn start_monitoring_max_number_of_accounts() { - for i in 0..MAX_MONITORED_ACCOUNTS { - start_monitoring_account(account(i)); - } -} - -mod update_balance { +mod update_balance_tests { use super::*; + const ACCOUNT: Account = Account { + owner: Principal::from_slice(&[1; 29]), + subaccount: None, + }; + #[test] - fn should_register_account() { + fn should_start_monitoring_unknown_account() { init_state(); - let runtime = TestCanisterRuntime::new().with_increasing_time(); + let runtime = TestCanisterRuntime::new().add_times(iter::repeat(0)); - let result = update_balance(&runtime, account(1)); + let result = update_balance(&runtime, ACCOUNT); assert_eq!(result, Ok(())); - assert!(read_state(|s| s.monitored_accounts().contains(&account(1)))); - assert_eq!(monitored_accounts_count(), 1); - + // Fresh cache entry with full quotas and initial backoff delay. + CacheAssert::for_account(ACCOUNT) + .next_poll_at_mins(INITIAL_BACKOFF_DELAY_MINS) + .sig_calls(MAX_GET_SIGNATURES_CALLS) + .tx_calls(MAX_RETRIEVED_TRANSACTIONS) + .backoff_mins(INITIAL_BACKOFF_DELAY_MINS); EventsAssert::from_recorded() - .expect_event_eq(EventType::StartedMonitoringAccount { - account: account(1), - }) + .expect_event_eq(EventType::StartedMonitoringAccount { account: ACCOUNT }) .assert_no_more_events(); } #[test] - fn should_be_idempotent_for_already_monitored_account() { + fn should_not_modify_active_account() { init_state(); - let runtime = TestCanisterRuntime::new().with_increasing_time(); + let runtime = TestCanisterRuntime::new().add_times(iter::repeat(0)); - for _ in 0..2 { - let result = update_balance(&runtime, account(1)); - assert_eq!(result, Ok(())); - assert_eq!(monitored_accounts_count(), 1); - } + let result1 = update_balance(&runtime, ACCOUNT); + assert_eq!(result1, Ok(())); + + set_cache_entry(ACCOUNT, 1, 7, 4); + let cache_before = CacheAssert::for_account(ACCOUNT); + let events_before = EventsAssert::from_recorded(); + + let result2 = update_balance(&runtime, ACCOUNT); + assert_eq!(result2, Ok(())); - // Only one event should have been emitted + // Does not modify the cache or events. + assert_eq!(cache_before, CacheAssert::for_account(ACCOUNT)); + assert_eq!(events_before, EventsAssert::from_recorded()); + } + + #[test] + fn should_reschedule_stopped_account() { + init_state(); + let runtime = TestCanisterRuntime::new().add_times(iter::repeat(0)); + + set_cache_entry(ACCOUNT, u64::MAX, 5, 8); + + let result = update_balance(&runtime, ACCOUNT); + assert_eq!(result, Ok(())); + + // Preserves the existing quota but resets the backoff delay and reschedules. + CacheAssert::for_account(ACCOUNT) + .next_poll_at_mins(INITIAL_BACKOFF_DELAY_MINS) + .sig_calls(5) + .backoff_mins(INITIAL_BACKOFF_DELAY_MINS); EventsAssert::from_recorded() - .expect_event_eq(EventType::StartedMonitoringAccount { - account: account(1), - }) + .expect_event_eq(EventType::StartedMonitoringAccount { account: ACCOUNT }) .assert_no_more_events(); } #[test] - fn should_return_queue_full_when_at_capacity() { + fn should_return_error_for_exhausted_account() { init_state(); + let runtime = TestCanisterRuntime::new().add_times(iter::repeat(0)); - start_monitoring_max_number_of_accounts(); - assert_eq!(monitored_accounts_count(), MAX_MONITORED_ACCOUNTS); + set_cache_entry(ACCOUNT, u64::MAX, 0, INITIAL_BACKOFF_DELAY_MINS); + let cache_before = CacheAssert::for_account(ACCOUNT); + let events_before = EventsAssert::from_recorded(); + + let result = update_balance(&runtime, ACCOUNT); + assert_eq!(result, Err(UpdateBalanceError::MonitoringQuotaExhausted)); - let runtime = TestCanisterRuntime::new().with_increasing_time(); - let result = update_balance(&runtime, account(MAX_MONITORED_ACCOUNTS + 1)); - assert_eq!(result, Err(UpdateBalanceError::QueueFull)); - assert_eq!(monitored_accounts_count(), MAX_MONITORED_ACCOUNTS); + // Does not modify the cache or events. + assert_eq!(cache_before, CacheAssert::for_account(ACCOUNT)); + assert_eq!(events_before, EventsAssert::from_recorded()); } #[test] - fn should_not_return_queue_full_if_account_already_monitored() { + fn should_return_error_when_at_capacity() { init_state(); - start_monitoring_max_number_of_accounts(); - assert_eq!(monitored_accounts_count(), MAX_MONITORED_ACCOUNTS); + let runtime = TestCanisterRuntime::new().add_times(iter::repeat(0)); - // Re-registering an already-monitored account should still return Ok - let runtime = TestCanisterRuntime::new().with_increasing_time(); - let result = update_balance(&runtime, account(0)); - assert_eq!(result, Ok(())); + // Account is not being monitored, return error. + let result1 = update_balance(&runtime, account(MAX_MONITORED_ACCOUNTS + 1)); + assert_eq!(result1, Err(UpdateBalanceError::QueueFull)); + + // Account is already being monitored, return ok. + let result2 = update_balance(&runtime, account(0)); + assert_eq!(result2, Ok(())); } } -mod poll_monitored_addresses { +mod poll_monitored_addresses_tests { use super::*; #[tokio::test] async fn should_poll_monitored_addresses_in_rounds() { setup(); - // Add MAX_CONCURRENT_RPC_CALLS + 1 accounts to monitor so that 2 rounds are needed. + // Seed MAX_CONCURRENT_RPC_CALLS + 1 accounts, all due immediately. let num_accounts = MAX_CONCURRENT_RPC_CALLS + 1; + let runtime = TestCanisterRuntime::new().add_times(0..); for i in 0..num_accounts { - start_monitoring_account(account(i)); + update_balance(&runtime, account(i)).unwrap(); + set_cache_entry( + account(i), + 0, + MAX_GET_SIGNATURES_CALLS, + INITIAL_BACKOFF_DELAY_MINS, + ); } - assert_eq!(monitored_accounts_count(), num_accounts); - // Round 1: polls MAX_CONCURRENT_RPC_CALLS accounts, 1 remains → reschedule. - let mut runtime = TestCanisterRuntime::new().with_increasing_time(); + // Round 1: processes MAX_CONCURRENT_RPC_CALLS accounts (rescheduled into the future), + // detects 1 more remaining → sets timer. + let mut runtime = TestCanisterRuntime::new().add_times(0..); for _ in 0..MAX_CONCURRENT_RPC_CALLS { - runtime = runtime.add_stub_response(SignaturesResult::Consistent(Ok(vec![]))); + runtime = runtime.add_stub_response(empty_signatures_response()); } poll_monitored_addresses(runtime.clone()).await; - assert_eq!(monitored_accounts_count(), 1); + assert_eq!(monitored_accounts_count(), num_accounts); assert_eq!(runtime.set_timer_call_count(), 1); - // Round 2: polls the remaining 1 account → no reschedule, queue empty. + // Round 2: only the 1 unprocessed account is due → processes it, no set_timer. let runtime = TestCanisterRuntime::new() - .with_increasing_time() - .add_stub_response(SignaturesResult::Consistent(Ok(vec![]))); + .add_times(0..) + .add_stub_response(empty_signatures_response()); poll_monitored_addresses(runtime.clone()).await; - assert_eq!(monitored_accounts_count(), 0); + assert_eq!(monitored_accounts_count(), num_accounts); assert_eq!(runtime.set_timer_call_count(), 0); + } - // Verify StoppedMonitoringAccount was emitted for each account. - let mut events_assert = EventsAssert::from_recorded(); - for i in 0..num_accounts { - events_assert = - events_assert.expect_contains_event_eq(EventType::StoppedMonitoringAccount { - account: account(i), - }); + #[tokio::test] + async fn should_poll_signatures_for_address_with_exponential_backoff() { + setup(); + let t0 = 0u64; + + let account = account(1); + let runtime = TestCanisterRuntime::new().add_times([t0, t0, t0]); + update_balance(&runtime, account).unwrap(); + + // MAX_GET_SIGNATURES_CALLS polls: 1, 2, 4, 8, ..., 512 minutes apart. + for _ in 0..MAX_GET_SIGNATURES_CALLS { + let next_poll_at = CacheAssert::for_account(account).scheduled_at(); + + EventsAssert::from_recorded() + .expect_event_eq(EventType::StartedMonitoringAccount { account }) + .assert_no_more_events(); + + // Just before the scheduled time: no RPC call. + let runtime = TestCanisterRuntime::new().add_times([next_poll_at - 1]); + poll_monitored_addresses(runtime).await; + + // At the scheduled time: one `getSignaturesForAddress` call. + let runtime = TestCanisterRuntime::new() + .add_times([next_poll_at; 3]) + .add_stub_response(empty_signatures_response()); + poll_monitored_addresses(runtime).await; } + + EventsAssert::from_recorded() + .expect_event_eq(EventType::StartedMonitoringAccount { account }) + .expect_event_eq(EventType::StoppedMonitoringAccount { account }) + .assert_no_more_events(); } fn setup() { + reset_state(); + reset_events(); + reset_automatic_deposit_cache(); init_state(); init_schnorr_master_key(); } } + +fn empty_signatures_response() -> SignaturesResult { + MultiRpcResult::Consistent(Ok(vec![])) +} + +fn monitored_accounts_count() -> usize { + read_state(|s| s.monitored_accounts().len()) +} + +fn start_monitoring_max_number_of_accounts() { + let runtime = TestCanisterRuntime::new().add_times(0..); + for i in 0..MAX_MONITORED_ACCOUNTS { + update_balance(&runtime, account(i)).unwrap(); + } +} + +/// Sets a cache entry with the given `next_poll_at`, `sig_calls_remaining`, and +/// `next_backoff_delay_mins`. `tx_calls_remaining` is set to `MAX_RETRIEVED_TRANSACTIONS`. +fn set_cache_entry(account: Account, next_poll_at: u64, sig_calls: u32, backoff_mins: u64) { + with_automatic_deposit_cache_mut(|cache| { + cache.insert( + account, + next_poll_at, + AutomaticDepositCacheEntry { + sig_calls_remaining: sig_calls, + tx_calls_remaining: MAX_RETRIEVED_TRANSACTIONS, + next_backoff_delay_mins: backoff_mins, + }, + ); + }); +} + +#[derive(Debug, PartialEq)] +struct CacheAssert { + account: Account, + next_poll_at: Option, + entry: AutomaticDepositCacheEntry, +} + +impl CacheAssert { + fn for_account(account: Account) -> Self { + let (next_poll_at, entry) = with_automatic_deposit_cache(|cache| { + cache.get_with_index(&account).map(|(t, entry)| { + let next_poll_at = if t == u64::MAX { None } else { Some(t) }; + (next_poll_at, entry) + }) + }) + .unwrap_or_else(|| panic!("No cache entry for account {account:?}")); + Self { + account, + next_poll_at, + entry, + } + } + + /// Returns the scheduled poll time, panicking if the account is stopped/exhausted. + fn scheduled_at(self) -> u64 { + self.next_poll_at.unwrap_or_else(|| { + panic!( + "Account {:?} is stopped/exhausted (no scheduled poll time)", + self.account + ) + }) + } + + /// Asserts that the account is scheduled exactly `delay_mins` minutes from time 0. + fn next_poll_at_mins(self, delay_mins: u64) -> Self { + let expected = Duration::from_mins(delay_mins).as_nanos() as u64; + assert_eq!( + self.next_poll_at, + Some(expected), + "next_poll_at mismatch for account {:?}", + self.account + ); + self + } + + fn sig_calls(self, expected: u32) -> Self { + assert_eq!( + self.entry.sig_calls_remaining, expected, + "sig_calls_remaining mismatch for account {:?}", + self.account + ); + self + } + + fn tx_calls(self, expected: u32) -> Self { + assert_eq!( + self.entry.tx_calls_remaining, expected, + "tx_calls_remaining mismatch for account {:?}", + self.account + ); + self + } + + fn backoff_mins(self, expected: u64) -> Self { + assert_eq!( + self.entry.next_backoff_delay_mins, expected, + "next_backoff_delay_mins mismatch for account {:?}", + self.account + ); + self + } +} diff --git a/minter/src/storage/mod.rs b/minter/src/storage/mod.rs index d59d3093..bf341110 100644 --- a/minter/src/storage/mod.rs +++ b/minter/src/storage/mod.rs @@ -1,4 +1,5 @@ use crate::{ + deposit::automatic::cache::AutomaticDepositCache, runtime::CanisterRuntime, state::event::{Event, EventType}, }; @@ -30,6 +31,12 @@ thread_local! { ) ); + /// Heap-memory cache for per-account automated deposit discovery state. + /// Reset on canister upgrade; does not need to survive upgrades since it can + /// be reconstructed by replaying the RPC calls. + static AUTOMATIC_DEPOSIT_CACHE: RefCell = + RefCell::new(AutomaticDepositCache::default()); + static UNSTABLE_METRICS: RefCell = const { RefCell::new(Metrics::new()) }; } @@ -46,6 +53,20 @@ impl Metrics { } } +pub fn with_automatic_deposit_cache(f: F) -> R +where + F: FnOnce(&AutomaticDepositCache) -> R, +{ + AUTOMATIC_DEPOSIT_CACHE.with(|c| f(&c.borrow())) +} + +pub fn with_automatic_deposit_cache_mut(f: F) -> R +where + F: FnOnce(&mut AutomaticDepositCache) -> R, +{ + AUTOMATIC_DEPOSIT_CACHE.with(|c| f(&mut c.borrow_mut())) +} + /// Appends the event to the event log. pub fn record_event(payload: EventType, runtime: &R) { EVENTS @@ -95,3 +116,10 @@ pub(crate) fn reset_events() { }); }); } + +#[cfg(test)] +pub fn reset_automatic_deposit_cache() { + AUTOMATIC_DEPOSIT_CACHE.with(|cache| { + *cache.borrow_mut() = AutomaticDepositCache::default(); + }); +} diff --git a/minter/src/test_fixtures/runtime.rs b/minter/src/test_fixtures/runtime.rs index 9cd8e3f5..8f5e50ed 100644 --- a/minter/src/test_fixtures/runtime.rs +++ b/minter/src/test_fixtures/runtime.rs @@ -47,11 +47,10 @@ impl TestCanisterRuntime { pub fn add_times(mut self, times: I) -> Self where - I: IntoIterator, + I: IntoIterator + 'static, + I::IntoIter: Send + 'static, { - for time in times { - self.times = self.times.add(time); - } + self.times = self.times.chain(times); self } diff --git a/minter/src/utils/insertion_ordered_map/mod.rs b/minter/src/utils/insertion_ordered_map/mod.rs index 24720b7b..a074db71 100644 --- a/minter/src/utils/insertion_ordered_map/mod.rs +++ b/minter/src/utils/insertion_ordered_map/mod.rs @@ -1,15 +1,12 @@ -use std::collections::BTreeMap; +use crate::utils::sorted_key_map::{self, SortedKeyMap}; #[cfg(test)] mod tests; /// A map that preserves insertion order while providing O(log n) key lookup. /// -/// Internally uses two `BTreeMap`s: -/// - `entries`: key → (sequence number, value) -/// - `order`: sequence number → key -/// -/// Iteration via [`iter`], [`keys`], and [`values`] is in insertion order (oldest first). +/// Backed by a [`SortedKeyMap`] keyed on an auto-incrementing sequence number, +/// so iteration via [`iter`], [`keys`], and [`values`] is in insertion order (oldest first). /// [`DoubleEndedIterator`] is supported on [`Iter`], so callers can call `.rev()` on /// [`iter`] to get newest-first. /// @@ -17,16 +14,14 @@ mod tests; /// [`keys`]: InsertionOrderedMap::keys /// [`values`]: InsertionOrderedMap::values pub struct InsertionOrderedMap { - entries: BTreeMap, - order: BTreeMap, + inner: SortedKeyMap, next_seq: u64, } impl InsertionOrderedMap { pub fn new() -> Self { Self { - entries: BTreeMap::new(), - order: BTreeMap::new(), + inner: SortedKeyMap::new(), next_seq: 0, } } @@ -34,51 +29,35 @@ impl InsertionOrderedMap { /// Inserts a key-value pair. Returns the old value if the key was already present /// (and moves it to the end of the insertion order). pub fn insert(&mut self, key: K, value: V) -> Option { - let old_value = if let Some((old_seq, old_val)) = self.entries.remove(&key) { - self.order.remove(&old_seq); - Some(old_val) - } else { - None - }; let seq = self.next_seq; self.next_seq += 1; - self.order.insert(seq, key.clone()); - self.entries.insert(key, (seq, value)); - old_value + self.inner.insert(key, seq, value) } /// Removes a key and returns its value if it was present. pub fn remove(&mut self, key: &K) -> Option { - if let Some((seq, value)) = self.entries.remove(key) { - self.order.remove(&seq); - Some(value) - } else { - None - } + self.inner.remove(key) } pub fn get(&self, key: &K) -> Option<&V> { - self.entries.get(key).map(|(_, v)| v) + self.inner.get(key) } pub fn contains_key(&self, key: &K) -> bool { - self.entries.contains_key(key) + self.inner.contains_key(key) } pub fn len(&self) -> usize { - self.entries.len() + self.inner.len() } pub fn is_empty(&self) -> bool { - self.entries.is_empty() + self.inner.is_empty() } /// Returns an iterator over `(&K, &V)` pairs in insertion order. pub fn iter(&self) -> Iter<'_, K, V> { - Iter { - order_iter: self.order.values(), - entries: &self.entries, - } + Iter(self.inner.iter()) } /// Returns an iterator over keys in insertion order. @@ -93,7 +72,7 @@ impl InsertionOrderedMap { /// Returns a mutable iterator over values. Iteration order is unspecified. pub fn values_mut(&mut self) -> impl Iterator + '_ { - self.entries.values_mut().map(|(_, v)| v) + self.inner.values_mut() } } @@ -130,26 +109,19 @@ impl<'a, K: Ord + Clone, V> IntoIterator for &'a InsertionOrderedMap { // --- Iterator types --- -pub struct Iter<'a, K, V> { - order_iter: std::collections::btree_map::Values<'a, u64, K>, - entries: &'a BTreeMap, -} +pub struct Iter<'a, K, V>(sorted_key_map::Iter<'a, K, u64, V>); impl<'a, K: Ord, V> Iterator for Iter<'a, K, V> { type Item = (&'a K, &'a V); fn next(&mut self) -> Option { - let key = self.order_iter.next()?; - let (_, value) = self.entries.get(key)?; - Some((key, value)) + self.0.next().map(|(_, k, v)| (k, v)) } } impl<'a, K: Ord, V> DoubleEndedIterator for Iter<'a, K, V> { fn next_back(&mut self) -> Option { - let key = self.order_iter.next_back()?; - let (_, value) = self.entries.get(key)?; - Some((key, value)) + self.0.next_back().map(|(_, k, v)| (k, v)) } } diff --git a/minter/src/utils/mod.rs b/minter/src/utils/mod.rs index cc87b6c5..927c6f4c 100644 --- a/minter/src/utils/mod.rs +++ b/minter/src/utils/mod.rs @@ -1 +1,2 @@ pub mod insertion_ordered_map; +pub mod sorted_key_map; diff --git a/minter/src/utils/sorted_key_map/mod.rs b/minter/src/utils/sorted_key_map/mod.rs new file mode 100644 index 00000000..d34cfd87 --- /dev/null +++ b/minter/src/utils/sorted_key_map/mod.rs @@ -0,0 +1,126 @@ +use std::collections::BTreeMap; + +#[cfg(test)] +mod tests; + +/// A map with a user-supplied sort index, backed by two `BTreeMap`s. +/// +/// Two `BTreeMap`s are kept in sync: +/// - `by_key`: primary store, key → (index, value) +/// - `by_index`: secondary index, (index, key) → (), drives [`iter`] in ascending index order +/// +/// Unlike a secondary index keyed only by `I`, composite `(I, K)` keys allow +/// multiple entries to share the same index value (e.g. several accounts scheduled +/// at the same timestamp). +/// +/// [`iter`]: SortedKeyMap::iter +pub struct SortedKeyMap { + by_key: BTreeMap, + by_index: BTreeMap<(I, K), ()>, +} + +impl Default for SortedKeyMap { + fn default() -> Self { + Self { + by_key: BTreeMap::new(), + by_index: BTreeMap::new(), + } + } +} + +impl SortedKeyMap { + pub fn new() -> Self { + Self::default() + } + + /// Inserts or updates an entry, atomically updating the sort index. + /// Returns the old value if the key was already present. + pub fn insert(&mut self, key: K, index: I, value: V) -> Option { + let old_value = if let Some((old_index, old_val)) = self.by_key.remove(&key) { + self.by_index.remove(&(old_index, key.clone())); + Some(old_val) + } else { + None + }; + self.by_index.insert((index.clone(), key.clone()), ()); + self.by_key.insert(key, (index, value)); + old_value + } + + /// Removes a key and returns its value if present. + pub fn remove(&mut self, key: &K) -> Option { + if let Some((index, value)) = self.by_key.remove(key) { + self.by_index.remove(&(index, key.clone())); + Some(value) + } else { + None + } + } + + /// Returns a reference to the value for the given key. + pub fn get(&self, key: &K) -> Option<&V> { + self.by_key.get(key).map(|(_, v)| v) + } + + /// Returns references to the index and value for the given key. + pub fn get_with_index(&self, key: &K) -> Option<(&I, &V)> { + self.by_key.get(key).map(|(i, v)| (i, v)) + } + + pub fn contains_key(&self, key: &K) -> bool { + self.by_key.contains_key(key) + } + + /// Iterates `(index, key, value)` triples in ascending index order. + /// + /// Entries with equal index values are ordered by key. + pub fn iter(&self) -> Iter<'_, K, I, V> { + Iter { + index_iter: self.by_index.keys(), + by_key: &self.by_key, + } + } + + /// Returns a mutable iterator over values. Iteration order is unspecified. + pub fn values_mut(&mut self) -> impl Iterator + '_ { + self.by_key.values_mut().map(|(_, v)| v) + } + + pub fn len(&self) -> usize { + self.by_key.len() + } + + pub fn is_empty(&self) -> bool { + self.by_key.is_empty() + } +} + +/// Iterator over `(index, key, value)` triples in ascending index order. +pub struct Iter<'a, K, I, V> { + index_iter: std::collections::btree_map::Keys<'a, (I, K), ()>, + by_key: &'a BTreeMap, +} + +impl<'a, K: Ord, I: Ord, V> Iterator for Iter<'a, K, I, V> { + type Item = (&'a I, &'a K, &'a V); + + fn next(&mut self) -> Option { + let (i, k) = self.index_iter.next()?; + let (_, v) = self + .by_key + .get(k) + .expect("by_index and by_key must be in sync"); + Some((i, k, v)) + } +} + +impl<'a, K: Ord, I: Ord, V> DoubleEndedIterator for Iter<'a, K, I, V> { + fn next_back(&mut self) -> Option { + let (i, k) = self.index_iter.next_back()?; + let (_, v) = self + .by_key + .get(k) + .expect("by_index and by_key must be in sync"); + Some((i, k, v)) + } +} diff --git a/minter/src/utils/sorted_key_map/tests.rs b/minter/src/utils/sorted_key_map/tests.rs new file mode 100644 index 00000000..5fd8cec6 --- /dev/null +++ b/minter/src/utils/sorted_key_map/tests.rs @@ -0,0 +1,172 @@ +use super::SortedKeyMap; + +mod insert { + use super::*; + + #[test] + fn should_insert_new_entry() { + let mut map: SortedKeyMap = SortedKeyMap::new(); + assert_eq!(map.insert(1, 10, "a"), None); + assert_eq!(map.len(), 1); + assert_eq!(map.get(&1), Some(&"a")); + } + + #[test] + fn should_return_old_value_on_reinsertion() { + let mut map: SortedKeyMap = SortedKeyMap::new(); + map.insert(1, 10, "a"); + assert_eq!(map.insert(1, 20, "b"), Some("a")); + assert_eq!(map.get(&1), Some(&"b")); + assert_eq!(map.len(), 1); + } + + #[test] + fn should_update_index_on_reinsertion() { + let mut map: SortedKeyMap = SortedKeyMap::new(); + map.insert(1, 10, "a"); + map.insert(2, 20, "b"); + map.insert(1, 30, "a2"); // re-insert with new index + + let triples: Vec<_> = map.iter().map(|(i, k, v)| (*i, *k, *v)).collect(); + assert_eq!(triples, vec![(20, 2, "b"), (30, 1, "a2")]); + } +} + +mod remove { + use super::*; + + #[test] + fn should_remove_existing_key() { + let mut map: SortedKeyMap = SortedKeyMap::new(); + map.insert(1, 10, "a"); + assert_eq!(map.remove(&1), Some("a")); + assert!(map.is_empty()); + } + + #[test] + fn should_return_none_for_absent_key() { + let mut map: SortedKeyMap = SortedKeyMap::new(); + assert_eq!(map.remove(&99), None); + } + + #[test] + fn should_preserve_order_of_remaining_entries() { + let mut map: SortedKeyMap = SortedKeyMap::new(); + map.insert(1, 10, "a"); + map.insert(2, 20, "b"); + map.insert(3, 30, "c"); + map.remove(&2); + + let keys: Vec<_> = map.iter().map(|(_, k, _)| *k).collect(); + assert_eq!(keys, vec![1, 3]); + } +} + +mod get { + use super::*; + + #[test] + fn should_return_value_for_present_key() { + let mut map: SortedKeyMap = SortedKeyMap::new(); + map.insert(42, 0, "hello"); + assert_eq!(map.get(&42), Some(&"hello")); + } + + #[test] + fn should_return_none_for_absent_key() { + let map: SortedKeyMap = SortedKeyMap::new(); + assert_eq!(map.get(&1), None); + } +} + +mod get_with_index { + use super::*; + + #[test] + fn should_return_index_and_value() { + let mut map: SortedKeyMap = SortedKeyMap::new(); + map.insert(1, 42, "foo"); + assert_eq!(map.get_with_index(&1), Some((&42u64, &"foo"))); + } + + #[test] + fn should_return_none_for_absent_key() { + let map: SortedKeyMap = SortedKeyMap::new(); + assert_eq!(map.get_with_index(&1), None); + } +} + +mod contains_key { + use super::*; + + #[test] + fn should_return_true_for_present_key() { + let mut map: SortedKeyMap = SortedKeyMap::new(); + map.insert(1, 0, "a"); + assert!(map.contains_key(&1)); + } + + #[test] + fn should_return_false_for_absent_key() { + let map: SortedKeyMap = SortedKeyMap::new(); + assert!(!map.contains_key(&1)); + } +} + +mod iter { + use super::*; + + #[test] + fn should_iterate_in_ascending_index_order() { + let mut map: SortedKeyMap = SortedKeyMap::new(); + map.insert(3, 30, "c"); + map.insert(1, 10, "a"); + map.insert(2, 20, "b"); + + let triples: Vec<_> = map.iter().map(|(i, k, v)| (*i, *k, *v)).collect(); + assert_eq!(triples, vec![(10, 1, "a"), (20, 2, "b"), (30, 3, "c")]); + } + + #[test] + fn should_order_equal_indices_by_key() { + let mut map: SortedKeyMap = SortedKeyMap::new(); + map.insert(2, 10, "b"); + map.insert(1, 10, "a"); // same index as key 2 + + let triples: Vec<_> = map.iter().map(|(i, k, v)| (*i, *k, *v)).collect(); + assert_eq!(triples, vec![(10, 1, "a"), (10, 2, "b")]); + } + + #[test] + fn should_support_reverse_iteration() { + let mut map: SortedKeyMap = SortedKeyMap::new(); + map.insert(1, 10, "a"); + map.insert(2, 20, "b"); + map.insert(3, 30, "c"); + + let keys: Vec<_> = map.iter().rev().map(|(_, k, _)| *k).collect(); + assert_eq!(keys, vec![3, 2, 1]); + } +} + +mod len_and_is_empty { + use super::*; + + #[test] + fn should_be_empty_when_new() { + let map: SortedKeyMap = SortedKeyMap::new(); + assert!(map.is_empty()); + assert_eq!(map.len(), 0); + } + + #[test] + fn should_track_len_through_inserts_and_removes() { + let mut map: SortedKeyMap = SortedKeyMap::new(); + map.insert(1, 10, "a"); + map.insert(2, 20, "b"); + assert_eq!(map.len(), 2); + map.remove(&1); + assert_eq!(map.len(), 1); + assert!(!map.is_empty()); + } +}