Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions minter/src/deposit/automatic/cache/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use crate::utils::stable_sort_key_map::StableSortKeyMap;
use ic_stable_structures::{Storable, storable::Bound};
use icrc_ledger_types::icrc1::account::Account;
use minicbor::{Decode, Encode};
use solana_signature::Signature;
use std::borrow::Cow;

#[cfg(test)]
Expand All @@ -17,6 +18,20 @@ pub struct AutomaticDepositCacheEntry {
/// The number of `getSignaturesForAddress` calls made so far for this account.
#[n(0)]
pub get_signatures_calls: u8,
/// The newest transaction signature discovered in the most recently completed scan.
///
/// Passed as `until` (exclusive lower bound) at the start of each new scan so
/// that only transactions newer than this are returned, avoiding re-fetching of
/// already-seen signatures. Not updated mid-scan.
#[cbor(n(1), with = "crate::utils::cbor::signature::option")]
pub last_discovered_signature: Option<Signature>,
/// Pagination cursor for an in-progress scan.
///
/// Set to the oldest signature in the most recent batch when there may be more
/// results. Passed as `before` to continue scanning backwards. `None` means no
/// scan is in progress and the next call should start from the latest block.
#[cbor(n(2), with = "crate::utils::cbor::signature::option")]
pub page_cursor: Option<Signature>,
}

