diff --git a/integration_tests/src/fixtures.rs b/integration_tests/src/fixtures.rs index ac1ffc61..0ab9eac8 100644 --- a/integration_tests/src/fixtures.rs +++ b/integration_tests/src/fixtures.rs @@ -46,6 +46,13 @@ pub fn default_get_deposit_address_args() -> GetDepositAddressArgs { } } +pub fn default_update_balance_args() -> cksol_types::UpdateBalanceArgs { + cksol_types::UpdateBalanceArgs { + owner: None, + subaccount: None, + } +} + pub fn default_process_deposit_args() -> ProcessDepositArgs { ProcessDepositArgs { owner: None, @@ -98,7 +105,7 @@ pub struct MockBuilder { /// Number of Solana RPC providers used for redundancy. /// Each logical RPC call generates this many HTTP outcalls with consecutive IDs. -const NUM_RPC_PROVIDERS: u64 = 4; +pub const NUM_RPC_PROVIDERS: u64 = 4; impl Default for MockBuilder { fn default() -> Self { @@ -198,6 +205,14 @@ impl MockBuilder { send_transaction_response(tx_signature), ) } + + /// Mock for `getSignaturesForAddress` returning the given list of signature objects. + pub fn get_signatures_for_address(self, signatures: Vec) -> Self { + self.expect( + get_signatures_for_address_request(), + get_signatures_for_address_response(signatures), + ) + } } // ── JSON-RPC request matchers and response builders ───────────────────────── @@ -360,3 +375,15 @@ fn send_transaction_response(signature: &str) -> JsonRpcResponse { "id": 1 })) } + +fn get_signatures_for_address_request() -> JsonRpcRequestMatcher { + JsonRpcRequestMatcher::with_method("getSignaturesForAddress") +} + +fn get_signatures_for_address_response(signatures: Vec) -> JsonRpcResponse { + JsonRpcResponse::from(json!({ + "jsonrpc": "2.0", + "result": signatures, + "id": 1 + })) +} diff --git a/integration_tests/tests/tests.rs b/integration_tests/tests/tests.rs index 620b33ab..93c1f850 100644 --- a/integration_tests/tests/tests.rs +++ b/integration_tests/tests/tests.rs @@ -5,8 +5,9 @@ use cksol_int_tests::{ CkSolMinter, Setup, SetupBuilder, fixtures::{ DEFAULT_CALLER_ACCOUNT, DEFAULT_CALLER_DEPOSIT_ADDRESS, DEPOSIT_AMOUNT, - EXPECTED_MINT_AMOUNT, MockBuilder, SharedMockHttpOutcalls, default_process_deposit_args, - deposit_transaction_signature, + EXPECTED_MINT_AMOUNT, MockBuilder, SharedMockHttpOutcalls, + default_get_deposit_address_args, default_process_deposit_args, + default_update_balance_args, deposit_transaction_signature, }, }; use cksol_types::{ @@ -30,6 +31,7 @@ const WITHDRAWAL_PROCESSING_DELAY: Duration = Duration::from_mins(1); const FINALIZE_TRANSACTIONS_DELAY: Duration = Duration::from_mins(2); const RESUBMIT_TRANSACTIONS_DELAY: Duration = Duration::from_mins(3); const DEPOSIT_CONSOLIDATION_DELAY: Duration = Duration::from_mins(10); +const POLL_MONITORED_ADDRESSES_DELAY: Duration = Duration::from_mins(1); /// Deposits funds into the minter via `process_deposit`, consolidates them, /// and finalizes the consolidation so the minter's internal balance is credited. @@ -1288,3 +1290,54 @@ mod metrics_tests { .await; } } + +mod automated_deposit_flow_tests { + use super::*; + + #[tokio::test] + async fn should_poll_monitored_address() { + let setup = SetupBuilder::new().build().await; + let minter = setup.minter(); + + // Initialize the minter public key and register the account for monitoring. + assert_eq!( + minter + .get_deposit_address(default_get_deposit_address_args()) + .await + .to_string(), + DEFAULT_CALLER_DEPOSIT_ADDRESS + ); + minter + .update_balance(default_update_balance_args()) + .await + .expect("update_balance should succeed"); + + minter.assert_that_events().await.satisfy(|events| { + check!(events.iter().any(|e| { + e == &EventType::StartedMonitoringAccount { + account: DEFAULT_CALLER_ACCOUNT, + } + })); + }); + + // Advance time: the minter should poll getSignaturesForAddress once, then remove the account. + setup.advance_time(POLL_MONITORED_ADDRESSES_DELAY).await; + setup + .execute_http_mocks( + MockBuilder::with_start_id(0) + .get_signatures_for_address(vec![]) + .build(), + ) + .await; + + minter.assert_that_events().await.satisfy(|events| { + check!(events.iter().any(|e| { + e == &EventType::StoppedMonitoringAccount { + account: DEFAULT_CALLER_ACCOUNT, + } + })); + }); + + setup.drop().await; + } +} diff --git a/libs/types-internal/src/event.rs b/libs/types-internal/src/event.rs index 5948f443..2cf04953 100644 --- a/libs/types-internal/src/event.rs +++ b/libs/types-internal/src/event.rs @@ -7,7 +7,7 @@ use serde::Deserialize; use sol_rpc_types::{Lamport, Pubkey as Address, Signature, Slot}; /// A minter event that can be serialized to Candid. -#[derive(Clone, Debug, CandidType, Deserialize)] +#[derive(Clone, Debug, PartialEq, CandidType, Deserialize)] pub struct Event { /// The canister time at which the minter generated this event. pub timestamp: u64, @@ -16,7 +16,7 @@ pub struct Event { } /// The type of a minter event. -#[derive(Clone, Debug, CandidType, Deserialize)] +#[derive(Clone, Debug, PartialEq, CandidType, Deserialize)] pub enum EventType { /// The minter initialization event. /// Must be the first event in the log. @@ -116,10 +116,15 @@ pub enum EventType { /// The account to monitor for incoming deposits. account: Account, }, + /// The minter stopped monitoring an account for automated deposits. + StoppedMonitoringAccount { + /// The account that is no longer being monitored. + account: Account, + }, } /// The purpose of a submitted Solana transaction. -#[derive(Clone, Debug, CandidType, Deserialize)] +#[derive(Clone, Debug, PartialEq, CandidType, Deserialize)] pub enum TransactionPurpose { /// Consolidate deposited funds into the minter's main account. ConsolidateDeposits { @@ -134,7 +139,7 @@ pub enum TransactionPurpose { } /// A versioned Solana transaction message. -#[derive(Clone, Debug, CandidType, Deserialize)] +#[derive(Clone, Debug, PartialEq, CandidType, Deserialize)] pub enum VersionedTransactionMessage { /// A legacy Solana transaction message, serialized with bincode. Legacy(Vec), diff --git a/minter/cksol_minter.did b/minter/cksol_minter.did index ef01a7e6..f81e36f2 100644 --- a/minter/cksol_minter.did +++ b/minter/cksol_minter.did @@ -416,6 +416,11 @@ type EventType = variant { // The account to monitor for incoming deposits. account: Account; }; + // The minter stopped monitoring an account for automated deposits. + StoppedMonitoringAccount : record { + // The account that is no longer being monitored. + account: Account; + }; }; // A single transaction can deposit to multiple accounts, so the signature alone diff --git a/minter/src/deposit/automatic/mod.rs b/minter/src/deposit/automatic/mod.rs index f9d4dc2e..5454c9c8 100644 --- a/minter/src/deposit/automatic/mod.rs +++ b/minter/src/deposit/automatic/mod.rs @@ -1,9 +1,20 @@ use crate::{ + address::{account_address, lazy_get_schnorr_master_key}, + constants::MAX_CONCURRENT_RPC_CALLS, + guard::TimerGuard, + rpc::get_signatures_for_address, runtime::CanisterRuntime, - state::{audit::process_event, event::EventType, mutate_state, read_state}, + state::{ + SchnorrPublicKey, TaskType, audit::process_event, event::EventType, mutate_state, + read_state, + }, }; +use canlog::log; use cksol_types::UpdateBalanceError; +use cksol_types_internal::log::Priority; use icrc_ledger_types::icrc1::account::Account; +use sol_rpc_types::{CommitmentLevel, GetSignaturesForAddressParams}; +use std::time::Duration; #[cfg(test)] mod tests; @@ -11,6 +22,12 @@ mod tests; /// Maximum number of accounts the minter will monitor simultaneously for automated deposits. pub const MAX_MONITORED_ACCOUNTS: usize = 100; +/// How often the minter polls monitored addresses for new deposit transactions. +pub const POLL_MONITORED_ADDRESSES_DELAY: Duration = Duration::from_mins(1); + +/// Maximum number of `getTransaction` calls to make per polled account. +pub const MAX_TRANSACTIONS_PER_ACCOUNT: usize = 10; + /// Registers the given account for automated deposit monitoring. /// /// Returns `Ok(())` if the account was registered (or was already being monitored). @@ -37,3 +54,81 @@ pub fn update_balance( Ok(()) } + +/// Polls all monitored addresses for new deposit transaction signatures. +/// +/// For each address, calls `getSignaturesForAddress` on the Solana RPC. +pub async fn poll_monitored_addresses(runtime: R) { + let _guard = match TimerGuard::new(TaskType::PollMonitoredAddresses) { + Ok(guard) => guard, + Err(_) => return, + }; + + let all_accounts: Vec = + read_state(|s| s.monitored_accounts().iter().copied().collect()); + if all_accounts.is_empty() { + return; + } + + let more_to_process = all_accounts.len() > MAX_CONCURRENT_RPC_CALLS; + let reschedule = scopeguard::guard(runtime.clone(), |runtime| { + runtime.set_timer(Duration::ZERO, poll_monitored_addresses); + }); + + let master_key = lazy_get_schnorr_master_key(&runtime).await; + + futures::future::join_all( + all_accounts + .into_iter() + .take(MAX_CONCURRENT_RPC_CALLS) + .map(|account| poll_account(&runtime, &master_key, account)), + ) + .await; + + if !more_to_process { + // All work fits in this round + scopeguard::ScopeGuard::into_inner(reschedule); + } +} + +async fn poll_account( + runtime: &R, + master_key: &SchnorrPublicKey, + account: Account, +) { + let deposit_address = account_address(master_key, &account); + + let params = GetSignaturesForAddressParams { + pubkey: deposit_address.into(), + commitment: Some(CommitmentLevel::Finalized), + min_context_slot: None, + // Fetch no more signatures than we intend to process with `getTransaction`. + limit: Some( + (MAX_TRANSACTIONS_PER_ACCOUNT as u32) + .try_into() + .expect("MAX_TRANSACTIONS_PER_ACCOUNT must be between 1 and 1000"), + ), + before: None, + until: None, + }; + + match get_signatures_for_address(runtime, params).await { + Err(e) => { + log!( + Priority::Info, + "Failed to get signatures for address {deposit_address}: {e}" + ); + } + Ok(_signatures) => { + // TODO(DEFI-2780): Process discovered deposit signatures. + } + } + + mutate_state(|state| { + process_event( + state, + EventType::StoppedMonitoringAccount { account }, + runtime, + ); + }); +} diff --git a/minter/src/deposit/automatic/tests.rs b/minter/src/deposit/automatic/tests.rs index fc0e5f7a..0ece8426 100644 --- a/minter/src/deposit/automatic/tests.rs +++ b/minter/src/deposit/automatic/tests.rs @@ -1,11 +1,15 @@ use super::*; use crate::{ + constants::MAX_CONCURRENT_RPC_CALLS, state::{event::EventType, read_state}, test_fixtures::{ - EventsAssert, account, events::start_monitoring_account, init_state, - runtime::TestCanisterRuntime, + EventsAssert, account, events::start_monitoring_account, init_schnorr_master_key, + init_state, runtime::TestCanisterRuntime, }, }; +use sol_rpc_types::{ConfirmedTransactionStatusWithSignature, MultiRpcResult}; + +type SignaturesResult = MultiRpcResult>; fn monitored_accounts_count() -> usize { read_state(|s| s.monitored_accounts().len()) @@ -79,3 +83,48 @@ fn should_not_return_queue_full_if_account_already_monitored() { let result = update_balance(&runtime, account(0)); assert_eq!(result, Ok(())); } + +#[tokio::test] +async fn should_poll_monitored_addresses_in_rounds() { + setup(); + + // Add MAX_CONCURRENT_RPC_CALLS + 1 accounts to monitor so that 2 rounds are needed. + let num_accounts = MAX_CONCURRENT_RPC_CALLS + 1; + for i in 0..num_accounts { + start_monitoring_account(account(i)); + } + assert_eq!(monitored_accounts_count(), num_accounts); + + // Round 1: polls MAX_CONCURRENT_RPC_CALLS accounts, 1 remains → reschedule. + let mut runtime = TestCanisterRuntime::new().with_increasing_time(); + for _ in 0..MAX_CONCURRENT_RPC_CALLS { + runtime = runtime.add_stub_response(SignaturesResult::Consistent(Ok(vec![]))); + } + poll_monitored_addresses(runtime.clone()).await; + + assert_eq!(monitored_accounts_count(), 1); + assert_eq!(runtime.set_timer_call_count(), 1); + + // Round 2: polls the remaining 1 account → no reschedule, queue empty. + let runtime = TestCanisterRuntime::new() + .with_increasing_time() + .add_stub_response(SignaturesResult::Consistent(Ok(vec![]))); + poll_monitored_addresses(runtime.clone()).await; + + assert_eq!(monitored_accounts_count(), 0); + assert_eq!(runtime.set_timer_call_count(), 0); + + // Verify StoppedMonitoringAccount was emitted for each account. + let mut events_assert = EventsAssert::from_recorded(); + for i in 0..num_accounts { + events_assert = + events_assert.expect_contains_event_eq(EventType::StoppedMonitoringAccount { + account: account(i), + }); + } +} + +fn setup() { + init_state(); + init_schnorr_master_key(); +} diff --git a/minter/src/main.rs b/minter/src/main.rs index 5f448f46..d00ef890 100644 --- a/minter/src/main.rs +++ b/minter/src/main.rs @@ -3,6 +3,7 @@ use canlog::{Log, Sort}; use cksol_minter::{ address::lazy_get_schnorr_master_key, consolidate::{DEPOSIT_CONSOLIDATION_DELAY, consolidate_deposits}, + deposit::automatic::{POLL_MONITORED_ADDRESSES_DELAY, poll_monitored_addresses}, monitor::{ FINALIZE_TRANSACTIONS_DELAY, RESUBMIT_TRANSACTIONS_DELAY, finalize_transactions, resubmit_transactions, @@ -200,6 +201,9 @@ fn get_events( EventType::StartedMonitoringAccount { account } => { event::EventType::StartedMonitoringAccount { account } } + EventType::StoppedMonitoringAccount { account } => { + event::EventType::StoppedMonitoringAccount { account } + } } } @@ -356,6 +360,9 @@ fn setup_timers() { ic_cdk_timers::set_timer_interval(RESUBMIT_TRANSACTIONS_DELAY, async || { resubmit_transactions(IcCanisterRuntime::new()).await; }); + ic_cdk_timers::set_timer_interval(POLL_MONITORED_ADDRESSES_DELAY, async || { + poll_monitored_addresses(IcCanisterRuntime::new()).await; + }); } fn main() {} diff --git a/minter/src/rpc/mod.rs b/minter/src/rpc/mod.rs index 2c713ef7..e27165b8 100644 --- a/minter/src/rpc/mod.rs +++ b/minter/src/rpc/mod.rs @@ -6,7 +6,10 @@ use crate::{ use cksol_types::ProcessDepositError; use derive_more::From; use ic_canister_runtime::IcError; -use sol_rpc_types::{CommitmentLevel, GetTransactionEncoding, MultiRpcResult, RpcError, Slot}; +use sol_rpc_types::{ + CommitmentLevel, ConfirmedTransactionStatusWithSignature, GetSignaturesForAddressParams, + GetTransactionEncoding, MultiRpcResult, RpcError, Slot, +}; use solana_hash::Hash; use solana_signature::Signature; use solana_transaction::Transaction; @@ -133,3 +136,28 @@ pub enum GetSignatureStatusesError { #[error("Inconsistent RPC results for getSignatureStatuses")] InconsistentRpcResults, } + +pub async fn get_signatures_for_address( + runtime: &R, + params: GetSignaturesForAddressParams, +) -> Result, GetSignaturesForAddressError> { + let client = read_state(|state| state.sol_rpc_client(runtime.inter_canister_call_runtime())); + let result = client.get_signatures_for_address(params).try_send().await; + match result? { + MultiRpcResult::Consistent(Ok(signatures)) => Ok(signatures), + MultiRpcResult::Consistent(Err(e)) => Err(GetSignaturesForAddressError::RpcError(e)), + MultiRpcResult::Inconsistent(_) => { + Err(GetSignaturesForAddressError::InconsistentRpcResults) + } + } +} + +#[derive(Debug, PartialEq, Error)] +pub enum GetSignaturesForAddressError { + #[error("Error while calling SOL RPC canister: {0}")] + IcError(#[from] IcError), + #[error("RPC error while fetching signatures for address: {0}")] + RpcError(RpcError), + #[error("Inconsistent RPC results for getSignaturesForAddress")] + InconsistentRpcResults, +} diff --git a/minter/src/rpc/tests.rs b/minter/src/rpc/tests.rs index cfc4f183..1da761ca 100644 --- a/minter/src/rpc/tests.rs +++ b/minter/src/rpc/tests.rs @@ -1,7 +1,8 @@ use crate::{ rpc::{ - GetRecentBlockhashError, GetTransactionError, SubmitTransactionError, - get_recent_slot_and_blockhash, get_transaction, submit_transaction, + GetRecentBlockhashError, GetSignaturesForAddressError, GetTransactionError, + SubmitTransactionError, get_recent_slot_and_blockhash, get_signatures_for_address, + get_transaction, submit_transaction, }, test_fixtures::{ PROCESS_DEPOSIT_REQUIRED_CYCLES, confirmed_block, @@ -266,3 +267,129 @@ mod get_recent_slot_and_blockhash_tests { solana_hash::Hash::from([0x42; 32]).into() } } + +mod get_signatures_for_address_tests { + use super::*; + use sol_rpc_types::{ + ConfirmedTransactionStatusWithSignature, GetSignaturesForAddressParams, Pubkey, + }; + + type MultiRpcResult = + sol_rpc_types::MultiRpcResult>; + + fn test_params() -> GetSignaturesForAddressParams { + GetSignaturesForAddressParams { + pubkey: "Cybe9JqZKtmhBoVGNHBxRVMUndZno5vNj5bS9GqTCty1" + .parse::() + .unwrap(), + commitment: None, + min_context_slot: None, + limit: None, + before: None, + until: None, + } + } + + fn confirmed_tx_status(signature_bytes: [u8; 64]) -> ConfirmedTransactionStatusWithSignature { + ConfirmedTransactionStatusWithSignature { + signature: solana_signature::Signature::from(signature_bytes).into(), + slot: 12345, + err: None, + memo: None, + block_time: None, + confirmation_status: None, + } + } + + #[tokio::test] + async fn should_fail_if_get_signatures_for_address_fails() { + init_state(); + + let runtime = TestCanisterRuntime::new().add_stub_error(IcError::CallPerformFailed); + + let result = get_signatures_for_address(&runtime, test_params()).await; + + assert_eq!( + result, + Err(GetSignaturesForAddressError::IcError( + IcError::CallPerformFailed + )) + ); + } + + #[tokio::test] + async fn should_fail_if_get_signatures_for_address_returns_rpc_error() { + init_state(); + + let rpc_error = RpcError::HttpOutcallError(HttpOutcallError::InvalidHttpJsonRpcResponse { + status: 500, + body: "Internal server error".to_string(), + parsing_error: None, + }); + + let runtime = TestCanisterRuntime::new() + .add_stub_response(MultiRpcResult::Consistent(Err(rpc_error.clone()))); + + let result = get_signatures_for_address(&runtime, test_params()).await; + + assert_eq!( + result, + Err(GetSignaturesForAddressError::RpcError(rpc_error)) + ); + } + + #[tokio::test] + async fn should_fail_if_get_signatures_for_address_result_inconsistent() { + init_state(); + + let results = vec![ + ( + RpcSource::Supported(SupportedRpcProviderId::AnkrMainnet), + Err(RpcError::ValidationError("Error 1".to_string())), + ), + ( + RpcSource::Supported(SupportedRpcProviderId::DrpcMainnet), + Err(RpcError::ValidationError("Error 2".to_string())), + ), + ]; + + let runtime = + TestCanisterRuntime::new().add_stub_response(MultiRpcResult::Inconsistent(results)); + + let result = get_signatures_for_address(&runtime, test_params()).await; + + assert_eq!( + result, + Err(GetSignaturesForAddressError::InconsistentRpcResults) + ); + } + + #[tokio::test] + async fn should_return_empty_vec_if_no_signatures() { + init_state(); + + let runtime = + TestCanisterRuntime::new().add_stub_response(MultiRpcResult::Consistent(Ok(vec![]))); + + let result = get_signatures_for_address(&runtime, test_params()).await; + + assert_eq!(result, Ok(vec![])); + } + + #[tokio::test] + async fn should_return_signatures() { + init_state(); + + let expected = vec![ + confirmed_tx_status([0x11; 64]), + confirmed_tx_status([0x22; 64]), + ]; + + let runtime = TestCanisterRuntime::new() + .add_stub_response(MultiRpcResult::Consistent(Ok(expected.clone()))); + + let result = get_signatures_for_address(&runtime, test_params()).await; + + assert_eq!(result, Ok(expected)); + } +} diff --git a/minter/src/state/audit.rs b/minter/src/state/audit.rs index 18951b64..7ad256b6 100644 --- a/minter/src/state/audit.rs +++ b/minter/src/state/audit.rs @@ -69,6 +69,9 @@ fn apply_state_transition(state: &mut State, payload: &EventType, timestamp: u64 EventType::StartedMonitoringAccount { account } => { state.process_started_monitoring_account(account); } + EventType::StoppedMonitoringAccount { account } => { + state.process_stopped_monitoring_account(account); + } } } diff --git a/minter/src/state/event.rs b/minter/src/state/event.rs index e13ef2f9..cc37f44c 100644 --- a/minter/src/state/event.rs +++ b/minter/src/state/event.rs @@ -135,6 +135,13 @@ pub enum EventType { #[n(0)] account: Account, }, + /// The minter stopped monitoring an account for automated deposits. + #[n(12)] + StoppedMonitoringAccount { + /// The account that is no longer being monitored. + #[n(0)] + account: Account, + }, } /// Payload of the `AcceptedWithdrawalRequest` event. diff --git a/minter/src/state/mod.rs b/minter/src/state/mod.rs index 9e625dde..4220505e 100644 --- a/minter/src/state/mod.rs +++ b/minter/src/state/mod.rs @@ -252,6 +252,10 @@ impl State { self.monitored_accounts.insert(*account); } + pub(crate) fn process_stopped_monitoring_account(&mut self, account: &Account) { + self.monitored_accounts.remove(account); + } + pub fn consolidation_transactions( &self, ) -> &InsertionOrderedMap { @@ -836,6 +840,7 @@ pub enum TaskType { FinalizeTransactions, ResubmitTransactions, WithdrawalProcessing, + PollMonitoredAddresses, } /// Details about a consolidation transaction, capturing the individual