From c419aa4163a1e25b04c55f20be5ad7a2ea730847 Mon Sep 17 00:00:00 2001 From: Louis Pahlavi Date: Wed, 22 Apr 2026 11:22:00 +0200 Subject: [PATCH 1/2] feat: cache last discovered signature to avoid re-fetching seen transactions Store the most recent `getSignaturesForAddress` result in the deposit cache and pass it as the `until` parameter on subsequent calls, so that only new transactions are returned. Also moves the CBOR codecs from `state/event/cbor` to `utils/cbor` (making them reusable), extends the signature codec with an `option` variant, adds round-trip tests for `Option` encoding, and adds `update_index` to `StableSortKeyMap` for rescheduling existing entries. Co-Authored-By: Claude Sonnet 4.6 --- minter/src/deposit/automatic/cache/mod.rs | 8 +++ minter/src/deposit/automatic/mod.rs | 58 ++++++++----------- minter/src/state/event.rs | 7 ++- minter/src/test_fixtures/mod.rs | 13 +++-- minter/src/{state/event => utils}/cbor/mod.rs | 26 +++++++++ .../src/{state/event => utils}/cbor/tests.rs | 33 ++++++++++- minter/src/utils/mod.rs | 1 + minter/src/utils/stable_sort_key_map/mod.rs | 21 +++++++ 8 files changed, 122 insertions(+), 45 deletions(-) rename minter/src/{state/event => utils}/cbor/mod.rs (79%) rename minter/src/{state/event => utils}/cbor/tests.rs (71%) diff --git a/minter/src/deposit/automatic/cache/mod.rs b/minter/src/deposit/automatic/cache/mod.rs index db6e0b21..1b166c13 100644 --- a/minter/src/deposit/automatic/cache/mod.rs +++ b/minter/src/deposit/automatic/cache/mod.rs @@ -2,6 +2,7 @@ use crate::utils::stable_sort_key_map::StableSortKeyMap; use ic_stable_structures::{Storable, storable::Bound}; use icrc_ledger_types::icrc1::account::Account; use minicbor::{Decode, Encode}; +use solana_signature::Signature; use std::borrow::Cow; #[cfg(test)] @@ -17,6 +18,13 @@ pub struct AutomaticDepositCacheEntry { /// The number of `getSignaturesForAddress` calls made so far for this account. #[n(0)] pub get_signatures_calls: u8, + /// The most recent transaction signature seen for this address. + /// + /// Passed as the `until` parameter to the next `getSignaturesForAddress` call + /// so that only transactions newer than this one are returned, avoiding + /// redundant re-fetching of already-seen signatures. + #[cbor(n(1), with = "crate::utils::cbor::signature::option")] + pub last_discovered_signature: Option, } impl Storable for AutomaticDepositCacheEntry { diff --git a/minter/src/deposit/automatic/mod.rs b/minter/src/deposit/automatic/mod.rs index edfea00a..118f6188 100644 --- a/minter/src/deposit/automatic/mod.rs +++ b/minter/src/deposit/automatic/mod.rs @@ -87,10 +87,9 @@ pub async fn poll_monitored_addresses(runtime: R) { return; } - let due: Vec<(Account, u8)> = with_automatic_deposit_cache(|cache| { + let due: Vec<(Account, AutomaticDepositCacheEntry)> = with_automatic_deposit_cache(|cache| { cache .iter_by_index_up_to(&now) - .map(|(account, entry)| (account, entry.get_signatures_calls)) // +1 to detect whether more accounts remain after this round. .take(MAX_CONCURRENT_RPC_CALLS + 1) .collect() @@ -103,11 +102,11 @@ pub async fn poll_monitored_addresses(runtime: R) { let master_key = lazy_get_schnorr_master_key(&runtime).await; - futures::future::join_all(due.into_iter().take(MAX_CONCURRENT_RPC_CALLS).map( - |(account, get_signatures_calls)| { - poll_account(&runtime, &master_key, account, get_signatures_calls) - }, - )) + futures::future::join_all( + due.into_iter() + .take(MAX_CONCURRENT_RPC_CALLS) + .map(|(account, entry)| poll_account(&runtime, &master_key, account, entry)), + ) .await; if !more_to_process { @@ -119,7 +118,7 @@ async fn poll_account( runtime: &R, master_key: &SchnorrPublicKey, account: Account, - get_signatures_calls: u8, + mut entry: AutomaticDepositCacheEntry, ) { let deposit_address = account_address(master_key, &account); @@ -134,7 +133,7 @@ async fn poll_account( ), min_context_slot: None, before: None, - until: None, + until: entry.last_discovered_signature.map(Into::into), }; match get_signatures_for_address(runtime, params).await { @@ -144,23 +143,17 @@ async fn poll_account( "Failed to get signatures for address {deposit_address}: {e}" ); } - Ok(_signatures) => { - // TODO(DEFI-2780): Process discovered deposit signatures. + Ok(signatures) => { + if let Some(newest) = signatures.first() { + entry.last_discovered_signature = + Some(solana_signature::Signature::from(newest.signature.clone())); + } } } - let get_signatures_calls = get_signatures_calls + 1; - if get_signatures_calls >= MAX_GET_SIGNATURES_FOR_ADDRESS_CALLS { - // Use u64::MAX so the entry is retained but never scheduled for another poll. - with_automatic_deposit_cache_mut(|cache| { - cache.insert( - account, - u64::MAX, - AutomaticDepositCacheEntry { - get_signatures_calls, - }, - ); - }); + entry.get_signatures_calls += 1; + let next_poll_at = if entry.get_signatures_calls >= MAX_GET_SIGNATURES_FOR_ADDRESS_CALLS { + // Maximum number of getSignaturesForAddress calls made, stop polling. mutate_state(|state| { process_event( state, @@ -172,18 +165,13 @@ async fn poll_account( Priority::Info, "Stopped monitoring {deposit_address}: reached maximum getSignaturesForAddress calls ({MAX_GET_SIGNATURES_FOR_ADDRESS_CALLS})" ); + u64::MAX } else { // Exponential backoff: delay before next poll is 2^get_signatures_calls minutes. - let delay = Duration::from_mins(1u64 << get_signatures_calls); - let next_poll_at = runtime.time() + delay.as_nanos() as u64; - with_automatic_deposit_cache_mut(|cache| { - cache.insert( - account, - next_poll_at, - AutomaticDepositCacheEntry { - get_signatures_calls, - }, - ); - }); - } + let delay = Duration::from_mins(1u64 << entry.get_signatures_calls); + runtime.time() + delay.as_nanos() as u64 + }; + with_automatic_deposit_cache_mut(|cache| { + cache.update_index(account, next_poll_at, entry); + }); } diff --git a/minter/src/state/event.rs b/minter/src/state/event.rs index 7b647d8f..29489a52 100644 --- a/minter/src/state/event.rs +++ b/minter/src/state/event.rs @@ -1,4 +1,7 @@ -use crate::numeric::{LedgerBurnIndex, LedgerMintIndex}; +use crate::{ + numeric::{LedgerBurnIndex, LedgerMintIndex}, + utils::cbor, +}; use cksol_types_internal::{InitArgs, UpgradeArgs}; use derive_more::From; use ic_stable_structures::{Storable, storable::Bound}; @@ -21,8 +24,6 @@ pub enum VersionedMessage { ), } -mod cbor; - #[derive(Eq, PartialEq, Debug, Decode, Encode)] pub struct Event { /// The canister time at which the minter generated this event. diff --git a/minter/src/test_fixtures/mod.rs b/minter/src/test_fixtures/mod.rs index cc1778f2..9ae01341 100644 --- a/minter/src/test_fixtures/mod.rs +++ b/minter/src/test_fixtures/mod.rs @@ -340,6 +340,7 @@ pub mod events { #[cfg(test)] pub mod arb { use crate::{ + deposit::automatic::cache::AutomaticDepositCacheEntry, numeric::{LedgerBurnIndex, LedgerMintIndex}, state::event::{DepositId, Event, EventType, TransactionPurpose, WithdrawalRequest}, }; @@ -598,13 +599,13 @@ pub mod arb { .prop_map(|(timestamp, payload)| Event { timestamp, payload }) } - pub fn arb_cache_entry() - -> impl Strategy { - any::().prop_map(|get_signatures_calls| { - crate::deposit::automatic::cache::AutomaticDepositCacheEntry { + pub fn arb_cache_entry() -> impl Strategy { + (any::(), prop::option::of(arb_signature())).prop_map( + |(get_signatures_calls, last_discovered_signature)| AutomaticDepositCacheEntry { get_signatures_calls, - } - }) + last_discovered_signature, + }, + ) } } diff --git a/minter/src/state/event/cbor/mod.rs b/minter/src/utils/cbor/mod.rs similarity index 79% rename from minter/src/state/event/cbor/mod.rs rename to minter/src/utils/cbor/mod.rs index 99f1eac1..20ca6dc7 100644 --- a/minter/src/state/event/cbor/mod.rs +++ b/minter/src/utils/cbor/mod.rs @@ -51,6 +51,32 @@ pub mod signature { e.bytes(v.as_ref())?; Ok(()) } + + pub mod option { + use super::*; + + pub fn decode(d: &mut Decoder<'_>, ctx: &mut Ctx) -> Result, Error> { + if d.datatype()? == minicbor::data::Type::Null { + d.null()?; + return Ok(None); + } + super::decode(d, ctx).map(Some) + } + + pub fn encode( + v: &Option, + e: &mut Encoder, + ctx: &mut Ctx, + ) -> Result<(), minicbor::encode::Error> { + match v { + None => { + e.null()?; + Ok(()) + } + Some(sig) => super::encode(sig, e, ctx), + } + } + } } pub mod id_vec { diff --git a/minter/src/state/event/cbor/tests.rs b/minter/src/utils/cbor/tests.rs similarity index 71% rename from minter/src/state/event/cbor/tests.rs rename to minter/src/utils/cbor/tests.rs index f5dc3d87..40db7858 100644 --- a/minter/src/state/event/cbor/tests.rs +++ b/minter/src/utils/cbor/tests.rs @@ -1,6 +1,7 @@ use crate::{ - state::event::{VersionedMessage, cbor}, + state::event::VersionedMessage, test_fixtures::arb::{arb_message, arb_signature}, + utils::cbor, }; use proptest::{prop_assert_eq, proptest}; @@ -29,6 +30,36 @@ mod signature_tests { } } +mod signature_option_tests { + use super::*; + + #[test] + fn none_roundtrips() { + let encoded = encode_opt(None); + assert_eq!(decode_opt(&encoded), None); + } + + proptest! { + #[test] + fn some_minicbor_roundtrip(signature in arb_signature()) { + let encoded = encode_opt(Some(signature)); + prop_assert_eq!(decode_opt(&encoded), Some(signature)); + } + } + + fn encode_opt(v: Option) -> Vec { + let mut buf = Vec::new(); + let mut encoder = minicbor::Encoder::new(&mut buf); + cbor::signature::option::encode(&v, &mut encoder, &mut ()).unwrap(); + buf + } + + fn decode_opt(bytes: &[u8]) -> Option { + let mut decoder = minicbor::Decoder::new(bytes); + cbor::signature::option::decode(&mut decoder, &mut ()).unwrap() + } +} + mod message_tests { use super::*; diff --git a/minter/src/utils/mod.rs b/minter/src/utils/mod.rs index 51766244..e965177b 100644 --- a/minter/src/utils/mod.rs +++ b/minter/src/utils/mod.rs @@ -1,2 +1,3 @@ +pub mod cbor; pub mod insertion_ordered_map; pub mod stable_sort_key_map; diff --git a/minter/src/utils/stable_sort_key_map/mod.rs b/minter/src/utils/stable_sort_key_map/mod.rs index 14d46013..f1b19e98 100644 --- a/minter/src/utils/stable_sort_key_map/mod.rs +++ b/minter/src/utils/stable_sort_key_map/mod.rs @@ -70,6 +70,27 @@ where self.by_key.insert(key, (index, value, ())); } + /// Updates the index and value for an existing entry. + /// + /// Unlike [`insert`], this communicates that the entry already exists and + /// its sort position (index) is being updated — e.g. to reschedule a poll. + /// + /// # Panics + /// + /// Panics if `key` is not present in the map. + /// + /// [`insert`]: Self::insert + pub fn update_index(&mut self, key: K, new_index: I, new_value: V) { + let (old_index, _, _) = self + .by_key + .get(&key) + .expect("update_index called for non-existent key"); + self.by_index.remove(&(old_index, key.clone(), ())); + self.by_index + .insert((new_index.clone(), key.clone(), ()), ()); + self.by_key.insert(key, (new_index, new_value, ())); + } + /// Returns the `(index, key)` of the entry with the smallest index, if any. /// /// O(log n). From f8912098eccfc770fba86e5032a8b15751329c7f Mon Sep 17 00:00:00 2001 From: Louis Pahlavi Date: Wed, 22 Apr 2026 16:58:17 +0200 Subject: [PATCH 2/2] feat: add page_cursor for paginated getSignaturesForAddress scanning With a fixed limit, a single call may not return all transactions in the range (e.g., B at slot 101 is missed when the batch fills up with D and C). Add `page_cursor` to `AutomaticDepositCacheEntry`: - New scan (page_cursor = None): call with before=None, until=last_discovered. Record first() as the new last_discovered; set page_cursor=last() if full. - Continuing (page_cursor = Some): call with before=cursor, until=None. No lower bound is needed since the call budget limits scan depth. Clear page_cursor when the batch is smaller than the limit. Co-Authored-By: Claude Sonnet 4.6 --- minter/src/deposit/automatic/cache/mod.rs | 15 ++++++++++---- minter/src/deposit/automatic/mod.rs | 25 +++++++++++++++++++---- minter/src/test_fixtures/mod.rs | 18 +++++++++++----- 3 files changed, 45 insertions(+), 13 deletions(-) diff --git a/minter/src/deposit/automatic/cache/mod.rs b/minter/src/deposit/automatic/cache/mod.rs index 1b166c13..aa919e08 100644 --- a/minter/src/deposit/automatic/cache/mod.rs +++ b/minter/src/deposit/automatic/cache/mod.rs @@ -18,13 +18,20 @@ pub struct AutomaticDepositCacheEntry { /// The number of `getSignaturesForAddress` calls made so far for this account. #[n(0)] pub get_signatures_calls: u8, - /// The most recent transaction signature seen for this address. + /// The newest transaction signature discovered in the most recently completed scan. /// - /// Passed as the `until` parameter to the next `getSignaturesForAddress` call - /// so that only transactions newer than this one are returned, avoiding - /// redundant re-fetching of already-seen signatures. + /// Passed as `until` (exclusive lower bound) at the start of each new scan so + /// that only transactions newer than this are returned, avoiding re-fetching of + /// already-seen signatures. Not updated mid-scan. #[cbor(n(1), with = "crate::utils::cbor::signature::option")] pub last_discovered_signature: Option, + /// Pagination cursor for an in-progress scan. + /// + /// Set to the oldest signature in the most recent batch when there may be more + /// results. Passed as `before` to continue scanning backwards. `None` means no + /// scan is in progress and the next call should start from the latest block. + #[cbor(n(2), with = "crate::utils::cbor::signature::option")] + pub page_cursor: Option, } impl Storable for AutomaticDepositCacheEntry { diff --git a/minter/src/deposit/automatic/mod.rs b/minter/src/deposit/automatic/mod.rs index 118f6188..f837c6a0 100644 --- a/minter/src/deposit/automatic/mod.rs +++ b/minter/src/deposit/automatic/mod.rs @@ -122,6 +122,7 @@ async fn poll_account( ) { let deposit_address = account_address(master_key, &account); + let is_new_scan = entry.page_cursor.is_none(); let params = GetSignaturesForAddressParams { pubkey: deposit_address.into(), commitment: Some(CommitmentLevel::Finalized), @@ -132,8 +133,15 @@ async fn poll_account( .expect("MAX_GET_TRANSACTION_CALLS must be between 1 and 1000"), ), min_context_slot: None, - before: None, - until: entry.last_discovered_signature.map(Into::into), + // When continuing a scan, page backwards using the cursor as the upper bound. + // No lower bound is needed since the call budget limits how far back we go. + // When starting a new scan, use last_discovered_signature as the lower bound + // to avoid re-fetching already-seen transactions. + before: entry.page_cursor.map(Into::into), + until: is_new_scan + .then(|| entry.last_discovered_signature) + .flatten() + .map(Into::into), }; match get_signatures_for_address(runtime, params).await { @@ -144,10 +152,19 @@ async fn poll_account( ); } Ok(signatures) => { - if let Some(newest) = signatures.first() { + if is_new_scan { + // Record the newest signature at the start of the scan; it becomes + // last_discovered_signature once the scan completes. entry.last_discovered_signature = - Some(solana_signature::Signature::from(newest.signature.clone())); + signatures.first().map(|s| s.signature.clone().into()); } + entry.page_cursor = if signatures.len() >= MAX_GET_TRANSACTION_CALLS { + // Full batch returned — there may be more; continue paginating. + signatures.last().map(|s| s.signature.clone().into()) + } else { + // Fewer results than the limit — this batch exhausts the range. + None + }; } } diff --git a/minter/src/test_fixtures/mod.rs b/minter/src/test_fixtures/mod.rs index 9ae01341..f1cf3c54 100644 --- a/minter/src/test_fixtures/mod.rs +++ b/minter/src/test_fixtures/mod.rs @@ -600,12 +600,20 @@ pub mod arb { } pub fn arb_cache_entry() -> impl Strategy { - (any::(), prop::option::of(arb_signature())).prop_map( - |(get_signatures_calls, last_discovered_signature)| AutomaticDepositCacheEntry { - get_signatures_calls, - last_discovered_signature, - }, + ( + any::(), + prop::option::of(arb_signature()), + prop::option::of(arb_signature()), ) + .prop_map( + |(get_signatures_calls, last_discovered_signature, page_cursor)| { + AutomaticDepositCacheEntry { + get_signatures_calls, + last_discovered_signature, + page_cursor, + } + }, + ) } }