Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 28 additions & 1 deletion integration_tests/src/fixtures.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,13 @@ pub fn default_get_deposit_address_args() -> GetDepositAddressArgs {
}
}

pub fn default_update_balance_args() -> cksol_types::UpdateBalanceArgs {
cksol_types::UpdateBalanceArgs {
owner: None,
subaccount: None,
}
}

pub fn default_process_deposit_args() -> ProcessDepositArgs {
ProcessDepositArgs {
owner: None,
Expand Down Expand Up @@ -98,7 +105,7 @@ pub struct MockBuilder {

/// Number of Solana RPC providers used for redundancy.
/// Each logical RPC call generates this many HTTP outcalls with consecutive IDs.
const NUM_RPC_PROVIDERS: u64 = 4;
pub const NUM_RPC_PROVIDERS: u64 = 4;

impl Default for MockBuilder {
fn default() -> Self {
Expand Down Expand Up @@ -198,6 +205,14 @@ impl MockBuilder {
send_transaction_response(tx_signature),
)
}

/// Mock for `getSignaturesForAddress` returning the given list of signature objects.
pub fn get_signatures_for_address(self, signatures: Vec<serde_json::Value>) -> Self {
self.expect(
get_signatures_for_address_request(),
get_signatures_for_address_response(signatures),
)
}
}

// ── JSON-RPC request matchers and response builders ─────────────────────────
Expand Down Expand Up @@ -360,3 +375,15 @@ fn send_transaction_response(signature: &str) -> JsonRpcResponse {
"id": 1
}))
}

fn get_signatures_for_address_request() -> JsonRpcRequestMatcher {
JsonRpcRequestMatcher::with_method("getSignaturesForAddress")
}
Comment thread
lpahlavi marked this conversation as resolved.

fn get_signatures_for_address_response(signatures: Vec<serde_json::Value>) -> JsonRpcResponse {
JsonRpcResponse::from(json!({
"jsonrpc": "2.0",
"result": signatures,
"id": 1
}))
}
57 changes: 55 additions & 2 deletions integration_tests/tests/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@ use cksol_int_tests::{
CkSolMinter, Setup, SetupBuilder,
fixtures::{
DEFAULT_CALLER_ACCOUNT, DEFAULT_CALLER_DEPOSIT_ADDRESS, DEPOSIT_AMOUNT,
EXPECTED_MINT_AMOUNT, MockBuilder, SharedMockHttpOutcalls, default_process_deposit_args,
deposit_transaction_signature,
EXPECTED_MINT_AMOUNT, MockBuilder, SharedMockHttpOutcalls,
default_get_deposit_address_args, default_process_deposit_args,
default_update_balance_args, deposit_transaction_signature,
},
};
use cksol_types::{
Expand All @@ -30,6 +31,7 @@ const WITHDRAWAL_PROCESSING_DELAY: Duration = Duration::from_mins(1);
const FINALIZE_TRANSACTIONS_DELAY: Duration = Duration::from_mins(2);
const RESUBMIT_TRANSACTIONS_DELAY: Duration = Duration::from_mins(3);
const DEPOSIT_CONSOLIDATION_DELAY: Duration = Duration::from_mins(10);
const POLL_MONITORED_ADDRESSES_DELAY: Duration = Duration::from_mins(1);

/// Deposits funds into the minter via `process_deposit`, consolidates them,
/// and finalizes the consolidation so the minter's internal balance is credited.
Expand Down Expand Up @@ -1288,3 +1290,54 @@ mod metrics_tests {
.await;
}
}

mod automated_deposit_flow_tests {
use super::*;

#[tokio::test]
async fn should_poll_monitored_address() {
let setup = SetupBuilder::new().build().await;
let minter = setup.minter();

// Initialize the minter public key and register the account for monitoring.
assert_eq!(
minter
.get_deposit_address(default_get_deposit_address_args())
.await
.to_string(),
DEFAULT_CALLER_DEPOSIT_ADDRESS
);
minter
.update_balance(default_update_balance_args())
.await
.expect("update_balance should succeed");

minter.assert_that_events().await.satisfy(|events| {
check!(events.iter().any(|e| {
e == &EventType::StartedMonitoringAccount {
account: DEFAULT_CALLER_ACCOUNT,
}
}));
});

// Advance time: the minter should poll getSignaturesForAddress once, then remove the account.
setup.advance_time(POLL_MONITORED_ADDRESSES_DELAY).await;
setup
.execute_http_mocks(
MockBuilder::with_start_id(0)
.get_signatures_for_address(vec![])
.build(),
)
.await;

minter.assert_that_events().await.satisfy(|events| {
check!(events.iter().any(|e| {
e == &EventType::StoppedMonitoringAccount {
account: DEFAULT_CALLER_ACCOUNT,
}
}));
});

setup.drop().await;
}
}
13 changes: 9 additions & 4 deletions libs/types-internal/src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use serde::Deserialize;
use sol_rpc_types::{Lamport, Pubkey as Address, Signature, Slot};

