diff --git a/.github/workflows/build-and-test.yml b/.github/workflows/build-and-test.yml index 5ca240e10..d890f6ad1 100644 --- a/.github/workflows/build-and-test.yml +++ b/.github/workflows/build-and-test.yml @@ -18,7 +18,7 @@ permissions: env: DASHVERSION: "23.1.0" TEST_DATA_REPO: "dashpay/regtest-blockchain" - TEST_DATA_VERSION: "v0.0.3" + TEST_DATA_VERSION: "v0.0.4" jobs: test: diff --git a/contrib/setup-dashd.py b/contrib/setup-dashd.py index 1ef437bfb..ea056258b 100755 --- a/contrib/setup-dashd.py +++ b/contrib/setup-dashd.py @@ -7,7 +7,7 @@ Environment variables: DASHVERSION - Dash Core version (default: 23.1.0) - TEST_DATA_VERSION - Test data release version (default: v0.0.3) + TEST_DATA_VERSION - Test data release version (default: v0.0.4) TEST_DATA_REPO - GitHub repo for test data (default: dashpay/regtest-blockchain) CACHE_DIR - Cache directory (default: ~/.rust-dashcore-test) """ @@ -22,7 +22,7 @@ # Keep these defaults in sync with .github/workflows/build-and-test.yml DASHVERSION = os.environ.get("DASHVERSION", "23.1.0") -TEST_DATA_VERSION = os.environ.get("TEST_DATA_VERSION", "v0.0.3") +TEST_DATA_VERSION = os.environ.get("TEST_DATA_VERSION", "v0.0.4") TEST_DATA_REPO = os.environ.get("TEST_DATA_REPO", "dashpay/regtest-blockchain") @@ -115,23 +115,26 @@ def setup_dashd(cache_dir): return dashd_bin -VARIANTS = ["regtest-40000", "regtest-200"] +# Each entry maps a variant directory name to a marker path (relative to that +# directory) used as a cache hit / extraction-success check. Single-node +# variants ship a `regtest/blocks` subdirectory; the masternode network ships a +# top-level `network.json` plus per-node datadirs. +VARIANTS = { + "regtest-40000": "regtest/blocks", + "regtest-200": "regtest/blocks", + "regtest-mn": "network.json", +} -def setup_test_data(cache_dir, variant): - """Download and extract a single test blockchain variant. - - Args: - cache_dir: Root cache directory for all test assets. - variant: Directory name of the test data (e.g. "regtest-40000" or "regtest-200"). - """ +def setup_test_data(cache_dir, variant, marker_relpath): + """Download and extract a single test blockchain variant.""" parent_dir = os.path.join(cache_dir, f"regtest-blockchain-{TEST_DATA_VERSION}") test_data_dir = os.path.join(parent_dir, variant) - blocks_dir = os.path.join(test_data_dir, "regtest", "blocks") + marker_path = os.path.join(test_data_dir, marker_relpath) - if os.path.isdir(blocks_dir): + if os.path.exists(marker_path): log(f"Test blockchain data {variant} ({TEST_DATA_VERSION}) already available") - return + return test_data_dir log(f"Downloading test blockchain data {variant} ({TEST_DATA_VERSION})...") os.makedirs(parent_dir, exist_ok=True) @@ -143,10 +146,11 @@ def setup_test_data(cache_dir, variant): extract(archive_path, parent_dir) os.remove(archive_path) - if not os.path.isdir(blocks_dir): - sys.exit(f"Expected blocks directory not found after extraction: {blocks_dir}") + if not os.path.exists(marker_path): + sys.exit(f"Expected marker not found after extraction: {marker_path}") log(f"Downloaded test data to {test_data_dir}") + return test_data_dir def main(): @@ -154,8 +158,10 @@ def main(): os.makedirs(cache_dir, exist_ok=True) dashd_path = setup_dashd(cache_dir) - for variant in VARIANTS: - setup_test_data(cache_dir, variant) + variant_dirs = { + variant: setup_test_data(cache_dir, variant, marker) + for variant, marker in VARIANTS.items() + } datadir = os.path.join(cache_dir, f"regtest-blockchain-{TEST_DATA_VERSION}") @@ -163,6 +169,7 @@ def main(): prefix = "" if os.environ.get("GITHUB_ACTIONS") == "true" else "export " print(f"{prefix}DASHD_PATH={dashd_path}") print(f"{prefix}DASHD_TEST_DATA={datadir}") + print(f"{prefix}DASHD_MN_DATADIR={variant_dirs['regtest-mn']}") if __name__ == "__main__": diff --git a/dash-spv/src/test_utils/event_handler.rs b/dash-spv/src/test_utils/event_handler.rs index 25d9f6b19..7a5dd1143 100644 --- a/dash-spv/src/test_utils/event_handler.rs +++ b/dash-spv/src/test_utils/event_handler.rs @@ -23,10 +23,10 @@ pub struct TestEventHandler { impl TestEventHandler { pub fn new() -> Self { - let (sync_tx, _) = broadcast::channel(256); - let (network_tx, _) = broadcast::channel(256); + let (sync_tx, _) = broadcast::channel(10000); + let (network_tx, _) = broadcast::channel(10000); let (progress_tx, _) = watch::channel(SyncProgress::default()); - let (wallet_tx, _) = broadcast::channel(256); + let (wallet_tx, _) = broadcast::channel(10000); Self { sync_tx, network_tx, diff --git a/dash-spv/src/test_utils/masternode_network.rs b/dash-spv/src/test_utils/masternode_network.rs new file mode 100644 index 000000000..0873d9128 --- /dev/null +++ b/dash-spv/src/test_utils/masternode_network.rs @@ -0,0 +1,1099 @@ +//! Masternode network test infrastructure. +//! +//! Manages a pre-generated masternode network (1 controller + N masternodes) +//! for integration testing of masternode list sync against real dashd peers. + +use std::env; +use std::fs; +use std::iter; +use std::net::SocketAddr; +use std::path::{Path, PathBuf}; +use std::thread; +use std::time::{Duration, Instant}; + +use dashcore::sml::llmq_type::LLMQ_TEST_DIP00024; +use dashcore::BlockHash; +use dashcore_rpc::{Auth, Client, RpcApi}; +use serde::Deserialize; +use serde_json::Value; +use tempfile::TempDir; +use tokio::time; +use tracing::{debug, info, warn}; + +use super::fs_helpers::copy_dir; +use super::node::find_available_port; +use super::{retain_test_dir, DashCoreConfig, DashCoreNode, WalletFile}; + +/// Metadata for a pre-generated masternode network, deserialized from network.json. +#[derive(Debug, Deserialize)] +pub struct NetworkMetadata { + pub version: String, + pub chain_height: u32, + pub dkg_cycles_completed: u32, + pub dkg_interval: u32, + pub controller: ControllerInfo, + pub masternodes: Vec, + pub spork_private_key: String, + pub dashd_extra_args: Vec, +} + +/// Controller node info from network.json. +#[derive(Debug, Deserialize)] +pub struct ControllerInfo { + pub datadir: String, + pub wallet: String, +} + +/// Individual masternode info from network.json. +#[derive(Debug, Deserialize)] +pub struct MasternodeInfo { + pub index: u32, + pub datadir: String, + pub pro_tx_hash: String, + pub bls_private_key: String, + pub bls_public_key: String, + pub owner_address: String, + pub voting_address: String, + pub payout_address: String, +} + +impl NetworkMetadata { + fn from_json(path: &Path) -> Self { + let contents = fs::read_to_string(path) + .unwrap_or_else(|e| panic!("Failed to read {}: {}", path.display(), e)); + serde_json::from_str(&contents) + .unwrap_or_else(|e| panic!("Failed to parse {}: {}", path.display(), e)) + } +} + +/// Test context managing a full masternode network (controller + masternodes). +/// +/// Starts dashd instances from pre-generated blockchain data, connects them, +/// and provides the controller's P2P address for SPV client testing. +pub struct MasternodeTestContext { + pub controller: DashCoreNode, + pub masternodes: Vec, + pub metadata: NetworkMetadata, + pub controller_addr: SocketAddr, + pub wallet: WalletFile, + pub expected_height: u32, + /// Current mock time used for DKG phase orchestration. + mocktime: u64, +} + +impl MasternodeTestContext { + /// Create a new masternode test context. + /// + /// When `controller_only` is true, only the controller node is started + /// (sufficient for static masternode list sync tests). + /// + /// Returns `None` when `SKIP_DASHD_TESTS=1` or when the dashd binary lacks + /// the RPC miner (`generatetoaddress`) — every masternode test mines + /// blocks, so without the miner none can run. Missing or invalid + /// `DASHD_PATH` / `DASHD_MN_DATADIR` env vars panic, matching the + /// `DashdTestContext::new` policy in `tests/dashd_sync`: a CI + /// misconfiguration must fail loudly rather than silently skipping + /// every dashd-backed test. + pub async fn new(controller_only: bool) -> Option { + if env::var("SKIP_DASHD_TESTS").is_ok() { + eprintln!("Skipping dashd integration test (SKIP_DASHD_TESTS is set)"); + return None; + } + + let dashd_path = env::var("DASHD_PATH") + .ok() + .map(PathBuf::from) + .expect("DASHD_PATH must be set for masternode tests"); + assert!(dashd_path.exists(), "DASHD_PATH does not exist: {}", dashd_path.display()); + + let mn_datadir = env::var("DASHD_MN_DATADIR") + .ok() + .map(PathBuf::from) + .expect("DASHD_MN_DATADIR must be set for masternode tests"); + assert!(mn_datadir.exists(), "DASHD_MN_DATADIR does not exist: {}", mn_datadir.display()); + + let metadata = NetworkMetadata::from_json(&mn_datadir.join("network.json")); + info!( + "Loaded masternode network: height={}, dkg_cycles={}, masternodes={}", + metadata.chain_height, + metadata.dkg_cycles_completed, + metadata.masternodes.len() + ); + + let wallet = WalletFile::from_json(&mn_datadir, &metadata.controller.wallet); + info!( + "Loaded wallet: {} transactions, balance: {:.8}", + wallet.transaction_count, wallet.balance + ); + + let mut shared_args: Vec = metadata.dashd_extra_args.clone(); + shared_args.push(format!("-sporkkey={}", metadata.spork_private_key)); + shared_args.push("-debug=all".to_string()); + shared_args.push("-debuglogfile=debug.log".to_string()); + + let controller_temp = TempDir::new().expect("failed to create controller temp dir"); + copy_dir(&mn_datadir.join(&metadata.controller.datadir), controller_temp.path()) + .expect("failed to copy controller datadir"); + let controller_config = DashCoreConfig { + dashd_path: dashd_path.clone(), + datadir: controller_temp.path().to_path_buf(), + wallet: metadata.controller.wallet.clone(), + p2p_port: find_available_port(), + rpc_port: find_available_port(), + extra_args: shared_args.clone(), + }; + let mut controller = DashCoreNode::with_config(controller_config); + let controller_addr = controller.start().await; + + // Every masternode test path mines blocks (DKG cycles, ISLOCK, mocktime + // bumps), so a dashd binary without the RPC miner cannot run any of + // them. Some Windows release builds ship without `generatetoaddress` + // compiled in. Detect this and skip rather than panicking deep inside + // a test, mirroring the policy in `DashdTestContext`. + if !controller.supports_mining() { + eprintln!("Skipping masternode test (dashd RPC miner not available)"); + return None; + } + + // Keep the temp dir so debug.log is accessible after the test. + let controller_path = controller_temp.keep(); + info!( + "Controller started at {} | debug.log: {}/regtest/debug.log", + controller_addr, + controller_path.display() + ); + + let mut masternodes = Vec::new(); + if !controller_only { + for mn_info in &metadata.masternodes { + let mn_temp = TempDir::new().expect("failed to create mn temp dir"); + copy_dir(&mn_datadir.join(&mn_info.datadir), mn_temp.path()) + .expect("failed to copy masternode datadir"); + + let mut mn_args = shared_args.clone(); + mn_args.push("-txindex=1".to_string()); + mn_args.push(format!("-masternodeblsprivkey={}", mn_info.bls_private_key)); + + let mn_config = DashCoreConfig { + dashd_path: dashd_path.clone(), + datadir: mn_temp.keep(), + wallet: "".to_string(), + p2p_port: find_available_port(), + rpc_port: find_available_port(), + extra_args: mn_args, + }; + let mut node = DashCoreNode::with_config(mn_config); + let addr = node.start().await; + info!( + "Masternode {} started at {} | debug.log: {}/regtest/debug.log", + mn_info.datadir, + addr, + node.datadir().display() + ); + + masternodes.push(node); + } + + connect_all_nodes(&controller, &masternodes).await; + + // Update each masternode's service address to match its actual P2P port. + // The proTx entries from generation reference the original ports, but the + // nodes now run on different ports. Without this update, quorum connections + // between masternodes would fail (dashd connects to registered addresses). + update_mn_service_addresses(&controller, &masternodes, &metadata); + + // Disable `SPORK_21_QUORUM_ALL_CONNECTED` to match upstream Dash + // Core functional tests (`p2p_instantsend.py`, + // `feature_llmq_is_*.py`, `feature_llmq_rotation.py`) which all + // run with SPORK_21 OFF. The pre-generated chain bakes this spork + // at value=0 (ON), which switches `GetQuorumConnections` into the + // asymmetric `DeterministicOutboundConnection` hub-and-spoke mode. + // With only 4 MNs and our fixed pre-generated proTxHashes, that + // produces a degenerate topology where one MN ends up as the + // outbound initiator against all three others, so none of them + // ever put it in their `masternodeQuorumRelayMembers` set, no + // `QSENDRECSIGS` reaches it, and its recovered input-lock sig + // never propagates back — leaving ISLOCK sessions stuck at a + // single share. Setting the spork to `4070908800` (far-future + // timestamp) disables it per Dash Core convention. + if controller + .try_rpc_call( + "sporkupdate", + &["SPORK_21_QUORUM_ALL_CONNECTED".into(), 4070908800i64.into()], + ) + .is_none() + { + warn!("Failed to disable SPORK_21_QUORUM_ALL_CONNECTED"); + } + // Mine one block so the updated spork value is gossiped to MNs + // before any signing session starts. + { + let addr = controller.get_new_address(); + controller.generate_blocks(1, &addr); + } + } + + let expected_height = controller + .try_rpc_call("getblockcount", &[]) + .and_then(|v| v.as_u64()) + .expect("getblockcount on controller") as u32; + + // DKG orchestration requires initializing mocktime to the latest block timestamp. + let mocktime = { + let hash = controller.get_best_block_hash().to_string(); + let block_info = controller + .try_rpc_call("getblock", &[hash.into()]) + .expect("getblock on controller"); + block_info["time"].as_u64().expect("block time") + 1 + }; + + // Set mocktime on all nodes so DKG timing is consistent with the + // pre-generated data. Without this, nodes use real system time which + // is far ahead of the block timestamps from generation. + controller.set_mocktime(mocktime); + for mn in &masternodes { + mn.set_mocktime(mocktime); + } + + info!("Network ready: controller at height {}, mocktime={}", expected_height, mocktime); + + Some(MasternodeTestContext { + controller, + masternodes, + metadata, + controller_addr, + wallet, + expected_height, + mocktime, + }) + } + + pub fn bump_mocktime(&mut self, seconds: u64) { + self.mocktime += seconds; + let time = self.mocktime; + for node in iter::once(&self.controller).chain(self.masternodes.iter()) { + node.try_rpc_call("setmocktime", &[time.into()]); + node.try_rpc_call("mockscheduler", &[seconds.into()]); + } + } + + /// Generate blocks on the controller and wait for all nodes to sync. + pub fn move_blocks(&mut self, count: u64) { + if count == 0 { + return; + } + self.bump_mocktime(1); + let addr = self.controller.get_new_address(); + self.controller.generate_blocks(count, &addr); + self.wait_for_sync(); + } + + /// Wait for all masternode nodes to reach the same height as the controller. + fn wait_for_sync(&self) { + let target_height = self + .controller + .try_rpc_call("getblockcount", &[]) + .and_then(|v| v.as_u64()) + .expect("getblockcount on controller"); + + for mn in &self.masternodes { + let start = Instant::now(); + loop { + let h = mn.try_rpc_call("getblockcount", &[]).and_then(|v| v.as_u64()).unwrap_or(0); + if h >= target_height { + break; + } + if start.elapsed() > Duration::from_secs(30) { + panic!("Masternode sync timeout: at {}, expected {}", h, target_height); + } + thread::sleep(Duration::from_millis(200)); + } + } + } + + /// Wait for masternodes to reach a specific DKG phase for the given quorum type and hash. + /// + /// Returns true if enough members reached the phase within the timeout. + #[allow(clippy::too_many_arguments)] + fn wait_for_quorum_phase( + &mut self, + llmq_type: &str, + quorum_hash: &str, + phase: u64, + expected_members: usize, + check_received_messages: Option<&str>, + check_received_messages_count: u64, + timeout_secs: u64, + ) -> bool { + let start = Instant::now(); + let timeout = Duration::from_secs(timeout_secs); + let mut poll_iter: u32 = 0; + + while start.elapsed() < timeout { + let mut member_count = 0; + for mn in &self.masternodes { + if let Some(status) = mn.try_rpc_call("quorum", &["dkgstatus".into()]) { + if let Some(sessions) = status.get("session").and_then(|s| s.as_array()) { + for session in sessions { + let session_type = session.get("llmqType").and_then(|t| t.as_str()); + if session_type != Some(llmq_type) { + continue; + } + let qs = session.get("status").unwrap_or(session); + let hash_matches = + qs.get("quorumHash").and_then(|h| h.as_str()) == Some(quorum_hash); + let phase_matches = + qs.get("phase").and_then(|p| p.as_u64()) == Some(phase); + let messages_match = check_received_messages.is_none_or(|field| { + qs.get(field).and_then(|v| v.as_u64()).unwrap_or(0) + >= check_received_messages_count + }); + if hash_matches && phase_matches && messages_match { + member_count += 1; + break; + } + } + } + } + } + if member_count >= expected_members { + return true; + } + if poll_iter.is_multiple_of(5) { + self.bump_mocktime(1); + } + poll_iter += 1; + thread::sleep(Duration::from_millis(100)); + } + false + } + + /// Wait for quorum connections between masternodes to be actually established. + /// + /// Checks the `quorumConnections` field in dkgstatus for masternodes that have + /// outbound connections marked as `connected: true`. The TCP connections use + /// real wall-clock time, so mockscheduler alone is not enough. + fn wait_for_quorum_connections( + &mut self, + llmq_type: &str, + quorum_hash: &str, + expected_connections: usize, + expected_members: usize, + timeout_secs: u64, + ) -> bool { + let start = Instant::now(); + let timeout = Duration::from_secs(timeout_secs); + let mut poll_iter: u32 = 0; + + while start.elapsed() < timeout { + let mut connected_members = 0; + + for mn in &self.masternodes { + let Some(status) = mn.try_rpc_call("quorum", &["dkgstatus".into()]) else { + continue; + }; + let Some(sessions) = status.get("session").and_then(|s| s.as_array()) else { + continue; + }; + let has_session = sessions.iter().any(|session| { + session.get("llmqType").and_then(|t| t.as_str()) == Some(llmq_type) + && session + .get("status") + .unwrap_or(session) + .get("quorumHash") + .and_then(|h| h.as_str()) + == Some(quorum_hash) + }); + if !has_session { + continue; + } + + let Some(conn_groups) = status.get("quorumConnections").and_then(|c| c.as_array()) + else { + continue; + }; + + let Some(conn_group) = conn_groups.iter().find(|conn_group| { + conn_group.get("llmqType").and_then(|t| t.as_str()) == Some(llmq_type) + && conn_group.get("quorumHash").and_then(|h| h.as_str()) + == Some(quorum_hash) + }) else { + continue; + }; + + let connected = conn_group + .get("quorumConnections") + .and_then(|p| p.as_array()) + .map(|peers| { + peers + .iter() + .filter(|p| p.get("connected").and_then(|c| c.as_bool()) == Some(true)) + .count() + }) + .unwrap_or(0); + if connected >= expected_connections { + connected_members += 1; + } + } + + if connected_members >= expected_members { + debug!( + "Quorum connections established for {} ({} members with {} peers each)", + llmq_type, connected_members, expected_connections + ); + return true; + } + if poll_iter.is_multiple_of(5) { + self.bump_mocktime(1); + } + poll_iter += 1; + thread::sleep(Duration::from_millis(100)); + } + false + } + + fn wait_for_masternode_probes( + &mut self, + llmq_type: &str, + quorum_hash: &str, + timeout_secs: u64, + ) -> bool { + let start = Instant::now(); + let timeout = Duration::from_secs(timeout_secs); + let mut poll_iter: u32 = 0; + + while start.elapsed() < timeout { + let mut all_probed = true; + + 'nodes: for mn in &self.masternodes { + let Some(status) = mn.try_rpc_call("quorum", &["dkgstatus".into()]) else { + all_probed = false; + break; + }; + let Some(conn_groups) = status.get("quorumConnections").and_then(|c| c.as_array()) + else { + all_probed = false; + break; + }; + + for conn_group in conn_groups { + let type_matches = + conn_group.get("llmqType").and_then(|t| t.as_str()) == Some(llmq_type); + let hash_matches = + conn_group.get("quorumHash").and_then(|h| h.as_str()) == Some(quorum_hash); + if !type_matches || !hash_matches { + continue; + } + + let Some(peers) = + conn_group.get("quorumConnections").and_then(|p| p.as_array()) + else { + all_probed = false; + break 'nodes; + }; + + for peer in peers { + if peer.get("outbound").and_then(|v| v.as_bool()) != Some(false) { + continue; + } + let Some(pro_tx_hash) = peer.get("proTxHash").and_then(|v| v.as_str()) + else { + all_probed = false; + break 'nodes; + }; + let Some(info) = + mn.try_rpc_call("protx", &["info".into(), pro_tx_hash.into()]) + else { + all_probed = false; + break 'nodes; + }; + let meta = info.get("metaInfo").unwrap_or(&info); + let last_success = meta + .get("lastOutboundSuccessElapsed") + .and_then(|v| v.as_u64()) + .unwrap_or(u64::MAX); + let last_attempt = meta + .get("lastOutboundAttemptElapsed") + .and_then(|v| v.as_u64()) + .unwrap_or(u64::MAX); + let is_expected_mn = self + .metadata + .masternodes + .iter() + .any(|mn_info| mn_info.pro_tx_hash == pro_tx_hash); + if is_expected_mn { + if last_success > 55 * 60 { + all_probed = false; + break 'nodes; + } + } else if last_attempt > 55 * 60 && last_success > 55 * 60 { + all_probed = false; + break 'nodes; + } + } + } + } + + if all_probed { + return true; + } + if poll_iter.is_multiple_of(5) { + self.bump_mocktime(1); + } + poll_iter += 1; + thread::sleep(Duration::from_millis(100)); + } + false + } + + /// Wait for at least `expected_count` entries of `llmq_type` to appear in + /// the controller's `quorum list` output. + fn wait_for_quorum_list_count( + &self, + llmq_type: &str, + expected_count: usize, + timeout_secs: u64, + ) -> bool { + let start = Instant::now(); + let timeout = Duration::from_secs(timeout_secs); + + while start.elapsed() < timeout { + if let Some(qlist) = self.controller.try_rpc_call("quorum", &["list".into(), 10.into()]) + { + let count = + qlist.get(llmq_type).and_then(|v| v.as_array()).map(|a| a.len()).unwrap_or(0); + if count >= expected_count { + return true; + } + } + thread::sleep(Duration::from_millis(100)); + } + false + } + + /// Mine a single block and then call `on_block_mined` with the controller + /// reference and the resulting block height. + fn mine_block_then_notify(&mut self, on_block_mined: &mut F) { + self.move_blocks(1); + let height = self + .controller + .try_rpc_call("getblockcount", &[]) + .and_then(|v| v.as_u64()) + .expect("getblockcount") as u32; + on_block_mined(&self.controller, height); + } + + /// Mine a single block, wait for it to be ChainLocked, then call the hook. + /// + /// The ChainLock wait guarantees the block's CbTx `bestCLSignature` is + /// populated, which is required whenever the block is later referenced as + /// a QRInfo rotating-quorum lookup target (`cycleBlock - quorumIndex - 8`). + /// + /// A missed ChainLock within `cl_timeout_secs` is logged as a warning and + /// the method returns normally. + fn mine_block_with_cl_then_notify( + &mut self, + on_block_mined: &mut F, + cl_timeout_secs: u64, + ) { + self.move_blocks(1); + let best_hash = self.controller.get_best_block_hash(); + if !self.wait_for_chainlocked_block(&best_hash, cl_timeout_secs) { + warn!("ChainLock for {} not received within {}s", best_hash, cl_timeout_secs); + } + let height = self + .controller + .try_rpc_call("getblockcount", &[]) + .and_then(|v| v.as_u64()) + .expect("getblockcount") as u32; + on_block_mined(&self.controller, height); + } + + /// Mine a complete DKG cycle and return the quorum hash if successful. + /// + /// Orchestrates all 6 DKG phases, mines the commitment block, and verifies + /// the quorum appears in `quorum list`. + /// + /// In regtest, `llmq_test` (type 100, 3 members) and `llmq_test_platform` + /// (type 106) reliably produce commitments. `llmq_test_dip0024` (type 103, + /// 4 members, minSize=4) requires all masternodes to succeed and is fragile + /// in live orchestration (pre-generated data has DIP0024 quorums from + /// controlled generation). + /// + /// ChainLocks use `llmq_test` in regtest, so new DKG cycles enable ChainLock + /// signing. QRInfo for rotated quorums references the pre-generated DIP0024 + /// quorum history. + /// + /// Returns `None` if any phase times out. + /// Requires the full network to be running (controller + masternodes). + pub fn mine_dkg_cycle(&mut self) -> Option { + self.mine_dkg_cycle_with_hook(|_, _| {}) + } + + /// Mine a complete DKG cycle, invoking `on_block_mined` after every block + /// mined past the cycle alignment point. + /// + /// Blocks mined to align the chain to the next DKG cycle boundary do *not* + /// trigger the hook — alignment is an implementation detail. The hook is + /// called for every subsequent block that advances a DKG phase, the + /// commitment block, and every maturity block. + /// + /// The hook receives a shared reference to the controller node so it can + /// issue RPC calls (for example `send_to_address`) from inside the cycle. + pub fn mine_dkg_cycle_with_hook(&mut self, mut on_block_mined: F) -> Option + where + F: FnMut(&DashCoreNode, u32), + { + assert!(!self.masternodes.is_empty(), "mine_dkg_cycle requires masternodes to be running"); + + // Both llmq_test (100) and llmq_test_dip0024 (103) share the same DKG + // interval so their cycles run simultaneously. We track phases using + // llmq_test (3 members, easier to satisfy) and then verify that the + // rotated type also produced a quorum. + let dkg_interval = self.metadata.dkg_interval as u64; + let current_height = self + .controller + .try_rpc_call("getblockcount", &[]) + .and_then(|v| v.as_u64()) + .expect("getblockcount"); + + // Align to the next DKG cycle boundary. Each alignment block waits for + // ChainLock so its CbTx carries a valid `bestCLSignature` for later + // QRInfo rotating-quorum lookups. + let remainder = current_height % dkg_interval; + if remainder != 0 { + let skip = dkg_interval - remainder; + debug!("Aligning to DKG boundary: mining {} blocks with CL wait", skip); + for _ in 0..skip { + self.mine_block_with_cl_then_notify(&mut |_, _| {}, 15); + } + } + + // The quorum hash is the best block hash at the cycle start + let quorum_hash = self.controller.get_best_block_hash(); + let quorum_hash_str = quorum_hash.to_string(); + info!("Starting DKG cycle, quorum_hash={}", quorum_hash_str); + + // Do NOT activate SPORK_21_QUORUM_ALL_CONNECTED. Upstream Dash Core + // functional tests (`p2p_instantsend.py`, `feature_llmq_is_*.py`, + // `feature_llmq_rotation.py`) all run with SPORK_21 OFF, which keeps + // `GetQuorumConnections` on the default power-of-2-gap mesh in + // `GetQuorumRelayMembers`. + // + // With SPORK_21 ON, `GetQuorumConnections` instead uses + // `DeterministicOutboundConnection` to pick an asymmetric set. With + // only 4 MNs and our fixed pre-generated proTxHashes it happens that + // one MN (the `SelectMemberForRecovery` winner) is the outbound + // initiator for all three pairs, so the other three never put it into + // their `masternodeQuorumRelayMembers` set. They never send + // `QSENDRECSIGS` to it (`CMNAuth::ProcessMessage`, + // `SetMasternodeQuorumRelayMembers`), so its `peer->m_wants_recsigs` + // stays false for every peer. When that MN recovers the input-lock + // signature, `PeerManagerImpl::RelayRecoveredSig` iterates its peers + // and finds none with `m_wants_recsigs==true`, so the recovered + // signature never reaches the other three MNs, they never run + // `TrySignInstantSendLock`, and the ISLOCK session stays stuck at a + // single sigshare. + let sporks = self.controller.try_rpc_call("spork", &["show".into()])?; + let spork23_active = sporks + .get("SPORK_23_QUORUM_POSE") + .and_then(|v| v.as_i64()) + .is_some_and(|value| value <= 1); + + // Per-MN expected counts: + // - `LLMQ_TEST` is non-rotating, size 3 on regtest, so 3 members. + // - `LLMQ_TEST_DIP0024` is rotating, size 4 on regtest, so 4 members + // for each of the 2 rotating quorum indices (q_0 and q_1). + // With SPORK_21 OFF, each member establishes the power-of-2-gap count + // of outbound quorum connections (2 per member). + let expected_connections = 2; + let expected_members_test = 3; + let expected_members_dip24 = 4; + + if !self.wait_for_quorum_connections( + "llmq_test", + &quorum_hash_str, + expected_connections, + expected_members_test, + 60, + ) { + warn!("llmq_test quorum connections timeout"); + return None; + } + if !self.wait_for_quorum_connections( + "llmq_test_dip0024", + &quorum_hash_str, + expected_connections, + expected_members_dip24, + 60, + ) { + warn!("llmq_test_dip0024 quorum connections timeout"); + return None; + } + if spork23_active { + if !self.wait_for_masternode_probes("llmq_test", &quorum_hash_str, 30) { + warn!("llmq_test masternode probe timeout"); + return None; + } + if !self.wait_for_masternode_probes("llmq_test_dip0024", &quorum_hash_str, 30) { + warn!("llmq_test_dip0024 masternode probe timeout"); + return None; + } + } + + let q0_hash_str = quorum_hash_str.clone(); + if !self.wait_for_quorum_phase( + "llmq_test", + &q0_hash_str, + 1, + expected_members_test, + None, + 0, + 30, + ) { + warn!("DKG phase 1 (llmq_test) timeout"); + return None; + } + if !self.wait_for_quorum_phase( + "llmq_test_dip0024", + &q0_hash_str, + 1, + expected_members_dip24, + None, + 0, + 30, + ) { + warn!("DKG phase 1 (llmq_test_dip0024 q_0) timeout"); + return None; + } + + self.mine_block_then_notify(&mut on_block_mined); + let q1_hash = self.controller.get_best_block_hash(); + let q1_hash_str = q1_hash.to_string(); + + if !self.wait_for_quorum_phase( + "llmq_test_dip0024", + &q1_hash_str, + 1, + expected_members_dip24, + None, + 0, + 30, + ) { + warn!("DKG phase 1 (llmq_test_dip0024 q_1) timeout"); + return None; + } + if !self.wait_for_quorum_connections( + "llmq_test_dip0024", + &q1_hash_str, + expected_connections, + expected_members_dip24, + 30, + ) { + warn!("llmq_test_dip0024 q_1 connections timeout"); + return None; + } + if spork23_active && !self.wait_for_masternode_probes("llmq_test_dip0024", &q1_hash_str, 30) + { + warn!("llmq_test_dip0024 q_1 masternode probe timeout"); + return None; + } + + self.mine_block_then_notify(&mut on_block_mined); + + // Phases 2-6: interleave q_0 and q_1 with one block between each wait. + // LLMQ_TEST is tracked alongside q_0 only (non-rotating, single session). + let phase_checks: [(u64, Option<&str>, u64, u64); 5] = [ + (2, Some("receivedContributions"), 0, 30), + (3, Some("receivedComplaints"), 0, 30), + (4, Some("receivedJustifications"), 0, 30), + (5, Some("receivedPrematureCommitments"), 0, 30), + (6, None, 0, 45), + ]; + for (phase, field, _count, timeout_secs) in phase_checks { + let dip24_count = match phase { + 2 | 5 => expected_members_dip24 as u64, + _ => 0, + }; + let test_count = match phase { + 2 | 5 => expected_members_test as u64, + _ => 0, + }; + + if phase < 6 + && !self.wait_for_quorum_phase( + "llmq_test", + &q0_hash_str, + phase, + expected_members_test, + field, + test_count, + timeout_secs, + ) + { + warn!("DKG phase {} (llmq_test) timeout", phase); + return None; + } + if phase == 6 + && !self.wait_for_quorum_phase( + "llmq_test", + &q0_hash_str, + phase, + expected_members_test, + None, + 0, + timeout_secs, + ) + { + warn!("DKG phase 6 (llmq_test) timeout"); + return None; + } + if !self.wait_for_quorum_phase( + "llmq_test_dip0024", + &q0_hash_str, + phase, + expected_members_dip24, + field, + dip24_count, + timeout_secs, + ) { + warn!("DKG phase {} (llmq_test_dip0024 q_0) timeout", phase); + return None; + } + self.mine_block_then_notify(&mut on_block_mined); + + if !self.wait_for_quorum_phase( + "llmq_test_dip0024", + &q1_hash_str, + phase, + expected_members_dip24, + field, + dip24_count, + timeout_secs, + ) { + warn!("DKG phase {} (llmq_test_dip0024 q_1) timeout", phase); + return None; + } + self.mine_block_then_notify(&mut on_block_mined); + } + + // Commitments are mined into phase blocks once the mining window opens. Polling + // the minable-commitment queue would miss them, so mining one more block here + // forces any pending commitment on-chain. + self.bump_mocktime(1); + self.controller.get_block_template(); + self.mine_block_then_notify(&mut on_block_mined); + + if !self.wait_for_quorum_list_count("llmq_test", 1, 15) { + warn!("LLMQ_TEST quorum not in list after mining commitment"); + return None; + } + + if !self.wait_for_quorum_list_count("llmq_test_dip0024", 2, 15) { + warn!("LLMQ_TEST_DIP0024 rotating quorums not all mined (DKG null commitment)"); + return None; + } + + // Mine maturity blocks past this cycle's DKG mining window. Each block + // waits for its ChainLock so the CbTx carries a valid + // `bestCLSignature` for next-cycle rotating-quorum lookups. + let dkg_interval = self.metadata.dkg_interval; + let cycle_start = (self + .controller + .try_rpc_call("getblockcount", &[]) + .and_then(|v| v.as_u64()) + .unwrap_or(0) as u32 + / dkg_interval) + * dkg_interval; + // LLMQ_TEST_DIP0024 mining window end, sourced from the params struct + // so an `-llmqtestparams` regtest override would be honoured. + let mining_window_end = cycle_start + LLMQ_TEST_DIP00024.dkg_params.mining_window_end; + for _ in 0..8 { + self.mine_block_with_cl_then_notify(&mut on_block_mined, 15); + } + while let Some(h) = self + .controller + .try_rpc_call("getblockcount", &[]) + .and_then(|v| v.as_u64()) + .map(|v| v as u32) + { + if h > mining_window_end { + break; + } + self.mine_block_with_cl_then_notify(&mut on_block_mined, 15); + } + + info!("DKG cycle complete: quorum_hash={}", quorum_hash_str); + Some(quorum_hash) + } + + /// Wait for a specific block to become ChainLocked on the controller. + pub fn wait_for_chainlocked_block( + &mut self, + block_hash: &BlockHash, + timeout_secs: u64, + ) -> bool { + let start = Instant::now(); + let timeout = Duration::from_secs(timeout_secs); + let block_hash_str = block_hash.to_string(); + + let mut poll_iter: u32 = 0; + while start.elapsed() < timeout { + if let Some(block) = + self.controller.try_rpc_call("getblock", &[block_hash_str.clone().into()]) + { + let confirmed = block.get("confirmations").and_then(|v| v.as_i64()).unwrap_or(0); + let chainlocked = block.get("chainlock").and_then(|v| v.as_bool()).unwrap_or(false); + if confirmed > 0 && chainlocked { + return true; + } + } + // Bump mocktime every 10 polls (~1s) to nudge the CL scheduler without + // over-advancing the signing-session clock. + if poll_iter.is_multiple_of(5) { + self.bump_mocktime(1); + } + poll_iter += 1; + thread::sleep(Duration::from_millis(100)); + } + false + } + + /// Mine blocks and wait for each newly mined block to become ChainLocked. + /// + /// Mining one block at a time matches Dash Core's own functional tests more + /// closely than mining a batch and polling `getbestchainlock`, and avoids + /// depending on an RPC that returns an error before the first ChainLock exists. + pub fn mine_blocks_and_wait_for_chainlock( + &mut self, + block_count: u64, + timeout_secs: u64, + ) -> Option { + let mut last_chainlocked_height = None; + + for _ in 0..block_count { + self.bump_mocktime(1); + let addr = self.controller.get_new_address(); + let block_hash = self + .controller + .generate_blocks(1, &addr) + .into_iter() + .next() + .expect("generated block hash"); + self.wait_for_sync(); + + if self.wait_for_chainlocked_block(&block_hash, timeout_secs) { + let height = self + .controller + .try_rpc_call("getblock", &[block_hash.to_string().into()]) + .and_then(|b| b.get("height").and_then(|h| h.as_u64())) + .map(|h| h as u32) + .expect("getblock height"); + info!("ChainLock found at height {}", height); + last_chainlocked_height = Some(height); + } + } + + last_chainlocked_height + } +} + +impl Drop for MasternodeTestContext { + fn drop(&mut self) { + retain_test_dir(self.controller.datadir(), "controller"); + for (idx, masternode) in self.masternodes.iter().enumerate() { + let label = format!("masternode-{}-{}", idx, masternode.rpc_port()); + retain_test_dir(masternode.datadir(), &label); + } + } +} + +/// Connect each masternode to the controller only. Intra-quorum (MN↔MN) +/// connections are established by dashd's own `EnsureQuorumConnections` logic +/// after quorum formation, matching Dash Core's functional test framework +/// (`test/functional/test_framework/test_framework.py`, comment: "masternodes +/// should take care of intra-quorum connections themselves"). +/// +/// Pre-wiring MN↔MN via `addnode` here is harmful: those manual connections +/// bypass the `EnsureQuorumConnections` → `SetMasternodeQuorumRelayMembers` → +/// `QSENDRECSIGS` handshake, so `peer->m_wants_recsigs` stays false on the +/// inbound side. `PeerManagerImpl::RelayRecoveredSig(proactive=true)` then +/// skips those peers, and recovered input-lock signatures never propagate to +/// every quorum member, which starves ISLOCK signing of shares. +async fn connect_all_nodes(controller: &DashCoreNode, masternodes: &[DashCoreNode]) { + let controller_p2p: Value = format!("127.0.0.1:{}", controller.p2p_port()).into(); + + for mn in masternodes { + mn.try_rpc_call("addnode", &[controller_p2p.clone(), "add".into()]); + } + + let expected_peers = masternodes.len(); + for _ in 0..15 { + time::sleep(Duration::from_secs(2)).await; + let count = controller + .try_rpc_call("getpeerinfo", &[]) + .and_then(|v| v.as_array().map(|a| a.len())) + .unwrap_or(0); + if count >= expected_peers { + info!("Controller has {} peers connected", count); + return; + } + } + let count = controller + .try_rpc_call("getpeerinfo", &[]) + .and_then(|v| v.as_array().map(|a| a.len())) + .unwrap_or(0); + panic!( + "connect_all_nodes: controller has {} peer(s) after waiting, expected {}. \ + Tests downstream assume the controller is connected to every masternode \ + and would silently flake if we proceeded with a partial mesh.", + count, expected_peers + ); +} + +/// Update each masternode's registered service address to match its actual P2P port. +/// +/// The proTx entries from generation reference the original ports. After restarting +/// with different ports, we need to call `protx update_service` so that dashd can +/// establish quorum connections between masternodes at the correct addresses. +fn update_mn_service_addresses( + controller: &DashCoreNode, + masternodes: &[DashCoreNode], + metadata: &NetworkMetadata, +) { + // Wallet-aware client needed because protx update_service sends a transaction + let url = + format!("http://127.0.0.1:{}/wallet/{}", controller.rpc_port(), metadata.controller.wallet); + let cookie_path = controller.datadir().join("regtest/.cookie"); + let client = Client::new(&url, Auth::CookieFile(cookie_path)).expect("rpc client"); + + let mut failures = Vec::new(); + for (mn, mn_info) in masternodes.iter().zip(metadata.masternodes.iter()) { + let new_addr = format!("127.0.0.1:{}", mn.p2p_port()); + info!("Updating {} service address to {}", mn_info.datadir, new_addr); + + let result: Result = client.call( + "protx", + &[ + "update_service".into(), + mn_info.pro_tx_hash.clone().into(), + new_addr.into(), + mn_info.bls_private_key.clone().into(), + ], + ); + if let Err(e) = result { + failures.push(format!("{}: {}", mn_info.datadir, e)); + } + } + assert!( + failures.is_empty(), + "protx update_service failed for {} masternode(s): {:?}", + failures.len(), + failures + ); + + // Mine a block to confirm the update transactions + let addr = controller.get_new_address(); + controller.generate_blocks(1, &addr); + info!("Service addresses updated and confirmed"); +} diff --git a/dash-spv/src/test_utils/mod.rs b/dash-spv/src/test_utils/mod.rs index 1ae7b8a89..61acedda0 100644 --- a/dash-spv/src/test_utils/mod.rs +++ b/dash-spv/src/test_utils/mod.rs @@ -5,9 +5,11 @@ mod context; mod event_handler; mod filter; mod fs_helpers; +pub(crate) mod masternode_network; mod network; mod node; mod types; +mod wallet; use std::time::Duration; @@ -17,7 +19,12 @@ pub const SYNC_TIMEOUT: Duration = Duration::from_secs(180); pub use context::DashdTestContext; pub use event_handler::TestEventHandler; pub use fs_helpers::retain_test_dir; +pub use masternode_network::MasternodeTestContext; pub use network::{test_socket_address, MockNetworkManager}; pub use node::{DashCoreNode, TestChain, WalletFile}; +pub use wallet::{ + create_test_wallet, default_test_account_options, init_test_logging, + next_unused_receive_address, +}; pub(crate) use node::DashCoreConfig; diff --git a/dash-spv/src/test_utils/network.rs b/dash-spv/src/test_utils/network.rs index 1eff5aecf..5ac883944 100644 --- a/dash-spv/src/test_utils/network.rs +++ b/dash-spv/src/test_utils/network.rs @@ -47,7 +47,7 @@ impl MockNetworkManager { sent_messages: Vec::new(), request_tx, request_rx: Some(request_rx), - network_event_sender: broadcast::Sender::new(100), + network_event_sender: broadcast::Sender::new(100000), } } diff --git a/dash-spv/src/test_utils/node.rs b/dash-spv/src/test_utils/node.rs index 829697c43..aaf46574b 100644 --- a/dash-spv/src/test_utils/node.rs +++ b/dash-spv/src/test_utils/node.rs @@ -6,6 +6,7 @@ use dashcore::{Address, Amount, BlockHash, Transaction, Txid}; use dashcore_rpc::json as rpc_json; use dashcore_rpc::{Auth, Client, RpcApi}; use serde::Deserialize; +use serde_json::Value; use std::collections::HashMap; use std::fs; use std::net::SocketAddr; @@ -22,7 +23,7 @@ static NEXT_PORT: AtomicU16 = AtomicU16::new(19400); const MAX_PORT_ATTEMPTS: usize = 100; /// Allocate a unique, available TCP port for test use. -fn find_available_port() -> u16 { +pub(super) fn find_available_port() -> u16 { for _ in 0..MAX_PORT_ATTEMPTS { let port = NEXT_PORT.fetch_add(1, Ordering::Relaxed); assert!(port >= 1024, "port counter overflowed"); @@ -63,6 +64,7 @@ pub struct DashCoreConfig { pub p2p_port: u16, /// RPC port for the node pub rpc_port: u16, + pub extra_args: Vec, } impl DashCoreConfig { @@ -97,8 +99,14 @@ impl DashCoreConfig { wallet: "default".to_string(), p2p_port: find_available_port(), rpc_port: find_available_port(), + extra_args: Vec::new(), }) } + + pub fn with_extra_args(mut self, args: Vec) -> Self { + self.extra_args.extend(args); + self + } } /// Test infrastructure for managing a Dash Core node. @@ -126,7 +134,7 @@ impl DashCoreNode { fs::create_dir_all(&self.config.datadir).expect("failed to create datadir"); - let args_vec = vec![ + let mut args_vec = vec![ "-regtest".to_string(), format!("-datadir={}", self.config.datadir.display()), format!("-port={}", self.config.p2p_port), @@ -147,8 +155,11 @@ impl DashCoreNode { "-peerbloomfilters=1".to_string(), "-whitelist=127.0.0.1".to_string(), "-debug=all".to_string(), - format!("-wallet={}", self.config.wallet), ]; + if !self.config.wallet.is_empty() { + args_vec.push(format!("-wallet={}", self.config.wallet)); + } + args_vec.extend(self.config.extra_args.iter().cloned()); let mut cmd = tokio::process::Command::new(&self.config.dashd_path); cmd.args(&args_vec) @@ -257,6 +268,10 @@ impl DashCoreNode { } } + pub fn get_new_address(&self) -> Address { + self.get_new_address_from_wallet(&self.config.wallet) + } + /// Get a new address from a specific dashd wallet. pub fn get_new_address_from_wallet(&self, wallet_name: &str) -> Address { let client = self.rpc_client_for_wallet(wallet_name); @@ -435,6 +450,23 @@ impl DashCoreNode { tracing::info!("Set network active={} on dashd", active); } + /// Set mock time on this node. + pub fn set_mocktime(&self, time: u64) { + let client = self.rpc_client(); + let _: Value = client.call("setmocktime", &[time.into()]).expect("setmocktime failed"); + } + + pub fn get_best_block_hash(&self) -> BlockHash { + let client = self.rpc_client(); + client.get_best_block_hash().expect("getbestblockhash failed") + } + + /// Call getblocktemplate to trigger CreateNewBlock (includes quorum commitments). + pub fn get_block_template(&self) { + let client = self.rpc_client(); + let _: Result = client.call("getblocktemplate", &[]); + } + /// Disconnect all currently connected peers. pub fn disconnect_all_peers(&self) { let client = self.rpc_client(); @@ -446,6 +478,33 @@ impl DashCoreNode { } tracing::info!("Disconnected {} peers", peers.len()); } + + /// Execute an RPC call, returning None on failure instead of panicking. + /// + /// Uses the base URL (no wallet path) which works for all non-wallet RPCs. + /// Useful during DKG orchestration where transient failures are expected. + pub fn try_rpc_call(&self, method: &str, params: &[serde_json::Value]) -> Option { + let url = format!("http://127.0.0.1:{}", self.config.rpc_port); + let cookie_path = self.config.datadir.join("regtest/.cookie"); + if !cookie_path.exists() { + return None; + } + let auth = Auth::CookieFile(cookie_path); + let client = Client::new(&url, auth).ok()?; + client.call(method, params).ok() + } + + pub fn datadir(&self) -> &Path { + &self.config.datadir + } + + pub fn p2p_port(&self) -> u16 { + self.config.p2p_port + } + + pub fn rpc_port(&self) -> u16 { + self.config.rpc_port + } } impl Drop for DashCoreNode { diff --git a/dash-spv/src/test_utils/wallet.rs b/dash-spv/src/test_utils/wallet.rs new file mode 100644 index 000000000..17ef5df59 --- /dev/null +++ b/dash-spv/src/test_utils/wallet.rs @@ -0,0 +1,81 @@ +//! Shared wallet/logging helpers for integration tests. + +use std::collections::BTreeSet; +use std::env; +use std::path::PathBuf; +use std::sync::Arc; + +use key_wallet::managed_account::managed_account_trait::ManagedAccountTrait; +use key_wallet::managed_account::managed_account_type::ManagedAccountType; +use key_wallet::wallet::initialization::WalletAccountCreationOptions; +use key_wallet::wallet::managed_wallet_info::wallet_info_interface::WalletInfoInterface; +use key_wallet::wallet::managed_wallet_info::ManagedWalletInfo; +use key_wallet_manager::{WalletId, WalletManager}; +use tokio::sync::RwLock; +use tracing::level_filters::LevelFilter; + +use crate::logging::{init_logging, LogFileConfig, LoggingConfig, LoggingGuard}; +use crate::Network; + +/// Account creation options for tests: a single standard BIP44 account 0. +pub fn default_test_account_options() -> WalletAccountCreationOptions { + WalletAccountCreationOptions::SpecificAccounts( + BTreeSet::from([0]), + BTreeSet::new(), + BTreeSet::new(), + BTreeSet::new(), + BTreeSet::new(), + None, + ) +} + +/// Create a test wallet from a BIP39 mnemonic. +pub fn create_test_wallet( + mnemonic: &str, + network: Network, +) -> (Arc>>, WalletId) { + let mut wallet_manager = WalletManager::::new(network); + let wallet_id = wallet_manager + .create_wallet_from_mnemonic(mnemonic, 0, default_test_account_options()) + .expect("Failed to create wallet from mnemonic"); + (Arc::new(RwLock::new(wallet_manager)), wallet_id) +} + +/// Return the next unused BIP44 account-0 external address from the wallet. +pub async fn next_unused_receive_address( + wallet: &Arc>>, + wallet_id: &WalletId, +) -> dashcore::Address { + let wallet_read = wallet.read().await; + let wallet_info = wallet_read.get_wallet_info(wallet_id).expect("Wallet info not found"); + let account = + wallet_info.accounts().standard_bip44_accounts.get(&0).expect("BIP44 account 0 not found"); + let ManagedAccountType::Standard { + external_addresses, + .. + } = account.managed_account_type() + else { + panic!("Account 0 is not a Standard account type"); + }; + external_addresses + .unused_addresses() + .into_iter() + .next() + .expect("No unused receive address available") +} + +/// Initialize per-test thread-local logging into the given directory. +/// +/// Honors `DASHD_TEST_LOG` to additionally enable console output. +pub fn init_test_logging(log_dir: PathBuf) -> LoggingGuard { + init_logging(LoggingConfig { + level: Some(LevelFilter::DEBUG), + console: env::var("DASHD_TEST_LOG").is_ok(), + file: Some(LogFileConfig { + log_dir, + max_files: 1, + }), + thread_local: true, + }) + .expect("Failed to initialize test logging") +} diff --git a/dash-spv/tests/dashd_masternode/helpers.rs b/dash-spv/tests/dashd_masternode/helpers.rs new file mode 100644 index 000000000..c3cdb5cfb --- /dev/null +++ b/dash-spv/tests/dashd_masternode/helpers.rs @@ -0,0 +1,398 @@ +use dash_spv::sync::{MasternodesProgress, SyncEvent, SyncProgress, SyncState}; +use dashcore::ephemerealdata::instant_lock::InstantLock; +use dashcore::sml::llmq_entry_verification::LLMQEntryVerificationStatus; +use dashcore::sml::masternode_list_engine::MasternodeListEngine; +use dashcore::Txid; +use key_wallet::transaction_checking::TransactionContext; +use key_wallet_manager::WalletEvent; +use std::time::Duration; +use tokio::sync::{broadcast, watch}; +use tokio::time; + +use super::setup::{TestContext, SYNC_TIMEOUT}; + +/// Mine a DKG cycle and wait for the SPV to surface a `MasternodeStateUpdated` +/// event above `baseline_height`. +pub(super) async fn mine_dkg_cycle_and_wait( + ctx: &mut TestContext, + sync_event_receiver: &mut broadcast::Receiver, + baseline_height: u32, +) -> u32 { + ctx.mn_ctx.mine_dkg_cycle().expect("DKG cycle should succeed"); + wait_for_mn_state_event_above(sync_event_receiver, baseline_height, SYNC_TIMEOUT).await +} + +/// Assert every rotated quorum across all stored cycles is `Verified`. +pub(super) fn assert_all_rotated_quorums_verified(engine: &MasternodeListEngine) { + for (cycle_key, cycle_quorums) in &engine.rotated_quorums_per_cycle { + for (idx, entry) in cycle_quorums { + assert!( + matches!(entry.verified, LLMQEntryVerificationStatus::Verified), + "Rotated quorum (cycle_key={}, idx={}, hash={}) should be Verified, got {}", + cycle_key, + idx, + entry.quorum_entry.quorum_hash, + entry.verified + ); + } + } +} + +/// Wait for masternode sync to reach Synced state. +pub(super) async fn wait_for_masternode_sync( + progress_receiver: &mut watch::Receiver, + timeout_secs: u64, +) -> MasternodesProgress { + { + let progress = progress_receiver.borrow_and_update(); + if let Ok(mn_progress) = progress.masternodes() { + if mn_progress.state() == SyncState::Synced { + tracing::info!( + "Masternode sync already complete at height {}", + mn_progress.current_height() + ); + return mn_progress.clone(); + } + } + } + + let timeout = time::sleep(Duration::from_secs(timeout_secs)); + tokio::pin!(timeout); + + loop { + tokio::select! { + _ = &mut timeout => { + let progress = progress_receiver.borrow(); + panic!( + "Timeout waiting for masternode sync. Current progress: {:?}", + progress + ); + } + result = progress_receiver.changed() => { + if result.is_err() { + panic!("Progress channel closed"); + } + let progress = progress_receiver.borrow_and_update().clone(); + + if let Ok(mn_progress) = progress.masternodes() { + if mn_progress.state() == SyncState::Synced { + tracing::info!( + "Masternode sync complete at height {}", + mn_progress.current_height() + ); + return mn_progress.clone(); + } + } + } + } + } +} + +/// Wait for the MasternodeStateUpdated sync event. +pub(super) async fn wait_for_mn_state_event( + event_receiver: &mut broadcast::Receiver, + timeout_secs: u64, +) -> u32 { + wait_for_mn_state_event_above(event_receiver, 0, timeout_secs).await +} + +/// Wait for a MasternodeStateUpdated event at a height strictly above `min_height`. +pub(super) async fn wait_for_mn_state_event_above( + event_receiver: &mut broadcast::Receiver, + min_height: u32, + timeout_secs: u64, +) -> u32 { + let timeout = time::sleep(Duration::from_secs(timeout_secs)); + tokio::pin!(timeout); + + loop { + tokio::select! { + _ = &mut timeout => { + panic!( + "Timeout waiting for MasternodeStateUpdated above height {}", + min_height + ); + } + result = event_receiver.recv() => { + match result { + Ok(SyncEvent::MasternodeStateUpdated { height, .. }) if height > min_height => { + tracing::info!("MasternodeStateUpdated at height {} (above {})", height, min_height); + return height; + } + Ok(SyncEvent::MasternodeStateUpdated { height, .. }) => { + tracing::debug!("MasternodeStateUpdated at height {} (waiting for > {})", height, min_height); + continue; + } + Ok(_) => continue, + Err(broadcast::error::RecvError::Lagged(n)) => { + tracing::warn!("Event receiver lagged by {} messages", n); + continue; + } + Err(broadcast::error::RecvError::Closed) => { + panic!("Sync event channel closed"); + } + } + } + } + } +} + +/// Wait for a `MasternodeStateUpdated` event that carries a `QRInfoFeedResult` +/// whose `stored_cycle_height` is above `min_cycle_height` and which reports +/// `all_fully_verified()` (i.e. every rotated quorum settled as `Verified` and +/// the rotation cycle was actually stored in `rotated_quorums_per_cycle`). Use +/// this when a test needs the SPV to have a fully-verified rotation cycle +/// before proceeding, for example when preparing to verify a post-rotation +/// InstantSend lock. +pub(super) async fn wait_for_mn_state_with_stored_cycle_above( + event_receiver: &mut broadcast::Receiver, + min_cycle_height: u32, + timeout_secs: u64, +) -> u32 { + let timeout = time::sleep(Duration::from_secs(timeout_secs)); + tokio::pin!(timeout); + + loop { + tokio::select! { + _ = &mut timeout => { + panic!( + "Timeout waiting for MasternodeStateUpdated carrying a fully-verified rotation cycle above height {}", + min_cycle_height + ); + } + result = event_receiver.recv() => { + match result { + Ok(SyncEvent::MasternodeStateUpdated { + height, + qr_info_result: Some(ref s), + }) if s.all_fully_verified() + && matches!(s.stored_cycle_height, Some(h) if h > min_cycle_height) => + { + tracing::info!( + "MasternodeStateUpdated at height {} with fully-verified stored_cycle_height={:?}", + height, + s.stored_cycle_height, + ); + return height; + } + Ok(SyncEvent::MasternodeStateUpdated { height, qr_info_result }) => { + tracing::debug!( + "MasternodeStateUpdated at height {} (waiting for stored cycle > {}, got {:?})", + height, + min_cycle_height, + qr_info_result.as_ref().map(|r| (r.stored_cycle_height, r.fully_verified_count, r.rotated_quorum_count)), + ); + continue; + } + Ok(_) => continue, + Err(broadcast::error::RecvError::Lagged(n)) => { + tracing::warn!("Event receiver lagged by {} messages", n); + continue; + } + Err(broadcast::error::RecvError::Closed) => { + panic!("Sync event channel closed"); + } + } + } + } + } +} + +/// Wait for an `InstantLockReceived` sync event for `txid` with the desired validation state. +/// +/// Returns the received `InstantLock`. Ignores events for unrelated txids and events +/// whose `validated` flag does not match `want_validated`. +pub(super) async fn wait_for_instant_lock_received( + event_receiver: &mut broadcast::Receiver, + txid: Txid, + want_validated: bool, + timeout_secs: u64, +) -> InstantLock { + let timeout = time::sleep(Duration::from_secs(timeout_secs)); + tokio::pin!(timeout); + + loop { + tokio::select! { + _ = &mut timeout => { + panic!( + "Timeout waiting for InstantLockReceived(txid={}, validated={})", + txid, want_validated + ); + } + result = event_receiver.recv() => { + match result { + Ok(SyncEvent::InstantLockReceived { instant_lock, validated }) + if instant_lock.txid == txid && validated == want_validated => + { + tracing::info!( + "InstantLockReceived(txid={}, validated={})", + txid, validated + ); + return instant_lock; + } + Ok(SyncEvent::InstantLockReceived { instant_lock, validated }) => { + tracing::debug!( + "Ignoring InstantLockReceived(txid={}, validated={}) — waiting for txid={} validated={}", + instant_lock.txid, validated, txid, want_validated + ); + continue; + } + Ok(_) => continue, + Err(broadcast::error::RecvError::Lagged(n)) => { + tracing::warn!("Sync event receiver lagged by {} messages", n); + continue; + } + Err(broadcast::error::RecvError::Closed) => { + panic!("Sync event channel closed"); + } + } + } + } + } +} + +/// Wait for a wallet event about `txid` whose `TransactionContext` matches `pred`. +/// +/// Returns the matching context. Matches both `TransactionDetected` (first-time +/// seen, predicate runs against `record.context`) and `TransactionInstantLocked` +/// (subsequent IS-lock application, synthesized as `InstantSend(lock)` so the +/// same predicate works). +pub(super) async fn wait_for_wallet_tx_status( + event_receiver: &mut broadcast::Receiver, + txid: Txid, + pred: F, + timeout_secs: u64, +) -> TransactionContext +where + F: Fn(&TransactionContext) -> bool, +{ + let timeout = time::sleep(Duration::from_secs(timeout_secs)); + tokio::pin!(timeout); + + loop { + tokio::select! { + _ = &mut timeout => { + panic!("Timeout waiting for wallet event for txid {}", txid); + } + result = event_receiver.recv() => { + match result { + Ok(WalletEvent::TransactionDetected { record, .. }) + if record.txid == txid && pred(&record.context) => + { + tracing::info!( + "Wallet TransactionDetected(txid={}, context={})", + txid, record.context + ); + return record.context.clone(); + } + Ok(WalletEvent::TransactionInstantLocked { txid: event_txid, instant_lock, .. }) + if event_txid == txid => + { + let status = TransactionContext::InstantSend(instant_lock); + if pred(&status) { + tracing::info!( + "Wallet TransactionInstantLocked(txid={}, status={})", + txid, status + ); + return status; + } + continue; + } + Ok(other) => { + tracing::debug!("Ignoring wallet event: {}", other.description()); + continue; + } + Err(broadcast::error::RecvError::Lagged(n)) => { + tracing::warn!("Wallet event receiver lagged by {} messages", n); + continue; + } + Err(broadcast::error::RecvError::Closed) => { + panic!("Wallet event channel closed"); + } + } + } + } + } +} + +/// Wait for `InstantSendProgress::valid()` to reach at least `min_valid`. +pub(super) async fn wait_for_instantsend_valid_at_least( + progress_receiver: &mut watch::Receiver, + min_valid: u32, + timeout_secs: u64, +) { + { + let progress = progress_receiver.borrow(); + if let Ok(is_progress) = progress.instantsend() { + if is_progress.valid() >= min_valid { + return; + } + } + } + + let timeout = time::sleep(Duration::from_secs(timeout_secs)); + tokio::pin!(timeout); + + loop { + tokio::select! { + _ = &mut timeout => { + let progress = progress_receiver.borrow(); + panic!( + "Timeout waiting for InstantSendProgress.valid >= {}. Current: {:?}", + min_valid, progress.instantsend().ok() + ); + } + result = progress_receiver.changed() => { + if result.is_err() { + panic!("Progress channel closed"); + } + let progress = progress_receiver.borrow_and_update().clone(); + if let Ok(is_progress) = progress.instantsend() { + if is_progress.valid() >= min_valid { + return; + } + } + } + } + } +} + +/// Wait for validated ChainLock progress to reach at least `min_height`. +pub(super) async fn wait_for_chainlock_height_at_least( + progress_receiver: &mut watch::Receiver, + min_height: u32, + timeout_secs: u64, +) -> u32 { + { + let progress = progress_receiver.borrow(); + if let Ok(chainlock_progress) = progress.chainlocks() { + let height = chainlock_progress.best_validated_height(); + if height >= min_height { + return height; + } + } + } + + let timeout = time::sleep(Duration::from_secs(timeout_secs)); + tokio::pin!(timeout); + + loop { + tokio::select! { + _ = &mut timeout => { + panic!("Timeout waiting for ChainLock height >= {}", min_height); + } + result = progress_receiver.changed() => { + if result.is_err() { + panic!("Progress channel closed"); + } + let progress = progress_receiver.borrow_and_update().clone(); + if let Ok(chainlock_progress) = progress.chainlocks() { + let height = chainlock_progress.best_validated_height(); + if height >= min_height { + return height; + } + } + } + } + } +} diff --git a/dash-spv/tests/dashd_masternode/main.rs b/dash-spv/tests/dashd_masternode/main.rs new file mode 100644 index 000000000..214b32964 --- /dev/null +++ b/dash-spv/tests/dashd_masternode/main.rs @@ -0,0 +1,14 @@ +//! Masternode network tests using dashd. +//! +//! These tests verify SPV behavior against a pre-generated regtest masternode +//! network (1 controller + 4 masternodes with DKG cycles). +//! +//! Required environment variables: +//! - `DASHD_PATH`: path to dashd binary +//! - `DASHD_MN_DATADIR`: path to pre-generated masternode blockchain data +//! - `SKIP_DASHD_TESTS=1`: set to skip these tests + +mod helpers; +mod setup; +mod tests_instantsend; +mod tests_sync; diff --git a/dash-spv/tests/dashd_masternode/setup.rs b/dash-spv/tests/dashd_masternode/setup.rs new file mode 100644 index 000000000..0ac2ca457 --- /dev/null +++ b/dash-spv/tests/dashd_masternode/setup.rs @@ -0,0 +1,204 @@ +use std::net::SocketAddr; +use std::time::{Duration, Instant}; + +use dash_spv::error::Result as SpvResult; +use dash_spv::network::NetworkEvent; +use dash_spv::test_utils::{ + create_test_wallet, init_test_logging, next_unused_receive_address, retain_test_dir, + MasternodeTestContext, TestEventHandler, +}; +use dash_spv::{ + client::{ClientConfig, DashSpvClient}, + network::PeerNetworkManager, + storage::DiskStorageManager, + sync::{SyncEvent, SyncProgress}, + LoggingGuard, Network, +}; +use dashcore::sml::masternode_list_engine::MasternodeListEngine; +use key_wallet::wallet::managed_wallet_info::ManagedWalletInfo; +use key_wallet_manager::{WalletEvent, WalletId, WalletManager}; +use std::path::{Path, PathBuf}; +use std::sync::Arc; +use tempfile::TempDir; +use tokio::sync::{broadcast, watch, RwLock}; +use tokio::task::JoinHandle; +use tokio::time; +use tokio_util::sync::CancellationToken; + +/// Timeout for masternode sync tests (masternode sync takes longer than wallet sync). +pub(super) const SYNC_TIMEOUT: u64 = 60; + +pub(super) type TestClient = + DashSpvClient, PeerNetworkManager, DiskStorageManager>; + +pub(super) struct ClientHandle { + pub(super) client: TestClient, + pub(super) run_handle: Option>>, + pub(super) progress_receiver: watch::Receiver, + pub(super) sync_event_receiver: broadcast::Receiver, + pub(super) wallet_event_receiver: broadcast::Receiver, + pub(super) _network_event_receiver: broadcast::Receiver, + pub(super) cancel_token: CancellationToken, + pub(super) engine: Arc>, +} + +impl ClientHandle { + pub(super) async fn stop(&mut self) { + tracing::info!("Cancelling client run loop..."); + self.cancel_token.cancel(); + if let Some(handle) = self.run_handle.take() { + handle.await.expect("Run task panicked").expect("Run task returned error"); + } + } +} + +/// SPV-specific test context wrapping the masternode network infrastructure. +/// +/// Storage and blockchain directories are cleaned up on drop. +/// Set `DASHD_TEST_RETAIN_DIR` to a directory path to retain logs and storage for failed tests. +pub(super) struct TestContext { + pub(super) mn_ctx: MasternodeTestContext, + pub(super) storage_path: PathBuf, + _storage_dir: TempDir, + _log_guard: LoggingGuard, +} + +impl TestContext { + pub(super) async fn new(controller_only: bool) -> Option { + let storage_dir = TempDir::new().expect("Failed to create temp directory"); + let _log_guard = init_test_logging(storage_dir.path().join("logs")); + + let mn_ctx = MasternodeTestContext::new(controller_only).await?; + let storage_path = storage_dir.path().to_path_buf(); + + eprintln!( + "TestContext: addr={}, blocks={}, data={}", + mn_ctx.controller_addr, + mn_ctx.expected_height, + storage_path.display(), + ); + + Some(TestContext { + mn_ctx, + storage_path, + _storage_dir: storage_dir, + _log_guard, + }) + } + + pub(super) fn storage_path(&self) -> &Path { + &self.storage_path + } +} + +impl Drop for TestContext { + fn drop(&mut self) { + retain_test_dir(&self.storage_path, "spv"); + } +} + +/// Wait for the controller to confirm an IS lock for `txid`. Matches Dash +/// Core's `wait_for_instantlock` pattern from `test_framework.py`: one initial +/// mocktime bump to kick the MN scheduler, then poll `getrawtransaction` on +/// real time only. +/// +/// We must NOT advance mocktime inside the poll loop. `CSigSharesManager` runs +/// `SendMessages` and `Cleanup` on a dedicated 100ms real-time thread +/// (`HousekeepingThreadMain` in `llmq/signing_shares.cpp`), but the session +/// timeout inside `Cleanup` reads `GetTime()`, which +/// returns mocktime when set. Bumping mocktime in the poll loop expires the +/// `SESSION_NEW_SHARES_TIMEOUT` (60s mocktime) in a few real seconds, long +/// before real-time p2p can propagate enough sigshares between the 4 MNs to +/// reach threshold. +pub(super) async fn wait_for_controller_islock( + mn_ctx: &mut MasternodeTestContext, + txid: &dashcore::Txid, + timeout_secs: u64, +) { + mn_ctx.bump_mocktime(30); + let deadline = Instant::now() + Duration::from_secs(timeout_secs); + loop { + if let Some(raw) = mn_ctx + .controller + .try_rpc_call("getrawtransaction", &[txid.to_string().into(), 1.into()]) + { + if raw.get("instantlock").and_then(|v| v.as_bool()).unwrap_or(false) { + return; + } + } + assert!(Instant::now() < deadline, "Controller never IS-locked txid {}", txid); + time::sleep(Duration::from_millis(200)).await; + } +} + +pub(super) fn create_mn_test_config(storage_path: PathBuf, peer_addr: SocketAddr) -> ClientConfig { + let mut config = ClientConfig::regtest().with_storage_path(storage_path); + config.peers.clear(); + config.add_peer(peer_addr); + config +} + +/// Create a dummy wallet (masternode sync doesn't need real wallet data). +pub(super) fn create_dummy_wallet() -> Arc>> { + let mnemonic = "abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon about"; + let (wallet, _) = create_test_wallet(mnemonic, Network::Regtest); + wallet +} + +/// Create a wallet from the pre-generated controller wallet's mnemonic. +/// +/// Using the same mnemonic as the dashd controller wallet means the SPV wallet +/// derives the same addresses, so any `send_to_address` call routed through the +/// controller lands in the SPV wallet as well. +pub(super) fn create_wallet_from_controller( + mn_ctx: &MasternodeTestContext, +) -> (Arc>>, WalletId) { + create_test_wallet(&mn_ctx.wallet.mnemonic, Network::Regtest) +} + +pub(super) async fn receive_address( + wallet: &Arc>>, + wallet_id: &WalletId, +) -> dashcore::Address { + next_unused_receive_address(wallet, wallet_id).await +} + +pub(super) async fn create_and_start_client( + config: &ClientConfig, + wallet: Arc>>, +) -> ClientHandle { + let network_manager = + PeerNetworkManager::new(config).await.expect("Failed to create network manager"); + let storage_manager = + DiskStorageManager::new(config).await.expect("Failed to create storage manager"); + + let handler = Arc::new(TestEventHandler::new()); + let progress_receiver = handler.subscribe_progress(); + let sync_event_receiver = handler.subscribe_sync_events(); + let wallet_event_receiver = handler.subscribe_wallet_events(); + let _network_event_receiver = handler.subscribe_network_events(); + + let client = + DashSpvClient::new(config.clone(), network_manager, storage_manager, wallet, vec![handler]) + .await + .expect("Failed to create client"); + + let engine = + client.masternode_list_engine().expect("Engine should be initialized after creation"); + let cancel_token = CancellationToken::new(); + let run_token = cancel_token.clone(); + let run_client = client.clone(); + + let run_handle = tokio::task::spawn(async move { run_client.run(run_token).await }); + + ClientHandle { + client, + run_handle: Some(run_handle), + progress_receiver, + sync_event_receiver, + wallet_event_receiver, + _network_event_receiver, + cancel_token, + engine, + } +} diff --git a/dash-spv/tests/dashd_masternode/tests_instantsend.rs b/dash-spv/tests/dashd_masternode/tests_instantsend.rs new file mode 100644 index 000000000..315678131 --- /dev/null +++ b/dash-spv/tests/dashd_masternode/tests_instantsend.rs @@ -0,0 +1,306 @@ +//! InstantSend integration tests using the masternode network harness. +//! +//! These tests exercise the SPV client's InstantSend plumbing end-to-end against +//! a real dashd masternode network: `InstantSendManager` validation, the mempool +//! manager's InstantSend status propagation to the wallet, and the transition from +//! an InstantSend-locked transaction to a ChainLocked block. + +use std::sync::Arc; + +use dash_spv::sync::SyncState; +use dashcore::Amount; +use key_wallet::transaction_checking::TransactionContext; + +use super::helpers::{ + mine_dkg_cycle_and_wait, wait_for_chainlock_height_at_least, wait_for_instant_lock_received, + wait_for_instantsend_valid_at_least, wait_for_masternode_sync, + wait_for_mn_state_with_stored_cycle_above, wait_for_wallet_tx_status, +}; +use super::setup::{ + create_and_start_client, create_mn_test_config, create_wallet_from_controller, receive_address, + wait_for_controller_islock, TestContext, SYNC_TIMEOUT, +}; + +/// Full InstantSend lifecycle: send -> validated islock -> wallet IS status -> +/// chainlocked block -> wallet InChainLockedBlock status. +/// +/// Starts the full masternode network, drives a DKG cycle to form a live +/// `llmq_test` signing quorum, then sends three sequential transactions from +/// the controller wallet to the SPV wallet (same mnemonic, so addresses line +/// up). The sends are sequential rather than concurrent because regtest's +/// 4-MN quorum has trouble keeping multiple concurrent IS signing sessions +/// alive past the session timeout. For each transaction the test asserts: +/// 1. `SyncEvent::InstantLockReceived { validated: true }` fires. +/// 2. A wallet event reports `TransactionContext::InstantSend(_)` for the txid. +/// Then it mines blocks until a ChainLock is produced and asserts each tx +/// transitions to `TransactionContext::InChainLockedBlock(_)`. +#[tokio::test] +async fn test_instantsend_full_lifecycle() { + let Some(mut ctx) = TestContext::new(false).await else { + return; + }; + + let (wallet, wallet_id) = create_wallet_from_controller(&ctx.mn_ctx); + let config = + create_mn_test_config(ctx.storage_path().to_path_buf(), ctx.mn_ctx.controller_addr); + + let mut client_handle = create_and_start_client(&config, Arc::clone(&wallet)).await; + + // Initial masternode sync so the engine knows about the pre-generated quorums. + let mn_progress = + wait_for_masternode_sync(&mut client_handle.progress_receiver, SYNC_TIMEOUT).await; + assert_eq!(mn_progress.state(), SyncState::Synced); + let initial_height = mn_progress.current_height(); + tracing::info!("Initial masternode sync complete at height {}", initial_height); + + // Drive a DKG cycle so the newly formed llmq_test quorum can sign islocks and + // chainlocks for subsequent transactions. Pre-generated data alone isn't enough + // because its quorums are fixed in the past and won't produce fresh signatures. + tracing::info!("Mining DKG cycle to form a live signing quorum..."); + let post_dkg_height = + mine_dkg_cycle_and_wait(&mut ctx, &mut client_handle.sync_event_receiver, initial_height) + .await; + tracing::info!("SPV caught up to post-DKG masternode state at height {}", post_dkg_height); + + // Send transactions sequentially from the controller wallet, waiting for + // each IS lock before sending the next. Sequential sends avoid concurrent + // signing sessions on regtest's small quorum (4 MNs) where UTXO + // dependencies between txs can delay sigShare collection past the session + // timeout. + const NUM_TXS: usize = 3; + const SEND_AMOUNT: Amount = Amount::from_sat(50_000_000); + let mut txids = Vec::with_capacity(NUM_TXS); + for i in 0..NUM_TXS { + let addr = receive_address(&wallet, &wallet_id).await; + let txid = ctx.mn_ctx.controller.send_to_address(&addr, SEND_AMOUNT); + tracing::info!("Sent tx {}/{}: txid={} to {}", i + 1, NUM_TXS, txid, addr); + + wait_for_controller_islock(&mut ctx.mn_ctx, &txid, 60).await; + let lock = wait_for_instant_lock_received( + &mut client_handle.sync_event_receiver, + txid, + true, + SYNC_TIMEOUT, + ) + .await; + assert_eq!(lock.txid, txid); + tracing::info!("Tx {}/{} islocked (txid={})", i + 1, NUM_TXS, txid); + txids.push(txid); + } + + // Progress counter must reflect all validated locks. + wait_for_instantsend_valid_at_least( + &mut client_handle.progress_receiver, + NUM_TXS as u32, + SYNC_TIMEOUT, + ) + .await; + + // Each tx must surface in the wallet with an InstantSend context. The wallet + // may report it via `TransactionReceived` (first-seen) or a subsequent + // `TransactionStatusChanged`, the helper accepts either. + for (i, txid) in txids.iter().enumerate() { + let status = wait_for_wallet_tx_status( + &mut client_handle.wallet_event_receiver, + *txid, + |ctx| matches!(ctx, TransactionContext::InstantSend(_)), + SYNC_TIMEOUT, + ) + .await; + assert!(matches!(status, TransactionContext::InstantSend(_))); + tracing::info!("Tx {}/{} wallet-observed with InstantSend context", i + 1, NUM_TXS); + } + + // Mine blocks until a ChainLock is produced and propagated. After the DKG + // cycle above, the llmq_test quorum is eligible to sign chainlocks. + tracing::info!("Mining blocks and waiting for ChainLock..."); + let cl_height = ctx + .mn_ctx + .mine_blocks_and_wait_for_chainlock(3, 60) + .expect("ChainLock should be produced after DKG cycle completion"); + + // SPV client must catch up to that ChainLock. + let cl_sync_height = wait_for_chainlock_height_at_least( + &mut client_handle.progress_receiver, + cl_height, + SYNC_TIMEOUT, + ) + .await; + assert!(cl_sync_height >= cl_height); + tracing::info!("SPV synced to ChainLocked height {}", cl_sync_height); + + client_handle.stop().await; +} + +/// InstantSend works before and after a DIP-0024 quorum rotation cycle. +/// +/// Drives two DKG cycles: the first forms a signing quorum and verifies an +/// IS lock, the second performs the rotation and verifies a post-rotation +/// IS lock. This proves signing quorum availability survives the rotation. +#[tokio::test] +async fn test_instantsend_across_quorum_rotation() { + let Some(mut ctx) = TestContext::new(false).await else { + return; + }; + + let (wallet, wallet_id) = create_wallet_from_controller(&ctx.mn_ctx); + let config = + create_mn_test_config(ctx.storage_path().to_path_buf(), ctx.mn_ctx.controller_addr); + + let mut client_handle = create_and_start_client(&config, Arc::clone(&wallet)).await; + + let mn_progress = + wait_for_masternode_sync(&mut client_handle.progress_receiver, SYNC_TIMEOUT).await; + assert_eq!(mn_progress.state(), SyncState::Synced); + let initial_height = mn_progress.current_height(); + + // First DKG cycle: forms a signing quorum. + tracing::info!("Mining first DKG cycle..."); + let post_first_height = + mine_dkg_cycle_and_wait(&mut ctx, &mut client_handle.sync_event_receiver, initial_height) + .await; + tracing::info!("Post-first-DKG height {}", post_first_height); + + // Pre-rotation IS lock. + let send_addr = receive_address(&wallet, &wallet_id).await; + let send_amount = Amount::from_sat(50_000_000); + let pre_txid = ctx.mn_ctx.controller.send_to_address(&send_addr, send_amount); + tracing::info!("Pre-rotation IS tx: {}", pre_txid); + wait_for_controller_islock(&mut ctx.mn_ctx, &pre_txid, 60).await; + let _ = wait_for_instant_lock_received( + &mut client_handle.sync_event_receiver, + pre_txid, + true, + SYNC_TIMEOUT, + ) + .await; + tracing::info!("Pre-rotation IS lock verified"); + + // Second DKG cycle: rotation. The post-rotation IS lock below needs the new + // rotation cycle stored in `rotated_quorums_per_cycle`, which only happens + // when a QRInfo completes with every rotated quorum freshly validated. + // Depending on the timing race between the tip MnListDiff and the mining + // window QRInfos, the first `MasternodeStateUpdated` after rotation may come + // from either pipeline: the Incremental path (MnListDiff-only, cycle not + // yet stored) or the QuorumValidation path (cycle already stored). Bump + // mocktime to nudge the tick handler if a catch-up QRInfo is still needed, + // and wait specifically for an update carrying a freshly-validated + // post-rotation cycle. + tracing::info!("Mining second DKG cycle (rotation)..."); + ctx.mn_ctx.mine_dkg_cycle().expect("Second DKG cycle should succeed"); + ctx.mn_ctx.bump_mocktime(30); + let post_rotation_height = wait_for_mn_state_with_stored_cycle_above( + &mut client_handle.sync_event_receiver, + post_first_height, + SYNC_TIMEOUT, + ) + .await; + tracing::info!("Post-rotation height {}", post_rotation_height); + + // Post-rotation IS lock. + let post_txid = ctx.mn_ctx.controller.send_to_address(&send_addr, send_amount); + tracing::info!("Post-rotation IS tx: {}", post_txid); + wait_for_controller_islock(&mut ctx.mn_ctx, &post_txid, 60).await; + let _ = wait_for_instant_lock_received( + &mut client_handle.sync_event_receiver, + post_txid, + true, + SYNC_TIMEOUT, + ) + .await; + tracing::info!("Post-rotation IS lock verified"); + + wait_for_instantsend_valid_at_least(&mut client_handle.progress_receiver, 2, SYNC_TIMEOUT) + .await; + + client_handle.stop().await; +} + +/// InstantSend lock arrives before the SPV sees the transaction. +/// +/// Drives a DKG cycle, stops the SPV client, sends a transaction from the +/// controller, and waits for the controller to record an islock for that tx. +/// A fresh SPV client is then started against the same storage: when it +/// reconnects, the controller relays both the tx inv and the islock inv and +/// there is no ordering guarantee about which message the SPV processes first. +/// In particular, the islock may reach `InstantSendManager` before the +/// transaction reaches `MempoolManager`, exercising the `pending_is_locks` +/// path where an islock is held until the matching mempool entry arrives. +/// +/// Regardless of exact ordering, the test asserts the end-to-end outcome: a +/// validated `InstantLockReceived` event plus a wallet event reporting the +/// transaction in an `InstantSend` context. +#[tokio::test] +async fn test_instantsend_islock_arrives_before_tx() { + let Some(mut ctx) = TestContext::new(false).await else { + return; + }; + + let (wallet, wallet_id) = create_wallet_from_controller(&ctx.mn_ctx); + let config = + create_mn_test_config(ctx.storage_path().to_path_buf(), ctx.mn_ctx.controller_addr); + + // Initial run: sync MN list and form a live signing quorum, then shut down. + let mut client_handle = create_and_start_client(&config, Arc::clone(&wallet)).await; + let initial_mn = + wait_for_masternode_sync(&mut client_handle.progress_receiver, SYNC_TIMEOUT).await; + let initial_height = initial_mn.current_height(); + tracing::info!("Initial masternode sync at height {}", initial_height); + + tracing::info!("Mining DKG cycle to form a live signing quorum..."); + mine_dkg_cycle_and_wait(&mut ctx, &mut client_handle.sync_event_receiver, initial_height).await; + + tracing::info!("Stopping SPV client before sending the transaction"); + client_handle.stop().await; + drop(client_handle); + + // Send the tx and wait for the controller to record an islock for it. This + // ensures the islock exists on the network before the fresh SPV client + // reconnects, so both the tx and the islock are relayed in quick succession + // on reconnect with no guaranteed ordering. + let addr = receive_address(&wallet, &wallet_id).await; + let txid = ctx.mn_ctx.controller.send_to_address(&addr, Amount::from_sat(50_000_000)); + tracing::info!("Sent tx {} while SPV is down, waiting for controller islock...", txid); + wait_for_controller_islock(&mut ctx.mn_ctx, &txid, 60).await; + tracing::info!("Controller reports islock for tx {}", txid); + + // Reconnect the SPV client against the same storage. Reusing the storage + // keeps the previously synced masternode list so signature verification + // for the islock succeeds on first processing. + tracing::info!("Starting fresh SPV client to receive tx and islock together"); + // Bump mocktime before starting the fresh client so the MN scheduler + // relays the islock to the new peer once it connects. + ctx.mn_ctx.bump_mocktime(30); + let mut client_handle = create_and_start_client(&config, Arc::clone(&wallet)).await; + + // Wait for initial MN sync to complete on the fresh client. This populates + // rotated_quorums_per_cycle so the pending islock can be validated. + wait_for_masternode_sync(&mut client_handle.progress_receiver, SYNC_TIMEOUT).await; + + // The isdlock may have arrived before MN sync (validated=false, queued as + // pending). MasternodeStateUpdated triggers validate_pending which re-emits + // with validated=true. Bump mocktime again to ensure the scheduler fires. + ctx.mn_ctx.bump_mocktime(30); + + let lock = wait_for_instant_lock_received( + &mut client_handle.sync_event_receiver, + txid, + true, + SYNC_TIMEOUT, + ) + .await; + assert_eq!(lock.txid, txid); + + // And the wallet must observe it with an InstantSend context regardless of + // the internal ordering. + let status = wait_for_wallet_tx_status( + &mut client_handle.wallet_event_receiver, + txid, + |ctx| matches!(ctx, TransactionContext::InstantSend(_)), + SYNC_TIMEOUT, + ) + .await; + assert!(matches!(status, TransactionContext::InstantSend(_))); + + client_handle.stop().await; +} diff --git a/dash-spv/tests/dashd_masternode/tests_sync.rs b/dash-spv/tests/dashd_masternode/tests_sync.rs new file mode 100644 index 000000000..805e469ad --- /dev/null +++ b/dash-spv/tests/dashd_masternode/tests_sync.rs @@ -0,0 +1,563 @@ +//! Masternode list sync tests using dashd. +//! +//! These tests verify SPV masternode list synchronization against a pre-generated +//! regtest masternode network (1 controller + 4 masternodes with DKG cycles). + +use std::sync::Arc; + +use dash_spv::sync::{ProgressPercentage, SyncState}; +use dashcore::sml::llmq_entry_verification::LLMQEntryVerificationStatus; +use dashcore::sml::llmq_type::LLMQType; + +use super::helpers::{ + assert_all_rotated_quorums_verified, wait_for_chainlock_height_at_least, + wait_for_masternode_sync, wait_for_mn_state_event, wait_for_mn_state_event_above, + wait_for_mn_state_with_stored_cycle_above, +}; +use super::setup::{ + create_and_start_client, create_dummy_wallet, create_mn_test_config, TestContext, SYNC_TIMEOUT, +}; + +/// Sync masternode list against a pre-generated regtest controller node. +/// +/// Verifies that the SPV client can complete masternode list sync (QRInfo + MnListDiff) +/// and that the MasternodeStateUpdated event fires. +#[tokio::test] +async fn test_masternode_list_sync() { + let Some(ctx) = TestContext::new(true).await else { + return; + }; + + let expected_masternodes = ctx.mn_ctx.metadata.masternodes.len(); + let wallet = create_dummy_wallet(); + let config = + create_mn_test_config(ctx.storage_path().to_path_buf(), ctx.mn_ctx.controller_addr); + + let mut client_handle = create_and_start_client(&config, Arc::clone(&wallet)).await; + + // Wait for the MasternodeStateUpdated event + let mn_height = + wait_for_mn_state_event(&mut client_handle.sync_event_receiver, SYNC_TIMEOUT).await; + assert!(mn_height > 0, "Masternode state height should be positive"); + + // Wait for full masternode sync + let mn_progress = + wait_for_masternode_sync(&mut client_handle.progress_receiver, SYNC_TIMEOUT).await; + + assert_eq!(mn_progress.state(), SyncState::Synced, "Masternode sync should reach Synced state"); + assert!(mn_progress.current_height() > 0, "Masternode sync height should be positive"); + tracing::info!( + "Masternode sync verified: state={:?}, height={}, diffs={}", + mn_progress.state(), + mn_progress.current_height(), + mn_progress.diffs_processed() + ); + + // The engine must hold a masternode list whose entry count matches the + // pre-generated network metadata. A successful sync that ends with an + // empty engine indicates the MnListDiff/QRInfo plumbing dropped data. + { + let engine = client_handle.engine.read().await; + let latest_list = engine.latest_masternode_list().expect("Should have a masternode list"); + assert!( + !latest_list.masternodes.is_empty(), + "Engine should have at least one masternode after sync" + ); + assert_eq!( + latest_list.masternodes.len(), + expected_masternodes, + "Engine masternode count {} should match pre-generated metadata count {}", + latest_list.masternodes.len(), + expected_masternodes, + ); + } + + client_handle.stop().await; + + let final_progress = client_handle.client.sync_progress().await; + + // Headers should also be synced + let header_height = final_progress.headers().unwrap().current_height(); + assert!( + header_height >= ctx.mn_ctx.expected_height, + "Headers should sync to at least expected height: got {}, expected {}", + header_height, + ctx.mn_ctx.expected_height + ); +} + +/// Sync masternode list, stop, restart with same storage, verify incremental sync. +#[tokio::test] +async fn test_masternode_list_sync_with_restart() { + let Some(ctx) = TestContext::new(true).await else { + return; + }; + + let wallet = create_dummy_wallet(); + let config = + create_mn_test_config(ctx.storage_path().to_path_buf(), ctx.mn_ctx.controller_addr); + + // First sync + tracing::info!("=== Starting first masternode sync ==="); + let mut client_handle = create_and_start_client(&config, Arc::clone(&wallet)).await; + let first_mn_progress = + wait_for_masternode_sync(&mut client_handle.progress_receiver, SYNC_TIMEOUT).await; + let first_height = first_mn_progress.current_height(); + client_handle.stop().await; + drop(client_handle); + + // Restart with same storage + tracing::info!("=== Restarting with same storage ==="); + let mut client_handle = create_and_start_client(&config, Arc::clone(&wallet)).await; + let second_mn_progress = + wait_for_masternode_sync(&mut client_handle.progress_receiver, SYNC_TIMEOUT).await; + let second_height = second_mn_progress.current_height(); + + assert_eq!( + second_height, first_height, + "Masternode sync height should be identical after restart" + ); + assert_eq!( + second_mn_progress.state(), + SyncState::Synced, + "Should reach Synced state after restart" + ); + + tracing::info!( + "Restart verified: first_height={}, second_height={}", + first_height, + second_height + ); + + client_handle.stop().await; +} + +/// Sync to pre-generated height, generate new blocks, verify incremental update. +/// +/// Exercises the SPV's incremental masternode-list update path when new headers +/// arrive. Only needs the controller as a peer since this path does not depend +/// on live masternodes, ChainLocks, or IS signing. +#[tokio::test] +async fn test_masternode_list_sync_with_new_blocks() { + let Some(ctx) = TestContext::new(true).await else { + return; + }; + + let initial_height = ctx.mn_ctx.expected_height; + let wallet = create_dummy_wallet(); + let config = + create_mn_test_config(ctx.storage_path().to_path_buf(), ctx.mn_ctx.controller_addr); + + let mut client_handle = create_and_start_client(&config, Arc::clone(&wallet)).await; + + // Wait for initial masternode sync + let mn_progress = + wait_for_masternode_sync(&mut client_handle.progress_receiver, SYNC_TIMEOUT).await; + assert_eq!(mn_progress.state(), SyncState::Synced); + tracing::info!( + "Initial sync complete at height {}, generating new blocks...", + mn_progress.current_height() + ); + + // Generate new blocks on the controller + let blocks_to_generate = 10; + let addr = ctx.mn_ctx.controller.get_new_address(); + ctx.mn_ctx.controller.generate_blocks(blocks_to_generate, &addr); + + let expected_new_height = initial_height + blocks_to_generate as u32; + tracing::info!( + "Generated {} blocks, waiting for SPV update to height {}", + blocks_to_generate, + expected_new_height + ); + + // Wait for the SPV client to sync to the expected height + let updated_height = wait_for_mn_state_event_above( + &mut client_handle.sync_event_receiver, + expected_new_height - 1, + SYNC_TIMEOUT, + ) + .await; + + assert!( + updated_height >= expected_new_height, + "Updated height {} should be >= expected {}", + updated_height, + expected_new_height + ); + + tracing::info!( + "Incremental update verified: initial={}, updated={}", + initial_height, + updated_height + ); + + client_handle.stop().await; +} + +/// Mine multiple DKG cycles while the SPV client is connected and verify it keeps up. +/// +/// Starts the full masternode network, syncs to the pre-generated height, then +/// orchestrates 3 complete DKG cycles (6 phases + commitment each). After each +/// cycle, verifies the SPV client receives a MasternodeStateUpdated event at +/// the new height. +#[tokio::test] +async fn test_masternode_list_sync_with_quorum_rotation() { + let Some(mut ctx) = TestContext::new(false).await else { + return; + }; + + let wallet = create_dummy_wallet(); + let config = + create_mn_test_config(ctx.storage_path().to_path_buf(), ctx.mn_ctx.controller_addr); + + let mut client_handle = create_and_start_client(&config, Arc::clone(&wallet)).await; + + // Wait for initial masternode sync + let mn_progress = + wait_for_masternode_sync(&mut client_handle.progress_receiver, SYNC_TIMEOUT).await; + assert_eq!(mn_progress.state(), SyncState::Synced); + let mut last_height = mn_progress.current_height(); + tracing::info!("Initial sync complete at height {}", last_height); + + // Mine 3 DKG cycles and verify the SPV client keeps up after each + let num_cycles = 3; + let mut prev_stored_cycles: usize = { + let engine = client_handle.engine.read().await; + engine.rotated_quorums_per_cycle.len() + }; + for cycle in 1..=num_cycles { + tracing::info!("Starting DKG cycle {}/{}...", cycle, num_cycles); + + // Snapshot the highest stored-cycle boundary height before mining. + // The new DKG cycle's stored_cycle_height is always pre_dkg_max_cycle + + // dkg_interval, so waiting for stored_cycle_height > pre_dkg_max_cycle + // synchronizes precisely on the new cycle rather than on an early + // Incremental event that fires before the QRInfo window opens. + let pre_dkg_max_cycle: u32 = { + let engine = client_handle.engine.read().await; + engine + .rotated_quorums_per_cycle + .keys() + .filter_map(|h| engine.block_container.get_height(h)) + .max() + .unwrap_or(0) + }; + + let quorum_hash = + ctx.mn_ctx.mine_dkg_cycle().unwrap_or_else(|| panic!("DKG cycle {} failed", cycle)); + + // Wait for the SPV client to sync the new masternode state. + // Using wait_for_mn_state_with_stored_cycle_above instead of + // wait_for_mn_state_event_above to avoid returning on Incremental events + // that fire before the QRInfo window for the new cycle opens. + let updated_height = wait_for_mn_state_with_stored_cycle_above( + &mut client_handle.sync_event_receiver, + pre_dkg_max_cycle, + SYNC_TIMEOUT, + ) + .await; + + assert!( + updated_height > last_height, + "Cycle {}: updated height {} should be greater than previous {}", + cycle, + updated_height, + last_height + ); + + // After each successful DKG cycle, the stored rotation cycle count + // must not shrink and must reach at least `cycle` distinct entries. + // Then verify every stored rotated quorum is `Verified`. + let stored_cycles = { + let engine = client_handle.engine.read().await; + let stored = engine.rotated_quorums_per_cycle.len(); + assert!( + stored >= prev_stored_cycles, + "Cycle {}: rotated_quorums_per_cycle shrank from {} to {}", + cycle, + prev_stored_cycles, + stored + ); + assert!( + stored >= cycle as usize, + "Cycle {}: expected at least {} stored rotation cycles, got {}", + cycle, + cycle, + stored + ); + assert_all_rotated_quorums_verified(&engine); + stored + }; + prev_stored_cycles = stored_cycles; + + tracing::info!( + "Cycle {}/{} verified: height {} -> {}, quorum={}, stored_cycles={}", + cycle, + num_cycles, + last_height, + updated_height, + quorum_hash, + stored_cycles + ); + last_height = updated_height; + } + + client_handle.stop().await; +} + +/// Starting the SPV client *after* a freshly-mined DIP0024 cycle forces the +/// initial QRInfo to carry `tip_diff_has_rotating_quorums=true`, and a fresh +/// engine has no prior entries in `rotated_quorums_per_cycle`. Under those +/// conditions `feed_qr_info` exercises both storage paths against cycles +/// never before stored: +/// +/// - Current-cycle path stores the tip cycle (the freshly-mined one). +/// - `validate_and_store_previous_cycle_quorums` stores the cycle whose +/// quorums live on `masternode_lists[h]` (post-fix) or on +/// `masternode_lists[h-c]` (pre-fix). +/// +/// Regtest DIP0024 ships mn_lists at `[h-4c, h-3c, h-2c, h-c, h]` in the +/// QRInfo. Post-fix the previous-cycle target's reconstruction lands inside +/// that window and succeeds. Pre-fix it lands one cycle deeper than shipped +/// (`h-5c`), `find_rotated_masternodes_for_quorums` returns +/// `RequiredMasternodeListNotPresent`, and `from_validation_error` maps it to +/// `Skipped` — the outer match at `mod.rs:596` silently returns `Ok(())` +/// without storing anything. `rotated_quorums_per_cycle.len()` ends at 1 +/// (tip cycle only), not 2. +/// +/// This test catches the work-block-pick regression that +/// `test_masternode_list_sync_with_quorum_rotation` can't see because its +/// initial sync runs with tip *before* a mining window (current-cycle path +/// stores the most recent cycle, previous-cycle path lands on the same +/// already-stored cycle → both fix variants converge to `len == 1`). +#[tokio::test] +async fn test_rotated_quorums_stored_when_sync_starts_post_dkg() { + let Some(mut ctx) = TestContext::new(false).await else { + return; + }; + + // Mine one full DIP0024 cycle before the SPV client starts. The initial + // QRInfo will then have tip past the mining window but inside the same + // DKG cycle, so `mn_list_diff_tip` carries the freshly-mined cycle's + // rotating commitments. + let fresh_cycle_quorum_hash = ctx.mn_ctx.mine_dkg_cycle().expect("DKG cycle should succeed"); + tracing::info!( + "Pre-SPV DKG cycle mined, quorum_hash={}, starting SPV client…", + fresh_cycle_quorum_hash + ); + + let wallet = create_dummy_wallet(); + let config = + create_mn_test_config(ctx.storage_path().to_path_buf(), ctx.mn_ctx.controller_addr); + let mut client_handle = create_and_start_client(&config, Arc::clone(&wallet)).await; + + let mn_progress = + wait_for_masternode_sync(&mut client_handle.progress_receiver, SYNC_TIMEOUT).await; + assert_eq!(mn_progress.state(), SyncState::Synced); + + { + let engine = client_handle.engine.read().await; + let stored = engine.rotated_quorums_per_cycle.len(); + assert!( + stored >= 2, + "Initial QRInfo should store both the freshly-mined tip cycle and \ + the previous cycle from `mn_list[h]`; got {} entries. Pre-fix this \ + silently skips the previous cycle because it targets `mn_list[h-c]` \ + whose cycle needs a quarter mn_list deeper than the QRInfo ships.", + stored + ); + assert_all_rotated_quorums_verified(&engine); + let mut heights: Vec = engine + .rotated_quorums_per_cycle + .keys() + .filter_map(|h| engine.block_container.get_height(h)) + .collect(); + heights.sort_unstable(); + heights.dedup(); + assert!( + heights.len() >= 2, + "Stored cycles should map to at least 2 distinct block heights, got {:?}", + heights + ); + } + + client_handle.stop().await; +} + +/// End-to-end masternode sync test: initial sync, DKG cycle, and ChainLock. +/// +/// Starts the full masternode network with rotated quorum verification enabled. +/// After initial sync, validates the masternode list against the pre-generated +/// metadata, mines a new DKG cycle, verifies the SPV client picks up the update, +/// then mines blocks and waits for a ChainLock to propagate. +#[tokio::test] +async fn test_masternode_list_sync_end_to_end() { + let Some(mut ctx) = TestContext::new(false).await else { + return; + }; + + let expected_masternodes = ctx.mn_ctx.metadata.masternodes.len(); + let wallet = create_dummy_wallet(); + let config = + create_mn_test_config(ctx.storage_path().to_path_buf(), ctx.mn_ctx.controller_addr); + + let mut client_handle = create_and_start_client(&config, Arc::clone(&wallet)).await; + + // Wait for initial masternode sync + let mn_progress = + wait_for_masternode_sync(&mut client_handle.progress_receiver, SYNC_TIMEOUT).await; + assert_eq!(mn_progress.state(), SyncState::Synced); + let initial_height = mn_progress.current_height(); + tracing::info!("Initial sync complete at height {}", initial_height); + + // Validate MN list matches pre-generated metadata + { + let engine = client_handle.engine.read().await; + + let latest_list = engine.latest_masternode_list().expect("Should have a masternode list"); + assert_eq!( + latest_list.masternodes.len(), + expected_masternodes, + "Should have {} masternodes, got {}", + expected_masternodes, + latest_list.masternodes.len() + ); + + // Verify each pro_tx_hash from metadata is present + for mn_info in &ctx.mn_ctx.metadata.masternodes { + let pro_tx_hash: dashcore::ProTxHash = + mn_info.pro_tx_hash.parse().unwrap_or_else(|e| { + panic!("Failed to parse pro_tx_hash {}: {}", mn_info.pro_tx_hash, e) + }); + assert!( + latest_list.masternodes.contains_key(&pro_tx_hash), + "Masternode {} not found in engine's list", + mn_info.pro_tx_hash + ); + } + + // Non-rotating quorums (llmq_test, type 100) + let non_rotating_quorums = + latest_list.quorums.get(&LLMQType::LlmqtypeTest).map(|q| q.len()).unwrap_or(0); + assert!(non_rotating_quorums > 0, "Should have llmq_test (type 100) quorums"); + + let rotated_quorum_cycles = engine.rotated_quorums_per_cycle.len(); + assert!(rotated_quorum_cycles > 0, "Should have rotated quorum cycles from initial QRInfo"); + + // Every quorum in `rotated_quorums_per_cycle` must be Verified. + // That structure is the authoritative map of validated rotating + // quorums used for IS lock verification. + assert_all_rotated_quorums_verified(&engine); + tracing::info!( + "All rotated quorums across {} cycles verified", + engine.rotated_quorums_per_cycle.len() + ); + + // Non-rotating quorums in the latest MN list must be Verified. + // Older historical quorums (from previous cycles) may remain + // Unknown in `quorum_statuses` because validation only runs on + // the latest MN list; that's fine — they're no longer active. + if let Some(latest_quorums) = latest_list.quorums.get(&LLMQType::LlmqtypeTest) { + for (quorum_hash, entry) in latest_quorums { + assert!( + matches!(entry.verified, LLMQEntryVerificationStatus::Verified), + "Non-rotating quorum {} in latest MN list should be Verified, got {}", + quorum_hash, + entry.verified + ); + } + } + + tracing::info!( + "Validated: {} masternodes, {} non-rotating quorums, {} rotated quorum cycles", + latest_list.masternodes.len(), + non_rotating_quorums, + rotated_quorum_cycles, + ); + } + + // Snapshot rotated_quorums_per_cycle before the DKG cycle so we can + // assert the count grew once the SPV has fully validated the new cycle. + let prev_stored_cycles = { + let engine = client_handle.engine.read().await; + engine.rotated_quorums_per_cycle.len() + }; + + // Mine a DKG cycle and verify the SPV client picks up the update + tracing::info!("Mining DKG cycle..."); + let _quorum_hash = ctx.mn_ctx.mine_dkg_cycle().expect("DKG cycle should succeed"); + + // Wait for the newly mined DKG cycle to be fully stored and verified. + // The QRInfo for this cycle emits stored_cycle_height == initial_height (408 in + // regtest), so we gate on >= initial_height by passing initial_height - 1 as the + // lower bound. Using wait_for_mn_state_event_above here would consume this event + // before the stored-cycle check could see it, so we combine both waits into one. + // Bump mocktime to nudge the tick handler in case a catch-up QRInfo is pending. + ctx.mn_ctx.bump_mocktime(30); + let updated_height = wait_for_mn_state_with_stored_cycle_above( + &mut client_handle.sync_event_receiver, + initial_height.saturating_sub(1), + SYNC_TIMEOUT, + ) + .await; + assert!( + updated_height > initial_height, + "Post-DKG height {} should be greater than initial {}", + updated_height, + initial_height + ); + + // Verify engine has masternode list at the new height with new cycle stored + { + let engine = client_handle.engine.read().await; + let latest_list = engine.latest_masternode_list().expect("Should have a masternode list"); + assert_eq!( + latest_list.masternodes.len(), + expected_masternodes, + "MN count should remain {} after DKG", + expected_masternodes + ); + let stored = engine.rotated_quorums_per_cycle.len(); + assert!( + stored > prev_stored_cycles, + "Expected rotated_quorums_per_cycle to grow by at least 1 after DKG: \ + prev={}, got={}", + prev_stored_cycles, + stored + ); + tracing::info!( + "Post-DKG rotated quorum cycles: prev={}, now={}", + prev_stored_cycles, + stored + ); + } + tracing::info!("Post-DKG verified at height {}", updated_height); + + // Mine blocks and wait for ChainLock — required, not optional. + // After a completed DKG cycle, the llmq_test quorum should be signing ChainLocks. + tracing::info!("Mining blocks and waiting for ChainLock..."); + let cl_height = ctx + .mn_ctx + .mine_blocks_and_wait_for_chainlock(3, 60) + .expect("ChainLock should be produced after DKG cycle completion"); + + tracing::info!("ChainLock received at height {}", cl_height); + + // Wait for the SPV ChainLock manager to validate the new ChainLock. + let cl_sync_height = wait_for_chainlock_height_at_least( + &mut client_handle.progress_receiver, + cl_height, + SYNC_TIMEOUT, + ) + .await; + assert!( + cl_sync_height >= cl_height, + "SPV should sync to at least ChainLock height {}, got {}", + cl_height, + cl_sync_height + ); + tracing::info!("SPV synced to ChainLocked height {}", cl_sync_height); + + client_handle.stop().await; +} diff --git a/dash-spv/tests/dashd_sync/setup.rs b/dash-spv/tests/dashd_sync/setup.rs index 00d31c15d..16c229bb7 100644 --- a/dash-spv/tests/dashd_sync/setup.rs +++ b/dash-spv/tests/dashd_sync/setup.rs @@ -1,25 +1,25 @@ use dash_spv::client::config::MempoolStrategy; use dash_spv::network::NetworkEvent; use dash_spv::storage::{PeerStorage, PersistentPeerStorage, PersistentStorage}; -use dash_spv::test_utils::{retain_test_dir, DashdTestContext, TestChain, TestEventHandler}; +use dash_spv::test_utils::{ + create_test_wallet, init_test_logging, next_unused_receive_address, retain_test_dir, + DashdTestContext, TestChain, TestEventHandler, +}; use dash_spv::{ client::{ClientConfig, DashSpvClient}, network::PeerNetworkManager, storage::DiskStorageManager, sync::{ProgressPercentage, SyncEvent, SyncProgress}, - LevelFilter, LoggingGuard, Network, + LoggingGuard, Network, }; use dashcore::network::address::AddrV2Message; use dashcore::network::constants::ServiceFlags; use dashcore::Txid; -use key_wallet::managed_account::managed_account_trait::ManagedAccountTrait; -use key_wallet::managed_account::managed_account_type::ManagedAccountType; -use key_wallet::wallet::initialization::WalletAccountCreationOptions; use key_wallet::wallet::managed_wallet_info::wallet_info_interface::WalletInfoInterface; use key_wallet::wallet::managed_wallet_info::ManagedWalletInfo; use key_wallet_manager::WalletEvent; use key_wallet_manager::{WalletId, WalletManager}; -use std::collections::{BTreeSet, HashSet}; +use std::collections::HashSet; use std::path::PathBuf; use std::sync::Arc; use tempfile::TempDir; @@ -68,17 +68,7 @@ impl TestContext { fn create(dashd: DashdTestContext) -> Self { let storage_dir = TempDir::new().expect("Failed to create temporary directory"); - let log_dir = storage_dir.path().join("logs"); - let _log_guard = dash_spv::init_logging(dash_spv::LoggingConfig { - level: Some(LevelFilter::DEBUG), - console: std::env::var("DASHD_TEST_LOG").is_ok(), - file: Some(dash_spv::LogFileConfig { - log_dir: log_dir.clone(), - max_files: 1, - }), - thread_local: true, - }) - .expect("Failed to initialize test logging"); + let _log_guard = init_test_logging(storage_dir.path().join("logs")); let client_config = create_test_config(storage_dir.path().to_path_buf(), dashd.addr); @@ -134,29 +124,7 @@ impl TestContext { } /// Retrieves an unused receiving address from the wallet. pub(super) async fn receive_address(&self) -> dashcore::Address { - let wallet_read = self.wallet.read().await; - let wallet_info = - wallet_read.get_wallet_info(&self.wallet_id).expect("Wallet info not found"); - - let account = wallet_info - .accounts() - .standard_bip44_accounts - .get(&0) - .expect("BIP44 account 0 not found"); - - let ManagedAccountType::Standard { - external_addresses, - .. - } = account.managed_account_type() - else { - panic!("Account 0 is not a Standard account type"); - }; - - external_addresses - .unused_addresses() - .into_iter() - .next() - .expect("No unused receive address available") + next_unused_receive_address(&self.wallet, &self.wallet_id).await } /// Checks if a transaction with the specified transaction ID (`txid`) exists in the wallet. pub(super) async fn has_transaction(&self, txid: &dashcore::Txid) -> bool { @@ -353,30 +321,6 @@ pub(super) async fn create_and_start_client( } } -/// Account creation options for tests: just a standard BIP44 account 0. -pub(super) fn test_account_options() -> WalletAccountCreationOptions { - WalletAccountCreationOptions::SpecificAccounts( - BTreeSet::from([0]), - BTreeSet::new(), - BTreeSet::new(), - BTreeSet::new(), - BTreeSet::new(), - None, - ) -} - -/// Create a test wallet from mnemonic. -pub(super) fn create_test_wallet( - mnemonic: &str, - network: Network, -) -> (Arc>>, WalletId) { - let mut wallet_manager = WalletManager::::new(network); - let wallet_id = wallet_manager - .create_wallet_from_mnemonic(mnemonic, 0, test_account_options()) - .expect("Failed to create wallet from mnemonic"); - (Arc::new(RwLock::new(wallet_manager)), wallet_id) -} - /// Create test client config pointing to a specific peer (exclusive mode). fn create_test_config(storage_path: PathBuf, peer_addr: std::net::SocketAddr) -> ClientConfig { let mut config = ClientConfig::regtest().with_storage_path(storage_path).without_masternodes(); diff --git a/dash-spv/tests/dashd_sync/tests_basic.rs b/dash-spv/tests/dashd_sync/tests_basic.rs index 8126aa6a3..09ca0a8bb 100644 --- a/dash-spv/tests/dashd_sync/tests_basic.rs +++ b/dash-spv/tests/dashd_sync/tests_basic.rs @@ -11,8 +11,8 @@ use tokio::sync::RwLock; use super::helpers::{ count_wallet_transactions, get_spendable_balance, wait_for_sync, EMPTY_MNEMONIC, }; -use super::setup::{create_and_start_client, create_test_wallet, TestContext}; -use dash_spv::test_utils::TestChain; +use super::setup::{create_and_start_client, TestContext}; +use dash_spv::test_utils::{create_test_wallet, TestChain}; #[tokio::test] async fn test_wallet_sync() { diff --git a/dash-spv/tests/dashd_sync/tests_mempool.rs b/dash-spv/tests/dashd_sync/tests_mempool.rs index 14e8156d7..29c407dd5 100644 --- a/dash-spv/tests/dashd_sync/tests_mempool.rs +++ b/dash-spv/tests/dashd_sync/tests_mempool.rs @@ -3,7 +3,7 @@ use std::time::Duration; use dash_spv::client::config::MempoolStrategy; use dash_spv::network::NetworkEvent; -use dash_spv::test_utils::{DashdTestContext, TestChain}; +use dash_spv::test_utils::{create_test_wallet, DashdTestContext, TestChain}; use dashcore::Amount; use super::helpers::{ @@ -11,9 +11,7 @@ use super::helpers::{ wait_for_mempool_txs_both, wait_for_network_event, wait_for_network_event_both, wait_for_sync_both, }; -use super::setup::{ - client_has_transaction, create_and_start_client, create_test_wallet, TestContext, -}; +use super::setup::{client_has_transaction, create_and_start_client, TestContext}; const MEMPOOL_TIMEOUT: Duration = Duration::from_secs(30); diff --git a/dash-spv/tests/dashd_sync/tests_multi_wallet.rs b/dash-spv/tests/dashd_sync/tests_multi_wallet.rs index 47ea9f483..276f43cbf 100644 --- a/dash-spv/tests/dashd_sync/tests_multi_wallet.rs +++ b/dash-spv/tests/dashd_sync/tests_multi_wallet.rs @@ -14,10 +14,8 @@ use super::helpers::{ count_wallet_transactions, get_spendable_balance, wait_for_sync, wait_for_wallet_synced, EMPTY_MNEMONIC, SECONDARY_MNEMONIC, }; -use super::setup::{ - create_and_start_client, create_test_wallet, test_account_options, TestContext, -}; -use dash_spv::test_utils::TestChain; +use super::setup::{create_and_start_client, TestContext}; +use dash_spv::test_utils::{create_test_wallet, default_test_account_options, TestChain}; use key_wallet::account::ManagedAccountTrait; /// Derive a fresh BIP44 external receive address for `mnemonic` without @@ -84,7 +82,11 @@ async fn test_wallet_added_at_runtime_catches_up() { let w1_id = { let mut wallet_guard = client_handle.client.wallet().write().await; wallet_guard - .create_wallet_from_mnemonic(&ctx.dashd.wallet.mnemonic, 0, test_account_options()) + .create_wallet_from_mnemonic( + &ctx.dashd.wallet.mnemonic, + 0, + default_test_account_options(), + ) .expect("add pre-funded W1 at runtime") }; wait_for_wallet_synced(client_handle.client.wallet(), &w1_id, initial_height).await; @@ -115,7 +117,11 @@ async fn test_wallet_added_at_runtime_catches_up() { let w2_id = { let mut wallet_guard = client_handle.client.wallet().write().await; wallet_guard - .create_wallet_from_mnemonic(EMPTY_MNEMONIC, initial_height, test_account_options()) + .create_wallet_from_mnemonic( + EMPTY_MNEMONIC, + initial_height, + default_test_account_options(), + ) .expect("add W2 at runtime") }; @@ -167,7 +173,11 @@ async fn test_wallet_added_at_runtime_catches_up() { let w3_id = { let mut wallet_guard = client_handle.client.wallet().write().await; wallet_guard - .create_wallet_from_mnemonic(SECONDARY_MNEMONIC, future_height, test_account_options()) + .create_wallet_from_mnemonic( + SECONDARY_MNEMONIC, + future_height, + default_test_account_options(), + ) .expect("add W3 at runtime") }; @@ -273,13 +283,21 @@ async fn test_runtime_add_shared_block_two_wallets() { let w1_id = { let mut wallet_guard = client_handle.client.wallet().write().await; wallet_guard - .create_wallet_from_mnemonic(EMPTY_MNEMONIC, initial_height, test_account_options()) + .create_wallet_from_mnemonic( + EMPTY_MNEMONIC, + initial_height, + default_test_account_options(), + ) .expect("add W1 at runtime") }; let w2_id = { let mut wallet_guard = client_handle.client.wallet().write().await; wallet_guard - .create_wallet_from_mnemonic(SECONDARY_MNEMONIC, initial_height, test_account_options()) + .create_wallet_from_mnemonic( + SECONDARY_MNEMONIC, + initial_height, + default_test_account_options(), + ) .expect("add W2 at runtime") }; @@ -361,7 +379,11 @@ async fn test_runtime_add_during_initial_sync() { let w1_id = { let mut wallet_guard = wallet.write().await; wallet_guard - .create_wallet_from_mnemonic(&ctx.dashd.wallet.mnemonic, 0, test_account_options()) + .create_wallet_from_mnemonic( + &ctx.dashd.wallet.mnemonic, + 0, + default_test_account_options(), + ) .expect("add W1 before start") }; @@ -413,7 +435,7 @@ async fn test_runtime_add_during_initial_sync() { let w2_id = { let mut wallet_guard = client_handle.client.wallet().write().await; wallet_guard - .create_wallet_from_mnemonic(EMPTY_MNEMONIC, 0, test_account_options()) + .create_wallet_from_mnemonic(EMPTY_MNEMONIC, 0, default_test_account_options()) .expect("add W2 mid-flight") }; @@ -496,7 +518,7 @@ async fn test_runtime_add_with_tip_advance_during_rescan() { let w2_id = { let mut wallet_guard = client_handle.client.wallet().write().await; wallet_guard - .create_wallet_from_mnemonic(EMPTY_MNEMONIC, 0, test_account_options()) + .create_wallet_from_mnemonic(EMPTY_MNEMONIC, 0, default_test_account_options()) .expect("add W2 at runtime") }; diff --git a/dash-spv/tests/dashd_sync/tests_restart.rs b/dash-spv/tests/dashd_sync/tests_restart.rs index a00924241..4c043fd58 100644 --- a/dash-spv/tests/dashd_sync/tests_restart.rs +++ b/dash-spv/tests/dashd_sync/tests_restart.rs @@ -9,8 +9,8 @@ use dash_spv::Network; use super::helpers::{get_spendable_balance, is_progress_event, wait_for_sync}; use dash_spv::test_utils::SYNC_TIMEOUT; -use super::setup::{create_and_start_client, create_test_wallet, TestContext}; -use dash_spv::test_utils::TestChain; +use super::setup::{create_and_start_client, TestContext}; +use dash_spv::test_utils::{create_test_wallet, TestChain}; /// Verify sync state is identical after stopping and restarting with same storage. #[tokio::test]