impl Storable for AutomaticDepositCacheEntry {
Expand Down
77 changes: 41 additions & 36 deletions minter/src/deposit/automatic/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,9 @@ pub async fn poll_monitored_addresses<R: CanisterRuntime>(runtime: R) {
return;
}

let due: Vec<(Account, u8)> = with_automatic_deposit_cache(|cache| {
let due: Vec<(Account, AutomaticDepositCacheEntry)> = with_automatic_deposit_cache(|cache| {
cache
.iter_by_index_up_to(&now)
.map(|(account, entry)| (account, entry.get_signatures_calls))
// +1 to detect whether more accounts remain after this round.
.take(MAX_CONCURRENT_RPC_CALLS + 1)
.collect()
Expand All @@ -103,11 +102,11 @@ pub async fn poll_monitored_addresses<R: CanisterRuntime>(runtime: R) {

let master_key = lazy_get_schnorr_master_key(&runtime).await;

futures::future::join_all(due.into_iter().take(MAX_CONCURRENT_RPC_CALLS).map(
|(account, get_signatures_calls)| {
poll_account(&runtime, &master_key, account, get_signatures_calls)
},
))
futures::future::join_all(
due.into_iter()
.take(MAX_CONCURRENT_RPC_CALLS)
.map(|(account, entry)| poll_account(&runtime, &master_key, account, entry)),
)
.await;

if !more_to_process {
Expand All @@ -119,10 +118,11 @@ async fn poll_account<R: CanisterRuntime>(
runtime: &R,
master_key: &SchnorrPublicKey,
account: Account,
get_signatures_calls: u8,
mut entry: AutomaticDepositCacheEntry,
) {
let deposit_address = account_address(master_key, &account);

let is_new_scan = entry.page_cursor.is_none();
let params = GetSignaturesForAddressParams {
pubkey: deposit_address.into(),
commitment: Some(CommitmentLevel::Finalized),
Expand All @@ -133,8 +133,15 @@ async fn poll_account<R: CanisterRuntime>(
.expect("MAX_GET_TRANSACTION_CALLS must be between 1 and 1000"),
),
min_context_slot: None,
before: None,
until: None,
// When continuing a scan, page backwards using the cursor as the upper bound.
// No lower bound is needed since the call budget limits how far back we go.
// When starting a new scan, use last_discovered_signature as the lower bound
// to avoid re-fetching already-seen transactions.
before: entry.page_cursor.map(Into::into),
until: is_new_scan
.then(|| entry.last_discovered_signature)
.flatten()
.map(Into::into),
};

match get_signatures_for_address(runtime, params).await {
Expand All @@ -144,23 +151,26 @@ async fn poll_account<R: CanisterRuntime>(
"Failed to get signatures for address {deposit_address}: {e}"
);
}
Ok(_signatures) => {
// TODO(DEFI-2780): Process discovered deposit signatures.
Ok(signatures) => {
if is_new_scan {
// Record the newest signature at the start of the scan; it becomes
// last_discovered_signature once the scan completes.
entry.last_discovered_signature =
signatures.first().map(|s| s.signature.clone().into());
}
entry.page_cursor = if signatures.len() >= MAX_GET_TRANSACTION_CALLS {
// Full batch returned — there may be more; continue paginating.
signatures.last().map(|s| s.signature.clone().into())
} else {
// Fewer results than the limit — this batch exhausts the range.
None
};
}
}

let get_signatures_calls = get_signatures_calls + 1;
if get_signatures_calls >= MAX_GET_SIGNATURES_FOR_ADDRESS_CALLS {
// Use u64::MAX so the entry is retained but never scheduled for another poll.
with_automatic_deposit_cache_mut(|cache| {
cache.insert(
account,
u64::MAX,
AutomaticDepositCacheEntry {
get_signatures_calls,
},
);
});
entry.get_signatures_calls += 1;
let next_poll_at = if entry.get_signatures_calls >= MAX_GET_SIGNATURES_FOR_ADDRESS_CALLS {
// Maximum number of getSignaturesForAddress calls made, stop polling.
mutate_state(|state| {
process_event(
state,
Expand All @@ -172,18 +182,13 @@ async fn poll_account<R: CanisterRuntime>(
Priority::Info,
"Stopped monitoring {deposit_address}: reached maximum getSignaturesForAddress calls ({MAX_GET_SIGNATURES_FOR_ADDRESS_CALLS})"
);
u64::MAX
} else {
// Exponential backoff: delay before next poll is 2^get_signatures_calls minutes.
let delay = Duration::from_mins(1u64 << get_signatures_calls);
let next_poll_at = runtime.time() + delay.as_nanos() as u64;
with_automatic_deposit_cache_mut(|cache| {
cache.insert(
account,
next_poll_at,
AutomaticDepositCacheEntry {
get_signatures_calls,
},
);
});
}
let delay = Duration::from_mins(1u64 << entry.get_signatures_calls);
runtime.time() + delay.as_nanos() as u64
};
with_automatic_deposit_cache_mut(|cache| {
cache.update_index(account, next_poll_at, entry);
});
}
7 changes: 4 additions & 3 deletions minter/src/state/event.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use crate::numeric::{LedgerBurnIndex, LedgerMintIndex};
use crate::{
numeric::{LedgerBurnIndex, LedgerMintIndex},
utils::cbor,
};
use cksol_types_internal::{InitArgs, UpgradeArgs};
use derive_more::From;
use ic_stable_structures::{Storable, storable::Bound};
Expand All @@ -21,8 +24,6 @@ pub enum VersionedMessage {
),
}

mod cbor;

#[derive(Eq, PartialEq, Debug, Decode, Encode)]
pub struct Event {
/// The canister time at which the minter generated this event.
Expand Down
23 changes: 16 additions & 7 deletions minter/src/test_fixtures/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,7 @@ pub mod events {
#[cfg(test)]
pub mod arb {
use crate::{
deposit::automatic::cache::AutomaticDepositCacheEntry,
numeric::{LedgerBurnIndex, LedgerMintIndex},
state::event::{DepositId, Event, EventType, TransactionPurpose, WithdrawalRequest},
};
Expand Down Expand Up @@ -598,13 +599,21 @@ pub mod arb {
.prop_map(|(timestamp, payload)| Event { timestamp, payload })
}

pub fn arb_cache_entry()
-> impl Strategy<Value = crate::deposit::automatic::cache::AutomaticDepositCacheEntry> {
any::<u8>().prop_map(|get_signatures_calls| {
crate::deposit::automatic::cache::AutomaticDepositCacheEntry {
get_signatures_calls,
}
})
pub fn arb_cache_entry() -> impl Strategy<Value = AutomaticDepositCacheEntry> {
(
any::<u8>(),
prop::option::of(arb_signature()),
prop::option::of(arb_signature()),
)
.prop_map(
|(get_signatures_calls, last_discovered_signature, page_cursor)| {
AutomaticDepositCacheEntry {
get_signatures_calls,
last_discovered_signature,
page_cursor,
}
},
)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,32 @@ pub mod signature {
e.bytes(v.as_ref())?;
Ok(())
}

pub mod option {
use super::*;

pub fn decode<Ctx>(d: &mut Decoder<'_>, ctx: &mut Ctx) -> Result<Option<Signature>, Error> {
if d.datatype()? == minicbor::data::Type::Null {
d.null()?;
return Ok(None);
}
super::decode(d, ctx).map(Some)
}

pub fn encode<Ctx, W: Write>(
v: &Option<Signature>,
e: &mut Encoder<W>,
ctx: &mut Ctx,
) -> Result<(), minicbor::encode::Error<W::Error>> {
match v {
None => {
e.null()?;
Ok(())
}
Some(sig) => super::encode(sig, e, ctx),
}
}
}
}

pub mod id_vec {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::{
state::event::{VersionedMessage, cbor},
state::event::VersionedMessage,
test_fixtures::arb::{arb_message, arb_signature},
utils::cbor,
};
use proptest::{prop_assert_eq, proptest};

Expand Down Expand Up @@ -29,6 +30,36 @@ mod signature_tests {
}
}

mod signature_option_tests {
use super::*;

#[test]
fn none_roundtrips() {
let encoded = encode_opt(None);
assert_eq!(decode_opt(&encoded), None);
}

proptest! {
#[test]
fn some_minicbor_roundtrip(signature in arb_signature()) {
let encoded = encode_opt(Some(signature));
prop_assert_eq!(decode_opt(&encoded), Some(signature));
}
}

fn encode_opt(v: Option<solana_signature::Signature>) -> Vec<u8> {
let mut buf = Vec::new();
let mut encoder = minicbor::Encoder::new(&mut buf);
cbor::signature::option::encode(&v, &mut encoder, &mut ()).unwrap();
buf
}

fn decode_opt(bytes: &[u8]) -> Option<solana_signature::Signature> {
let mut decoder = minicbor::Decoder::new(bytes);
cbor::signature::option::decode(&mut decoder, &mut ()).unwrap()
}
}

mod message_tests {
use super::*;

Expand Down
1 change: 1 addition & 0 deletions minter/src/utils/mod.rs
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
pub mod cbor;
pub mod insertion_ordered_map;
pub mod stable_sort_key_map;
21 changes: 21 additions & 0 deletions minter/src/utils/stable_sort_key_map/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,27 @@ where
self.by_key.insert(key, (index, value, ()));
}

/// Updates the index and value for an existing entry.
///
/// Unlike [`insert`], this communicates that the entry already exists and
/// its sort position (index) is being updated — e.g. to reschedule a poll.
///
/// # Panics
///
/// Panics if `key` is not present in the map.
///
/// [`insert`]: Self::insert
pub fn update_index(&mut self, key: K, new_index: I, new_value: V) {
let (old_index, _, _) = self
.by_key
.get(&key)
.expect("update_index called for non-existent key");
self.by_index.remove(&(old_index, key.clone(), ()));
self.by_index
.insert((new_index.clone(), key.clone(), ()), ());
self.by_key.insert(key, (new_index, new_value, ()));
}

/// Returns the `(index, key)` of the entry with the smallest index, if any.
///
/// O(log n).
Expand Down
Loading