diff --git a/forester/src/forester_status.rs b/forester/src/forester_status.rs index a1f94803ca..c6f745a526 100644 --- a/forester/src/forester_status.rs +++ b/forester/src/forester_status.rs @@ -179,22 +179,28 @@ pub async fn fetch_forester_status(args: &StatusArgs) -> crate::Result<()> { fetch_active_tree: false, }) .await?; - let trees = fetch_trees(&rpc).await?; + let trees = fetch_trees(&rpc) + .await? + .iter() + .sorted_by_key(|t| t.merkle_tree.to_string()) + .cloned() + .collect::>(); + if trees.is_empty() { warn!("No trees found. Exiting."); } - run_queue_info(config.clone(), trees.clone(), TreeType::StateV1).await?; - run_queue_info(config.clone(), trees.clone(), TreeType::AddressV1).await?; + run_queue_info(config.clone(), &trees, TreeType::StateV1).await?; + run_queue_info(config.clone(), &trees, TreeType::AddressV1).await?; - run_queue_info(config.clone(), trees.clone(), TreeType::StateV2).await?; - run_queue_info(config.clone(), trees.clone(), TreeType::AddressV2).await?; + run_queue_info(config.clone(), &trees, TreeType::StateV2).await?; + run_queue_info(config.clone(), &trees, TreeType::AddressV2).await?; for tree in &trees { - let tree_type = format!("[{}]", tree.tree_type); + let tree_type = format!("{}", tree.tree_type); let tree_info = get_tree_fullness(&mut rpc, tree.merkle_tree, tree.tree_type).await?; let fullness_percentage = tree_info.fullness * 100.0; println!( - "{} Tree {}: Fullness: {:.4}% | Next Index: {} | Threshold: {}", + "{} {}: Fullness: {:.4}% | Next Index: {} | Threshold: {}", tree_type, &tree.merkle_tree, format!("{:.2}%", fullness_percentage), @@ -323,9 +329,7 @@ fn print_current_forester_assignments( ); println!("Queue processors for the current light slot:"); - println!("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━"); - println!("│ Tree Type │ Tree Address │ Forester │"); - println!("┼───────────┼──────────────────────────────────────────┼──────────────────────────────────────────┤"); + println!("Tree Type\t\tTree Address\tForester"); for tree in trees { let eligible_forester_slot_index = match ForesterEpochPda::get_eligible_forester_index( @@ -337,7 +341,7 @@ fn print_current_forester_assignments( Ok(idx) => idx, Err(e) => { println!( - "│ {:9} │ {} │ ERROR: {:?} │", + "{:12}\t\t{}\tERROR: {:?}", tree.tree_type, tree.merkle_tree, e ); continue; @@ -350,17 +354,16 @@ fn print_current_forester_assignments( if let Some(forester_pda) = assigned_forester { println!( - "│ {:9} │ {} │ {} │", + "{:12}\t\t{}\t{}", tree.tree_type, tree.merkle_tree, forester_pda.authority ); } else { println!( - "│ {:9} │ {} │ UNASSIGNED (Eligible Index: {}) │", + "{:12}\t\t{}\tUNASSIGNED (Eligible Index: {})", tree.tree_type, tree.merkle_tree, eligible_forester_slot_index ); } } - println!("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━"); } else { println!( "ERROR: Could not find EpochPda for active epoch {}. Cannot determine forester assignments.", diff --git a/forester/src/lib.rs b/forester/src/lib.rs index 55cb5abef8..794a60d7f6 100644 --- a/forester/src/lib.rs +++ b/forester/src/lib.rs @@ -26,6 +26,7 @@ pub use config::{ForesterConfig, ForesterEpochInfo}; use forester_utils::{ forester_epoch::TreeAccounts, rate_limiter::RateLimiter, rpc_pool::SolanaRpcPoolBuilder, }; +use itertools::Itertools; use light_client::rpc::{LightClient, LightClientConfig, Rpc}; use light_compressed_account::TreeType; use solana_sdk::commitment_config::CommitmentConfig; @@ -37,7 +38,8 @@ use crate::{ metrics::QUEUE_LENGTH, processor::tx_cache::ProcessedHashCache, queue_helpers::{ - fetch_address_v2_queue_length, fetch_queue_item_data, fetch_state_v2_queue_length, + fetch_queue_item_data, print_address_v2_queue_info, print_state_v2_input_queue_info, + print_state_v2_output_queue_info, }, slot_tracker::SlotTracker, utils::get_protocol_config, @@ -45,7 +47,7 @@ use crate::{ pub async fn run_queue_info( config: Arc, - trees: Vec, + trees: &[TreeAccounts], queue_type: TreeType, ) -> Result<()> { let mut rpc = LightClient::new(LightClientConfig { @@ -60,49 +62,87 @@ pub async fn run_queue_info( let trees: Vec<_> = trees .iter() .filter(|t| t.tree_type == queue_type) + .sorted_by_key(|t| t.merkle_tree.to_string()) .cloned() .collect(); for tree_data in trees { - let queue_length = match tree_data.tree_type { - TreeType::StateV1 => fetch_queue_item_data( - &mut rpc, - &tree_data.queue, - 0, - STATE_NULLIFIER_QUEUE_VALUES, - STATE_NULLIFIER_QUEUE_VALUES, - ) - .await? - .len(), - TreeType::AddressV1 => fetch_queue_item_data( - &mut rpc, - &tree_data.queue, - 0, - ADDRESS_QUEUE_VALUES, - ADDRESS_QUEUE_VALUES, - ) - .await? - .len(), - TreeType::StateV2 => fetch_state_v2_queue_length(&mut rpc, &tree_data.queue).await?, - TreeType::AddressV2 => { - fetch_address_v2_queue_length(&mut rpc, &tree_data.merkle_tree).await? + match tree_data.tree_type { + TreeType::StateV1 => { + let queue_length = fetch_queue_item_data( + &mut rpc, + &tree_data.queue, + 0, + STATE_NULLIFIER_QUEUE_VALUES, + STATE_NULLIFIER_QUEUE_VALUES, + ) + .await? + .len(); + QUEUE_LENGTH + .with_label_values(&[ + &*queue_type.to_string(), + &tree_data.merkle_tree.to_string(), + ]) + .set(queue_length as i64); + + println!( + "{:?} queue {} length: {}", + queue_type, tree_data.queue, queue_length + ); } - }; + TreeType::AddressV1 => { + let queue_length = fetch_queue_item_data( + &mut rpc, + &tree_data.queue, + 0, + ADDRESS_QUEUE_VALUES, + ADDRESS_QUEUE_VALUES, + ) + .await? + .len(); + QUEUE_LENGTH + .with_label_values(&[ + &*queue_type.to_string(), + &tree_data.merkle_tree.to_string(), + ]) + .set(queue_length as i64); + + println!( + "{:?} queue {} length: {}", + queue_type, tree_data.queue, queue_length + ); + } + TreeType::StateV2 => { + println!("\n=== StateV2 {} ===", tree_data.merkle_tree); - QUEUE_LENGTH - .with_label_values(&[&*queue_type.to_string(), &tree_data.merkle_tree.to_string()]) - .set(queue_length as i64); + println!("\n1. APPEND OPERATIONS:"); + let append_unprocessed = + print_state_v2_output_queue_info(&mut rpc, &tree_data.queue).await?; - let queue_identifier = if tree_data.tree_type == TreeType::AddressV2 { - tree_data.merkle_tree.to_string() - } else { - tree_data.queue.to_string() - }; + println!("\n2. NULLIFY OPERATIONS:"); + let nullify_unprocessed = + print_state_v2_input_queue_info(&mut rpc, &tree_data.merkle_tree).await?; + + println!("===========================================\n"); - println!( - "{:?} queue {} length: {}", - queue_type, queue_identifier, queue_length - ); + QUEUE_LENGTH + .with_label_values(&["StateV2.Append", &tree_data.queue.to_string()]) + .set(append_unprocessed as i64); + + QUEUE_LENGTH + .with_label_values(&["StateV2.Nullify", &tree_data.merkle_tree.to_string()]) + .set(nullify_unprocessed as i64); + } + TreeType::AddressV2 => { + println!("\n=== AddressV2 {} ===", tree_data.merkle_tree); + let queue_length = + print_address_v2_queue_info(&mut rpc, &tree_data.merkle_tree).await?; + println!("===========================================\n"); + QUEUE_LENGTH + .with_label_values(&["AddressV2", &tree_data.merkle_tree.to_string()]) + .set(queue_length as i64); + } + }; } Ok(()) } diff --git a/forester/src/queue_helpers.rs b/forester/src/queue_helpers.rs index 8282ec120d..f6b4d05f12 100644 --- a/forester/src/queue_helpers.rs +++ b/forester/src/queue_helpers.rs @@ -1,6 +1,8 @@ use account_compression::QueueAccount; use light_batched_merkle_tree::{ - merkle_tree::BatchedMerkleTreeAccount, queue::BatchedQueueAccount, + constants::{DEFAULT_ADDRESS_ZKP_BATCH_SIZE, DEFAULT_ZKP_BATCH_SIZE}, + merkle_tree::BatchedMerkleTreeAccount, + queue::BatchedQueueAccount, }; use light_client::rpc::Rpc; use light_hash_set::HashSet; @@ -43,36 +45,200 @@ pub async fn fetch_queue_item_data( Ok(filtered_queue) } -pub async fn fetch_state_v2_queue_length( +pub async fn print_state_v2_output_queue_info( rpc: &mut R, output_queue_pubkey: &Pubkey, ) -> Result { - trace!( - "Fetching StateV2 queue length for {:?}", - output_queue_pubkey - ); if let Some(mut account) = rpc.get_account(*output_queue_pubkey).await? { let output_queue = BatchedQueueAccount::output_from_bytes(account.data.as_mut_slice())?; - Ok(output_queue.get_metadata().batch_metadata.next_index as usize) + let metadata = output_queue.get_metadata(); + let next_index = metadata.batch_metadata.next_index; + + let mut zkp_batch_size = DEFAULT_ZKP_BATCH_SIZE; + let mut total_unprocessed = 0; + let mut batch_details = Vec::new(); + let mut total_completed_operations = 0; + + for (batch_idx, batch) in metadata.batch_metadata.batches.iter().enumerate() { + zkp_batch_size = batch.zkp_batch_size; + let num_inserted = batch.get_num_inserted_zkps(); + let current_index = batch.get_current_zkp_batch_index(); + let pending_in_batch = current_index.saturating_sub(num_inserted); + + let completed_operations_in_batch = + num_inserted * metadata.batch_metadata.zkp_batch_size; + total_completed_operations += completed_operations_in_batch; + + let pending_operations_in_batch = + pending_in_batch * metadata.batch_metadata.zkp_batch_size; + + batch_details.push(format!( + "batch_{}: state={:?}, zkp_inserted={}, zkp_current={}, zkp_pending={}, items_completed={}, items_pending={}", + batch_idx, + batch.get_state(), + num_inserted, + current_index, + pending_in_batch, + completed_operations_in_batch, + pending_operations_in_batch + )); + + total_unprocessed += pending_operations_in_batch; + } + + println!("StateV2 {} APPEND:", output_queue_pubkey); + println!(" next_index (total ever added): {}", next_index); + println!( + " total_completed_operations: {}", + total_completed_operations + ); + println!(" total_unprocessed_items: {}", total_unprocessed); + println!( + " pending_batch_index: {}", + metadata.batch_metadata.pending_batch_index + ); + println!( + " zkp_batch_size: {}", + metadata.batch_metadata.zkp_batch_size + ); + println!( + " SUMMARY: {} items added, {} items processed, {} items pending", + next_index, total_completed_operations, total_unprocessed + ); + for detail in batch_details { + println!(" {}", detail); + } + println!( + " Total pending APPEND operations: {}", + total_unprocessed / zkp_batch_size + ); + + Ok(total_unprocessed as usize) + } else { + Err(anyhow::anyhow!("account not found")) + } +} + +pub async fn print_state_v2_input_queue_info( + rpc: &mut R, + merkle_tree_pubkey: &Pubkey, +) -> Result { + if let Some(mut account) = rpc.get_account(*merkle_tree_pubkey).await? { + let merkle_tree = BatchedMerkleTreeAccount::state_from_bytes( + account.data.as_mut_slice(), + &(*merkle_tree_pubkey).into(), + )?; + let next_index = merkle_tree.queue_batches.next_index; + + let mut total_unprocessed = 0; + let mut batch_details = Vec::new(); + let mut total_completed_operations = 0; + + let mut zkp_batch_size = DEFAULT_ZKP_BATCH_SIZE; + + for (batch_idx, batch) in merkle_tree.queue_batches.batches.iter().enumerate() { + zkp_batch_size = batch.zkp_batch_size; + let num_inserted = batch.get_num_inserted_zkps(); + let current_index = batch.get_current_zkp_batch_index(); + let pending_in_batch = current_index.saturating_sub(num_inserted); + + let completed_operations_in_batch = num_inserted * batch.zkp_batch_size; + total_completed_operations += completed_operations_in_batch; + + let pending_operations_in_batch = pending_in_batch * batch.zkp_batch_size; + + batch_details.push(format!( + "batch_{}: state={:?}, zkp_inserted={}, zkp_current={}, zkp_pending={}, items_completed={}, items_pending={}", + batch_idx, + batch.get_state(), + num_inserted, + current_index, + pending_in_batch, + completed_operations_in_batch, + pending_operations_in_batch + )); + + total_unprocessed += pending_operations_in_batch; + } + + println!("StateV2 {} NULLIFY:", merkle_tree_pubkey); + println!(" next_index (total ever added): {}", next_index); + println!( + " total_completed_operations: {}", + total_completed_operations + ); + println!(" total_unprocessed_items: {}", total_unprocessed); + println!( + " pending_batch_index: {}", + merkle_tree.queue_batches.pending_batch_index + ); + println!(" zkp_batch_size: {}", zkp_batch_size); + println!( + " SUMMARY: {} items added, {} items processed, {} items pending", + next_index, total_completed_operations, total_unprocessed + ); + for detail in batch_details { + println!(" {}", detail); + } + println!( + " Total pending NULLIFY operations: {}", + total_unprocessed / zkp_batch_size + ); + + Ok(total_unprocessed as usize) } else { Err(anyhow::anyhow!("account not found")) } } -pub async fn fetch_address_v2_queue_length( +pub async fn print_address_v2_queue_info( rpc: &mut R, merkle_tree_pubkey: &Pubkey, ) -> Result { - trace!( - "Fetching AddressV2 queue length for {:?}", - merkle_tree_pubkey - ); if let Some(mut account) = rpc.get_account(*merkle_tree_pubkey).await? { let merkle_tree = BatchedMerkleTreeAccount::address_from_bytes( account.data.as_mut_slice(), &(*merkle_tree_pubkey).into(), )?; - Ok(merkle_tree.queue_batches.next_index as usize) + let next_index = merkle_tree.queue_batches.next_index; + + let mut zkp_batch_size = DEFAULT_ADDRESS_ZKP_BATCH_SIZE; + let mut total_unprocessed = 0; + let mut batch_details = Vec::new(); + + for (batch_idx, batch) in merkle_tree.queue_batches.batches.iter().enumerate() { + zkp_batch_size = batch.zkp_batch_size; + let num_inserted = batch.get_num_inserted_zkps(); + let current_index = batch.get_current_zkp_batch_index(); + let pending_in_batch = current_index.saturating_sub(num_inserted); + + batch_details.push(format!( + "batch_{}: state={:?}, inserted={}, current={}, pending={}", + batch_idx, + batch.get_state(), + num_inserted, + current_index, + pending_in_batch + )); + + total_unprocessed += pending_in_batch; + } + + println!("AddressV2 {}:", merkle_tree_pubkey); + println!(" next_index (total ever added): {}", next_index); + println!(" total_unprocessed_items: {}", total_unprocessed); + println!( + " pending_batch_index: {}", + merkle_tree.queue_batches.pending_batch_index + ); + println!(" zkp_batch_size: {}", zkp_batch_size); + for detail in batch_details { + println!(" {}", detail); + } + + println!(" Total pending ADDRESS operations: {}", total_unprocessed); + + Ok(total_unprocessed as usize) } else { Err(anyhow::anyhow!("account not found")) } diff --git a/forester/src/rollover/operations.rs b/forester/src/rollover/operations.rs index 3e4ea2a9bc..f4fa554c40 100644 --- a/forester/src/rollover/operations.rs +++ b/forester/src/rollover/operations.rs @@ -109,39 +109,6 @@ pub async fn get_tree_fullness( let merkle_tree = BatchedMerkleTreeAccount::state_from_bytes(&mut account.data, &tree_pubkey.into()) .unwrap(); - println!( - "merkle_tree.get_account().queue.batch_size: {:?}", - merkle_tree.queue_batches.batch_size - ); - - println!( - "queue currently_processing_batch_index: {:?}", - merkle_tree.queue_batches.currently_processing_batch_index as usize - ); - - println!( - "queue batch_size: {:?}", - merkle_tree.queue_batches.batch_size - ); - println!( - "queue zkp_batch_size: {:?}", - merkle_tree.queue_batches.zkp_batch_size - ); - println!( - "queue pending_batch_index: {:?}", - merkle_tree.queue_batches.pending_batch_index - ); - println!( - "queue bloom_filter_capacity: {:?}", - merkle_tree.queue_batches.bloom_filter_capacity - ); - println!( - "queue num_batches: {:?}", - merkle_tree.queue_batches.num_batches - ); - - println!("tree next_index: {:?}", merkle_tree.next_index); - println!("tree height: {:?}", merkle_tree.height); let height = merkle_tree.height as u64; let capacity = 1u64 << height; @@ -165,39 +132,6 @@ pub async fn get_tree_fullness( &tree_pubkey.into(), ) .unwrap(); - println!( - "merkle_tree.get_account().queue.batch_size: {:?}", - merkle_tree.queue_batches.batch_size - ); - - println!( - "queue currently_processing_batch_index: {:?}", - merkle_tree.queue_batches.currently_processing_batch_index as usize - ); - - println!( - "queue batch_size: {:?}", - merkle_tree.queue_batches.batch_size - ); - println!( - "queue zkp_batch_size: {:?}", - merkle_tree.queue_batches.zkp_batch_size - ); - println!( - "queue pending_batch_index: {:?}", - merkle_tree.queue_batches.pending_batch_index - ); - println!( - "queue bloom_filter_capacity: {:?}", - merkle_tree.queue_batches.bloom_filter_capacity - ); - println!( - "queue num_batches: {:?}", - merkle_tree.queue_batches.num_batches - ); - - println!("tree next_index: {:?}", merkle_tree.next_index); - println!("tree height: {:?}", merkle_tree.height); let height = merkle_tree.height as u64; let capacity = 1u64 << height;