From dd314a6de08a62d1336561461b2584d909163f52 Mon Sep 17 00:00:00 2001 From: Louis Pahlavi Date: Tue, 21 Apr 2026 17:55:09 +0200 Subject: [PATCH 1/5] feat: exponential backoff polling for monitored accounts MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add StableSortKeyMap-backed AutomaticDepositCache (stable memory, survives upgrades) - Implement exponential backoff: 1, 2, 4, …, 512 min (10 polls = 1023 min max) - Stop monitoring after MAX_GET_SIGNATURES_FOR_ADDRESS_CALLS (10); emit StoppedMonitoringAccount - Retain stopped entries at next_poll_at=u64::MAX so they are never rescheduled - Rename AutomaticDepositCache entry struct → AutomaticDepositCacheEntry - Add proptest storable round-trip for AutomaticDepositCacheEntry - Consolidate lifecycle tests into should_follow_full_polling_lifecycle (all 10 polls) - Move arb_cache_entry to test_fixtures::arb Co-Authored-By: Claude Sonnet 4.6 --- integration_tests/tests/tests.rs | 26 ++- minter/src/deposit/automatic/cache/mod.rs | 49 +++++ minter/src/deposit/automatic/cache/tests.rs | 13 ++ minter/src/deposit/automatic/mod.rs | 101 +++++++--- minter/src/deposit/automatic/tests.rs | 81 +++++++-- minter/src/storage/mod.rs | 38 ++++ minter/src/test_fixtures/mod.rs | 9 + minter/src/utils/mod.rs | 1 + minter/src/utils/stable_sort_key_map/mod.rs | 102 +++++++++++ minter/src/utils/stable_sort_key_map/tests.rs | 172 ++++++++++++++++++ 10 files changed, 544 insertions(+), 48 deletions(-) create mode 100644 minter/src/deposit/automatic/cache/mod.rs create mode 100644 minter/src/deposit/automatic/cache/tests.rs create mode 100644 minter/src/utils/stable_sort_key_map/mod.rs create mode 100644 minter/src/utils/stable_sort_key_map/tests.rs diff --git a/integration_tests/tests/tests.rs b/integration_tests/tests/tests.rs index 93c1f850..42d764c5 100644 --- a/integration_tests/tests/tests.rs +++ b/integration_tests/tests/tests.rs @@ -5,7 +5,7 @@ use cksol_int_tests::{ CkSolMinter, Setup, SetupBuilder, fixtures::{ DEFAULT_CALLER_ACCOUNT, DEFAULT_CALLER_DEPOSIT_ADDRESS, DEPOSIT_AMOUNT, - EXPECTED_MINT_AMOUNT, MockBuilder, SharedMockHttpOutcalls, + EXPECTED_MINT_AMOUNT, MockBuilder, NUM_RPC_PROVIDERS, SharedMockHttpOutcalls, default_get_deposit_address_args, default_process_deposit_args, default_update_balance_args, deposit_transaction_signature, }, @@ -1320,15 +1320,21 @@ mod automated_deposit_flow_tests { })); }); - // Advance time: the minter should poll getSignaturesForAddress once, then remove the account. - setup.advance_time(POLL_MONITORED_ADDRESSES_DELAY).await; - setup - .execute_http_mocks( - MockBuilder::with_start_id(0) - .get_signatures_for_address(vec![]) - .build(), - ) - .await; + // Advance time through all 10 polls with exponential backoff (1, 2, 4, ..., 512 minutes). + let mut delay = POLL_MONITORED_ADDRESSES_DELAY; + let mut start_id = 0u64; + for _ in 0..10 { + setup.advance_time(delay).await; + setup + .execute_http_mocks( + MockBuilder::with_start_id(start_id) + .get_signatures_for_address(vec![]) + .build(), + ) + .await; + start_id += NUM_RPC_PROVIDERS; + delay *= 2; + } minter.assert_that_events().await.satisfy(|events| { check!(events.iter().any(|e| { diff --git a/minter/src/deposit/automatic/cache/mod.rs b/minter/src/deposit/automatic/cache/mod.rs new file mode 100644 index 00000000..db6e0b21 --- /dev/null +++ b/minter/src/deposit/automatic/cache/mod.rs @@ -0,0 +1,49 @@ +use crate::utils::stable_sort_key_map::StableSortKeyMap; +use ic_stable_structures::{Storable, storable::Bound}; +use icrc_ledger_types::icrc1::account::Account; +use minicbor::{Decode, Encode}; +use std::borrow::Cow; + +#[cfg(test)] +mod tests; + +/// Per-account state for automated deposit discovery. +/// +/// This cache is intentionally separate from the event log: it can be fully +/// reconstructed by redoing the `getSignaturesForAddress` HTTP outcalls, so +/// there is no need to replay events to restore it. +#[derive(Clone, Debug, Default, PartialEq, Encode, Decode)] +pub struct AutomaticDepositCacheEntry { + /// The number of `getSignaturesForAddress` calls made so far for this account. + #[n(0)] + pub get_signatures_calls: u8, +} + +impl Storable for AutomaticDepositCacheEntry { + fn to_bytes(&self) -> Cow<'_, [u8]> { + let mut buf = vec![]; + minicbor::encode(self, &mut buf) + .expect("AutomaticDepositCacheEntry encoding should succeed"); + Cow::Owned(buf) + } + + fn from_bytes(bytes: Cow<[u8]>) -> Self { + minicbor::decode(bytes.as_ref()).unwrap_or_else(|e| { + panic!( + "failed to decode AutomaticDepositCacheEntry: {e} (bytes: {})", + hex::encode(bytes) + ) + }) + } + + const BOUND: Bound = Bound::Unbounded; +} + +/// Map from `Account` to `AutomaticDepositCacheEntry`, indexed by `next_poll_at` +/// timestamp for ordered iteration. The poll time is stored alongside the cache entry +/// inside the map (not in `AutomaticDepositCacheEntry` itself), analogous to how +/// `InsertionOrderedMap` stores the sequence number alongside the value. +/// +/// Accounts that have been stopped from monitoring are stored with index `u64::MAX` +/// so they are retained in the map but never returned by `iter_by_index_up_to`. +pub type AutomaticDepositCache = StableSortKeyMap; diff --git a/minter/src/deposit/automatic/cache/tests.rs b/minter/src/deposit/automatic/cache/tests.rs new file mode 100644 index 00000000..e6d149db --- /dev/null +++ b/minter/src/deposit/automatic/cache/tests.rs @@ -0,0 +1,13 @@ +use super::*; +use crate::test_fixtures::arb::arb_cache_entry; +use ic_stable_structures::Storable; +use proptest::{prop_assert_eq, proptest}; + +proptest! { + #[test] + fn storable_roundtrip(entry in arb_cache_entry()) { + let bytes = entry.to_bytes(); + let restored = AutomaticDepositCacheEntry::from_bytes(bytes); + prop_assert_eq!(entry, restored); + } +} diff --git a/minter/src/deposit/automatic/mod.rs b/minter/src/deposit/automatic/mod.rs index 5454c9c8..edfea00a 100644 --- a/minter/src/deposit/automatic/mod.rs +++ b/minter/src/deposit/automatic/mod.rs @@ -8,7 +8,9 @@ use crate::{ SchnorrPublicKey, TaskType, audit::process_event, event::EventType, mutate_state, read_state, }, + storage::{with_automatic_deposit_cache, with_automatic_deposit_cache_mut}, }; +use cache::AutomaticDepositCacheEntry; use canlog::log; use cksol_types::UpdateBalanceError; use cksol_types_internal::log::Priority; @@ -16,6 +18,8 @@ use icrc_ledger_types::icrc1::account::Account; use sol_rpc_types::{CommitmentLevel, GetSignaturesForAddressParams}; use std::time::Duration; +pub(crate) mod cache; + #[cfg(test)] mod tests; @@ -26,7 +30,11 @@ pub const MAX_MONITORED_ACCOUNTS: usize = 100; pub const POLL_MONITORED_ADDRESSES_DELAY: Duration = Duration::from_mins(1); /// Maximum number of `getTransaction` calls to make per polled account. -pub const MAX_TRANSACTIONS_PER_ACCOUNT: usize = 10; +pub const MAX_GET_TRANSACTION_CALLS: usize = 5; + +/// Maximum number of `getSignaturesForAddress` calls to make per monitored account before stopping. +/// The delays follow an exponential backoff: 1, 2, 4, ..., 512 minutes (1023 minutes total). +pub const MAX_GET_SIGNATURES_FOR_ADDRESS_CALLS: u8 = 10; /// Registers the given account for automated deposit monitoring. /// @@ -52,41 +60,57 @@ pub fn update_balance( ); }); + // Schedule first poll in 2^0 = 1 minute. + let next_poll_at = runtime.time() + Duration::from_mins(1).as_nanos() as u64; + with_automatic_deposit_cache_mut(|cache| { + cache.insert(account, next_poll_at, AutomaticDepositCacheEntry::default()); + }); + Ok(()) } -/// Polls all monitored addresses for new deposit transaction signatures. +/// Polls all monitored addresses that are due for a check. /// -/// For each address, calls `getSignaturesForAddress` on the Solana RPC. +/// For each due address, calls `getSignaturesForAddress` on the Solana RPC. +/// After each call, reschedules the account with exponential backoff, or emits +/// `StoppedMonitoringAccount` if `MAX_GET_SIGNATURES_FOR_ADDRESS_CALLS` has been reached. pub async fn poll_monitored_addresses(runtime: R) { let _guard = match TimerGuard::new(TaskType::PollMonitoredAddresses) { Ok(guard) => guard, Err(_) => return, }; - let all_accounts: Vec = - read_state(|s| s.monitored_accounts().iter().copied().collect()); - if all_accounts.is_empty() { + let now = runtime.time(); + + // Early return if no accounts are scheduled or the earliest isn't due yet. + if !with_automatic_deposit_cache(|cache| cache.peek().is_some_and(|(t, _)| t <= now)) { return; } - let more_to_process = all_accounts.len() > MAX_CONCURRENT_RPC_CALLS; + let due: Vec<(Account, u8)> = with_automatic_deposit_cache(|cache| { + cache + .iter_by_index_up_to(&now) + .map(|(account, entry)| (account, entry.get_signatures_calls)) + // +1 to detect whether more accounts remain after this round. + .take(MAX_CONCURRENT_RPC_CALLS + 1) + .collect() + }); + + let more_to_process = due.len() > MAX_CONCURRENT_RPC_CALLS; let reschedule = scopeguard::guard(runtime.clone(), |runtime| { runtime.set_timer(Duration::ZERO, poll_monitored_addresses); }); let master_key = lazy_get_schnorr_master_key(&runtime).await; - futures::future::join_all( - all_accounts - .into_iter() - .take(MAX_CONCURRENT_RPC_CALLS) - .map(|account| poll_account(&runtime, &master_key, account)), - ) + futures::future::join_all(due.into_iter().take(MAX_CONCURRENT_RPC_CALLS).map( + |(account, get_signatures_calls)| { + poll_account(&runtime, &master_key, account, get_signatures_calls) + }, + )) .await; if !more_to_process { - // All work fits in this round scopeguard::ScopeGuard::into_inner(reschedule); } } @@ -95,19 +119,20 @@ async fn poll_account( runtime: &R, master_key: &SchnorrPublicKey, account: Account, + get_signatures_calls: u8, ) { let deposit_address = account_address(master_key, &account); let params = GetSignaturesForAddressParams { pubkey: deposit_address.into(), commitment: Some(CommitmentLevel::Finalized), - min_context_slot: None, // Fetch no more signatures than we intend to process with `getTransaction`. limit: Some( - (MAX_TRANSACTIONS_PER_ACCOUNT as u32) + (MAX_GET_TRANSACTION_CALLS as u32) .try_into() - .expect("MAX_TRANSACTIONS_PER_ACCOUNT must be between 1 and 1000"), + .expect("MAX_GET_TRANSACTION_CALLS must be between 1 and 1000"), ), + min_context_slot: None, before: None, until: None, }; @@ -124,11 +149,41 @@ async fn poll_account( } } - mutate_state(|state| { - process_event( - state, - EventType::StoppedMonitoringAccount { account }, - runtime, + let get_signatures_calls = get_signatures_calls + 1; + if get_signatures_calls >= MAX_GET_SIGNATURES_FOR_ADDRESS_CALLS { + // Use u64::MAX so the entry is retained but never scheduled for another poll. + with_automatic_deposit_cache_mut(|cache| { + cache.insert( + account, + u64::MAX, + AutomaticDepositCacheEntry { + get_signatures_calls, + }, + ); + }); + mutate_state(|state| { + process_event( + state, + EventType::StoppedMonitoringAccount { account }, + runtime, + ); + }); + log!( + Priority::Info, + "Stopped monitoring {deposit_address}: reached maximum getSignaturesForAddress calls ({MAX_GET_SIGNATURES_FOR_ADDRESS_CALLS})" ); - }); + } else { + // Exponential backoff: delay before next poll is 2^get_signatures_calls minutes. + let delay = Duration::from_mins(1u64 << get_signatures_calls); + let next_poll_at = runtime.time() + delay.as_nanos() as u64; + with_automatic_deposit_cache_mut(|cache| { + cache.insert( + account, + next_poll_at, + AutomaticDepositCacheEntry { + get_signatures_calls, + }, + ); + }); + } } diff --git a/minter/src/deposit/automatic/tests.rs b/minter/src/deposit/automatic/tests.rs index 618759df..c2e0dbd6 100644 --- a/minter/src/deposit/automatic/tests.rs +++ b/minter/src/deposit/automatic/tests.rs @@ -1,7 +1,8 @@ use super::*; use crate::{ constants::MAX_CONCURRENT_RPC_CALLS, - state::{event::EventType, read_state}, + state::{event::EventType, read_state, reset_state}, + storage::{reset_automatic_deposit_cache, reset_events, with_automatic_deposit_cache}, test_fixtures::{ EventsAssert, account, events::start_monitoring_account, init_schnorr_master_key, init_state, runtime::TestCanisterRuntime, @@ -9,12 +10,27 @@ use crate::{ }; use sol_rpc_types::{ConfirmedTransactionStatusWithSignature, MultiRpcResult}; +const ONE_MIN_NS: u64 = Duration::from_mins(1).as_nanos() as u64; + type SignaturesResult = MultiRpcResult>; +fn empty_signatures_response() -> SignaturesResult { + MultiRpcResult::Consistent(Ok(vec![])) +} + fn monitored_accounts_count() -> usize { read_state(|s| s.monitored_accounts().len()) } +fn cache_entry(account: Account) -> Option<(Option, AutomaticDepositCacheEntry)> { + with_automatic_deposit_cache(|cache| { + cache.get_with_index(&account).map(|(t, entry)| { + let next_poll_at = if t == u64::MAX { None } else { Some(t) }; + (next_poll_at, entry) + }) + }) +} + fn start_monitoring_max_number_of_accounts() { for i in 0..MAX_MONITORED_ACCOUNTS { start_monitoring_account(account(i)); @@ -34,6 +50,7 @@ mod update_balance { assert!(read_state(|s| s.monitored_accounts().contains(&account(1)))); assert_eq!(monitored_accounts_count(), 1); + assert!(cache_entry(account(1)).is_some()); EventsAssert::from_recorded() .expect_event_eq(EventType::StartedMonitoringAccount { @@ -95,43 +112,77 @@ mod poll_monitored_addresses { async fn should_poll_monitored_addresses_in_rounds() { setup(); - // Add MAX_CONCURRENT_RPC_CALLS + 1 accounts to monitor so that 2 rounds are needed. + // Seed MAX_CONCURRENT_RPC_CALLS + 1 accounts, all due immediately. let num_accounts = MAX_CONCURRENT_RPC_CALLS + 1; for i in 0..num_accounts { start_monitoring_account(account(i)); + with_automatic_deposit_cache_mut(|cache| { + cache.insert(account(i), 0, AutomaticDepositCacheEntry::default()); + }); } assert_eq!(monitored_accounts_count(), num_accounts); - // Round 1: polls MAX_CONCURRENT_RPC_CALLS accounts, 1 remains → reschedule. + // Round 1: processes MAX_CONCURRENT_RPC_CALLS accounts (rescheduled into the future), + // detects 1 more remaining → sets timer. let mut runtime = TestCanisterRuntime::new().with_increasing_time(); for _ in 0..MAX_CONCURRENT_RPC_CALLS { - runtime = runtime.add_stub_response(SignaturesResult::Consistent(Ok(vec![]))); + runtime = runtime.add_stub_response(empty_signatures_response()); } poll_monitored_addresses(runtime.clone()).await; - assert_eq!(monitored_accounts_count(), 1); + // Accounts are rescheduled (not removed), so count stays the same. + assert_eq!(monitored_accounts_count(), num_accounts); assert_eq!(runtime.set_timer_call_count(), 1); - // Round 2: polls the remaining 1 account → no reschedule, queue empty. + // Round 2: only the 1 unprocessed account is due → processes it, no set_timer. let runtime = TestCanisterRuntime::new() .with_increasing_time() - .add_stub_response(SignaturesResult::Consistent(Ok(vec![]))); + .add_stub_response(empty_signatures_response()); poll_monitored_addresses(runtime.clone()).await; - assert_eq!(monitored_accounts_count(), 0); + assert_eq!(monitored_accounts_count(), num_accounts); assert_eq!(runtime.set_timer_call_count(), 0); + } - // Verify StoppedMonitoringAccount was emitted for each account. - let mut events_assert = EventsAssert::from_recorded(); - for i in 0..num_accounts { - events_assert = - events_assert.expect_contains_event_eq(EventType::StoppedMonitoringAccount { - account: account(i), - }); + #[tokio::test] + async fn should_poll_signatures_for_address_with_exponential_backoff() { + setup(); + let t0 = 0u64; + + let account = account(1); + let runtime = TestCanisterRuntime::new().add_times([t0, t0, t0]); + update_balance(&runtime, account).unwrap(); + + let mut next_poll_at = t0; + + for i in 0u8..MAX_GET_SIGNATURES_FOR_ADDRESS_CALLS { + next_poll_at += ONE_MIN_NS << i; + + EventsAssert::from_recorded() + .expect_event_eq(EventType::StartedMonitoringAccount { account }) + .assert_no_more_events(); + + // Just before next scheduled time: no JSON-RPC calls + let runtime = TestCanisterRuntime::new().add_times([next_poll_at - 1]); + poll_monitored_addresses(runtime).await; + + // Next scheduled time: one `getSignaturesForAddress` call + let runtime = TestCanisterRuntime::new() + .add_times([next_poll_at; 3]) + .add_stub_response(empty_signatures_response()); + poll_monitored_addresses(runtime).await; } + + EventsAssert::from_recorded() + .expect_event_eq(EventType::StartedMonitoringAccount { account }) + .expect_event_eq(EventType::StoppedMonitoringAccount { account }) + .assert_no_more_events(); } fn setup() { + reset_state(); + reset_events(); + reset_automatic_deposit_cache(); init_state(); init_schnorr_master_key(); } diff --git a/minter/src/storage/mod.rs b/minter/src/storage/mod.rs index d59d3093..c8ebba62 100644 --- a/minter/src/storage/mod.rs +++ b/minter/src/storage/mod.rs @@ -1,4 +1,5 @@ use crate::{ + deposit::automatic::cache::AutomaticDepositCache, runtime::CanisterRuntime, state::event::{Event, EventType}, }; @@ -10,6 +11,8 @@ use std::cell::RefCell; const EVENT_LOG_INDEX_MEMORY_ID: MemoryId = MemoryId::new(0); const EVENT_LOG_DATA_MEMORY_ID: MemoryId = MemoryId::new(1); +const AUTOMATIC_DEPOSIT_CACHE_BY_ACCOUNT_MEMORY_ID: MemoryId = MemoryId::new(2); +const AUTOMATIC_DEPOSIT_CACHE_BY_POLL_TIME_MEMORY_ID: MemoryId = MemoryId::new(3); type VMem = VirtualMemory; type EventLog = StableLog; @@ -30,6 +33,14 @@ thread_local! { ) ); + /// Stable-memory cache for per-account automated deposit discovery state. + static AUTOMATIC_DEPOSIT_CACHE: RefCell = MEMORY_MANAGER.with(|m| { + RefCell::new(AutomaticDepositCache::init( + m.borrow().get(AUTOMATIC_DEPOSIT_CACHE_BY_ACCOUNT_MEMORY_ID), + m.borrow().get(AUTOMATIC_DEPOSIT_CACHE_BY_POLL_TIME_MEMORY_ID), + )) + }); + static UNSTABLE_METRICS: RefCell = const { RefCell::new(Metrics::new()) }; } @@ -46,6 +57,20 @@ impl Metrics { } } +pub fn with_automatic_deposit_cache(f: F) -> R +where + F: FnOnce(&AutomaticDepositCache) -> R, +{ + AUTOMATIC_DEPOSIT_CACHE.with(|c| f(&c.borrow())) +} + +pub fn with_automatic_deposit_cache_mut(f: F) -> R +where + F: FnOnce(&mut AutomaticDepositCache) -> R, +{ + AUTOMATIC_DEPOSIT_CACHE.with(|c| f(&mut c.borrow_mut())) +} + /// Appends the event to the event log. pub fn record_event(payload: EventType, runtime: &R) { EVENTS @@ -95,3 +120,16 @@ pub(crate) fn reset_events() { }); }); } + +#[cfg(test)] +pub fn reset_automatic_deposit_cache() { + MEMORY_MANAGER.with(|m| { + AUTOMATIC_DEPOSIT_CACHE.with(|cache| { + *cache.borrow_mut() = AutomaticDepositCache::new( + m.borrow().get(AUTOMATIC_DEPOSIT_CACHE_BY_ACCOUNT_MEMORY_ID), + m.borrow() + .get(AUTOMATIC_DEPOSIT_CACHE_BY_POLL_TIME_MEMORY_ID), + ); + }); + }); +} diff --git a/minter/src/test_fixtures/mod.rs b/minter/src/test_fixtures/mod.rs index 16990393..cc1778f2 100644 --- a/minter/src/test_fixtures/mod.rs +++ b/minter/src/test_fixtures/mod.rs @@ -597,6 +597,15 @@ pub mod arb { (any::(), arb_event_type()) .prop_map(|(timestamp, payload)| Event { timestamp, payload }) } + + pub fn arb_cache_entry() + -> impl Strategy { + any::().prop_map(|get_signatures_calls| { + crate::deposit::automatic::cache::AutomaticDepositCacheEntry { + get_signatures_calls, + } + }) + } } pub mod deposit { diff --git a/minter/src/utils/mod.rs b/minter/src/utils/mod.rs index cc87b6c5..51766244 100644 --- a/minter/src/utils/mod.rs +++ b/minter/src/utils/mod.rs @@ -1 +1,2 @@ pub mod insertion_ordered_map; +pub mod stable_sort_key_map; diff --git a/minter/src/utils/stable_sort_key_map/mod.rs b/minter/src/utils/stable_sort_key_map/mod.rs new file mode 100644 index 00000000..14d46013 --- /dev/null +++ b/minter/src/utils/stable_sort_key_map/mod.rs @@ -0,0 +1,102 @@ +use ic_stable_structures::{ + DefaultMemoryImpl, StableBTreeMap, Storable, memory_manager::VirtualMemory, +}; + +type Memory = VirtualMemory; + +#[cfg(test)] +mod tests; + +/// A stable-memory map with a secondary sort index per entry. +/// +/// Two `StableBTreeMap`s are kept in sync: +/// - `by_key`: primary store, always contains every entry. +/// - `by_index`: drives [`peek`] and [`iter_by_index_up_to`]. +/// +// TODO: simplify value/key types to 2-tuples once ic-stable-structures supports 2-tuples +// with one unbounded element. +/// +/// [`peek`]: StableSortKeyMap::peek +/// [`iter_by_index_up_to`]: StableSortKeyMap::iter_by_index_up_to +/// [`get`]: StableSortKeyMap::get +pub struct StableSortKeyMap +where + K: Storable + Ord + Clone, + I: Storable + Ord + Clone, + V: Storable + Clone, +{ + by_key: StableBTreeMap, + by_index: StableBTreeMap<(I, K, ()), (), Memory>, +} + +impl StableSortKeyMap +where + K: Storable + Ord + Clone, + I: Storable + Ord + Clone, + V: Storable + Clone, +{ + /// Initializes the map from existing stable memory (used in production / post-upgrade). + pub fn init(by_key_mem: Memory, by_index_mem: Memory) -> Self { + Self { + by_key: StableBTreeMap::init(by_key_mem), + by_index: StableBTreeMap::init(by_index_mem), + } + } + + /// Creates an empty map in the given memory regions (used in tests). + pub fn new(by_key_mem: Memory, by_index_mem: Memory) -> Self { + Self { + by_key: StableBTreeMap::new(by_key_mem), + by_index: StableBTreeMap::new(by_index_mem), + } + } + + /// Returns the value for the given key, or `None` if absent. + pub fn get(&self, key: &K) -> Option { + self.by_key.get(key).map(|(_, v, _)| v) + } + + /// Returns the current index and value for the given key, or `None` if absent. + pub fn get_with_index(&self, key: &K) -> Option<(I, V)> { + self.by_key.get(key).map(|(i, v, _)| (i, v)) + } + + /// Inserts or updates an entry. + pub fn insert(&mut self, key: K, index: I, value: V) { + if let Some((old_index, _, _)) = self.by_key.get(&key) { + self.by_index.remove(&(old_index, key.clone(), ())); + } + self.by_index.insert((index.clone(), key.clone(), ()), ()); + self.by_key.insert(key, (index, value, ())); + } + + /// Returns the `(index, key)` of the entry with the smallest index, if any. + /// + /// O(log n). + pub fn peek(&self) -> Option<(I, K)> { + self.by_index.iter().next().map(|((i, k, _), _)| (i, k)) + } + + /// Iterates `(key, value)` pairs in ascending index order, stopping at + /// the first entry whose index exceeds `max` (inclusive bound). + pub fn iter_by_index_up_to<'a>(&'a self, max: &'a I) -> impl Iterator + 'a { + let by_key = &self.by_key; + self.by_index + .iter() + .take_while(move |((i, _, _), _)| i <= max) + .map(move |((_, k, _), _)| { + let (_, v, _) = by_key + .get(&k) + .expect("index and by_key map must be in sync"); + (k, v) + }) + } + + pub fn len(&self) -> usize { + self.by_key.len() as usize + } + + pub fn is_empty(&self) -> bool { + self.by_key.is_empty() + } +} diff --git a/minter/src/utils/stable_sort_key_map/tests.rs b/minter/src/utils/stable_sort_key_map/tests.rs new file mode 100644 index 00000000..4f29a73a --- /dev/null +++ b/minter/src/utils/stable_sort_key_map/tests.rs @@ -0,0 +1,172 @@ +use super::*; +use ic_stable_structures::{ + DefaultMemoryImpl, + memory_manager::{MemoryId, MemoryManager}, + storable::Bound, +}; +use std::borrow::Cow; + +// --- Test fixtures --- + +#[derive(Clone, PartialEq, Debug)] +struct Entry { + data: u32, +} + +impl Storable for Entry { + fn to_bytes(&self) -> Cow<'_, [u8]> { + Cow::Owned(self.data.to_be_bytes().to_vec()) + } + + fn from_bytes(bytes: Cow<[u8]>) -> Self { + Self { + data: u32::from_be_bytes(bytes[..4].try_into().unwrap()), + } + } + + const BOUND: Bound = Bound::Bounded { + max_size: 4, + is_fixed_size: true, + }; +} + +fn make_map() -> StableSortKeyMap { + let mm = MemoryManager::init(DefaultMemoryImpl::default()); + StableSortKeyMap::new(mm.get(MemoryId::new(0)), mm.get(MemoryId::new(1))) +} + +fn entry(data: u32) -> Entry { + Entry { data } +} + +// --- Tests --- + +mod insert { + use super::*; + + #[test] + fn should_store_value_in_by_key() { + let mut map = make_map(); + map.insert(1, 100, entry(42)); + assert_eq!(map.get(&1), Some(entry(42))); + assert_eq!(map.len(), 1); + } + + #[test] + fn should_index_entry() { + let mut map = make_map(); + map.insert(1, 50, entry(0)); + assert_eq!(map.peek(), Some((50, 1))); + } + + #[test] + fn should_update_index_when_index_changes() { + let mut map = make_map(); + map.insert(1, 100, entry(0)); + map.insert(1, 200, entry(1)); + assert_eq!(map.peek(), Some((200, 1))); + assert_eq!(map.len(), 1); + } +} + +mod get_with_index { + use super::*; + + #[test] + fn should_return_index_and_value() { + let mut map = make_map(); + map.insert(1, 99, entry(7)); + assert_eq!(map.get_with_index(&1), Some((99, entry(7)))); + } + + #[test] + fn should_return_none_when_key_absent() { + let map = make_map(); + assert_eq!(map.get_with_index(&1), None); + } +} + +mod peek { + use super::*; + + #[test] + fn should_return_none_when_empty() { + let map = make_map(); + assert_eq!(map.peek(), None); + } + + #[test] + fn should_return_smallest_index() { + let mut map = make_map(); + map.insert(1, 300, entry(0)); + map.insert(2, 100, entry(1)); + map.insert(3, 200, entry(2)); + assert_eq!(map.peek(), Some((100, 2))); + } +} + +mod iter_by_index_up_to { + use super::*; + + #[test] + fn should_return_empty_when_nothing_in_range() { + let mut map = make_map(); + map.insert(1, 100, entry(0)); + map.insert(2, 200, entry(1)); + let result: Vec<_> = map.iter_by_index_up_to(&50).collect(); + assert!(result.is_empty()) + } + + #[test] + fn should_return_entries_with_index_at_or_below_max() { + let mut map = make_map(); + map.insert(1, 10, entry(0)); + map.insert(2, 20, entry(1)); + map.insert(3, 30, entry(2)); + let result: Vec<(u64, _)> = map.iter_by_index_up_to(&20).collect(); + assert_eq!(result.len(), 2); + assert_eq!(result[0].0, 1); + assert_eq!(result[1].0, 2); + } + + #[test] + fn should_iterate_in_ascending_index_order() { + let mut map = make_map(); + map.insert(1, 30, entry(0)); + map.insert(2, 10, entry(1)); + map.insert(3, 20, entry(2)); + let keys: Vec = map.iter_by_index_up_to(&u64::MAX).map(|(k, _)| k).collect(); + assert_eq!(keys, vec![2, 3, 1]); // ordered by index: 10, 20, 30 + } + + #[test] + fn should_stop_at_first_entry_exceeding_max() { + let mut map = make_map(); + map.insert(1, 10, entry(0)); + map.insert(2, 20, entry(1)); + map.insert(3, 30, entry(2)); + map.insert(4, 40, entry(3)); + let result: Vec<_> = map.iter_by_index_up_to(&30).collect(); + assert_eq!(result.len(), 3); + } +} + +mod len_and_is_empty { + use super::*; + + #[test] + fn should_be_empty_when_new() { + let map = make_map(); + assert!(map.is_empty()); + assert_eq!(map.len(), 0); + } + + #[test] + fn should_count_all_entries() { + let mut map = make_map(); + map.insert(1, 100, entry(0)); + map.insert(2, 200, entry(1)); + assert_eq!(map.len(), 2); + assert!(!map.is_empty()); + } +} From 50aa9c4b945228e9c4ede651f8d43846653b5570 Mon Sep 17 00:00:00 2001 From: Louis Pahlavi Date: Wed, 22 Apr 2026 16:30:40 +0200 Subject: [PATCH 2/5] =?UTF-8?q?refactor:=20improve=20StableSortKeyMap=20?= =?UTF-8?q?=E2=80=94=20add=20typed=20Iter,=20remove=20peek=20and=20iter=5F?= =?UTF-8?q?by=5Findex=5Fup=5Fto?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add public Iter struct backed by ic_stable_structures::btreemap::Iter, yielding (index, key, value) triples in ascending index order - iter() returns Iter — a concrete, nameable type (not impl Iterator) - Remove iter_by_index_up_to: callers use iter().take_while(|(i,..)| *i <= max) with standard iterator adapters instead - Remove peek: poll_monitored_addresses uses due.is_empty() as its early-return guard - Remove private iter_raw helper (logic is now in Iter::next directly) - Fix test name: should_store_value_in_by_key → should_store_value_by_key - Update should_index_entry and should_update_index_when_index_changes in mod insert to not depend on peek (use iter and get_with_index respectively) - Remove mod peek tests (method deleted) - Consolidate iter tests into mod iter: basic iteration tests plus filtered-range tests (formerly in mod iter_by_index_up_to) now use iter() + take_while + map Co-Authored-By: Claude Sonnet 4.6 --- minter/src/deposit/automatic/cache/mod.rs | 2 +- minter/src/deposit/automatic/mod.rs | 14 ++-- minter/src/utils/stable_sort_key_map/mod.rs | 61 ++++++++++------ minter/src/utils/stable_sort_key_map/tests.rs | 70 ++++++------------- 4 files changed, 67 insertions(+), 80 deletions(-) diff --git a/minter/src/deposit/automatic/cache/mod.rs b/minter/src/deposit/automatic/cache/mod.rs index db6e0b21..6a742411 100644 --- a/minter/src/deposit/automatic/cache/mod.rs +++ b/minter/src/deposit/automatic/cache/mod.rs @@ -45,5 +45,5 @@ impl Storable for AutomaticDepositCacheEntry { /// `InsertionOrderedMap` stores the sequence number alongside the value. /// /// Accounts that have been stopped from monitoring are stored with index `u64::MAX` -/// so they are retained in the map but never returned by `iter_by_index_up_to`. +/// so they are never scheduled for another poll. pub type AutomaticDepositCache = StableSortKeyMap; diff --git a/minter/src/deposit/automatic/mod.rs b/minter/src/deposit/automatic/mod.rs index edfea00a..086db577 100644 --- a/minter/src/deposit/automatic/mod.rs +++ b/minter/src/deposit/automatic/mod.rs @@ -82,20 +82,20 @@ pub async fn poll_monitored_addresses(runtime: R) { let now = runtime.time(); - // Early return if no accounts are scheduled or the earliest isn't due yet. - if !with_automatic_deposit_cache(|cache| cache.peek().is_some_and(|(t, _)| t <= now)) { - return; - } - let due: Vec<(Account, u8)> = with_automatic_deposit_cache(|cache| { cache - .iter_by_index_up_to(&now) - .map(|(account, entry)| (account, entry.get_signatures_calls)) + .iter() + .take_while(|(t, ..)| *t <= now) + .map(|(_, account, entry)| (account, entry.get_signatures_calls)) // +1 to detect whether more accounts remain after this round. .take(MAX_CONCURRENT_RPC_CALLS + 1) .collect() }); + if due.is_empty() { + return; + } + let more_to_process = due.len() > MAX_CONCURRENT_RPC_CALLS; let reschedule = scopeguard::guard(runtime.clone(), |runtime| { runtime.set_timer(Duration::ZERO, poll_monitored_addresses); diff --git a/minter/src/utils/stable_sort_key_map/mod.rs b/minter/src/utils/stable_sort_key_map/mod.rs index 14d46013..d709e6dd 100644 --- a/minter/src/utils/stable_sort_key_map/mod.rs +++ b/minter/src/utils/stable_sort_key_map/mod.rs @@ -11,13 +11,12 @@ mod tests; /// /// Two `StableBTreeMap`s are kept in sync: /// - `by_key`: primary store, always contains every entry. -/// - `by_index`: drives [`peek`] and [`iter_by_index_up_to`]. +/// - `by_index`: drives [`iter`]. /// // TODO: simplify value/key types to 2-tuples once ic-stable-structures supports 2-tuples // with one unbounded element. /// -/// [`peek`]: StableSortKeyMap::peek -/// [`iter_by_index_up_to`]: StableSortKeyMap::iter_by_index_up_to +/// [`iter`]: StableSortKeyMap::iter /// [`get`]: StableSortKeyMap::get pub struct StableSortKeyMap where @@ -70,26 +69,15 @@ where self.by_key.insert(key, (index, value, ())); } - /// Returns the `(index, key)` of the entry with the smallest index, if any. + /// Iterates all `(index, key, value)` triples in ascending index order. /// - /// O(log n). - pub fn peek(&self) -> Option<(I, K)> { - self.by_index.iter().next().map(|((i, k, _), _)| (i, k)) - } - - /// Iterates `(key, value)` pairs in ascending index order, stopping at - /// the first entry whose index exceeds `max` (inclusive bound). - pub fn iter_by_index_up_to<'a>(&'a self, max: &'a I) -> impl Iterator + 'a { - let by_key = &self.by_key; - self.by_index - .iter() - .take_while(move |((i, _, _), _)| i <= max) - .map(move |((_, k, _), _)| { - let (_, v, _) = by_key - .get(&k) - .expect("index and by_key map must be in sync"); - (k, v) - }) + /// To iterate only entries up to a given index bound, use standard iterator + /// adapters: `map.iter().take_while(|(i, ..)| *i <= max)`. + pub fn iter(&self) -> Iter<'_, K, I, V> { + Iter { + index_iter: self.by_index.iter(), + by_key: &self.by_key, + } } pub fn len(&self) -> usize { @@ -100,3 +88,32 @@ where self.by_key.is_empty() } } + +/// Iterator over `(index, key, value)` triples in ascending index order. +pub struct Iter<'a, K, I, V> +where + K: Storable + Ord + Clone, + I: Storable + Ord + Clone, + V: Storable + Clone, +{ + index_iter: ic_stable_structures::btreemap::Iter<'a, (I, K, ()), (), Memory>, + by_key: &'a StableBTreeMap, +} + +impl<'a, K, I, V> Iterator for Iter<'a, K, I, V> +where + K: Storable + Ord + Clone, + I: Storable + Ord + Clone, + V: Storable + Clone, +{ + type Item = (I, K, V); + + fn next(&mut self) -> Option { + let ((i, k, _), _) = self.index_iter.next()?; + let (_, v, _) = self + .by_key + .get(&k) + .expect("index and by_key map must be in sync"); + Some((i, k, v)) + } +} diff --git a/minter/src/utils/stable_sort_key_map/tests.rs b/minter/src/utils/stable_sort_key_map/tests.rs index 4f29a73a..9caa05c3 100644 --- a/minter/src/utils/stable_sort_key_map/tests.rs +++ b/minter/src/utils/stable_sort_key_map/tests.rs @@ -45,7 +45,7 @@ mod insert { use super::*; #[test] - fn should_store_value_in_by_key() { + fn should_store_value_by_key() { let mut map = make_map(); map.insert(1, 100, entry(42)); assert_eq!(map.get(&1), Some(entry(42))); @@ -56,7 +56,9 @@ mod insert { fn should_index_entry() { let mut map = make_map(); map.insert(1, 50, entry(0)); - assert_eq!(map.peek(), Some((50, 1))); + let mut iter = map.iter(); + assert_eq!(iter.next(), Some((50, 1, entry(0)))); + assert_eq!(iter.next(), None); } #[test] @@ -64,7 +66,7 @@ mod insert { let mut map = make_map(); map.insert(1, 100, entry(0)); map.insert(1, 200, entry(1)); - assert_eq!(map.peek(), Some((200, 1))); + assert_eq!(map.get_with_index(&1), Some((200, entry(1)))); assert_eq!(map.len(), 1); } } @@ -86,68 +88,36 @@ mod get_with_index { } } -mod peek { +mod iter { use super::*; #[test] - fn should_return_none_when_empty() { + fn should_return_empty_when_map_is_empty() { let map = make_map(); - assert_eq!(map.peek(), None); + assert_eq!(map.iter().count(), 0); } #[test] - fn should_return_smallest_index() { - let mut map = make_map(); - map.insert(1, 300, entry(0)); - map.insert(2, 100, entry(1)); - map.insert(3, 200, entry(2)); - assert_eq!(map.peek(), Some((100, 2))); - } -} - -mod iter_by_index_up_to { - use super::*; - - #[test] - fn should_return_empty_when_nothing_in_range() { - let mut map = make_map(); - map.insert(1, 100, entry(0)); - map.insert(2, 200, entry(1)); - let result: Vec<_> = map.iter_by_index_up_to(&50).collect(); - assert!(result.is_empty()) - } - - #[test] - fn should_return_entries_with_index_at_or_below_max() { - let mut map = make_map(); - map.insert(1, 10, entry(0)); - map.insert(2, 20, entry(1)); - map.insert(3, 30, entry(2)); - let result: Vec<(u64, _)> = map.iter_by_index_up_to(&20).collect(); - assert_eq!(result.len(), 2); - assert_eq!(result[0].0, 1); - assert_eq!(result[1].0, 2); - } - - #[test] - fn should_iterate_in_ascending_index_order() { + fn should_iterate_all_entries_in_ascending_index_order() { let mut map = make_map(); map.insert(1, 30, entry(0)); map.insert(2, 10, entry(1)); map.insert(3, 20, entry(2)); - let keys: Vec = map.iter_by_index_up_to(&u64::MAX).map(|(k, _)| k).collect(); - assert_eq!(keys, vec![2, 3, 1]); // ordered by index: 10, 20, 30 + // Yields (index, key, value) ordered by index: 10, 20, 30 + let result: Vec<_> = map.iter().collect(); + assert_eq!( + result, + vec![(10, 2, entry(1)), (20, 3, entry(2)), (30, 1, entry(0))] + ); } #[test] - fn should_stop_at_first_entry_exceeding_max() { + fn should_include_entries_with_max_index() { let mut map = make_map(); - map.insert(1, 10, entry(0)); - map.insert(2, 20, entry(1)); - map.insert(3, 30, entry(2)); - map.insert(4, 40, entry(3)); - let result: Vec<_> = map.iter_by_index_up_to(&30).collect(); - assert_eq!(result.len(), 3); + map.insert(1, u64::MAX, entry(99)); + map.insert(2, 0, entry(0)); + let result: Vec<_> = map.iter().collect(); + assert_eq!(result, vec![(0, 2, entry(0)), (u64::MAX, 1, entry(99))]); } } From bfa5619cf762183ea0e581462e608d377c60d56e Mon Sep 17 00:00:00 2001 From: Louis Pahlavi Date: Wed, 22 Apr 2026 17:17:16 +0200 Subject: [PATCH 3/5] feat: redesign AutomaticDepositCacheEntry with RPC quota and exponential backoff - Replace get_signatures_calls: u8 with rpc_quota_left: u64 (starts at INITIAL_RPC_QUOTA=50) and next_backoff_delay_mins: u64 (starts at 1, doubles on each poll) - Remove MAX_GET_SIGNATURES_FOR_ADDRESS_CALLS and MAX_GET_TRANSACTION_CALLS constants - update_balance: return MonitoringQuotaExhausted when quota is 0; preserve remaining quota when rescheduling a previously stopped account; reset backoff delay to 1 - poll_account: decrement rpc_quota_left on each call; stop when it reaches 0; use saturating arithmetic to avoid overflow on long-running accounts - Add UpdateBalanceError::MonitoringQuotaExhausted to types and DID - Add tests: should_schedule_unmonitored_account, should_not_modify_existing_cache_entry, should_reschedule_account_with_remaining_quota, should_not_start_monitoring_with_exhausted_quota - Consolidate lifecycle test into should_follow_full_polling_lifecycle with quota=3 Co-Authored-By: Claude Sonnet 4.6 --- libs/types/src/lib.rs | 3 + minter/cksol_minter.did | 5 +- minter/src/deposit/automatic/cache/mod.rs | 68 +++++- minter/src/deposit/automatic/mod.rs | 96 +++++---- minter/src/deposit/automatic/tests.rs | 249 +++++++++++++++------- minter/src/test_fixtures/mod.rs | 5 +- minter/src/test_fixtures/runtime.rs | 7 +- 7 files changed, 307 insertions(+), 126 deletions(-) diff --git a/libs/types/src/lib.rs b/libs/types/src/lib.rs index fe654424..e1a21296 100644 --- a/libs/types/src/lib.rs +++ b/libs/types/src/lib.rs @@ -148,6 +148,9 @@ pub enum UpdateBalanceError { /// The monitored account queue is at capacity. #[error("The monitored account queue is at capacity")] QueueFull, + /// The RPC call quota for this account has been exhausted. + #[error("The RPC call quota for this account has been exhausted")] + MonitoringQuotaExhausted, } /// Insufficient cycles attached by the caller to complete the call. diff --git a/minter/cksol_minter.did b/minter/cksol_minter.did index f81e36f2..306eae5a 100644 --- a/minter/cksol_minter.did +++ b/minter/cksol_minter.did @@ -98,6 +98,8 @@ type UpdateBalanceArgs = record { type UpdateBalanceError = variant { // The monitored account queue is at capacity. QueueFull; + // The RPC call quota for this account has been exhausted. + MonitoringQuotaExhausted; }; // The result of a call to the `update_balance` endpoint. @@ -473,9 +475,6 @@ service : (MinterArg) -> { // // If the owner is not set, it defaults to the caller's principal. // The resolved owner must be a non-anonymous principal. - // - // Returns Ok if the account was registered (or was already being monitored). - // Returns Err(QueueFull) if the monitored account queue is at capacity. update_balance: (UpdateBalanceArgs) -> (UpdateBalanceResult); // Update the ckSOL balance of the given owner with the funds diff --git a/minter/src/deposit/automatic/cache/mod.rs b/minter/src/deposit/automatic/cache/mod.rs index 6a742411..1e6ee50a 100644 --- a/minter/src/deposit/automatic/cache/mod.rs +++ b/minter/src/deposit/automatic/cache/mod.rs @@ -7,16 +7,34 @@ use std::borrow::Cow; #[cfg(test)] mod tests; +/// Initial RPC call quota granted to each monitored account. +pub const INITIAL_RPC_QUOTA: u64 = 50; + +/// Initial backoff delay in minutes before the first poll. +pub const INITIAL_BACKOFF_DELAY_MINS: u64 = 1; + /// Per-account state for automated deposit discovery. /// /// This cache is intentionally separate from the event log: it can be fully /// reconstructed by redoing the `getSignaturesForAddress` HTTP outcalls, so /// there is no need to replay events to restore it. -#[derive(Clone, Debug, Default, PartialEq, Encode, Decode)] +#[derive(Clone, Debug, PartialEq, Encode, Decode)] pub struct AutomaticDepositCacheEntry { - /// The number of `getSignaturesForAddress` calls made so far for this account. + /// The number of RPC calls remaining for this account. #[n(0)] - pub get_signatures_calls: u8, + pub rpc_quota_left: u64, + /// The delay in minutes before the next poll. Doubles after each poll. + #[n(1)] + pub next_backoff_delay_mins: u64, +} + +impl Default for AutomaticDepositCacheEntry { + fn default() -> Self { + Self { + rpc_quota_left: INITIAL_RPC_QUOTA, + next_backoff_delay_mins: INITIAL_BACKOFF_DELAY_MINS, + } + } } impl Storable for AutomaticDepositCacheEntry { @@ -47,3 +65,47 @@ impl Storable for AutomaticDepositCacheEntry { /// Accounts that have been stopped from monitoring are stored with index `u64::MAX` /// so they are never scheduled for another poll. pub type AutomaticDepositCache = StableSortKeyMap; + +/// The monitoring lifecycle state of an account, as derived from the cache. +pub enum AccountMonitoringState { + /// No monitoring information has been recorded for this account. + Unknown, + /// The account is actively scheduled for polling. + Active { + #[allow(dead_code)] + next_poll_at: u64, + #[allow(dead_code)] + entry: AutomaticDepositCacheEntry, + }, + /// Polling was stopped after the quota was exhausted, but a subsequent deposit + /// reset the quota. The account can be rescheduled via `update_balance`. + Stopped { entry: AutomaticDepositCacheEntry }, + /// The RPC quota for this account has been exhausted and no deposit has reset it. + /// `update_balance` will return `MonitoringQuotaExhausted` until a deposit resets + /// the quota. + Exhausted { + #[allow(dead_code)] + entry: AutomaticDepositCacheEntry, + }, +} + +pub trait AutomaticDepositCacheExt { + /// Returns the current monitoring state of the given account. + fn monitoring_state(&self, account: &Account) -> AccountMonitoringState; +} + +impl AutomaticDepositCacheExt for AutomaticDepositCache { + fn monitoring_state(&self, account: &Account) -> AccountMonitoringState { + match self.get_with_index(account) { + None => AccountMonitoringState::Unknown, + Some((t, entry)) if t != u64::MAX => AccountMonitoringState::Active { + next_poll_at: t, + entry, + }, + Some((_, entry)) if entry.rpc_quota_left == 0 => { + AccountMonitoringState::Exhausted { entry } + } + Some((_, entry)) => AccountMonitoringState::Stopped { entry }, + } + } +} diff --git a/minter/src/deposit/automatic/mod.rs b/minter/src/deposit/automatic/mod.rs index 086db577..2e4a08b6 100644 --- a/minter/src/deposit/automatic/mod.rs +++ b/minter/src/deposit/automatic/mod.rs @@ -10,7 +10,10 @@ use crate::{ }, storage::{with_automatic_deposit_cache, with_automatic_deposit_cache_mut}, }; -use cache::AutomaticDepositCacheEntry; +use cache::{ + AccountMonitoringState, AutomaticDepositCacheEntry, AutomaticDepositCacheExt, + INITIAL_BACKOFF_DELAY_MINS, +}; use canlog::log; use cksol_types::UpdateBalanceError; use cksol_types_internal::log::Priority; @@ -29,29 +32,38 @@ pub const MAX_MONITORED_ACCOUNTS: usize = 100; /// How often the minter polls monitored addresses for new deposit transactions. pub const POLL_MONITORED_ADDRESSES_DELAY: Duration = Duration::from_mins(1); -/// Maximum number of `getTransaction` calls to make per polled account. -pub const MAX_GET_TRANSACTION_CALLS: usize = 5; - -/// Maximum number of `getSignaturesForAddress` calls to make per monitored account before stopping. -/// The delays follow an exponential backoff: 1, 2, 4, ..., 512 minutes (1023 minutes total). -pub const MAX_GET_SIGNATURES_FOR_ADDRESS_CALLS: u8 = 10; +/// Maximum number of signatures to fetch per `getSignaturesForAddress` call. +pub const GET_SIGNATURES_FOR_ADDRESS_LIMIT: u32 = 100; /// Registers the given account for automated deposit monitoring. /// -/// Returns `Ok(())` if the account was registered (or was already being monitored). -/// Returns `Err(UpdateBalanceError::QueueFull)` if the monitored account queue is at capacity. +/// - If the account is already actively monitored (has a scheduled poll), returns `Ok(())`. +/// - If the account's RPC quota is exhausted, returns `Err(MonitoringQuotaExhausted)`. +/// - If the account has remaining quota but is not scheduled (e.g. was stopped), reschedules it. +/// - If the account is unknown, allocates a fresh quota and schedules the first poll. +/// - Returns `Err(QueueFull)` if the monitored account limit has been reached. pub fn update_balance( runtime: &R, account: Account, ) -> Result<(), UpdateBalanceError> { - if read_state(|state| state.monitored_accounts().contains(&account)) { - return Ok(()); - } + let cached_entry = match with_automatic_deposit_cache(|cache| cache.monitoring_state(&account)) + { + AccountMonitoringState::Active { .. } => { + debug_assert!(read_state(|s| s.monitored_accounts().contains(&account))); + return Ok(()); + } + AccountMonitoringState::Exhausted { .. } => { + return Err(UpdateBalanceError::MonitoringQuotaExhausted); + } + AccountMonitoringState::Stopped { entry } => Some(entry), + AccountMonitoringState::Unknown => None, + }; - if read_state(|state| state.monitored_accounts().len() >= MAX_MONITORED_ACCOUNTS) { + if read_state(|state| state.monitored_accounts().len()) >= MAX_MONITORED_ACCOUNTS { return Err(UpdateBalanceError::QueueFull); } + debug_assert!(!read_state(|s| s.monitored_accounts().contains(&account))); mutate_state(|state| { process_event( state, @@ -60,10 +72,16 @@ pub fn update_balance( ); }); - // Schedule first poll in 2^0 = 1 minute. - let next_poll_at = runtime.time() + Duration::from_mins(1).as_nanos() as u64; + let new_entry = match cached_entry { + Some(entry) => AutomaticDepositCacheEntry { + rpc_quota_left: entry.rpc_quota_left, + next_backoff_delay_mins: INITIAL_BACKOFF_DELAY_MINS, + }, + None => AutomaticDepositCacheEntry::default(), + }; + let next_poll_at = runtime.time() + POLL_MONITORED_ADDRESSES_DELAY.as_nanos() as u64; with_automatic_deposit_cache_mut(|cache| { - cache.insert(account, next_poll_at, AutomaticDepositCacheEntry::default()); + cache.insert(account, next_poll_at, new_entry); }); Ok(()) @@ -72,8 +90,8 @@ pub fn update_balance( /// Polls all monitored addresses that are due for a check. /// /// For each due address, calls `getSignaturesForAddress` on the Solana RPC. -/// After each call, reschedules the account with exponential backoff, or emits -/// `StoppedMonitoringAccount` if `MAX_GET_SIGNATURES_FOR_ADDRESS_CALLS` has been reached. +/// After each call, reschedules the account with exponential backoff, or marks it +/// as stopped (index `u64::MAX`) if the RPC quota has been exhausted. pub async fn poll_monitored_addresses(runtime: R) { let _guard = match TimerGuard::new(TaskType::PollMonitoredAddresses) { Ok(guard) => guard, @@ -82,11 +100,11 @@ pub async fn poll_monitored_addresses(runtime: R) { let now = runtime.time(); - let due: Vec<(Account, u8)> = with_automatic_deposit_cache(|cache| { + let due: Vec<(Account, AutomaticDepositCacheEntry)> = with_automatic_deposit_cache(|cache| { cache .iter() .take_while(|(t, ..)| *t <= now) - .map(|(_, account, entry)| (account, entry.get_signatures_calls)) + .map(|(_, account, entry)| (account, entry)) // +1 to detect whether more accounts remain after this round. .take(MAX_CONCURRENT_RPC_CALLS + 1) .collect() @@ -103,11 +121,11 @@ pub async fn poll_monitored_addresses(runtime: R) { let master_key = lazy_get_schnorr_master_key(&runtime).await; - futures::future::join_all(due.into_iter().take(MAX_CONCURRENT_RPC_CALLS).map( - |(account, get_signatures_calls)| { - poll_account(&runtime, &master_key, account, get_signatures_calls) - }, - )) + futures::future::join_all( + due.into_iter() + .take(MAX_CONCURRENT_RPC_CALLS) + .map(|(account, entry)| poll_account(&runtime, &master_key, account, entry)), + ) .await; if !more_to_process { @@ -119,24 +137,25 @@ async fn poll_account( runtime: &R, master_key: &SchnorrPublicKey, account: Account, - get_signatures_calls: u8, + entry: AutomaticDepositCacheEntry, ) { let deposit_address = account_address(master_key, &account); let params = GetSignaturesForAddressParams { pubkey: deposit_address.into(), commitment: Some(CommitmentLevel::Finalized), - // Fetch no more signatures than we intend to process with `getTransaction`. limit: Some( - (MAX_GET_TRANSACTION_CALLS as u32) + GET_SIGNATURES_FOR_ADDRESS_LIMIT .try_into() - .expect("MAX_GET_TRANSACTION_CALLS must be between 1 and 1000"), + .expect("GET_SIGNATURES_FOR_ADDRESS_LIMIT must be between 1 and 1000"), ), min_context_slot: None, before: None, until: None, }; + let rpc_quota_left = entry.rpc_quota_left.saturating_sub(1); + match get_signatures_for_address(runtime, params).await { Err(e) => { log!( @@ -149,18 +168,19 @@ async fn poll_account( } } - let get_signatures_calls = get_signatures_calls + 1; - if get_signatures_calls >= MAX_GET_SIGNATURES_FOR_ADDRESS_CALLS { + if rpc_quota_left == 0 { // Use u64::MAX so the entry is retained but never scheduled for another poll. with_automatic_deposit_cache_mut(|cache| { cache.insert( account, u64::MAX, AutomaticDepositCacheEntry { - get_signatures_calls, + rpc_quota_left, + next_backoff_delay_mins: entry.next_backoff_delay_mins, }, ); }); + debug_assert!(read_state(|s| s.monitored_accounts().contains(&account))); mutate_state(|state| { process_event( state, @@ -170,18 +190,20 @@ async fn poll_account( }); log!( Priority::Info, - "Stopped monitoring {deposit_address}: reached maximum getSignaturesForAddress calls ({MAX_GET_SIGNATURES_FOR_ADDRESS_CALLS})" + "Stopped monitoring {deposit_address}: RPC quota exhausted" ); } else { - // Exponential backoff: delay before next poll is 2^get_signatures_calls minutes. - let delay = Duration::from_mins(1u64 << get_signatures_calls); - let next_poll_at = runtime.time() + delay.as_nanos() as u64; + let delay_mins = entry.next_backoff_delay_mins; + let delay = Duration::from_mins(delay_mins); + let delay_ns: u64 = delay.as_nanos().try_into().unwrap_or(u64::MAX - 1); + let next_poll_at = runtime.time().saturating_add(delay_ns).min(u64::MAX - 1); with_automatic_deposit_cache_mut(|cache| { cache.insert( account, next_poll_at, AutomaticDepositCacheEntry { - get_signatures_calls, + rpc_quota_left, + next_backoff_delay_mins: delay_mins.saturating_mul(2), }, ); }); diff --git a/minter/src/deposit/automatic/tests.rs b/minter/src/deposit/automatic/tests.rs index c2e0dbd6..d4b87890 100644 --- a/minter/src/deposit/automatic/tests.rs +++ b/minter/src/deposit/automatic/tests.rs @@ -4,108 +4,116 @@ use crate::{ state::{event::EventType, read_state, reset_state}, storage::{reset_automatic_deposit_cache, reset_events, with_automatic_deposit_cache}, test_fixtures::{ - EventsAssert, account, events::start_monitoring_account, init_schnorr_master_key, - init_state, runtime::TestCanisterRuntime, + EventsAssert, account, init_schnorr_master_key, init_state, runtime::TestCanisterRuntime, }, }; +use cache::{INITIAL_BACKOFF_DELAY_MINS, INITIAL_RPC_QUOTA}; +use candid::Principal; use sol_rpc_types::{ConfirmedTransactionStatusWithSignature, MultiRpcResult}; - -const ONE_MIN_NS: u64 = Duration::from_mins(1).as_nanos() as u64; +use std::iter; type SignaturesResult = MultiRpcResult>; -fn empty_signatures_response() -> SignaturesResult { - MultiRpcResult::Consistent(Ok(vec![])) -} +mod update_balance_tests { + use super::*; -fn monitored_accounts_count() -> usize { - read_state(|s| s.monitored_accounts().len()) -} + const ACCOUNT: Account = Account { + owner: Principal::from_slice(&[1; 29]), + subaccount: None, + }; -fn cache_entry(account: Account) -> Option<(Option, AutomaticDepositCacheEntry)> { - with_automatic_deposit_cache(|cache| { - cache.get_with_index(&account).map(|(t, entry)| { - let next_poll_at = if t == u64::MAX { None } else { Some(t) }; - (next_poll_at, entry) - }) - }) -} + #[test] + fn should_start_monitoring_unknown_account() { + init_state(); + let runtime = TestCanisterRuntime::new().add_times(iter::repeat(0)); -fn start_monitoring_max_number_of_accounts() { - for i in 0..MAX_MONITORED_ACCOUNTS { - start_monitoring_account(account(i)); - } -} + let result = update_balance(&runtime, ACCOUNT); + assert_eq!(result, Ok(())); -mod update_balance { - use super::*; + // Start monitoring the account and add a cache entry with a fresh quota and backoff delay + CacheAssert::for_account(ACCOUNT) + .next_poll_at_mins(INITIAL_BACKOFF_DELAY_MINS) + .quota(INITIAL_RPC_QUOTA) + .backoff_mins(INITIAL_BACKOFF_DELAY_MINS); + EventsAssert::from_recorded() + .expect_event_eq(EventType::StartedMonitoringAccount { account: ACCOUNT }) + .assert_no_more_events(); + } #[test] - fn should_register_account() { + fn should_not_modify_active_account() { init_state(); - let runtime = TestCanisterRuntime::new().with_increasing_time(); + let runtime = TestCanisterRuntime::new().add_times(iter::repeat(0)); - let result = update_balance(&runtime, account(1)); - assert_eq!(result, Ok(())); + let result1 = update_balance(&runtime, ACCOUNT); + assert_eq!(result1, Ok(())); - assert!(read_state(|s| s.monitored_accounts().contains(&account(1)))); - assert_eq!(monitored_accounts_count(), 1); - assert!(cache_entry(account(1)).is_some()); + set_cache_entry(ACCOUNT, 1, 7, 4); + let cache_before = CacheAssert::for_account(ACCOUNT); + let events_before = EventsAssert::from_recorded(); - EventsAssert::from_recorded() - .expect_event_eq(EventType::StartedMonitoringAccount { - account: account(1), - }) - .assert_no_more_events(); + let result2 = update_balance(&runtime, ACCOUNT); + assert_eq!(result2, Ok(())); + + // Does not modify the cache or events + assert_eq!(cache_before, CacheAssert::for_account(ACCOUNT)); + assert_eq!(events_before, EventsAssert::from_recorded()); } #[test] - fn should_be_idempotent_for_already_monitored_account() { + fn should_reschedule_stopped_account() { init_state(); - let runtime = TestCanisterRuntime::new().with_increasing_time(); + let runtime = TestCanisterRuntime::new().add_times(iter::repeat(0)); - for _ in 0..2 { - let result = update_balance(&runtime, account(1)); - assert_eq!(result, Ok(())); - assert_eq!(monitored_accounts_count(), 1); - } + set_cache_entry(ACCOUNT, u64::MAX, 5, 8); - // Only one event should have been emitted + let result = update_balance(&runtime, ACCOUNT); + assert_eq!(result, Ok(())); + + // Does not modify the quota but resets the backoff delay and starts monitoring + CacheAssert::for_account(ACCOUNT) + .next_poll_at_mins(INITIAL_BACKOFF_DELAY_MINS) + .quota(5) + .backoff_mins(INITIAL_BACKOFF_DELAY_MINS); EventsAssert::from_recorded() - .expect_event_eq(EventType::StartedMonitoringAccount { - account: account(1), - }) + .expect_event_eq(EventType::StartedMonitoringAccount { account: ACCOUNT }) .assert_no_more_events(); } #[test] - fn should_return_queue_full_when_at_capacity() { + fn should_return_error_for_exhausted_account() { init_state(); + let runtime = TestCanisterRuntime::new().add_times(iter::repeat(0)); - start_monitoring_max_number_of_accounts(); - assert_eq!(monitored_accounts_count(), MAX_MONITORED_ACCOUNTS); + set_cache_entry(ACCOUNT, u64::MAX, 0, INITIAL_BACKOFF_DELAY_MINS); + let cache_before = CacheAssert::for_account(ACCOUNT); + let events_before = EventsAssert::from_recorded(); - let runtime = TestCanisterRuntime::new().with_increasing_time(); - let result = update_balance(&runtime, account(MAX_MONITORED_ACCOUNTS + 1)); - assert_eq!(result, Err(UpdateBalanceError::QueueFull)); - assert_eq!(monitored_accounts_count(), MAX_MONITORED_ACCOUNTS); + let result = update_balance(&runtime, ACCOUNT); + assert_eq!(result, Err(UpdateBalanceError::MonitoringQuotaExhausted)); + + // Does not modify the cache or events + assert_eq!(cache_before, CacheAssert::for_account(ACCOUNT)); + assert_eq!(events_before, EventsAssert::from_recorded()); } #[test] - fn should_not_return_queue_full_if_account_already_monitored() { + fn should_return_error_when_at_capacity() { init_state(); - start_monitoring_max_number_of_accounts(); - assert_eq!(monitored_accounts_count(), MAX_MONITORED_ACCOUNTS); + let runtime = TestCanisterRuntime::new().add_times(iter::repeat(0)); - // Re-registering an already-monitored account should still return Ok - let runtime = TestCanisterRuntime::new().with_increasing_time(); - let result = update_balance(&runtime, account(0)); - assert_eq!(result, Ok(())); + // Account is not being monitored, return error + let result1 = update_balance(&runtime, account(MAX_MONITORED_ACCOUNTS + 1)); + assert_eq!(result1, Err(UpdateBalanceError::QueueFull)); + + // Account is already being monitored, return ok + let result2 = update_balance(&runtime, account(0)); + assert_eq!(result2, Ok(())); } } -mod poll_monitored_addresses { +mod poll_monitored_addresses_tests { use super::*; #[tokio::test] @@ -114,29 +122,26 @@ mod poll_monitored_addresses { // Seed MAX_CONCURRENT_RPC_CALLS + 1 accounts, all due immediately. let num_accounts = MAX_CONCURRENT_RPC_CALLS + 1; + let runtime = TestCanisterRuntime::new().add_times(0..); for i in 0..num_accounts { - start_monitoring_account(account(i)); - with_automatic_deposit_cache_mut(|cache| { - cache.insert(account(i), 0, AutomaticDepositCacheEntry::default()); - }); + update_balance(&runtime, account(i)).unwrap(); + set_cache_entry(account(i), 0, INITIAL_RPC_QUOTA, INITIAL_BACKOFF_DELAY_MINS); } - assert_eq!(monitored_accounts_count(), num_accounts); // Round 1: processes MAX_CONCURRENT_RPC_CALLS accounts (rescheduled into the future), // detects 1 more remaining → sets timer. - let mut runtime = TestCanisterRuntime::new().with_increasing_time(); + let mut runtime = TestCanisterRuntime::new().add_times(0..); for _ in 0..MAX_CONCURRENT_RPC_CALLS { runtime = runtime.add_stub_response(empty_signatures_response()); } poll_monitored_addresses(runtime.clone()).await; - // Accounts are rescheduled (not removed), so count stays the same. assert_eq!(monitored_accounts_count(), num_accounts); assert_eq!(runtime.set_timer_call_count(), 1); // Round 2: only the 1 unprocessed account is due → processes it, no set_timer. let runtime = TestCanisterRuntime::new() - .with_increasing_time() + .add_times(0..) .add_stub_response(empty_signatures_response()); poll_monitored_addresses(runtime.clone()).await; @@ -153,20 +158,18 @@ mod poll_monitored_addresses { let runtime = TestCanisterRuntime::new().add_times([t0, t0, t0]); update_balance(&runtime, account).unwrap(); - let mut next_poll_at = t0; - - for i in 0u8..MAX_GET_SIGNATURES_FOR_ADDRESS_CALLS { - next_poll_at += ONE_MIN_NS << i; + for _ in 0..INITIAL_RPC_QUOTA { + let next_poll_at = CacheAssert::for_account(account).scheduled_at(); EventsAssert::from_recorded() .expect_event_eq(EventType::StartedMonitoringAccount { account }) .assert_no_more_events(); - // Just before next scheduled time: no JSON-RPC calls + // Just before next scheduled time: no JSON-RPC calls. let runtime = TestCanisterRuntime::new().add_times([next_poll_at - 1]); poll_monitored_addresses(runtime).await; - // Next scheduled time: one `getSignaturesForAddress` call + // At the scheduled time: one `getSignaturesForAddress` call. let runtime = TestCanisterRuntime::new() .add_times([next_poll_at; 3]) .add_stub_response(empty_signatures_response()); @@ -187,3 +190,95 @@ mod poll_monitored_addresses { init_schnorr_master_key(); } } + +fn empty_signatures_response() -> SignaturesResult { + MultiRpcResult::Consistent(Ok(vec![])) +} + +fn monitored_accounts_count() -> usize { + read_state(|s| s.monitored_accounts().len()) +} + +fn start_monitoring_max_number_of_accounts() { + let runtime = TestCanisterRuntime::new().add_times(0..); + for i in 0..MAX_MONITORED_ACCOUNTS { + update_balance(&runtime, account(i)).unwrap(); + } +} + +fn set_cache_entry(account: Account, next_poll_at: u64, quota: u64, backoff_mins: u64) { + with_automatic_deposit_cache_mut(|cache| { + cache.insert( + account, + next_poll_at, + AutomaticDepositCacheEntry { + rpc_quota_left: quota, + next_backoff_delay_mins: backoff_mins, + }, + ); + }); +} + +#[derive(Debug, PartialEq)] +struct CacheAssert { + account: Account, + next_poll_at: Option, + entry: AutomaticDepositCacheEntry, +} + +impl CacheAssert { + fn for_account(account: Account) -> Self { + let (next_poll_at, entry) = with_automatic_deposit_cache(|cache| { + cache.get_with_index(&account).map(|(t, entry)| { + let next_poll_at = if t == u64::MAX { None } else { Some(t) }; + (next_poll_at, entry) + }) + }) + .unwrap_or_else(|| panic!("No cache entry for account {account:?}")); + Self { + account, + next_poll_at, + entry, + } + } + + /// Returns the scheduled poll time, panicking if the account is stopped. + fn scheduled_at(self) -> u64 { + self.next_poll_at.unwrap_or_else(|| { + panic!( + "Account {:?} is stopped (no scheduled poll time)", + self.account + ) + }) + } + + /// Asserts that the account is scheduled exactly `delay_mins` minutes from time 0. + fn next_poll_at_mins(self, delay_mins: u64) -> Self { + let expected = Duration::from_mins(delay_mins).as_nanos() as u64; + assert_eq!( + self.next_poll_at, + Some(expected), + "next_poll_at mismatch for account {:?}", + self.account + ); + self + } + + fn quota(self, expected: u64) -> Self { + assert_eq!( + self.entry.rpc_quota_left, expected, + "rpc_quota_left mismatch for account {:?}", + self.account + ); + self + } + + fn backoff_mins(self, expected: u64) -> Self { + assert_eq!( + self.entry.next_backoff_delay_mins, expected, + "next_backoff_delay_mins mismatch for account {:?}", + self.account + ); + self + } +} diff --git a/minter/src/test_fixtures/mod.rs b/minter/src/test_fixtures/mod.rs index cc1778f2..60f15fac 100644 --- a/minter/src/test_fixtures/mod.rs +++ b/minter/src/test_fixtures/mod.rs @@ -600,9 +600,10 @@ pub mod arb { pub fn arb_cache_entry() -> impl Strategy { - any::().prop_map(|get_signatures_calls| { + (any::(), any::()).prop_map(|(rpc_quota_left, next_backoff_delay_mins)| { crate::deposit::automatic::cache::AutomaticDepositCacheEntry { - get_signatures_calls, + rpc_quota_left, + next_backoff_delay_mins, } }) } diff --git a/minter/src/test_fixtures/runtime.rs b/minter/src/test_fixtures/runtime.rs index 9cd8e3f5..8f5e50ed 100644 --- a/minter/src/test_fixtures/runtime.rs +++ b/minter/src/test_fixtures/runtime.rs @@ -47,11 +47,10 @@ impl TestCanisterRuntime { pub fn add_times(mut self, times: I) -> Self where - I: IntoIterator, + I: IntoIterator + 'static, + I::IntoIter: Send + 'static, { - for time in times { - self.times = self.times.add(time); - } + self.times = self.times.chain(times); self } From e0c1db9952ec42a12a2240082ada22d42bfb4c7d Mon Sep 17 00:00:00 2001 From: Louis Pahlavi Date: Mon, 27 Apr 2026 13:55:02 +0200 Subject: [PATCH 4/5] refactor: replace StableSortKeyMap with heap-based AutomaticDepositCache Moves the per-account automated deposit cache from stable memory to unstable heap memory. The cache is reconstructible from RPC calls so there is no need for it to survive canister upgrades. - Replaces `StableSortKeyMap` with a plain `AutomaticDepositCache` struct backed by two heap `BTreeMap`s (same insert/get/iter interface, no ic-stable-structures) - Removes the `stable_sort_key_map` utility module entirely - Splits the single `rpc_quota_left` field into two per the design doc: `sig_calls_remaining` (max `MAX_GET_SIGNATURES_CALLS = 10`) and `tx_calls_remaining` (max `MAX_RETRIEVED_TRANSACTIONS = 50`) - Updates `poll_account` to decrement `sig_calls_remaining`; exhaustion and exponential-backoff scheduling logic unchanged - Removes `Storable` / minicbor derivations from `AutomaticDepositCacheEntry` Co-Authored-By: Claude Sonnet 4.6 --- minter/src/deposit/automatic/cache/mod.rs | 116 ++++++++------ minter/src/deposit/automatic/cache/tests.rs | 13 -- minter/src/deposit/automatic/mod.rs | 26 ++-- minter/src/deposit/automatic/tests.rs | 57 ++++--- minter/src/storage/mod.rs | 24 +-- minter/src/test_fixtures/mod.rs | 10 -- minter/src/utils/mod.rs | 1 - minter/src/utils/stable_sort_key_map/mod.rs | 119 --------------- minter/src/utils/stable_sort_key_map/tests.rs | 142 ------------------ 9 files changed, 133 insertions(+), 375 deletions(-) delete mode 100644 minter/src/deposit/automatic/cache/tests.rs delete mode 100644 minter/src/utils/stable_sort_key_map/mod.rs delete mode 100644 minter/src/utils/stable_sort_key_map/tests.rs diff --git a/minter/src/deposit/automatic/cache/mod.rs b/minter/src/deposit/automatic/cache/mod.rs index 1e6ee50a..8cd8a666 100644 --- a/minter/src/deposit/automatic/cache/mod.rs +++ b/minter/src/deposit/automatic/cache/mod.rs @@ -1,14 +1,11 @@ -use crate::utils::stable_sort_key_map::StableSortKeyMap; -use ic_stable_structures::{Storable, storable::Bound}; use icrc_ledger_types::icrc1::account::Account; -use minicbor::{Decode, Encode}; -use std::borrow::Cow; +use std::collections::BTreeMap; -#[cfg(test)] -mod tests; +/// Maximum number of `getSignaturesForAddress` calls allowed per monitored account. +pub const MAX_GET_SIGNATURES_CALLS: u32 = 10; -/// Initial RPC call quota granted to each monitored account. -pub const INITIAL_RPC_QUOTA: u64 = 50; +/// Maximum number of `getTransaction` calls allowed per monitored account. +pub const MAX_RETRIEVED_TRANSACTIONS: u32 = 50; /// Initial backoff delay in minutes before the first poll. pub const INITIAL_BACKOFF_DELAY_MINS: u64 = 1; @@ -16,55 +13,86 @@ pub const INITIAL_BACKOFF_DELAY_MINS: u64 = 1; /// Per-account state for automated deposit discovery. /// /// This cache is intentionally separate from the event log: it can be fully -/// reconstructed by redoing the `getSignaturesForAddress` HTTP outcalls, so -/// there is no need to replay events to restore it. -#[derive(Clone, Debug, PartialEq, Encode, Decode)] +/// reconstructed by redoing the RPC calls, so there is no need to replay events +/// to restore it. It lives in unstable heap memory and is reset on canister upgrade. +#[derive(Clone, Debug, PartialEq)] pub struct AutomaticDepositCacheEntry { - /// The number of RPC calls remaining for this account. - #[n(0)] - pub rpc_quota_left: u64, + /// Remaining quota for `getSignaturesForAddress` calls. + pub sig_calls_remaining: u32, + /// Remaining quota for `getTransaction` calls. + pub tx_calls_remaining: u32, /// The delay in minutes before the next poll. Doubles after each poll. - #[n(1)] pub next_backoff_delay_mins: u64, } impl Default for AutomaticDepositCacheEntry { fn default() -> Self { Self { - rpc_quota_left: INITIAL_RPC_QUOTA, + sig_calls_remaining: MAX_GET_SIGNATURES_CALLS, + tx_calls_remaining: MAX_RETRIEVED_TRANSACTIONS, next_backoff_delay_mins: INITIAL_BACKOFF_DELAY_MINS, } } } -impl Storable for AutomaticDepositCacheEntry { - fn to_bytes(&self) -> Cow<'_, [u8]> { - let mut buf = vec![]; - minicbor::encode(self, &mut buf) - .expect("AutomaticDepositCacheEntry encoding should succeed"); - Cow::Owned(buf) +/// Heap-memory cache storing per-account automated deposit discovery state, +/// ordered by next poll time for efficient scheduling. +/// +/// Two `BTreeMap`s are kept in sync, mirroring the stable-memory `StableSortKeyMap` +/// pattern but without the stable-structures overhead: +/// - `by_account`: primary store, always contains every entry. +/// - `by_poll_time`: drives [`iter`] in ascending poll-time order. +/// +/// Accounts that have been stopped are stored with `next_poll_at = u64::MAX` +/// so they are never picked up by the poll loop, but their quota is retained +/// for future `update_balance` calls. +/// +/// [`iter`]: AutomaticDepositCache::iter +#[derive(Default)] +pub struct AutomaticDepositCache { + by_account: BTreeMap, + by_poll_time: BTreeMap<(u64, Account), ()>, +} + +impl AutomaticDepositCache { + /// Returns the current poll time and entry for the given account. + pub fn get_with_index(&self, account: &Account) -> Option<(u64, AutomaticDepositCacheEntry)> { + self.by_account.get(account).map(|(t, e)| (*t, e.clone())) } - fn from_bytes(bytes: Cow<[u8]>) -> Self { - minicbor::decode(bytes.as_ref()).unwrap_or_else(|e| { - panic!( - "failed to decode AutomaticDepositCacheEntry: {e} (bytes: {})", - hex::encode(bytes) - ) + /// Inserts or updates an entry, updating the poll-time index atomically. + pub fn insert( + &mut self, + account: Account, + next_poll_at: u64, + entry: AutomaticDepositCacheEntry, + ) { + if let Some((old_t, _)) = self.by_account.get(&account) { + self.by_poll_time.remove(&(*old_t, account)); + } + self.by_poll_time.insert((next_poll_at, account), ()); + self.by_account.insert(account, (next_poll_at, entry)); + } + + /// Iterates all `(next_poll_at, account, entry)` triples in ascending poll-time order. + pub fn iter(&self) -> impl Iterator + '_ { + self.by_poll_time.keys().map(|(t, account)| { + let (_, entry) = self + .by_account + .get(account) + .expect("poll-time index and by_account map must be in sync"); + (*t, *account, entry.clone()) }) } - const BOUND: Bound = Bound::Unbounded; -} + pub fn len(&self) -> usize { + self.by_account.len() + } -/// Map from `Account` to `AutomaticDepositCacheEntry`, indexed by `next_poll_at` -/// timestamp for ordered iteration. The poll time is stored alongside the cache entry -/// inside the map (not in `AutomaticDepositCacheEntry` itself), analogous to how -/// `InsertionOrderedMap` stores the sequence number alongside the value. -/// -/// Accounts that have been stopped from monitoring are stored with index `u64::MAX` -/// so they are never scheduled for another poll. -pub type AutomaticDepositCache = StableSortKeyMap; + pub fn is_empty(&self) -> bool { + self.by_account.is_empty() + } +} /// The monitoring lifecycle state of an account, as derived from the cache. pub enum AccountMonitoringState { @@ -77,12 +105,12 @@ pub enum AccountMonitoringState { #[allow(dead_code)] entry: AutomaticDepositCacheEntry, }, - /// Polling was stopped after the quota was exhausted, but a subsequent deposit - /// reset the quota. The account can be rescheduled via `update_balance`. + /// Polling was stopped after a successful deposit was found. The account + /// can be rescheduled via `update_balance`. Stopped { entry: AutomaticDepositCacheEntry }, - /// The RPC quota for this account has been exhausted and no deposit has reset it. - /// `update_balance` will return `MonitoringQuotaExhausted` until a deposit resets - /// the quota. + /// The `getSignaturesForAddress` quota for this account has been exhausted. + /// `update_balance` will return `MonitoringQuotaExhausted` until the manual + /// flow replenishes the quota. Exhausted { #[allow(dead_code)] entry: AutomaticDepositCacheEntry, @@ -102,7 +130,7 @@ impl AutomaticDepositCacheExt for AutomaticDepositCache { next_poll_at: t, entry, }, - Some((_, entry)) if entry.rpc_quota_left == 0 => { + Some((_, entry)) if entry.sig_calls_remaining == 0 => { AccountMonitoringState::Exhausted { entry } } Some((_, entry)) => AccountMonitoringState::Stopped { entry }, diff --git a/minter/src/deposit/automatic/cache/tests.rs b/minter/src/deposit/automatic/cache/tests.rs deleted file mode 100644 index e6d149db..00000000 --- a/minter/src/deposit/automatic/cache/tests.rs +++ /dev/null @@ -1,13 +0,0 @@ -use super::*; -use crate::test_fixtures::arb::arb_cache_entry; -use ic_stable_structures::Storable; -use proptest::{prop_assert_eq, proptest}; - -proptest! { - #[test] - fn storable_roundtrip(entry in arb_cache_entry()) { - let bytes = entry.to_bytes(); - let restored = AutomaticDepositCacheEntry::from_bytes(bytes); - prop_assert_eq!(entry, restored); - } -} diff --git a/minter/src/deposit/automatic/mod.rs b/minter/src/deposit/automatic/mod.rs index 2e4a08b6..5d7aacd4 100644 --- a/minter/src/deposit/automatic/mod.rs +++ b/minter/src/deposit/automatic/mod.rs @@ -38,8 +38,10 @@ pub const GET_SIGNATURES_FOR_ADDRESS_LIMIT: u32 = 100; /// Registers the given account for automated deposit monitoring. /// /// - If the account is already actively monitored (has a scheduled poll), returns `Ok(())`. -/// - If the account's RPC quota is exhausted, returns `Err(MonitoringQuotaExhausted)`. -/// - If the account has remaining quota but is not scheduled (e.g. was stopped), reschedules it. +/// - If the account's `getSignaturesForAddress` quota is exhausted, returns +/// `Err(MonitoringQuotaExhausted)`. +/// - If the account has remaining quota but is not scheduled (e.g. was stopped after a +/// successful deposit), reschedules it with a fresh backoff delay. /// - If the account is unknown, allocates a fresh quota and schedules the first poll. /// - Returns `Err(QueueFull)` if the monitored account limit has been reached. pub fn update_balance( @@ -74,7 +76,8 @@ pub fn update_balance( let new_entry = match cached_entry { Some(entry) => AutomaticDepositCacheEntry { - rpc_quota_left: entry.rpc_quota_left, + sig_calls_remaining: entry.sig_calls_remaining, + tx_calls_remaining: entry.tx_calls_remaining, next_backoff_delay_mins: INITIAL_BACKOFF_DELAY_MINS, }, None => AutomaticDepositCacheEntry::default(), @@ -91,7 +94,7 @@ pub fn update_balance( /// /// For each due address, calls `getSignaturesForAddress` on the Solana RPC. /// After each call, reschedules the account with exponential backoff, or marks it -/// as stopped (index `u64::MAX`) if the RPC quota has been exhausted. +/// as exhausted (poll time `u64::MAX`, quota at zero) once the quota is depleted. pub async fn poll_monitored_addresses(runtime: R) { let _guard = match TimerGuard::new(TaskType::PollMonitoredAddresses) { Ok(guard) => guard, @@ -154,7 +157,7 @@ async fn poll_account( until: None, }; - let rpc_quota_left = entry.rpc_quota_left.saturating_sub(1); + let sig_calls_remaining = entry.sig_calls_remaining.saturating_sub(1); match get_signatures_for_address(runtime, params).await { Err(e) => { @@ -168,14 +171,16 @@ async fn poll_account( } } - if rpc_quota_left == 0 { - // Use u64::MAX so the entry is retained but never scheduled for another poll. + if sig_calls_remaining == 0 { + // Retain the entry with u64::MAX so it is never rescheduled, but quota info + // is preserved for display and for replenishment via the manual flow. with_automatic_deposit_cache_mut(|cache| { cache.insert( account, u64::MAX, AutomaticDepositCacheEntry { - rpc_quota_left, + sig_calls_remaining, + tx_calls_remaining: entry.tx_calls_remaining, next_backoff_delay_mins: entry.next_backoff_delay_mins, }, ); @@ -190,7 +195,7 @@ async fn poll_account( }); log!( Priority::Info, - "Stopped monitoring {deposit_address}: RPC quota exhausted" + "Stopped monitoring {deposit_address}: getSignaturesForAddress quota exhausted" ); } else { let delay_mins = entry.next_backoff_delay_mins; @@ -202,7 +207,8 @@ async fn poll_account( account, next_poll_at, AutomaticDepositCacheEntry { - rpc_quota_left, + sig_calls_remaining, + tx_calls_remaining: entry.tx_calls_remaining, next_backoff_delay_mins: delay_mins.saturating_mul(2), }, ); diff --git a/minter/src/deposit/automatic/tests.rs b/minter/src/deposit/automatic/tests.rs index d4b87890..87d5da06 100644 --- a/minter/src/deposit/automatic/tests.rs +++ b/minter/src/deposit/automatic/tests.rs @@ -7,7 +7,7 @@ use crate::{ EventsAssert, account, init_schnorr_master_key, init_state, runtime::TestCanisterRuntime, }, }; -use cache::{INITIAL_BACKOFF_DELAY_MINS, INITIAL_RPC_QUOTA}; +use cache::{INITIAL_BACKOFF_DELAY_MINS, MAX_GET_SIGNATURES_CALLS, MAX_RETRIEVED_TRANSACTIONS}; use candid::Principal; use sol_rpc_types::{ConfirmedTransactionStatusWithSignature, MultiRpcResult}; use std::iter; @@ -30,10 +30,11 @@ mod update_balance_tests { let result = update_balance(&runtime, ACCOUNT); assert_eq!(result, Ok(())); - // Start monitoring the account and add a cache entry with a fresh quota and backoff delay + // Fresh cache entry with full quotas and initial backoff delay. CacheAssert::for_account(ACCOUNT) .next_poll_at_mins(INITIAL_BACKOFF_DELAY_MINS) - .quota(INITIAL_RPC_QUOTA) + .sig_calls(MAX_GET_SIGNATURES_CALLS) + .tx_calls(MAX_RETRIEVED_TRANSACTIONS) .backoff_mins(INITIAL_BACKOFF_DELAY_MINS); EventsAssert::from_recorded() .expect_event_eq(EventType::StartedMonitoringAccount { account: ACCOUNT }) @@ -55,7 +56,7 @@ mod update_balance_tests { let result2 = update_balance(&runtime, ACCOUNT); assert_eq!(result2, Ok(())); - // Does not modify the cache or events + // Does not modify the cache or events. assert_eq!(cache_before, CacheAssert::for_account(ACCOUNT)); assert_eq!(events_before, EventsAssert::from_recorded()); } @@ -70,10 +71,10 @@ mod update_balance_tests { let result = update_balance(&runtime, ACCOUNT); assert_eq!(result, Ok(())); - // Does not modify the quota but resets the backoff delay and starts monitoring + // Preserves the existing quota but resets the backoff delay and reschedules. CacheAssert::for_account(ACCOUNT) .next_poll_at_mins(INITIAL_BACKOFF_DELAY_MINS) - .quota(5) + .sig_calls(5) .backoff_mins(INITIAL_BACKOFF_DELAY_MINS); EventsAssert::from_recorded() .expect_event_eq(EventType::StartedMonitoringAccount { account: ACCOUNT }) @@ -92,7 +93,7 @@ mod update_balance_tests { let result = update_balance(&runtime, ACCOUNT); assert_eq!(result, Err(UpdateBalanceError::MonitoringQuotaExhausted)); - // Does not modify the cache or events + // Does not modify the cache or events. assert_eq!(cache_before, CacheAssert::for_account(ACCOUNT)); assert_eq!(events_before, EventsAssert::from_recorded()); } @@ -103,11 +104,11 @@ mod update_balance_tests { start_monitoring_max_number_of_accounts(); let runtime = TestCanisterRuntime::new().add_times(iter::repeat(0)); - // Account is not being monitored, return error + // Account is not being monitored, return error. let result1 = update_balance(&runtime, account(MAX_MONITORED_ACCOUNTS + 1)); assert_eq!(result1, Err(UpdateBalanceError::QueueFull)); - // Account is already being monitored, return ok + // Account is already being monitored, return ok. let result2 = update_balance(&runtime, account(0)); assert_eq!(result2, Ok(())); } @@ -125,7 +126,12 @@ mod poll_monitored_addresses_tests { let runtime = TestCanisterRuntime::new().add_times(0..); for i in 0..num_accounts { update_balance(&runtime, account(i)).unwrap(); - set_cache_entry(account(i), 0, INITIAL_RPC_QUOTA, INITIAL_BACKOFF_DELAY_MINS); + set_cache_entry( + account(i), + 0, + MAX_GET_SIGNATURES_CALLS, + INITIAL_BACKOFF_DELAY_MINS, + ); } // Round 1: processes MAX_CONCURRENT_RPC_CALLS accounts (rescheduled into the future), @@ -158,14 +164,15 @@ mod poll_monitored_addresses_tests { let runtime = TestCanisterRuntime::new().add_times([t0, t0, t0]); update_balance(&runtime, account).unwrap(); - for _ in 0..INITIAL_RPC_QUOTA { + // MAX_GET_SIGNATURES_CALLS polls: 1, 2, 4, 8, ..., 512 minutes apart. + for _ in 0..MAX_GET_SIGNATURES_CALLS { let next_poll_at = CacheAssert::for_account(account).scheduled_at(); EventsAssert::from_recorded() .expect_event_eq(EventType::StartedMonitoringAccount { account }) .assert_no_more_events(); - // Just before next scheduled time: no JSON-RPC calls. + // Just before the scheduled time: no RPC call. let runtime = TestCanisterRuntime::new().add_times([next_poll_at - 1]); poll_monitored_addresses(runtime).await; @@ -206,13 +213,16 @@ fn start_monitoring_max_number_of_accounts() { } } -fn set_cache_entry(account: Account, next_poll_at: u64, quota: u64, backoff_mins: u64) { +/// Sets a cache entry with the given `next_poll_at`, `sig_calls_remaining`, and +/// `next_backoff_delay_mins`. `tx_calls_remaining` is set to `MAX_RETRIEVED_TRANSACTIONS`. +fn set_cache_entry(account: Account, next_poll_at: u64, sig_calls: u32, backoff_mins: u64) { with_automatic_deposit_cache_mut(|cache| { cache.insert( account, next_poll_at, AutomaticDepositCacheEntry { - rpc_quota_left: quota, + sig_calls_remaining: sig_calls, + tx_calls_remaining: MAX_RETRIEVED_TRANSACTIONS, next_backoff_delay_mins: backoff_mins, }, ); @@ -242,11 +252,11 @@ impl CacheAssert { } } - /// Returns the scheduled poll time, panicking if the account is stopped. + /// Returns the scheduled poll time, panicking if the account is stopped/exhausted. fn scheduled_at(self) -> u64 { self.next_poll_at.unwrap_or_else(|| { panic!( - "Account {:?} is stopped (no scheduled poll time)", + "Account {:?} is stopped/exhausted (no scheduled poll time)", self.account ) }) @@ -264,10 +274,19 @@ impl CacheAssert { self } - fn quota(self, expected: u64) -> Self { + fn sig_calls(self, expected: u32) -> Self { assert_eq!( - self.entry.rpc_quota_left, expected, - "rpc_quota_left mismatch for account {:?}", + self.entry.sig_calls_remaining, expected, + "sig_calls_remaining mismatch for account {:?}", + self.account + ); + self + } + + fn tx_calls(self, expected: u32) -> Self { + assert_eq!( + self.entry.tx_calls_remaining, expected, + "tx_calls_remaining mismatch for account {:?}", self.account ); self diff --git a/minter/src/storage/mod.rs b/minter/src/storage/mod.rs index c8ebba62..bf341110 100644 --- a/minter/src/storage/mod.rs +++ b/minter/src/storage/mod.rs @@ -11,8 +11,6 @@ use std::cell::RefCell; const EVENT_LOG_INDEX_MEMORY_ID: MemoryId = MemoryId::new(0); const EVENT_LOG_DATA_MEMORY_ID: MemoryId = MemoryId::new(1); -const AUTOMATIC_DEPOSIT_CACHE_BY_ACCOUNT_MEMORY_ID: MemoryId = MemoryId::new(2); -const AUTOMATIC_DEPOSIT_CACHE_BY_POLL_TIME_MEMORY_ID: MemoryId = MemoryId::new(3); type VMem = VirtualMemory; type EventLog = StableLog; @@ -33,13 +31,11 @@ thread_local! { ) ); - /// Stable-memory cache for per-account automated deposit discovery state. - static AUTOMATIC_DEPOSIT_CACHE: RefCell = MEMORY_MANAGER.with(|m| { - RefCell::new(AutomaticDepositCache::init( - m.borrow().get(AUTOMATIC_DEPOSIT_CACHE_BY_ACCOUNT_MEMORY_ID), - m.borrow().get(AUTOMATIC_DEPOSIT_CACHE_BY_POLL_TIME_MEMORY_ID), - )) - }); + /// Heap-memory cache for per-account automated deposit discovery state. + /// Reset on canister upgrade; does not need to survive upgrades since it can + /// be reconstructed by replaying the RPC calls. + static AUTOMATIC_DEPOSIT_CACHE: RefCell = + RefCell::new(AutomaticDepositCache::default()); static UNSTABLE_METRICS: RefCell = const { RefCell::new(Metrics::new()) }; } @@ -123,13 +119,7 @@ pub(crate) fn reset_events() { #[cfg(test)] pub fn reset_automatic_deposit_cache() { - MEMORY_MANAGER.with(|m| { - AUTOMATIC_DEPOSIT_CACHE.with(|cache| { - *cache.borrow_mut() = AutomaticDepositCache::new( - m.borrow().get(AUTOMATIC_DEPOSIT_CACHE_BY_ACCOUNT_MEMORY_ID), - m.borrow() - .get(AUTOMATIC_DEPOSIT_CACHE_BY_POLL_TIME_MEMORY_ID), - ); - }); + AUTOMATIC_DEPOSIT_CACHE.with(|cache| { + *cache.borrow_mut() = AutomaticDepositCache::default(); }); } diff --git a/minter/src/test_fixtures/mod.rs b/minter/src/test_fixtures/mod.rs index 60f15fac..16990393 100644 --- a/minter/src/test_fixtures/mod.rs +++ b/minter/src/test_fixtures/mod.rs @@ -597,16 +597,6 @@ pub mod arb { (any::(), arb_event_type()) .prop_map(|(timestamp, payload)| Event { timestamp, payload }) } - - pub fn arb_cache_entry() - -> impl Strategy { - (any::(), any::()).prop_map(|(rpc_quota_left, next_backoff_delay_mins)| { - crate::deposit::automatic::cache::AutomaticDepositCacheEntry { - rpc_quota_left, - next_backoff_delay_mins, - } - }) - } } pub mod deposit { diff --git a/minter/src/utils/mod.rs b/minter/src/utils/mod.rs index 51766244..cc87b6c5 100644 --- a/minter/src/utils/mod.rs +++ b/minter/src/utils/mod.rs @@ -1,2 +1 @@ pub mod insertion_ordered_map; -pub mod stable_sort_key_map; diff --git a/minter/src/utils/stable_sort_key_map/mod.rs b/minter/src/utils/stable_sort_key_map/mod.rs deleted file mode 100644 index d709e6dd..00000000 --- a/minter/src/utils/stable_sort_key_map/mod.rs +++ /dev/null @@ -1,119 +0,0 @@ -use ic_stable_structures::{ - DefaultMemoryImpl, StableBTreeMap, Storable, memory_manager::VirtualMemory, -}; - -type Memory = VirtualMemory; - -#[cfg(test)] -mod tests; - -/// A stable-memory map with a secondary sort index per entry. -/// -/// Two `StableBTreeMap`s are kept in sync: -/// - `by_key`: primary store, always contains every entry. -/// - `by_index`: drives [`iter`]. -/// -// TODO: simplify value/key types to 2-tuples once ic-stable-structures supports 2-tuples -// with one unbounded element. -/// -/// [`iter`]: StableSortKeyMap::iter -/// [`get`]: StableSortKeyMap::get -pub struct StableSortKeyMap -where - K: Storable + Ord + Clone, - I: Storable + Ord + Clone, - V: Storable + Clone, -{ - by_key: StableBTreeMap, - by_index: StableBTreeMap<(I, K, ()), (), Memory>, -} - -impl StableSortKeyMap -where - K: Storable + Ord + Clone, - I: Storable + Ord + Clone, - V: Storable + Clone, -{ - /// Initializes the map from existing stable memory (used in production / post-upgrade). - pub fn init(by_key_mem: Memory, by_index_mem: Memory) -> Self { - Self { - by_key: StableBTreeMap::init(by_key_mem), - by_index: StableBTreeMap::init(by_index_mem), - } - } - - /// Creates an empty map in the given memory regions (used in tests). - pub fn new(by_key_mem: Memory, by_index_mem: Memory) -> Self { - Self { - by_key: StableBTreeMap::new(by_key_mem), - by_index: StableBTreeMap::new(by_index_mem), - } - } - - /// Returns the value for the given key, or `None` if absent. - pub fn get(&self, key: &K) -> Option { - self.by_key.get(key).map(|(_, v, _)| v) - } - - /// Returns the current index and value for the given key, or `None` if absent. - pub fn get_with_index(&self, key: &K) -> Option<(I, V)> { - self.by_key.get(key).map(|(i, v, _)| (i, v)) - } - - /// Inserts or updates an entry. - pub fn insert(&mut self, key: K, index: I, value: V) { - if let Some((old_index, _, _)) = self.by_key.get(&key) { - self.by_index.remove(&(old_index, key.clone(), ())); - } - self.by_index.insert((index.clone(), key.clone(), ()), ()); - self.by_key.insert(key, (index, value, ())); - } - - /// Iterates all `(index, key, value)` triples in ascending index order. - /// - /// To iterate only entries up to a given index bound, use standard iterator - /// adapters: `map.iter().take_while(|(i, ..)| *i <= max)`. - pub fn iter(&self) -> Iter<'_, K, I, V> { - Iter { - index_iter: self.by_index.iter(), - by_key: &self.by_key, - } - } - - pub fn len(&self) -> usize { - self.by_key.len() as usize - } - - pub fn is_empty(&self) -> bool { - self.by_key.is_empty() - } -} - -/// Iterator over `(index, key, value)` triples in ascending index order. -pub struct Iter<'a, K, I, V> -where - K: Storable + Ord + Clone, - I: Storable + Ord + Clone, - V: Storable + Clone, -{ - index_iter: ic_stable_structures::btreemap::Iter<'a, (I, K, ()), (), Memory>, - by_key: &'a StableBTreeMap, -} - -impl<'a, K, I, V> Iterator for Iter<'a, K, I, V> -where - K: Storable + Ord + Clone, - I: Storable + Ord + Clone, - V: Storable + Clone, -{ - type Item = (I, K, V); - - fn next(&mut self) -> Option { - let ((i, k, _), _) = self.index_iter.next()?; - let (_, v, _) = self - .by_key - .get(&k) - .expect("index and by_key map must be in sync"); - Some((i, k, v)) - } -} diff --git a/minter/src/utils/stable_sort_key_map/tests.rs b/minter/src/utils/stable_sort_key_map/tests.rs deleted file mode 100644 index 9caa05c3..00000000 --- a/minter/src/utils/stable_sort_key_map/tests.rs +++ /dev/null @@ -1,142 +0,0 @@ -use super::*; -use ic_stable_structures::{ - DefaultMemoryImpl, - memory_manager::{MemoryId, MemoryManager}, - storable::Bound, -}; -use std::borrow::Cow; - -// --- Test fixtures --- - -#[derive(Clone, PartialEq, Debug)] -struct Entry { - data: u32, -} - -impl Storable for Entry { - fn to_bytes(&self) -> Cow<'_, [u8]> { - Cow::Owned(self.data.to_be_bytes().to_vec()) - } - - fn from_bytes(bytes: Cow<[u8]>) -> Self { - Self { - data: u32::from_be_bytes(bytes[..4].try_into().unwrap()), - } - } - - const BOUND: Bound = Bound::Bounded { - max_size: 4, - is_fixed_size: true, - }; -} - -fn make_map() -> StableSortKeyMap { - let mm = MemoryManager::init(DefaultMemoryImpl::default()); - StableSortKeyMap::new(mm.get(MemoryId::new(0)), mm.get(MemoryId::new(1))) -} - -fn entry(data: u32) -> Entry { - Entry { data } -} - -// --- Tests --- - -mod insert { - use super::*; - - #[test] - fn should_store_value_by_key() { - let mut map = make_map(); - map.insert(1, 100, entry(42)); - assert_eq!(map.get(&1), Some(entry(42))); - assert_eq!(map.len(), 1); - } - - #[test] - fn should_index_entry() { - let mut map = make_map(); - map.insert(1, 50, entry(0)); - let mut iter = map.iter(); - assert_eq!(iter.next(), Some((50, 1, entry(0)))); - assert_eq!(iter.next(), None); - } - - #[test] - fn should_update_index_when_index_changes() { - let mut map = make_map(); - map.insert(1, 100, entry(0)); - map.insert(1, 200, entry(1)); - assert_eq!(map.get_with_index(&1), Some((200, entry(1)))); - assert_eq!(map.len(), 1); - } -} - -mod get_with_index { - use super::*; - - #[test] - fn should_return_index_and_value() { - let mut map = make_map(); - map.insert(1, 99, entry(7)); - assert_eq!(map.get_with_index(&1), Some((99, entry(7)))); - } - - #[test] - fn should_return_none_when_key_absent() { - let map = make_map(); - assert_eq!(map.get_with_index(&1), None); - } -} - -mod iter { - use super::*; - - #[test] - fn should_return_empty_when_map_is_empty() { - let map = make_map(); - assert_eq!(map.iter().count(), 0); - } - - #[test] - fn should_iterate_all_entries_in_ascending_index_order() { - let mut map = make_map(); - map.insert(1, 30, entry(0)); - map.insert(2, 10, entry(1)); - map.insert(3, 20, entry(2)); - // Yields (index, key, value) ordered by index: 10, 20, 30 - let result: Vec<_> = map.iter().collect(); - assert_eq!( - result, - vec![(10, 2, entry(1)), (20, 3, entry(2)), (30, 1, entry(0))] - ); - } - - #[test] - fn should_include_entries_with_max_index() { - let mut map = make_map(); - map.insert(1, u64::MAX, entry(99)); - map.insert(2, 0, entry(0)); - let result: Vec<_> = map.iter().collect(); - assert_eq!(result, vec![(0, 2, entry(0)), (u64::MAX, 1, entry(99))]); - } -} - -mod len_and_is_empty { - use super::*; - - #[test] - fn should_be_empty_when_new() { - let map = make_map(); - assert!(map.is_empty()); - assert_eq!(map.len(), 0); - } - - #[test] - fn should_count_all_entries() { - let mut map = make_map(); - map.insert(1, 100, entry(0)); - map.insert(2, 200, entry(1)); - assert_eq!(map.len(), 2); - assert!(!map.is_empty()); - } -} From 597a17ee85c1486a365c2ec90e174299fde8aed4 Mon Sep 17 00:00:00 2001 From: Louis Pahlavi Date: Mon, 27 Apr 2026 14:19:29 +0200 Subject: [PATCH 5/5] refactor: unify AutomaticDepositCache and InsertionOrderedMap via SortedKeyMap Introduces `SortedKeyMap`, a generic two-BTreeMap structure that drives both `AutomaticDepositCache` (sorted by poll time) and `InsertionOrderedMap` (sorted by auto-incrementing sequence number). `InsertionOrderedMap` now wraps `SortedKeyMap` with `next_seq` and `AutomaticDepositCache` is a newtype over `SortedKeyMap`. Co-Authored-By: Claude Sonnet 4.6 --- minter/src/deposit/automatic/cache/mod.rs | 37 ++-- minter/src/utils/insertion_ordered_map/mod.rs | 60 ++---- minter/src/utils/mod.rs | 1 + minter/src/utils/sorted_key_map/mod.rs | 126 +++++++++++++ minter/src/utils/sorted_key_map/tests.rs | 172 ++++++++++++++++++ 5 files changed, 326 insertions(+), 70 deletions(-) create mode 100644 minter/src/utils/sorted_key_map/mod.rs create mode 100644 minter/src/utils/sorted_key_map/tests.rs diff --git a/minter/src/deposit/automatic/cache/mod.rs b/minter/src/deposit/automatic/cache/mod.rs index 8cd8a666..04789f2d 100644 --- a/minter/src/deposit/automatic/cache/mod.rs +++ b/minter/src/deposit/automatic/cache/mod.rs @@ -1,5 +1,5 @@ +use crate::utils::sorted_key_map::SortedKeyMap; use icrc_ledger_types::icrc1::account::Account; -use std::collections::BTreeMap; /// Maximum number of `getSignaturesForAddress` calls allowed per monitored account. pub const MAX_GET_SIGNATURES_CALLS: u32 = 10; @@ -38,26 +38,19 @@ impl Default for AutomaticDepositCacheEntry { /// Heap-memory cache storing per-account automated deposit discovery state, /// ordered by next poll time for efficient scheduling. /// -/// Two `BTreeMap`s are kept in sync, mirroring the stable-memory `StableSortKeyMap` -/// pattern but without the stable-structures overhead: -/// - `by_account`: primary store, always contains every entry. -/// - `by_poll_time`: drives [`iter`] in ascending poll-time order. +/// Backed by a [`SortedKeyMap`] with `Account` as key and `u64` (nanosecond timestamp) +/// as the sort index. /// /// Accounts that have been stopped are stored with `next_poll_at = u64::MAX` /// so they are never picked up by the poll loop, but their quota is retained /// for future `update_balance` calls. -/// -/// [`iter`]: AutomaticDepositCache::iter #[derive(Default)] -pub struct AutomaticDepositCache { - by_account: BTreeMap, - by_poll_time: BTreeMap<(u64, Account), ()>, -} +pub struct AutomaticDepositCache(SortedKeyMap); impl AutomaticDepositCache { /// Returns the current poll time and entry for the given account. pub fn get_with_index(&self, account: &Account) -> Option<(u64, AutomaticDepositCacheEntry)> { - self.by_account.get(account).map(|(t, e)| (*t, e.clone())) + self.0.get_with_index(account).map(|(t, e)| (*t, e.clone())) } /// Inserts or updates an entry, updating the poll-time index atomically. @@ -67,30 +60,22 @@ impl AutomaticDepositCache { next_poll_at: u64, entry: AutomaticDepositCacheEntry, ) { - if let Some((old_t, _)) = self.by_account.get(&account) { - self.by_poll_time.remove(&(*old_t, account)); - } - self.by_poll_time.insert((next_poll_at, account), ()); - self.by_account.insert(account, (next_poll_at, entry)); + self.0.insert(account, next_poll_at, entry); } /// Iterates all `(next_poll_at, account, entry)` triples in ascending poll-time order. pub fn iter(&self) -> impl Iterator + '_ { - self.by_poll_time.keys().map(|(t, account)| { - let (_, entry) = self - .by_account - .get(account) - .expect("poll-time index and by_account map must be in sync"); - (*t, *account, entry.clone()) - }) + self.0 + .iter() + .map(|(t, account, entry)| (*t, *account, entry.clone())) } pub fn len(&self) -> usize { - self.by_account.len() + self.0.len() } pub fn is_empty(&self) -> bool { - self.by_account.is_empty() + self.0.is_empty() } } diff --git a/minter/src/utils/insertion_ordered_map/mod.rs b/minter/src/utils/insertion_ordered_map/mod.rs index 24720b7b..a074db71 100644 --- a/minter/src/utils/insertion_ordered_map/mod.rs +++ b/minter/src/utils/insertion_ordered_map/mod.rs @@ -1,15 +1,12 @@ -use std::collections::BTreeMap; +use crate::utils::sorted_key_map::{self, SortedKeyMap}; #[cfg(test)] mod tests; /// A map that preserves insertion order while providing O(log n) key lookup. /// -/// Internally uses two `BTreeMap`s: -/// - `entries`: key → (sequence number, value) -/// - `order`: sequence number → key -/// -/// Iteration via [`iter`], [`keys`], and [`values`] is in insertion order (oldest first). +/// Backed by a [`SortedKeyMap`] keyed on an auto-incrementing sequence number, +/// so iteration via [`iter`], [`keys`], and [`values`] is in insertion order (oldest first). /// [`DoubleEndedIterator`] is supported on [`Iter`], so callers can call `.rev()` on /// [`iter`] to get newest-first. /// @@ -17,16 +14,14 @@ mod tests; /// [`keys`]: InsertionOrderedMap::keys /// [`values`]: InsertionOrderedMap::values pub struct InsertionOrderedMap { - entries: BTreeMap, - order: BTreeMap, + inner: SortedKeyMap, next_seq: u64, } impl InsertionOrderedMap { pub fn new() -> Self { Self { - entries: BTreeMap::new(), - order: BTreeMap::new(), + inner: SortedKeyMap::new(), next_seq: 0, } } @@ -34,51 +29,35 @@ impl InsertionOrderedMap { /// Inserts a key-value pair. Returns the old value if the key was already present /// (and moves it to the end of the insertion order). pub fn insert(&mut self, key: K, value: V) -> Option { - let old_value = if let Some((old_seq, old_val)) = self.entries.remove(&key) { - self.order.remove(&old_seq); - Some(old_val) - } else { - None - }; let seq = self.next_seq; self.next_seq += 1; - self.order.insert(seq, key.clone()); - self.entries.insert(key, (seq, value)); - old_value + self.inner.insert(key, seq, value) } /// Removes a key and returns its value if it was present. pub fn remove(&mut self, key: &K) -> Option { - if let Some((seq, value)) = self.entries.remove(key) { - self.order.remove(&seq); - Some(value) - } else { - None - } + self.inner.remove(key) } pub fn get(&self, key: &K) -> Option<&V> { - self.entries.get(key).map(|(_, v)| v) + self.inner.get(key) } pub fn contains_key(&self, key: &K) -> bool { - self.entries.contains_key(key) + self.inner.contains_key(key) } pub fn len(&self) -> usize { - self.entries.len() + self.inner.len() } pub fn is_empty(&self) -> bool { - self.entries.is_empty() + self.inner.is_empty() } /// Returns an iterator over `(&K, &V)` pairs in insertion order. pub fn iter(&self) -> Iter<'_, K, V> { - Iter { - order_iter: self.order.values(), - entries: &self.entries, - } + Iter(self.inner.iter()) } /// Returns an iterator over keys in insertion order. @@ -93,7 +72,7 @@ impl InsertionOrderedMap { /// Returns a mutable iterator over values. Iteration order is unspecified. pub fn values_mut(&mut self) -> impl Iterator + '_ { - self.entries.values_mut().map(|(_, v)| v) + self.inner.values_mut() } } @@ -130,26 +109,19 @@ impl<'a, K: Ord + Clone, V> IntoIterator for &'a InsertionOrderedMap { // --- Iterator types --- -pub struct Iter<'a, K, V> { - order_iter: std::collections::btree_map::Values<'a, u64, K>, - entries: &'a BTreeMap, -} +pub struct Iter<'a, K, V>(sorted_key_map::Iter<'a, K, u64, V>); impl<'a, K: Ord, V> Iterator for Iter<'a, K, V> { type Item = (&'a K, &'a V); fn next(&mut self) -> Option { - let key = self.order_iter.next()?; - let (_, value) = self.entries.get(key)?; - Some((key, value)) + self.0.next().map(|(_, k, v)| (k, v)) } } impl<'a, K: Ord, V> DoubleEndedIterator for Iter<'a, K, V> { fn next_back(&mut self) -> Option { - let key = self.order_iter.next_back()?; - let (_, value) = self.entries.get(key)?; - Some((key, value)) + self.0.next_back().map(|(_, k, v)| (k, v)) } } diff --git a/minter/src/utils/mod.rs b/minter/src/utils/mod.rs index cc87b6c5..927c6f4c 100644 --- a/minter/src/utils/mod.rs +++ b/minter/src/utils/mod.rs @@ -1 +1,2 @@ pub mod insertion_ordered_map; +pub mod sorted_key_map; diff --git a/minter/src/utils/sorted_key_map/mod.rs b/minter/src/utils/sorted_key_map/mod.rs new file mode 100644 index 00000000..d34cfd87 --- /dev/null +++ b/minter/src/utils/sorted_key_map/mod.rs @@ -0,0 +1,126 @@ +use std::collections::BTreeMap; + +#[cfg(test)] +mod tests; + +/// A map with a user-supplied sort index, backed by two `BTreeMap`s. +/// +/// Two `BTreeMap`s are kept in sync: +/// - `by_key`: primary store, key → (index, value) +/// - `by_index`: secondary index, (index, key) → (), drives [`iter`] in ascending index order +/// +/// Unlike a secondary index keyed only by `I`, composite `(I, K)` keys allow +/// multiple entries to share the same index value (e.g. several accounts scheduled +/// at the same timestamp). +/// +/// [`iter`]: SortedKeyMap::iter +pub struct SortedKeyMap { + by_key: BTreeMap, + by_index: BTreeMap<(I, K), ()>, +} + +impl Default for SortedKeyMap { + fn default() -> Self { + Self { + by_key: BTreeMap::new(), + by_index: BTreeMap::new(), + } + } +} + +impl SortedKeyMap { + pub fn new() -> Self { + Self::default() + } + + /// Inserts or updates an entry, atomically updating the sort index. + /// Returns the old value if the key was already present. + pub fn insert(&mut self, key: K, index: I, value: V) -> Option { + let old_value = if let Some((old_index, old_val)) = self.by_key.remove(&key) { + self.by_index.remove(&(old_index, key.clone())); + Some(old_val) + } else { + None + }; + self.by_index.insert((index.clone(), key.clone()), ()); + self.by_key.insert(key, (index, value)); + old_value + } + + /// Removes a key and returns its value if present. + pub fn remove(&mut self, key: &K) -> Option { + if let Some((index, value)) = self.by_key.remove(key) { + self.by_index.remove(&(index, key.clone())); + Some(value) + } else { + None + } + } + + /// Returns a reference to the value for the given key. + pub fn get(&self, key: &K) -> Option<&V> { + self.by_key.get(key).map(|(_, v)| v) + } + + /// Returns references to the index and value for the given key. + pub fn get_with_index(&self, key: &K) -> Option<(&I, &V)> { + self.by_key.get(key).map(|(i, v)| (i, v)) + } + + pub fn contains_key(&self, key: &K) -> bool { + self.by_key.contains_key(key) + } + + /// Iterates `(index, key, value)` triples in ascending index order. + /// + /// Entries with equal index values are ordered by key. + pub fn iter(&self) -> Iter<'_, K, I, V> { + Iter { + index_iter: self.by_index.keys(), + by_key: &self.by_key, + } + } + + /// Returns a mutable iterator over values. Iteration order is unspecified. + pub fn values_mut(&mut self) -> impl Iterator + '_ { + self.by_key.values_mut().map(|(_, v)| v) + } + + pub fn len(&self) -> usize { + self.by_key.len() + } + + pub fn is_empty(&self) -> bool { + self.by_key.is_empty() + } +} + +/// Iterator over `(index, key, value)` triples in ascending index order. +pub struct Iter<'a, K, I, V> { + index_iter: std::collections::btree_map::Keys<'a, (I, K), ()>, + by_key: &'a BTreeMap, +} + +impl<'a, K: Ord, I: Ord, V> Iterator for Iter<'a, K, I, V> { + type Item = (&'a I, &'a K, &'a V); + + fn next(&mut self) -> Option { + let (i, k) = self.index_iter.next()?; + let (_, v) = self + .by_key + .get(k) + .expect("by_index and by_key must be in sync"); + Some((i, k, v)) + } +} + +impl<'a, K: Ord, I: Ord, V> DoubleEndedIterator for Iter<'a, K, I, V> { + fn next_back(&mut self) -> Option { + let (i, k) = self.index_iter.next_back()?; + let (_, v) = self + .by_key + .get(k) + .expect("by_index and by_key must be in sync"); + Some((i, k, v)) + } +} diff --git a/minter/src/utils/sorted_key_map/tests.rs b/minter/src/utils/sorted_key_map/tests.rs new file mode 100644 index 00000000..5fd8cec6 --- /dev/null +++ b/minter/src/utils/sorted_key_map/tests.rs @@ -0,0 +1,172 @@ +use super::SortedKeyMap; + +mod insert { + use super::*; + + #[test] + fn should_insert_new_entry() { + let mut map: SortedKeyMap = SortedKeyMap::new(); + assert_eq!(map.insert(1, 10, "a"), None); + assert_eq!(map.len(), 1); + assert_eq!(map.get(&1), Some(&"a")); + } + + #[test] + fn should_return_old_value_on_reinsertion() { + let mut map: SortedKeyMap = SortedKeyMap::new(); + map.insert(1, 10, "a"); + assert_eq!(map.insert(1, 20, "b"), Some("a")); + assert_eq!(map.get(&1), Some(&"b")); + assert_eq!(map.len(), 1); + } + + #[test] + fn should_update_index_on_reinsertion() { + let mut map: SortedKeyMap = SortedKeyMap::new(); + map.insert(1, 10, "a"); + map.insert(2, 20, "b"); + map.insert(1, 30, "a2"); // re-insert with new index + + let triples: Vec<_> = map.iter().map(|(i, k, v)| (*i, *k, *v)).collect(); + assert_eq!(triples, vec![(20, 2, "b"), (30, 1, "a2")]); + } +} + +mod remove { + use super::*; + + #[test] + fn should_remove_existing_key() { + let mut map: SortedKeyMap = SortedKeyMap::new(); + map.insert(1, 10, "a"); + assert_eq!(map.remove(&1), Some("a")); + assert!(map.is_empty()); + } + + #[test] + fn should_return_none_for_absent_key() { + let mut map: SortedKeyMap = SortedKeyMap::new(); + assert_eq!(map.remove(&99), None); + } + + #[test] + fn should_preserve_order_of_remaining_entries() { + let mut map: SortedKeyMap = SortedKeyMap::new(); + map.insert(1, 10, "a"); + map.insert(2, 20, "b"); + map.insert(3, 30, "c"); + map.remove(&2); + + let keys: Vec<_> = map.iter().map(|(_, k, _)| *k).collect(); + assert_eq!(keys, vec![1, 3]); + } +} + +mod get { + use super::*; + + #[test] + fn should_return_value_for_present_key() { + let mut map: SortedKeyMap = SortedKeyMap::new(); + map.insert(42, 0, "hello"); + assert_eq!(map.get(&42), Some(&"hello")); + } + + #[test] + fn should_return_none_for_absent_key() { + let map: SortedKeyMap = SortedKeyMap::new(); + assert_eq!(map.get(&1), None); + } +} + +mod get_with_index { + use super::*; + + #[test] + fn should_return_index_and_value() { + let mut map: SortedKeyMap = SortedKeyMap::new(); + map.insert(1, 42, "foo"); + assert_eq!(map.get_with_index(&1), Some((&42u64, &"foo"))); + } + + #[test] + fn should_return_none_for_absent_key() { + let map: SortedKeyMap = SortedKeyMap::new(); + assert_eq!(map.get_with_index(&1), None); + } +} + +mod contains_key { + use super::*; + + #[test] + fn should_return_true_for_present_key() { + let mut map: SortedKeyMap = SortedKeyMap::new(); + map.insert(1, 0, "a"); + assert!(map.contains_key(&1)); + } + + #[test] + fn should_return_false_for_absent_key() { + let map: SortedKeyMap = SortedKeyMap::new(); + assert!(!map.contains_key(&1)); + } +} + +mod iter { + use super::*; + + #[test] + fn should_iterate_in_ascending_index_order() { + let mut map: SortedKeyMap = SortedKeyMap::new(); + map.insert(3, 30, "c"); + map.insert(1, 10, "a"); + map.insert(2, 20, "b"); + + let triples: Vec<_> = map.iter().map(|(i, k, v)| (*i, *k, *v)).collect(); + assert_eq!(triples, vec![(10, 1, "a"), (20, 2, "b"), (30, 3, "c")]); + } + + #[test] + fn should_order_equal_indices_by_key() { + let mut map: SortedKeyMap = SortedKeyMap::new(); + map.insert(2, 10, "b"); + map.insert(1, 10, "a"); // same index as key 2 + + let triples: Vec<_> = map.iter().map(|(i, k, v)| (*i, *k, *v)).collect(); + assert_eq!(triples, vec![(10, 1, "a"), (10, 2, "b")]); + } + + #[test] + fn should_support_reverse_iteration() { + let mut map: SortedKeyMap = SortedKeyMap::new(); + map.insert(1, 10, "a"); + map.insert(2, 20, "b"); + map.insert(3, 30, "c"); + + let keys: Vec<_> = map.iter().rev().map(|(_, k, _)| *k).collect(); + assert_eq!(keys, vec![3, 2, 1]); + } +} + +mod len_and_is_empty { + use super::*; + + #[test] + fn should_be_empty_when_new() { + let map: SortedKeyMap = SortedKeyMap::new(); + assert!(map.is_empty()); + assert_eq!(map.len(), 0); + } + + #[test] + fn should_track_len_through_inserts_and_removes() { + let mut map: SortedKeyMap = SortedKeyMap::new(); + map.insert(1, 10, "a"); + map.insert(2, 20, "b"); + assert_eq!(map.len(), 2); + map.remove(&1); + assert_eq!(map.len(), 1); + assert!(!map.is_empty()); + } +}