Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 17 additions & 4 deletions integration_tests/src/fixtures.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,8 +206,8 @@ impl MockBuilder {
)
}

/// Mock for `getSignaturesForAddress` returning the given list of signature objects.
pub fn get_signatures_for_address(self, signatures: Vec<serde_json::Value>) -> Self {
/// Mock for `getSignaturesForAddress` returning the given list of `(signature, slot)` pairs.
pub fn get_signatures_for_address(self, signatures: Vec<(&str, u64)>) -> Self {
self.expect(
get_signatures_for_address_request(),
get_signatures_for_address_response(signatures),
Expand Down Expand Up @@ -380,10 +380,23 @@ fn get_signatures_for_address_request() -> JsonRpcRequestMatcher {
JsonRpcRequestMatcher::with_method("getSignaturesForAddress")
}

fn get_signatures_for_address_response(signatures: Vec<serde_json::Value>) -> JsonRpcResponse {
fn get_signatures_for_address_response(signatures: Vec<(&str, u64)>) -> JsonRpcResponse {
let entries: Vec<serde_json::Value> = signatures
.into_iter()
.map(|(signature, slot)| {
json!({
"signature": signature,
"slot": slot,
"err": null,
"memo": null,
"blockTime": null,
"confirmationStatus": null
})
})
.collect();
JsonRpcResponse::from(json!({
"jsonrpc": "2.0",
"result": signatures,
"result": entries,
"id": 1
}))
}
51 changes: 24 additions & 27 deletions integration_tests/tests/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,8 @@ use cksol_int_tests::{
CkSolMinter, Setup, SetupBuilder,
fixtures::{
DEFAULT_CALLER_ACCOUNT, DEFAULT_CALLER_DEPOSIT_ADDRESS, DEPOSIT_AMOUNT,
EXPECTED_MINT_AMOUNT, MockBuilder, SharedMockHttpOutcalls,
default_get_deposit_address_args, default_process_deposit_args,
default_update_balance_args, deposit_transaction_signature,
EXPECTED_MINT_AMOUNT, MockBuilder, SharedMockHttpOutcalls, default_process_deposit_args,
default_update_balance_args, deposit_signature_status_json, deposit_transaction_signature,
},
};
use cksol_types::{
Expand All @@ -17,7 +16,7 @@ use cksol_types::{
};
use cksol_types_internal::{
UpgradeArgs,
event::{EventType, TransactionPurpose},
event::{DepositSource, EventType, TransactionPurpose},
log::Priority,
};
use ic_pocket_canister_runtime::{JsonRpcResponse, MockHttpOutcalls};
Expand All @@ -32,6 +31,7 @@ const FINALIZE_TRANSACTIONS_DELAY: Duration = Duration::from_mins(2);
const RESUBMIT_TRANSACTIONS_DELAY: Duration = Duration::from_mins(3);
const DEPOSIT_CONSOLIDATION_DELAY: Duration = Duration::from_mins(10);
const POLL_MONITORED_ADDRESSES_DELAY: Duration = Duration::from_mins(1);
const PROCESS_PENDING_SIGNATURES_DELAY: Duration = Duration::from_secs(5);

/// Deposits funds into the minter via `process_deposit`, consolidates them,
/// and finalizes the consolidation so the minter's internal balance is credited.
Expand Down Expand Up @@ -1295,47 +1295,44 @@ mod automated_deposit_flow_tests {
use super::*;

#[tokio::test]
async fn should_poll_monitored_address() {
async fn should_accept_automatic_deposit() {
let setup = SetupBuilder::new().build().await;
let minter = setup.minter();

// Initialize the minter public key and register the account for monitoring.
assert_eq!(
minter
.get_deposit_address(default_get_deposit_address_args())
.await
.to_string(),
DEFAULT_CALLER_DEPOSIT_ADDRESS
);
// Register the account for monitoring.
minter
.update_balance(default_update_balance_args())
.await
.expect("update_balance should succeed");

minter.assert_that_events().await.satisfy(|events| {
check!(events.iter().any(|e| {
e == &EventType::StartedMonitoringAccount {
account: DEFAULT_CALLER_ACCOUNT,
}
}));
});

// Advance time: the minter should poll getSignaturesForAddress once, then remove the account.
// Poll phase: minter calls getSignaturesForAddress and discovers the deposit.
setup.advance_time(POLL_MONITORED_ADDRESSES_DELAY).await;
setup
.execute_http_mocks(
MockBuilder::with_start_id(0)
.get_signatures_for_address(vec![])
.get_signatures_for_address(vec![deposit_signature_status_json()])
.build(),
)
.await;

// Process phase: minter calls getTransaction and accepts the deposit.
setup.advance_time(PROCESS_PENDING_SIGNATURES_DELAY).await;
setup
.execute_http_mocks(
MockBuilder::with_start_id(4)
.get_deposit_transaction()
.build(),
)
.await;

minter.assert_that_events().await.satisfy(|events| {
check!(events.iter().any(|e| {
e == &EventType::StoppedMonitoringAccount {
account: DEFAULT_CALLER_ACCOUNT,
check!(events.iter().any(|e| matches!(
e,
EventType::AcceptedDeposit {
source: DepositSource::Automatic,
..
}
}));
)));
});

setup.drop().await;
Expand Down
58 changes: 21 additions & 37 deletions minter/src/consolidate/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,7 @@ use crate::{
},
};
use assert_matches::assert_matches;
use sol_rpc_types::{ConfirmedBlock, MultiRpcResult, RpcError, Signature, Slot};

type SlotResult = MultiRpcResult<Slot>;
type BlockResult = MultiRpcResult<ConfirmedBlock>;
type SendTransactionResult = MultiRpcResult<Signature>;
use sol_rpc_types::{MultiRpcResult, RpcError, Signature};

#[tokio::test]
async fn should_return_early_if_no_deposits_to_consolidate() {
Expand Down Expand Up @@ -55,11 +51,8 @@ async fn should_return_early_if_fetching_blockhash_fails() {

add_funds_to_consolidate(&[(deposit_id(0), 1_000_000_000)]);

let error = SlotResult::Consistent(Err(RpcError::ValidationError("Error".to_string())));
let runtime = TestCanisterRuntime::new()
.add_stub_response(error.clone())
.add_stub_response(error.clone())
.add_stub_response(error);
.add_n_get_slot_error(RpcError::ValidationError("Error".to_string()), 3);

consolidate_deposits(runtime).await;

Expand All @@ -81,11 +74,9 @@ async fn should_submit_single_consolidation_request() {
let runtime = TestCanisterRuntime::new()
.with_increasing_time()
// get_recent_slot_and_blockhash calls (get_recent_block internally calls getSlot then getBlock)
.add_stub_response(SlotResult::Consistent(Ok(slot)))
.add_stub_response(BlockResult::Consistent(Ok(confirmed_block())))
.add_stub_response(SendTransactionResult::Consistent(Ok(
fee_payer_signature.into()
)))
.add_get_slot_response(slot)
.add_get_block_response(confirmed_block())
.add_send_transaction_response(fee_payer_signature)
.add_signature(fee_payer_signature.into());

consolidate_deposits(runtime).await;
Expand Down Expand Up @@ -118,10 +109,10 @@ async fn should_record_events_even_if_transaction_submission_fails() {
let runtime = TestCanisterRuntime::new()
.with_increasing_time()
// get_recent_slot_and_blockhash calls
.add_stub_response(SlotResult::Consistent(Ok(slot)))
.add_stub_response(BlockResult::Consistent(Ok(confirmed_block())))
.add_get_slot_response(slot)
.add_get_block_response(confirmed_block())
// Transaction submission fails
.add_stub_response(SendTransactionResult::Inconsistent(vec![]))
.add_stub_response(MultiRpcResult::<Signature>::Inconsistent(vec![]))
.add_signature(fee_payer_signature.into());

consolidate_deposits(runtime).await;
Expand Down Expand Up @@ -158,14 +149,10 @@ async fn should_submit_multiple_consolidation_batches() {
let mut runtime = TestCanisterRuntime::new()
.with_increasing_time()
// get_recent_slot_and_blockhash calls
.add_stub_response(SlotResult::Consistent(Ok(slot)))
.add_stub_response(BlockResult::Consistent(Ok(confirmed_block())))
.add_stub_response(SendTransactionResult::Consistent(Ok(
fee_payer_signature_1.into()
)))
.add_stub_response(SendTransactionResult::Consistent(Ok(
fee_payer_signature_2.into()
)));
.add_get_slot_response(slot)
.add_get_block_response(confirmed_block())
.add_send_transaction_response(fee_payer_signature_1)
.add_send_transaction_response(fee_payer_signature_2);

for i in 0..(2 + NUM_DEPOSITS) {
runtime = runtime.add_signature(signature(i).into());
Expand Down Expand Up @@ -229,11 +216,9 @@ async fn should_consolidate_multiple_deposits_to_same_account_in_single_transfer
let slot = 100;
let runtime = TestCanisterRuntime::new()
.with_increasing_time()
.add_stub_response(SlotResult::Consistent(Ok(slot)))
.add_stub_response(BlockResult::Consistent(Ok(confirmed_block())))
.add_stub_response(SendTransactionResult::Consistent(Ok(
fee_payer_signature.into()
)))
.add_get_slot_response(slot)
.add_get_block_response(confirmed_block())
.add_send_transaction_response(fee_payer_signature)
.add_signature(fee_payer_signature.into());

consolidate_deposits(runtime).await;
Expand Down Expand Up @@ -271,11 +256,10 @@ async fn should_reschedule_until_all_deposits_consolidated() {
// Round 1: processes MAX_CONCURRENT_RPC_CALLS batches, 1 deposit remains → reschedule
let mut runtime = TestCanisterRuntime::new()
.with_increasing_time()
.add_stub_response(SlotResult::Consistent(Ok(slot)))
.add_stub_response(BlockResult::Consistent(Ok(confirmed_block())));
.add_get_slot_response(slot)
.add_get_block_response(confirmed_block());
for i in 0..MAX_CONCURRENT_RPC_CALLS {
runtime =
runtime.add_stub_response(SendTransactionResult::Consistent(Ok(signature(i).into())));
runtime = runtime.add_send_transaction_response(signature(i));
}
for i in 0..(MAX_CONCURRENT_RPC_CALLS + num_deposits) {
runtime = runtime.add_signature(signature(i).into());
Expand All @@ -293,9 +277,9 @@ async fn should_reschedule_until_all_deposits_consolidated() {
let last_sig = signature(num_deposits);
let runtime = TestCanisterRuntime::new()
.with_increasing_time()
.add_stub_response(SlotResult::Consistent(Ok(slot)))
.add_stub_response(BlockResult::Consistent(Ok(confirmed_block())))
.add_stub_response(SendTransactionResult::Consistent(Ok(last_sig.into())))
.add_get_slot_response(slot)
.add_get_block_response(confirmed_block())
.add_send_transaction_response(last_sig)
.add_signature(last_sig.into());

consolidate_deposits(runtime.clone()).await;
Expand Down
118 changes: 116 additions & 2 deletions minter/src/deposit/automatic/mod.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
use crate::{
address::{account_address, lazy_get_schnorr_master_key},
constants::MAX_CONCURRENT_RPC_CALLS,
deposit::fetch_and_validate_deposit,
guard::TimerGuard,
rpc::get_signatures_for_address,
runtime::CanisterRuntime,
state::{
SchnorrPublicKey, TaskType, audit::process_event, event::EventType, mutate_state,
read_state,
SchnorrPublicKey, TaskType,
audit::process_event,
event::{DepositId, DepositSource, EventType},
mutate_state, read_state,
},
};
use canlog::log;
Expand Down Expand Up @@ -38,6 +41,9 @@ pub const POLL_MONITORED_ADDRESSES_DELAY: Duration = Duration::from_mins(1);
/// Maximum number of `getTransaction` calls to make per polled account.
pub const MAX_TRANSACTIONS_PER_ACCOUNT: usize = 10;

/// How often the minter processes the pending-signatures queue.
pub const PROCESS_PENDING_SIGNATURES_DELAY: Duration = Duration::from_secs(5);

/// Registers the given account for automated deposit monitoring.
///
/// Returns `Ok(())` if the account was registered (or was already being monitored).
Expand Down Expand Up @@ -160,6 +166,114 @@ async fn poll_account<R: CanisterRuntime>(
});
}

/// Processes pending deposit signatures using a round-robin, capacity-filling strategy.
///
/// Each pass takes one signature per account (fair round-robin by `Account` key order). If
/// capacity remains after a full pass, another pass begins — so up to `MAX_CONCURRENT_RPC_CALLS`
/// signatures are dispatched in parallel each call. For each signature, calls `getTransaction`
/// and emits [`EventType::AcceptedDeposit`] with `source: Automatic` if valid. Invalid or
/// already-processed signatures are silently discarded. Reschedules itself immediately if
/// signatures remain after the capacity is exhausted.
pub async fn process_pending_signatures<R: CanisterRuntime>(runtime: R) {
let _guard = match TimerGuard::new(TaskType::ProcessPendingSignatures) {
Ok(guard) => guard,
Err(_) => return,
};

// Round-robin across accounts, refilling capacity with additional passes until exhausted.
let to_process: Vec<(Account, Signature)> = PENDING_SIGNATURES.with(|pending| {
let mut pending = pending.borrow_mut();

// Interleave queues round-robin by iterating column-by-column: round 0 yields
// one item from each account, then round 1, and so on. `take` stops early
// without advancing past the capacity limit.
let max_round = pending.values().map(VecDeque::len).max().unwrap_or(0);
let to_process: Vec<(Account, Signature)> = (0..max_round)
.flat_map(|round| {
pending.iter().filter_map(move |(&account, queue)| {
queue.get(round).map(|&sig| (account, sig))
})
})
.take(MAX_CONCURRENT_RPC_CALLS)
.collect();

// Drain the taken items from each account's queue.
let mut counts: BTreeMap<Account, usize> = BTreeMap::new();
for &(account, _) in &to_process {
*counts.entry(account).or_default() += 1;
}
for (account, count) in counts {
if let Some(queue) = pending.get_mut(&account) {
queue.drain(..count);
}
}
pending.retain(|_, queue| !queue.is_empty());
to_process
});

if to_process.is_empty() {
return;
}

let more_to_process = PENDING_SIGNATURES.with(|p| !p.borrow().is_empty());
let reschedule = scopeguard::guard(runtime.clone(), |runtime| {
runtime.set_timer(Duration::ZERO, process_pending_signatures);
});

let fee = read_state(|s| s.automated_deposit_fee());

futures::future::join_all(
to_process
.into_iter()
.map(|(account, signature)| process_signature(&runtime, account, signature, fee)),
)
.await;

if !more_to_process {
scopeguard::ScopeGuard::into_inner(reschedule);
}
}

async fn process_signature<R: CanisterRuntime>(
runtime: &R,
account: Account,
signature: Signature,
fee: u64,
) {
// Skip signatures that were already accepted or minted (e.g. via manual deposit).
let deposit_id = DepositId { account, signature };
if read_state(|s| s.deposit_status(&deposit_id)).is_some() {
return;
}

match fetch_and_validate_deposit(runtime, account, signature, fee).await {
Ok((deposit_id, deposit_amount, amount_to_mint)) => {
mutate_state(|state| {
process_event(
state,
EventType::AcceptedDeposit {
deposit_id,
deposit_amount,
amount_to_mint,
source: DepositSource::Automatic,
},
runtime,
)
});
log!(
Priority::Info,
"Accepted automatic deposit {deposit_id:?}: {deposit_amount} lamports deposited, minting {amount_to_mint} lamports"
);
}
Err(e) => {
log!(
Priority::Info,
"Discarding automatic deposit signature {signature}: {e}"
);
}
}
}

#[cfg(any(test, feature = "canbench-rs"))]
pub fn pending_signatures_for(account: &Account) -> Vec<Signature> {
PENDING_SIGNATURES.with(|p| {
Expand Down
Loading
Loading