/// A minter event that can be serialized to Candid.
#[derive(Clone, Debug, CandidType, Deserialize)]
#[derive(Clone, Debug, PartialEq, CandidType, Deserialize)]
pub struct Event {
/// The canister time at which the minter generated this event.
pub timestamp: u64,
Expand All @@ -16,7 +16,7 @@ pub struct Event {
}

/// The type of a minter event.
#[derive(Clone, Debug, CandidType, Deserialize)]
#[derive(Clone, Debug, PartialEq, CandidType, Deserialize)]
pub enum EventType {
/// The minter initialization event.
/// Must be the first event in the log.
Expand Down Expand Up @@ -116,10 +116,15 @@ pub enum EventType {
/// The account to monitor for incoming deposits.
account: Account,
},
/// The minter stopped monitoring an account for automated deposits.
StoppedMonitoringAccount {
/// The account that is no longer being monitored.
account: Account,
},
}

/// The purpose of a submitted Solana transaction.
#[derive(Clone, Debug, CandidType, Deserialize)]
#[derive(Clone, Debug, PartialEq, CandidType, Deserialize)]
pub enum TransactionPurpose {
/// Consolidate deposited funds into the minter's main account.
ConsolidateDeposits {
Expand All @@ -134,7 +139,7 @@ pub enum TransactionPurpose {
}

/// A versioned Solana transaction message.
#[derive(Clone, Debug, CandidType, Deserialize)]
#[derive(Clone, Debug, PartialEq, CandidType, Deserialize)]
pub enum VersionedTransactionMessage {
/// A legacy Solana transaction message, serialized with bincode.
Legacy(Vec<u8>),
Expand Down
5 changes: 5 additions & 0 deletions minter/cksol_minter.did
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,11 @@ type EventType = variant {
// The account to monitor for incoming deposits.
account: Account;
};
// The minter stopped monitoring an account for automated deposits.
StoppedMonitoringAccount : record {
// The account that is no longer being monitored.
account: Account;
};
};

// A single transaction can deposit to multiple accounts, so the signature alone
Expand Down
97 changes: 96 additions & 1 deletion minter/src/deposit/automatic/mod.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,33 @@
use crate::{
address::{account_address, lazy_get_schnorr_master_key},
constants::MAX_CONCURRENT_RPC_CALLS,
guard::TimerGuard,
rpc::get_signatures_for_address,
runtime::CanisterRuntime,
state::{audit::process_event, event::EventType, mutate_state, read_state},
state::{
SchnorrPublicKey, TaskType, audit::process_event, event::EventType, mutate_state,
read_state,
},
};
use canlog::log;
use cksol_types::UpdateBalanceError;
use cksol_types_internal::log::Priority;
use icrc_ledger_types::icrc1::account::Account;
use sol_rpc_types::{CommitmentLevel, GetSignaturesForAddressParams};
use std::time::Duration;

#[cfg(test)]
mod tests;

/// Maximum number of accounts the minter will monitor simultaneously for automated deposits.
pub const MAX_MONITORED_ACCOUNTS: usize = 100;

/// How often the minter polls monitored addresses for new deposit transactions.
pub const POLL_MONITORED_ADDRESSES_DELAY: Duration = Duration::from_mins(1);

/// Maximum number of `getTransaction` calls to make per polled account.
pub const MAX_TRANSACTIONS_PER_ACCOUNT: usize = 10;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This constant is currently 50 in the design document. We can adapt either number. 🙂

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point! I'll merge the PR as-is to not re-run the CI and update the value in the PR that adds the getTransaction calls.


/// Registers the given account for automated deposit monitoring.
///
/// Returns `Ok(())` if the account was registered (or was already being monitored).
Expand All @@ -37,3 +54,81 @@ pub fn update_balance<R: CanisterRuntime>(

Ok(())
}

/// Polls all monitored addresses for new deposit transaction signatures.
///
/// For each address, calls `getSignaturesForAddress` on the Solana RPC.
pub async fn poll_monitored_addresses<R: CanisterRuntime>(runtime: R) {
let _guard = match TimerGuard::new(TaskType::PollMonitoredAddresses) {
Ok(guard) => guard,
Err(_) => return,
};

let all_accounts: Vec<Account> =
read_state(|s| s.monitored_accounts().iter().copied().collect());
if all_accounts.is_empty() {
return;
}

let more_to_process = all_accounts.len() > MAX_CONCURRENT_RPC_CALLS;
Comment thread
lpahlavi marked this conversation as resolved.
let reschedule = scopeguard::guard(runtime.clone(), |runtime| {
runtime.set_timer(Duration::ZERO, poll_monitored_addresses);
});

let master_key = lazy_get_schnorr_master_key(&runtime).await;

futures::future::join_all(
all_accounts
.into_iter()
.take(MAX_CONCURRENT_RPC_CALLS)
.map(|account| poll_account(&runtime, &master_key, account)),
)
Comment thread
lpahlavi marked this conversation as resolved.
.await;

if !more_to_process {
// All work fits in this round
scopeguard::ScopeGuard::into_inner(reschedule);
}
Comment thread
lpahlavi marked this conversation as resolved.
}

async fn poll_account<R: CanisterRuntime>(
runtime: &R,
master_key: &SchnorrPublicKey,
account: Account,
) {
let deposit_address = account_address(master_key, &account);

let params = GetSignaturesForAddressParams {
pubkey: deposit_address.into(),
commitment: Some(CommitmentLevel::Finalized),
min_context_slot: None,
// Fetch no more signatures than we intend to process with `getTransaction`.
limit: Some(
(MAX_TRANSACTIONS_PER_ACCOUNT as u32)
.try_into()
.expect("MAX_TRANSACTIONS_PER_ACCOUNT must be between 1 and 1000"),
),
before: None,
until: None,
};
Comment thread
lpahlavi marked this conversation as resolved.

match get_signatures_for_address(runtime, params).await {
Err(e) => {
log!(
Priority::Info,
"Failed to get signatures for address {deposit_address}: {e}"
);
}
Ok(_signatures) => {
// TODO(DEFI-2780): Process discovered deposit signatures.
}
}

mutate_state(|state| {
process_event(
state,
EventType::StoppedMonitoringAccount { account },
runtime,
);
});
Comment thread
lpahlavi marked this conversation as resolved.
}
53 changes: 51 additions & 2 deletions minter/src/deposit/automatic/tests.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
use super::*;
use crate::{
constants::MAX_CONCURRENT_RPC_CALLS,
state::{event::EventType, read_state},
test_fixtures::{
EventsAssert, account, events::start_monitoring_account, init_state,
runtime::TestCanisterRuntime,
EventsAssert, account, events::start_monitoring_account, init_schnorr_master_key,
init_state, runtime::TestCanisterRuntime,
},
};
use sol_rpc_types::{ConfirmedTransactionStatusWithSignature, MultiRpcResult};

type SignaturesResult = MultiRpcResult<Vec<ConfirmedTransactionStatusWithSignature>>;

fn monitored_accounts_count() -> usize {
read_state(|s| s.monitored_accounts().len())
Expand Down Expand Up @@ -79,3 +83,48 @@ fn should_not_return_queue_full_if_account_already_monitored() {
let result = update_balance(&runtime, account(0));
assert_eq!(result, Ok(()));
}

#[tokio::test]
async fn should_poll_monitored_addresses_in_rounds() {
setup();

// Add MAX_CONCURRENT_RPC_CALLS + 1 accounts to monitor so that 2 rounds are needed.
let num_accounts = MAX_CONCURRENT_RPC_CALLS + 1;
for i in 0..num_accounts {
start_monitoring_account(account(i));
}
assert_eq!(monitored_accounts_count(), num_accounts);

// Round 1: polls MAX_CONCURRENT_RPC_CALLS accounts, 1 remains → reschedule.
let mut runtime = TestCanisterRuntime::new().with_increasing_time();
for _ in 0..MAX_CONCURRENT_RPC_CALLS {
runtime = runtime.add_stub_response(SignaturesResult::Consistent(Ok(vec![])));
}
poll_monitored_addresses(runtime.clone()).await;

assert_eq!(monitored_accounts_count(), 1);
assert_eq!(runtime.set_timer_call_count(), 1);

// Round 2: polls the remaining 1 account → no reschedule, queue empty.
let runtime = TestCanisterRuntime::new()
.with_increasing_time()
.add_stub_response(SignaturesResult::Consistent(Ok(vec![])));
poll_monitored_addresses(runtime.clone()).await;

assert_eq!(monitored_accounts_count(), 0);
assert_eq!(runtime.set_timer_call_count(), 0);

// Verify StoppedMonitoringAccount was emitted for each account.
let mut events_assert = EventsAssert::from_recorded();
for i in 0..num_accounts {
events_assert =
events_assert.expect_contains_event_eq(EventType::StoppedMonitoringAccount {
account: account(i),
});
}
}

fn setup() {
init_state();
init_schnorr_master_key();
}
7 changes: 7 additions & 0 deletions minter/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use canlog::{Log, Sort};
use cksol_minter::{
address::lazy_get_schnorr_master_key,
consolidate::{DEPOSIT_CONSOLIDATION_DELAY, consolidate_deposits},
deposit::automatic::{POLL_MONITORED_ADDRESSES_DELAY, poll_monitored_addresses},
monitor::{
FINALIZE_TRANSACTIONS_DELAY, RESUBMIT_TRANSACTIONS_DELAY, finalize_transactions,
resubmit_transactions,
Expand Down Expand Up @@ -200,6 +201,9 @@ fn get_events(
EventType::StartedMonitoringAccount { account } => {
event::EventType::StartedMonitoringAccount { account }
}
EventType::StoppedMonitoringAccount { account } => {
event::EventType::StoppedMonitoringAccount { account }
}
}
}

Expand Down Expand Up @@ -356,6 +360,9 @@ fn setup_timers() {
ic_cdk_timers::set_timer_interval(RESUBMIT_TRANSACTIONS_DELAY, async || {
resubmit_transactions(IcCanisterRuntime::new()).await;
});
ic_cdk_timers::set_timer_interval(POLL_MONITORED_ADDRESSES_DELAY, async || {
poll_monitored_addresses(IcCanisterRuntime::new()).await;
});
}

fn main() {}
Expand Down
Loading
Loading