diff --git a/Cargo.lock b/Cargo.lock index dda4686d71..36bf7ff968 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2137,6 +2137,7 @@ dependencies = [ "light-prover-client", "light-registry", "light-sdk", + "light-sparse-merkle-tree", "light-system-program-anchor", "light-test-utils", "num-bigint 0.4.6", @@ -2179,19 +2180,13 @@ dependencies = [ "light-concurrent-merkle-tree", "light-hash-set", "light-hasher", - "light-indexed-array", "light-indexed-merkle-tree", "light-merkle-tree-metadata", "light-prover-client", "light-registry", "light-sdk", "light-sparse-merkle-tree", - "num-bigint 0.4.6", "num-traits", - "rand 0.8.5", - "reqwest 0.12.23", - "serde", - "serde_json", "solana-sdk", "thiserror 2.0.14", "tokio", diff --git a/forester-utils/Cargo.toml b/forester-utils/Cargo.toml index cc7e1af82c..4d5960379c 100644 --- a/forester-utils/Cargo.toml +++ b/forester-utils/Cargo.toml @@ -17,7 +17,6 @@ light-hash-set = { workspace = true } light-hasher = { workspace = true } light-concurrent-merkle-tree = { workspace = true } light-indexed-merkle-tree = { workspace = true } -light-indexed-array = { workspace = true } light-compressed-account = { workspace = true } light-batched-merkle-tree = { workspace = true } light-merkle-tree-metadata = { workspace = true } @@ -25,13 +24,11 @@ light-sparse-merkle-tree = { workspace = true } light-account-checks = { workspace = true } light-sdk = { workspace = true } -# unrelased light-client = { workspace = true } light-prover-client = { workspace = true } light-registry = { workspace = true, features = ["cpi"] } account-compression = { workspace = true, features = ["cpi"] } - tokio = { workspace = true } futures = { workspace = true } async-stream = "0.3" @@ -45,14 +42,7 @@ thiserror = { workspace = true } tracing = { workspace = true } num-traits = { workspace = true } -num-bigint = { workspace = true } - -rand = { workspace = true } -reqwest = { workspace = true } bb8 = { workspace = true } async-trait = { workspace = true } governor = { workspace = true } - -serde = { version = "1.0.219", features = ["derive"] } -serde_json = "1.0.140" diff --git a/forester-utils/src/instructions/address_batch_update.rs b/forester-utils/src/instructions/address_batch_update.rs index 73c247f983..bbd35ceb12 100644 --- a/forester-utils/src/instructions/address_batch_update.rs +++ b/forester-utils/src/instructions/address_batch_update.rs @@ -21,9 +21,10 @@ use light_prover_client::{ use light_sparse_merkle_tree::SparseMerkleTree; use tracing::{debug, error, info, warn}; -use crate::{error::ForesterUtilsError, rpc_pool::SolanaRpcPool}; +use crate::{error::ForesterUtilsError, rpc_pool::SolanaRpcPool, utils::wait_for_indexer}; const MAX_PHOTON_ELEMENTS_PER_CALL: usize = 500; +const MAX_PROOFS_PER_TX: usize = 3; pub struct AddressUpdateConfig { pub rpc_pool: Arc>, @@ -32,7 +33,6 @@ pub struct AddressUpdateConfig { pub prover_api_key: Option, pub polling_interval: Duration, pub max_wait_time: Duration, - pub ixs_per_tx: usize, } #[allow(clippy::too_many_arguments)] @@ -47,7 +47,6 @@ async fn stream_instruction_data<'a, R: Rpc>( start_index: u64, zkp_batch_size: u16, mut current_root: [u8; 32], - yield_batch_size: usize, ) -> impl Stream, ForesterUtilsError>> + Send + 'a { stream! { @@ -63,6 +62,17 @@ async fn stream_instruction_data<'a, R: Rpc>( let elements_for_chunk = chunk_hash_chains.len() * zkp_batch_size as usize; let processed_items_offset = chunk_start * zkp_batch_size as usize; + { + if chunk_idx > 0 { + debug!("Waiting for indexer to sync before fetching chunk {} data", chunk_idx); + } + let connection = rpc_pool.get_connection().await?; + wait_for_indexer(&*connection).await?; + if chunk_idx > 0 { + debug!("Indexer synced, proceeding with chunk {} fetch", chunk_idx); + } + } + let indexer_update_info = { let mut connection = rpc_pool.get_connection().await?; let indexer = connection.indexer_mut()?; @@ -125,8 +135,8 @@ async fn stream_instruction_data<'a, R: Rpc>( }); pending_count += 1; - if pending_count >= yield_batch_size { - for _ in 0..yield_batch_size.min(pending_count) { + if pending_count >= MAX_PROOFS_PER_TX { + for _ in 0..MAX_PROOFS_PER_TX.min(pending_count) { if let Some((idx, result)) = futures_ordered.next().await { match result { Ok((compressed_proof, new_root)) => { @@ -173,7 +183,7 @@ async fn stream_instruction_data<'a, R: Rpc>( }; proof_buffer.push(instruction_data); - if proof_buffer.len() >= yield_batch_size { + if proof_buffer.len() >= MAX_PROOFS_PER_TX { yield Ok(proof_buffer.clone()); proof_buffer.clear(); } @@ -333,7 +343,6 @@ pub async fn get_address_update_instruction_stream<'a, R: Rpc>( start_index, zkp_batch_size, current_root, - config.ixs_per_tx, ) .await; diff --git a/forester-utils/src/instructions/mod.rs b/forester-utils/src/instructions/mod.rs index 61ea236271..b8d5dd6f2f 100644 --- a/forester-utils/src/instructions/mod.rs +++ b/forester-utils/src/instructions/mod.rs @@ -1,6 +1,4 @@ pub mod address_batch_update; pub mod create_account; -pub mod state_batch_append; -pub mod state_batch_nullify; pub use create_account::create_account_instruction; diff --git a/forester-utils/src/instructions/state_batch_append.rs b/forester-utils/src/instructions/state_batch_append.rs deleted file mode 100644 index 44c81e4d75..0000000000 --- a/forester-utils/src/instructions/state_batch_append.rs +++ /dev/null @@ -1,208 +0,0 @@ -use std::{pin::Pin, sync::Arc, time::Duration}; - -use account_compression::processor::initialize_address_merkle_tree::Pubkey; -use async_stream::stream; -use futures::{ - stream::{FuturesOrdered, Stream}, - StreamExt, -}; -use light_batched_merkle_tree::{ - constants::DEFAULT_BATCH_STATE_TREE_HEIGHT, merkle_tree::InstructionDataBatchAppendInputs, -}; -use light_client::{indexer::Indexer, rpc::Rpc}; -use light_compressed_account::instruction_data::compressed_proof::CompressedProof; -use light_hasher::bigint::bigint_to_be_bytes_array; -use light_merkle_tree_metadata::QueueType; -use light_prover_client::{ - proof_client::ProofClient, - proof_types::batch_append::{get_batch_append_inputs, BatchAppendsCircuitInputs}, -}; -use light_sparse_merkle_tree::changelog::ChangelogEntry; -use tracing::trace; - -use crate::{ - error::ForesterUtilsError, rpc_pool::SolanaRpcPool, utils::wait_for_indexer, - ParsedMerkleTreeData, ParsedQueueData, -}; - -async fn generate_zkp_proof( - circuit_inputs: BatchAppendsCircuitInputs, - proof_client: Arc, -) -> Result { - let (proof, new_root) = proof_client - .generate_batch_append_proof(circuit_inputs) - .await - .map_err(|e| ForesterUtilsError::Prover(e.to_string()))?; - Ok(InstructionDataBatchAppendInputs { - new_root, - compressed_proof: CompressedProof { - a: proof.a, - b: proof.b, - c: proof.c, - }, - }) -} - -#[allow(clippy::too_many_arguments)] -pub async fn get_append_instruction_stream<'a, R: Rpc>( - rpc_pool: Arc>, - merkle_tree_pubkey: Pubkey, - prover_url: String, - prover_api_key: Option, - polling_interval: Duration, - max_wait_time: Duration, - merkle_tree_data: ParsedMerkleTreeData, - output_queue_data: ParsedQueueData, - yield_batch_size: usize, -) -> Result< - ( - Pin< - Box< - dyn Stream, ForesterUtilsError>> - + Send - + 'a, - >, - >, - u16, - ), - ForesterUtilsError, -> { - trace!("Initializing append batch instruction stream with parsed data"); - let (merkle_tree_next_index, mut current_root, _) = ( - merkle_tree_data.next_index, - merkle_tree_data.current_root, - merkle_tree_data.root_history, - ); - let (zkp_batch_size, leaves_hash_chains) = ( - output_queue_data.zkp_batch_size, - output_queue_data.leaves_hash_chains, - ); - - if leaves_hash_chains.is_empty() { - trace!("No hash chains to process, returning empty stream."); - return Ok((Box::pin(futures::stream::empty()), zkp_batch_size)); - } - let rpc = rpc_pool.get_connection().await?; - wait_for_indexer(&*rpc).await?; - drop(rpc); - - let stream = stream! { - let total_elements = zkp_batch_size as usize * leaves_hash_chains.len(); - let offset = merkle_tree_next_index; - - let queue_elements = { - let mut connection = rpc_pool.get_connection().await?; - let indexer = connection.indexer_mut()?; - match indexer - .get_queue_elements( - merkle_tree_pubkey.to_bytes(), - QueueType::OutputStateV2, - total_elements as u16, - Some(offset), - None, - ) - .await { - Ok(res) => res.value.items, - Err(e) => { - yield Err(ForesterUtilsError::Indexer(format!("Failed to get queue elements: {}", e))); - return; - } - } - }; - - if queue_elements.len() != total_elements { - yield Err(ForesterUtilsError::Indexer(format!( - "Expected {} elements, got {}", - total_elements, - queue_elements.len() - ))); - return; - } - - if let Some(first_element) = queue_elements.first() { - if first_element.root != current_root { - yield Err(ForesterUtilsError::Indexer("Root mismatch between indexer and on-chain state".into())); - return; - } - } - - let mut all_changelogs: Vec> = Vec::new(); - let proof_client = Arc::new(ProofClient::with_config(prover_url.clone(), polling_interval, max_wait_time, prover_api_key.clone())); - let mut futures_ordered = FuturesOrdered::new(); - let mut pending_count = 0; - - let mut proof_buffer = Vec::new(); - - for (batch_idx, leaves_hash_chain) in leaves_hash_chains.iter().enumerate() { - let start_idx = batch_idx * zkp_batch_size as usize; - let end_idx = start_idx + zkp_batch_size as usize; - let batch_elements = &queue_elements[start_idx..end_idx]; - - let old_leaves: Vec<[u8; 32]> = batch_elements.iter().map(|x| x.leaf).collect(); - let leaves: Vec<[u8; 32]> = batch_elements.iter().map(|x| x.account_hash).collect(); - let merkle_proofs: Vec> = batch_elements.iter().map(|x| x.proof.clone()).collect(); - let adjusted_start_index = merkle_tree_next_index as u32 + (batch_idx * zkp_batch_size as usize) as u32; - - let (circuit_inputs, batch_changelogs) = match get_batch_append_inputs::<32>( - current_root, adjusted_start_index, leaves, *leaves_hash_chain, old_leaves, merkle_proofs, zkp_batch_size as u32, &all_changelogs, - ) { - Ok(inputs) => inputs, - Err(e) => { - yield Err(ForesterUtilsError::Prover(format!("Failed to get circuit inputs: {}", e))); - return; - } - }; - - current_root = bigint_to_be_bytes_array::<32>(&circuit_inputs.new_root.to_biguint().unwrap()).unwrap(); - all_changelogs.extend(batch_changelogs); - - let client = Arc::clone(&proof_client); - futures_ordered.push_back(generate_zkp_proof(circuit_inputs, client)); - pending_count += 1; - - while pending_count >= yield_batch_size { - for _ in 0..yield_batch_size.min(pending_count) { - if let Some(result) = futures_ordered.next().await { - match result { - Ok(proof) => proof_buffer.push(proof), - Err(e) => { - yield Err(e); - return; - } - } - pending_count -= 1; - } - } - - if !proof_buffer.is_empty() { - yield Ok(proof_buffer.clone()); - proof_buffer.clear(); - } - } - } - - while let Some(result) = futures_ordered.next().await { - match result { - Ok(proof) => { - proof_buffer.push(proof); - - if proof_buffer.len() >= yield_batch_size { - yield Ok(proof_buffer.clone()); - proof_buffer.clear(); - } - }, - Err(e) => { - yield Err(e); - return; - } - } - } - - // Yield any remaining proofs - if !proof_buffer.is_empty() { - yield Ok(proof_buffer); - } - }; - - Ok((Box::pin(stream), zkp_batch_size)) -} diff --git a/forester-utils/src/instructions/state_batch_nullify.rs b/forester-utils/src/instructions/state_batch_nullify.rs deleted file mode 100644 index 42174a4c0a..0000000000 --- a/forester-utils/src/instructions/state_batch_nullify.rs +++ /dev/null @@ -1,215 +0,0 @@ -use std::{pin::Pin, sync::Arc, time::Duration}; - -use account_compression::processor::initialize_address_merkle_tree::Pubkey; -use async_stream::stream; -use futures::{ - stream::{FuturesOrdered, Stream}, - StreamExt, -}; -use light_batched_merkle_tree::{ - constants::DEFAULT_BATCH_STATE_TREE_HEIGHT, merkle_tree::InstructionDataBatchNullifyInputs, -}; -use light_client::{indexer::Indexer, rpc::Rpc}; -use light_compressed_account::instruction_data::compressed_proof::CompressedProof; -use light_hasher::bigint::bigint_to_be_bytes_array; -use light_merkle_tree_metadata::QueueType; -use light_prover_client::{ - proof_client::ProofClient, - proof_types::batch_update::{get_batch_update_inputs, BatchUpdateCircuitInputs}, -}; -use tracing::{debug, trace}; - -use crate::{ - error::ForesterUtilsError, rpc_pool::SolanaRpcPool, utils::wait_for_indexer, - ParsedMerkleTreeData, -}; - -async fn generate_nullify_zkp_proof( - inputs: BatchUpdateCircuitInputs, - proof_client: Arc, -) -> Result { - let (proof, new_root) = proof_client - .generate_batch_update_proof(inputs) - .await - .map_err(|e| ForesterUtilsError::Prover(e.to_string()))?; - Ok(InstructionDataBatchNullifyInputs { - new_root, - compressed_proof: CompressedProof { - a: proof.a, - b: proof.b, - c: proof.c, - }, - }) -} - -#[allow(clippy::too_many_arguments)] -pub async fn get_nullify_instruction_stream<'a, R: Rpc>( - rpc_pool: Arc>, - merkle_tree_pubkey: Pubkey, - prover_url: String, - prover_api_key: Option, - polling_interval: Duration, - max_wait_time: Duration, - merkle_tree_data: ParsedMerkleTreeData, - yield_batch_size: usize, -) -> Result< - ( - Pin< - Box< - dyn Stream< - Item = Result, ForesterUtilsError>, - > + Send - + 'a, - >, - >, - u16, - ), - ForesterUtilsError, -> { - let (mut current_root, leaves_hash_chains, num_inserted_zkps, zkp_batch_size) = ( - merkle_tree_data.current_root, - merkle_tree_data.leaves_hash_chains, - merkle_tree_data.num_inserted_zkps, - merkle_tree_data.zkp_batch_size, - ); - - if leaves_hash_chains.is_empty() { - debug!("No hash chains to process for nullification, returning empty stream."); - return Ok((Box::pin(futures::stream::empty()), zkp_batch_size)); - } - - let rpc = rpc_pool.get_connection().await?; - wait_for_indexer(&*rpc).await?; - drop(rpc); - - let stream = stream! { - let total_elements = zkp_batch_size as usize * leaves_hash_chains.len(); - let offset = num_inserted_zkps * zkp_batch_size as u64; - - trace!("Requesting {} total elements with offset {}", total_elements, offset); - - let all_queue_elements = { - let mut connection = rpc_pool.get_connection().await?; - let indexer = connection.indexer_mut()?; - indexer.get_queue_elements( - merkle_tree_pubkey.to_bytes(), - QueueType::InputStateV2, - total_elements as u16, - Some(offset), - None, - ) - .await - }; - - let all_queue_elements = match all_queue_elements { - Ok(res) => res.value.items, - Err(e) => { - yield Err(ForesterUtilsError::Indexer(format!("Failed to get queue elements: {}", e))); - return; - } - }; - - trace!("Got {} queue elements in total", all_queue_elements.len()); - if all_queue_elements.len() != total_elements { - yield Err(ForesterUtilsError::Indexer(format!( - "Expected {} elements, got {}", - total_elements, all_queue_elements.len() - ))); - return; - } - - if let Some(first_element) = all_queue_elements.first() { - if first_element.root != current_root { - yield Err(ForesterUtilsError::Indexer("Root mismatch between indexer and on-chain state".into())); - return; - } - } - - let mut all_changelogs = Vec::new(); - let proof_client = Arc::new(ProofClient::with_config(prover_url.clone(), polling_interval, max_wait_time, prover_api_key.clone())); - let mut futures_ordered = FuturesOrdered::new(); - let mut pending_count = 0; - - let mut proof_buffer = Vec::new(); - - for (batch_offset, leaves_hash_chain) in leaves_hash_chains.iter().enumerate() { - let start_idx = batch_offset * zkp_batch_size as usize; - let end_idx = start_idx + zkp_batch_size as usize; - let batch_elements = &all_queue_elements[start_idx..end_idx]; - - let mut leaves = Vec::new(); - let mut tx_hashes = Vec::new(); - let mut old_leaves = Vec::new(); - let mut path_indices = Vec::new(); - let mut merkle_proofs = Vec::new(); - - for leaf_info in batch_elements.iter() { - path_indices.push(leaf_info.leaf_index as u32); - leaves.push(leaf_info.account_hash); - old_leaves.push(leaf_info.leaf); - merkle_proofs.push(leaf_info.proof.clone()); - tx_hashes.push(leaf_info.tx_hash.ok_or_else(|| ForesterUtilsError::Indexer(format!("Missing tx_hash for leaf index {}", leaf_info.leaf_index)))?); - } - - let (circuit_inputs, batch_changelog) = match get_batch_update_inputs::<{ DEFAULT_BATCH_STATE_TREE_HEIGHT as usize }>( - current_root, tx_hashes, leaves, *leaves_hash_chain, old_leaves, merkle_proofs, path_indices, zkp_batch_size as u32, &all_changelogs, - ) { - Ok(inputs) => inputs, - Err(e) => { - yield Err(ForesterUtilsError::Prover(format!("Failed to get batch update inputs: {}", e))); - return; - } - }; - - all_changelogs.extend(batch_changelog); - current_root = bigint_to_be_bytes_array::<32>(&circuit_inputs.new_root.to_biguint().unwrap()).unwrap(); - - let client = Arc::clone(&proof_client); - futures_ordered.push_back(generate_nullify_zkp_proof(circuit_inputs, client)); - pending_count += 1; - - while pending_count >= yield_batch_size { - for _ in 0..yield_batch_size.min(pending_count) { - if let Some(result) = futures_ordered.next().await { - match result { - Ok(proof) => proof_buffer.push(proof), - Err(e) => { - yield Err(e); - return; - } - } - pending_count -= 1; - } - } - - if !proof_buffer.is_empty() { - yield Ok(proof_buffer.clone()); - proof_buffer.clear(); - } - } - } - - while let Some(result) = futures_ordered.next().await { - match result { - Ok(proof) => { - proof_buffer.push(proof); - - if proof_buffer.len() >= yield_batch_size { - yield Ok(proof_buffer.clone()); - proof_buffer.clear(); - } - }, - Err(e) => { - yield Err(e); - return; - } - } - } - - if !proof_buffer.is_empty() { - yield Ok(proof_buffer); - } - }; - - Ok((Box::pin(stream), zkp_batch_size)) -} diff --git a/forester-utils/src/utils.rs b/forester-utils/src/utils.rs index a577cb3f83..b78cb88df6 100644 --- a/forester-utils/src/utils.rs +++ b/forester-utils/src/utils.rs @@ -6,7 +6,7 @@ use light_client::{ }; use solana_sdk::{signature::Signer, transaction::Transaction}; use tokio::time::sleep; -use tracing::{debug, error}; +use tracing::{error, warn}; use crate::error::ForesterUtilsError; @@ -43,7 +43,7 @@ pub async fn wait_for_indexer(rpc: &R) -> Result<(), ForesterUtilsError> } }; - let max_attempts = 20; + let max_attempts = 100; let mut attempts = 0; while rpc_slot > indexer_slot { @@ -53,13 +53,16 @@ pub async fn wait_for_indexer(rpc: &R) -> Result<(), ForesterUtilsError> )); } - debug!( - "waiting for indexer to catch up, rpc_slot: {}, indexer_slot: {}", - rpc_slot, indexer_slot - ); + if rpc_slot - indexer_slot > 50 { + warn!( + "indexer is behind {} slots (rpc_slot: {}, indexer_slot: {})", + rpc_slot - indexer_slot, + rpc_slot, + indexer_slot + ); + } - tokio::task::yield_now().await; - sleep(std::time::Duration::from_millis(500)).await; + sleep(std::time::Duration::from_millis(1000)).await; indexer_slot = rpc.indexer()?.get_indexer_slot(None).await.map_err(|e| { error!("failed to get indexer slot from indexer: {:?}", e); ForesterUtilsError::Indexer("Failed to get indexer slot".into()) diff --git a/forester/Cargo.toml b/forester/Cargo.toml index 79601efe11..f945f19c26 100644 --- a/forester/Cargo.toml +++ b/forester/Cargo.toml @@ -24,6 +24,8 @@ light-client = { workspace = true, features = ["v2"] } light-merkle-tree-metadata = { workspace = true } light-sdk = { workspace = true, features = ["anchor"] } light-program-test = { workspace = true } +light-sparse-merkle-tree = { workspace = true } +light-prover-client = { workspace = true, features = ["devenv"] } solana-transaction-status = { workspace = true } bb8 = { workspace = true } @@ -53,7 +55,6 @@ num-bigint = { workspace = true } [dev-dependencies] serial_test = { workspace = true } -light-prover-client = { workspace = true, features = ["devenv"] } light-test-utils = { workspace = true } light-program-test = { workspace = true, features = ["devenv"] } light-batched-merkle-tree = { workspace = true, features = ["test-only"] } diff --git a/forester/scripts/compare_performance.py b/forester/scripts/compare_performance.py deleted file mode 100755 index 35034aff17..0000000000 --- a/forester/scripts/compare_performance.py +++ /dev/null @@ -1,322 +0,0 @@ -#!/usr/bin/env python3 -""" -Performance Comparison Script for Forester Logs -Compares queue processing performance between old and new forester versions. -""" - -import re -import sys -import argparse -from datetime import datetime -from collections import defaultdict -from typing import Dict, List, Tuple, Optional -import statistics - -class PerformanceAnalyzer: - def __init__(self): - # ANSI color removal - self.ansi_escape = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])') - - # Patterns - self.timestamp_pattern = re.compile(r'(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d+Z)') - self.queue_metric_pattern = re.compile(r'QUEUE_METRIC: (queue_empty|queue_has_elements) tree_type=(\S+) tree=(\S+)') - self.operation_start_pattern = re.compile(r'V2_TPS_METRIC: operation_start tree_type=(\w+)') - self.operation_complete_pattern = re.compile(r'V2_TPS_METRIC: operation_complete.*?duration_ms=(\d+).*?items_processed=(\d+)') - self.transaction_sent_pattern = re.compile(r'V2_TPS_METRIC: transaction_sent.*?tx_duration_ms=(\d+)') - - def clean_line(self, line: str) -> str: - return self.ansi_escape.sub('', line) - - def parse_timestamp(self, line: str) -> Optional[datetime]: - timestamp_match = self.timestamp_pattern.search(line) - if timestamp_match: - return datetime.fromisoformat(timestamp_match.group(1).replace('Z', '+00:00')) - return None - - def analyze_log(self, filename: str) -> Dict: - """Comprehensive analysis of a log file.""" - results = { - 'filename': filename, - 'queue_events': [], - 'operations': [], - 'transactions': [], - 'queue_emptying_times': [], - 'queue_response_times': [], - 'processing_rates': [], - 'transaction_durations': [] - } - - with open(filename, 'r') as f: - current_operation = None - - for line in f: - clean_line = self.clean_line(line) - timestamp = self.parse_timestamp(clean_line) - - if not timestamp: - continue - - # Parse queue metrics - if 'QUEUE_METRIC:' in clean_line: - queue_match = self.queue_metric_pattern.search(clean_line) - if queue_match: - state = queue_match.group(1) - results['queue_events'].append((timestamp, state)) - - # Parse operation start - elif 'operation_start' in clean_line: - start_match = self.operation_start_pattern.search(clean_line) - if start_match: - current_operation = { - 'start_time': timestamp, - 'tree_type': start_match.group(1) - } - - # Parse operation complete - elif 'operation_complete' in clean_line and current_operation: - complete_match = self.operation_complete_pattern.search(clean_line) - if complete_match: - duration_ms = int(complete_match.group(1)) - items_processed = int(complete_match.group(2)) - - operation = { - 'start_time': current_operation['start_time'], - 'end_time': timestamp, - 'duration_ms': duration_ms, - 'items_processed': items_processed, - 'tree_type': current_operation['tree_type'], - 'processing_rate': items_processed / (duration_ms / 1000) if duration_ms > 0 else 0 - } - - results['operations'].append(operation) - results['processing_rates'].append(operation['processing_rate']) - current_operation = None - - # Parse transaction sent - elif 'transaction_sent' in clean_line: - tx_match = self.transaction_sent_pattern.search(clean_line) - if tx_match: - tx_duration = int(tx_match.group(1)) - results['transactions'].append({ - 'timestamp': timestamp, - 'duration_ms': tx_duration - }) - results['transaction_durations'].append(tx_duration) - - # Calculate queue metrics - self._calculate_queue_metrics(results) - - return results - - def _calculate_queue_metrics(self, results: Dict): - """Calculate queue emptying and response times.""" - events = results['queue_events'] - - for i in range(1, len(events)): - prev_time, prev_state = events[i-1] - curr_time, curr_state = events[i] - - duration = (curr_time - prev_time).total_seconds() - - # Queue emptying time: has_elements -> empty - if prev_state == 'queue_has_elements' and curr_state == 'queue_empty': - results['queue_emptying_times'].append(duration) - - # Response time: empty -> has_elements (filter out immediate responses) - elif prev_state == 'queue_empty' and curr_state == 'queue_has_elements': - if duration > 0.01: # Filter immediate responses - results['queue_response_times'].append(duration) - - def generate_stats(self, data: List[float], name: str) -> Dict: - """Generate statistics for a dataset.""" - if not data: - return {'name': name, 'count': 0} - - return { - 'name': name, - 'count': len(data), - 'min': min(data), - 'max': max(data), - 'mean': statistics.mean(data), - 'median': statistics.median(data), - 'std_dev': statistics.stdev(data) if len(data) > 1 else 0 - } - - def print_stats(self, stats: Dict, unit: str = ""): - """Print statistics in a formatted way.""" - if stats['count'] == 0: - print(f" {stats['name']}: No data") - return - - print(f" {stats['name']}:") - print(f" Count: {stats['count']}") - print(f" Min: {stats['min']:.2f}{unit}") - print(f" Max: {stats['max']:.2f}{unit}") - print(f" Mean: {stats['mean']:.2f}{unit}") - print(f" Median: {stats['median']:.2f}{unit}") - if stats['count'] > 1: - print(f" Std Dev: {stats['std_dev']:.2f}{unit}") - - def compare_stats(self, old_stats: Dict, new_stats: Dict, unit: str = "") -> Dict: - """Compare two statistics and return improvement metrics.""" - if old_stats['count'] == 0 or new_stats['count'] == 0: - return {'valid': False} - - mean_improvement = ((old_stats['mean'] - new_stats['mean']) / old_stats['mean']) * 100 - median_improvement = ((old_stats['median'] - new_stats['median']) / old_stats['median']) * 100 - - return { - 'valid': True, - 'mean_improvement': mean_improvement, - 'median_improvement': median_improvement, - 'old_mean': old_stats['mean'], - 'new_mean': new_stats['mean'], - 'old_median': old_stats['median'], - 'new_median': new_stats['median'], - 'unit': unit - } - - def print_comparison(self, comparison: Dict, metric_name: str): - """Print comparison results.""" - if not comparison['valid']: - print(f" {metric_name}: Insufficient data for comparison") - return - - unit = comparison['unit'] - print(f" {metric_name}:") - print(f" Mean: {comparison['old_mean']:.2f}{unit} → {comparison['new_mean']:.2f}{unit} ({comparison['mean_improvement']:+.1f}%)") - print(f" Median: {comparison['old_median']:.2f}{unit} → {comparison['new_median']:.2f}{unit} ({comparison['median_improvement']:+.1f}%)") - - def analyze_and_compare(self, old_file: str, new_file: str): - """Main analysis and comparison function.""" - print("FORESTER PERFORMANCE COMPARISON") - print("=" * 60) - print() - - # Analyze both files - print("Analyzing log files...") - old_results = self.analyze_log(old_file) - new_results = self.analyze_log(new_file) - - print(f"Old version: {old_file}") - print(f"New version: {new_file}") - print() - - # Overall summary - print("OVERALL SUMMARY") - print("-" * 40) - print(f"Old version: {len(old_results['operations'])} operations, {len(old_results['transactions'])} transactions") - print(f"New version: {len(new_results['operations'])} operations, {len(new_results['transactions'])} transactions") - print() - - # Queue Performance Analysis - print("QUEUE PERFORMANCE ANALYSIS") - print("-" * 40) - - # Queue emptying times - old_emptying = self.generate_stats(old_results['queue_emptying_times'], "Queue Emptying Time") - new_emptying = self.generate_stats(new_results['queue_emptying_times'], "Queue Emptying Time") - - print("Old Version:") - self.print_stats(old_emptying, "s") - print() - print("New Version:") - self.print_stats(new_emptying, "s") - print() - - emptying_comparison = self.compare_stats(old_emptying, new_emptying, "s") - print("COMPARISON - Queue Emptying:") - self.print_comparison(emptying_comparison, "Queue Emptying Time") - print() - - # Response times - old_response = self.generate_stats(old_results['queue_response_times'], "Response Time") - new_response = self.generate_stats(new_results['queue_response_times'], "Response Time") - - response_comparison = self.compare_stats(old_response, new_response, "s") - print("COMPARISON - Response Time:") - self.print_comparison(response_comparison, "Response Time to New Work") - print() - - # Transaction Performance Analysis - print("TRANSACTION PERFORMANCE ANALYSIS") - print("-" * 40) - - old_tx = self.generate_stats(old_results['transaction_durations'], "Transaction Duration") - new_tx = self.generate_stats(new_results['transaction_durations'], "Transaction Duration") - - tx_comparison = self.compare_stats(old_tx, new_tx, "ms") - print("COMPARISON - Transaction Duration:") - self.print_comparison(tx_comparison, "Individual Transaction Time") - print() - - # Processing Rate Analysis - print("PROCESSING RATE ANALYSIS") - print("-" * 40) - - old_rate = self.generate_stats(old_results['processing_rates'], "Processing Rate") - new_rate = self.generate_stats(new_results['processing_rates'], "Processing Rate") - - rate_comparison = self.compare_stats(old_rate, new_rate, " items/sec") - print("COMPARISON - Processing Rate:") - self.print_comparison(rate_comparison, "Items Processing Rate") - print() - - # Key Insights - print("KEY INSIGHTS") - print("-" * 40) - - insights = [] - - if emptying_comparison['valid']: - if emptying_comparison['mean_improvement'] > 0: - insights.append(f"✅ Queue emptying is {emptying_comparison['mean_improvement']:.1f}% faster") - else: - insights.append(f"⚠️ Queue emptying is {abs(emptying_comparison['mean_improvement']):.1f}% slower") - - if response_comparison['valid']: - if response_comparison['mean_improvement'] > 0: - insights.append(f"✅ Response to new work is {response_comparison['mean_improvement']:.1f}% faster") - else: - insights.append(f"⚠️ Response to new work is {abs(response_comparison['mean_improvement']):.1f}% slower") - - if tx_comparison['valid']: - if tx_comparison['median_improvement'] > 0: - insights.append(f"✅ Individual transactions are {tx_comparison['median_improvement']:.1f}% faster") - else: - insights.append(f"⚠️ Individual transactions are {abs(tx_comparison['median_improvement']):.1f}% slower") - - if rate_comparison['valid']: - if rate_comparison['mean_improvement'] > 0: - insights.append(f"✅ Processing rate improved by {rate_comparison['mean_improvement']:.1f}%") - else: - insights.append(f"⚠️ Processing rate decreased by {abs(rate_comparison['mean_improvement']):.1f}%") - - for insight in insights: - print(f" {insight}") - - if not insights: - print(" No significant performance differences detected") - - print() - print("=" * 60) - -def main(): - parser = argparse.ArgumentParser(description='Compare forester performance between two log files') - parser.add_argument('old_log', help='Path to old version log file') - parser.add_argument('new_log', help='Path to new version log file') - - args = parser.parse_args() - - analyzer = PerformanceAnalyzer() - try: - analyzer.analyze_and_compare(args.old_log, args.new_log) - except FileNotFoundError as e: - print(f"Error: {e}") - sys.exit(1) - except Exception as e: - print(f"Error analyzing logs: {e}") - sys.exit(1) - -if __name__ == '__main__': - main() \ No newline at end of file diff --git a/forester/scripts/v2_stats.py b/forester/scripts/v2_stats.py deleted file mode 100755 index a0c8961b1a..0000000000 --- a/forester/scripts/v2_stats.py +++ /dev/null @@ -1,272 +0,0 @@ -#!/usr/bin/env python3 - -import re -import sys -from datetime import datetime, timedelta -from collections import defaultdict -from typing import Dict, List, Tuple, Optional -import argparse -import statistics -import json - -class V2TpsAnalyzer: - def __init__(self): - # ANSI color removal - self.ansi_escape = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])') - - self.v1_operation_start_pattern = re.compile( - r'V1_TPS_METRIC: operation_start tree_type=(\w+) tree=(\S+) epoch=(\d+)' - ) - self.v1_operation_complete_pattern = re.compile( - r'V1_TPS_METRIC: operation_complete tree_type=(\w+) tree=(\S+) epoch=(\d+) transactions=(\d+) duration_ms=(\d+) tps=([\d.]+)' - ) - self.v2_operation_start_pattern = re.compile( - r'V2_TPS_METRIC: operation_start tree_type=(\w+) (?:operation=(\w+) )?tree=(\S+) epoch=(\d+)' - ) - self.v2_operation_complete_pattern = re.compile( - r'V2_TPS_METRIC: operation_complete tree_type=(\w+) (?:operation=(\w+) )?tree=(\S+) epoch=(\d+) zkp_batches=(\d+) transactions=(\d+) instructions=(\d+) duration_ms=(\d+) tps=([\d.]+) ips=([\d.]+)(?:\s+items_processed=(\d+))?' - ) - self.v2_transaction_sent_pattern = re.compile( - r'V2_TPS_METRIC: transaction_sent tree_type=(\w+) (?:operation=(\w+) )?tree=(\S+) tx_num=(\d+) signature=(\S+) instructions=(\d+) tx_duration_ms=(\d+)' - ) - - # Data storage - self.operations: List[Dict] = [] - self.transactions: List[Dict] = [] - self.operation_summaries: List[Dict] = [] - - def clean_line(self, line: str) -> str: - """Remove ANSI color codes.""" - return self.ansi_escape.sub('', line) - - def parse_timestamp(self, line: str) -> Optional[datetime]: - """Extract timestamp from log line.""" - timestamp_match = re.search(r'(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d+Z)', line) - if timestamp_match: - return datetime.fromisoformat(timestamp_match.group(1).replace('Z', '+00:00')) - return None - - def parse_log_line(self, line: str) -> None: - """Parse a single log line for V1/V2 TPS metrics.""" - clean_line = self.clean_line(line) - timestamp = self.parse_timestamp(clean_line) - - if not timestamp: - return - - # Parse V1 operation start - v1_start_match = self.v1_operation_start_pattern.search(clean_line) - if v1_start_match: - self.operations.append({ - 'type': 'start', - 'version': 'V1', - 'timestamp': timestamp, - 'tree_type': v1_start_match.group(1), - 'tree': v1_start_match.group(2), - 'epoch': int(v1_start_match.group(3)) - }) - return - - # Parse V1 operation complete - v1_complete_match = self.v1_operation_complete_pattern.search(clean_line) - if v1_complete_match: - self.operation_summaries.append({ - 'version': 'V1', - 'timestamp': timestamp, - 'tree_type': v1_complete_match.group(1), - 'tree': v1_complete_match.group(2), - 'epoch': int(v1_complete_match.group(3)), - 'transactions': int(v1_complete_match.group(4)), - 'duration_ms': int(v1_complete_match.group(5)), - 'tps': float(v1_complete_match.group(6)), - 'zkp_batches': 0, # V1 doesn't have zkp batches - 'instructions': int(v1_complete_match.group(4)), # For V1, instructions = transactions - 'ips': float(v1_complete_match.group(6)), # For V1, ips = tps - 'items_processed': 0 - }) - return - - # Parse V2 operation start - v2_start_match = self.v2_operation_start_pattern.search(clean_line) - if v2_start_match: - self.operations.append({ - 'type': 'start', - 'version': 'V2', - 'timestamp': timestamp, - 'tree_type': v2_start_match.group(1), - 'operation': v2_start_match.group(2) or 'batch', - 'tree': v2_start_match.group(3), - 'epoch': int(v2_start_match.group(4)) - }) - return - - # Parse V2 operation complete - v2_complete_match = self.v2_operation_complete_pattern.search(clean_line) - if v2_complete_match: - self.operation_summaries.append({ - 'version': 'V2', - 'timestamp': timestamp, - 'tree_type': v2_complete_match.group(1), - 'operation': v2_complete_match.group(2) or 'batch', - 'tree': v2_complete_match.group(3), - 'epoch': int(v2_complete_match.group(4)), - 'zkp_batches': int(v2_complete_match.group(5)), - 'transactions': int(v2_complete_match.group(6)), - 'instructions': int(v2_complete_match.group(7)), - 'duration_ms': int(v2_complete_match.group(8)), - 'tps': float(v2_complete_match.group(9)), - 'ips': float(v2_complete_match.group(10)), - 'items_processed': int(v2_complete_match.group(11)) if v2_complete_match.group(11) else 0 - }) - return - - # Parse V2 transaction sent - v2_tx_match = self.v2_transaction_sent_pattern.search(clean_line) - if v2_tx_match: - self.transactions.append({ - 'version': 'V2', - 'timestamp': timestamp, - 'tree_type': v2_tx_match.group(1), - 'operation': v2_tx_match.group(2) or 'batch', - 'tree': v2_tx_match.group(3), - 'tx_num': int(v2_tx_match.group(4)), - 'signature': v2_tx_match.group(5), - 'instructions': int(v2_tx_match.group(6)), - 'tx_duration_ms': int(v2_tx_match.group(7)) - }) - - def print_summary_stats(self) -> None: - """Print high-level summary statistics.""" - print("\n" + "="*80) - print("FORESTER PERFORMANCE ANALYSIS REPORT (V1 & V2)") - print("="*80) - - if not self.operation_summaries: - print("No TPS metrics found in logs") - return - - print(f"\nSUMMARY:") - print(f" Total operations analyzed: {len(self.operation_summaries)}") - - # Count total transactions from operation summaries - total_txs_from_ops = sum(op.get('transactions', 0) for op in self.operation_summaries) - print(f" Total transactions (from operations): {total_txs_from_ops}") - print(f" Total transaction events logged: {len(self.transactions)}") - - # Time span - if self.operation_summaries: - start_time = min(op['timestamp'] for op in self.operation_summaries) - end_time = max(op['timestamp'] for op in self.operation_summaries) - time_span = (end_time - start_time).total_seconds() - print(f" Analysis time span: {time_span:.1f}s ({time_span/60:.1f} minutes)") - - def print_tree_type_analysis(self) -> None: - """Analyze performance by tree type.""" - print("\n## PERFORMANCE BY TREE TYPE") - print("-" * 60) - print("\nNOTE: V1 and V2 use different transaction models:") - print(" V1: 1 tree update = 1 transaction (~1 slot/400ms latency)") - print(" V2: 10+ tree updates = 1 transaction (multi-slot batching + ZKP generation)") - print(" ") - print(" TPS comparison is misleading - V2 optimizes for cost efficiency, not transaction count.") - print(" Focus on 'Items Processed Per Second' and 'Total items processed' for V2.") - print(" V2's higher latency is architectural (batching) not a performance issue.") - print() - - tree_type_stats = defaultdict(lambda: { - 'operations': [], - 'total_transactions': 0, - 'total_instructions': 0, - 'total_zkp_batches': 0, - 'total_duration_ms': 0, - 'tps_values': [], - 'ips_values': [], - 'items_processed': 0 - }) - - for op in self.operation_summaries: - stats = tree_type_stats[op['tree_type']] - stats['operations'].append(op) - stats['total_transactions'] += op['transactions'] - stats['total_instructions'] += op['instructions'] - stats['total_zkp_batches'] += op['zkp_batches'] - stats['total_duration_ms'] += op['duration_ms'] - if op['tps'] > 0: - stats['tps_values'].append(op['tps']) - if op['ips'] > 0: - stats['ips_values'].append(op['ips']) - stats['items_processed'] += op['items_processed'] - - for tree_type, stats in sorted(tree_type_stats.items()): - print(f"\n{tree_type}:") - print(f" Operations: {len(stats['operations'])}") - print(f" Total transactions: {stats['total_transactions']}") - print(f" Total instructions: {stats['total_instructions']}") - print(f" Total ZKP batches: {stats['total_zkp_batches']}") - print(f" Total items processed: {stats['items_processed']}") - print(f" Total processing time: {stats['total_duration_ms']/1000:.2f}s") - - if stats['tps_values']: - print(f" TPS - Min: {min(stats['tps_values']):.2f}, Max: {max(stats['tps_values']):.2f}, Mean: {statistics.mean(stats['tps_values']):.2f}") - if stats['ips_values']: - print(f" IPS - Min: {min(stats['ips_values']):.2f}, Max: {max(stats['ips_values']):.2f}, Mean: {statistics.mean(stats['ips_values']):.2f}") - - # Calculate aggregate rates - if stats['total_duration_ms'] > 0: - aggregate_tps = stats['total_transactions'] / (stats['total_duration_ms'] / 1000) - aggregate_ips = stats['total_instructions'] / (stats['total_duration_ms'] / 1000) - print(f" Aggregate TPS: {aggregate_tps:.2f}") - print(f" Aggregate IPS: {aggregate_ips:.2f}") - - # For V2 trees, show Items Processed Per Second (more meaningful than TPS) - if 'V2' in tree_type and stats['items_processed'] > 0: - items_per_second = stats['items_processed'] / (stats['total_duration_ms'] / 1000) - print(f" *** Items Processed Per Second (IPPS): {items_per_second:.2f} ***") - print(f" ^ This is the meaningful throughput metric for V2 (actual tree updates/sec)") - - # Show batching efficiency - if stats['total_zkp_batches'] > 0: - avg_items_per_batch = stats['items_processed'] / stats['total_zkp_batches'] - print(f" Avg items per ZKP batch: {avg_items_per_batch:.1f}") - - def generate_report(self) -> None: - """Generate comprehensive TPS analysis report.""" - self.print_summary_stats() - self.print_tree_type_analysis() - print("\n" + "="*80) - -def main(): - parser = argparse.ArgumentParser(description='Analyze forester performance metrics (V1 & V2) - Focus on IPPS for V2') - parser.add_argument('logfile', nargs='?', default='-', help='Log file to analyze') - parser.add_argument('--tree-type', help='Filter to specific tree type') - - args = parser.parse_args() - - analyzer = V2TpsAnalyzer() - - # Read and parse log file - if args.logfile == '-': - log_file = sys.stdin - else: - log_file = open(args.logfile, 'r') - - try: - for line in log_file: - if 'TPS_METRIC' not in line: # Match both V1 and V2 - continue - - analyzer.parse_log_line(line) - finally: - if args.logfile != '-': - log_file.close() - - # Apply filters - if args.tree_type: - analyzer.operation_summaries = [op for op in analyzer.operation_summaries if op['tree_type'] == args.tree_type] - analyzer.transactions = [tx for tx in analyzer.transactions if tx['tree_type'] == args.tree_type] - - # Generate report - analyzer.generate_report() - -if __name__ == '__main__': - main() \ No newline at end of file diff --git a/forester/src/cli.rs b/forester/src/cli.rs index 00b8e039cb..1ef32d2b87 100644 --- a/forester/src/cli.rs +++ b/forester/src/cli.rs @@ -80,10 +80,6 @@ pub struct StartArgs { #[arg(long, env = "FORESTER_LEGACY_XS_PER_TX", default_value = "1")] pub legacy_ixs_per_tx: usize, - - #[arg(long, env = "FORESTER_BATCH_IXS_PER_TX", default_value = "4")] - pub batch_ixs_per_tx: usize, - #[arg( long, env = "FORESTER_TRANSACTION_MAX_CONCURRENT_BATCHES", @@ -206,6 +202,13 @@ pub struct StartArgs { help = "Processor mode: v1 (process only v1 trees), v2 (process only v2 trees), all (process all trees)" )] pub processor_mode: ProcessorMode, + + #[arg( + long, + env = "FORESTER_TREE_ID", + help = "Process only the specified tree (Pubkey). If specified, forester will process only this tree and ignore all others" + )] + pub tree_id: Option, } #[derive(Parser, Clone, Debug)] diff --git a/forester/src/config.rs b/forester/src/config.rs index 452ca58259..b757f51638 100644 --- a/forester/src/config.rs +++ b/forester/src/config.rs @@ -64,7 +64,6 @@ pub struct IndexerConfig { #[derive(Debug, Clone)] pub struct TransactionConfig { pub legacy_ixs_per_tx: usize, - pub batch_ixs_per_tx: usize, pub max_concurrent_batches: usize, pub cu_limit: u32, pub enable_priority_fees: bool, @@ -81,6 +80,7 @@ pub struct GeneralConfig { pub skip_v1_address_trees: bool, pub skip_v2_state_trees: bool, pub skip_v2_address_trees: bool, + pub tree_id: Option, } impl Default for GeneralConfig { @@ -93,6 +93,7 @@ impl Default for GeneralConfig { skip_v1_address_trees: false, skip_v2_state_trees: false, skip_v2_address_trees: false, + tree_id: None, } } } @@ -107,6 +108,7 @@ impl GeneralConfig { skip_v1_address_trees: true, skip_v2_state_trees: true, skip_v2_address_trees: false, + tree_id: None, } } @@ -119,6 +121,7 @@ impl GeneralConfig { skip_v1_address_trees: true, skip_v2_state_trees: false, skip_v2_address_trees: true, + tree_id: None, } } } @@ -157,7 +160,6 @@ impl Default for TransactionConfig { fn default() -> Self { Self { legacy_ixs_per_tx: 1, - batch_ixs_per_tx: 3, max_concurrent_batches: 20, cu_limit: 1_000_000, enable_priority_fees: false, @@ -251,7 +253,6 @@ impl ForesterConfig { }, transaction_config: TransactionConfig { legacy_ixs_per_tx: args.legacy_ixs_per_tx, - batch_ixs_per_tx: args.batch_ixs_per_tx, max_concurrent_batches: args.transaction_max_concurrent_batches, cu_limit: args.cu_limit, enable_priority_fees: args.enable_priority_fees, @@ -266,6 +267,10 @@ impl ForesterConfig { skip_v2_state_trees: args.processor_mode == ProcessorMode::V1, skip_v1_address_trees: args.processor_mode == ProcessorMode::V2, skip_v2_address_trees: args.processor_mode == ProcessorMode::V1, + tree_id: args + .tree_id + .as_ref() + .and_then(|id| Pubkey::from_str(id).ok()), }, rpc_pool_config: RpcPoolConfig { max_size: args.rpc_pool_size, @@ -320,6 +325,7 @@ impl ForesterConfig { skip_v2_state_trees: false, skip_v1_address_trees: false, skip_v2_address_trees: false, + tree_id: None, }, rpc_pool_config: RpcPoolConfig { max_size: 10, diff --git a/forester/src/epoch_manager.rs b/forester/src/epoch_manager.rs index 15fb1a53c8..e6ddeb0669 100644 --- a/forester/src/epoch_manager.rs +++ b/forester/src/epoch_manager.rs @@ -20,7 +20,7 @@ use light_client::{ }; use light_compressed_account::TreeType; use light_registry::{ - protocol_config::state::ProtocolConfig, + protocol_config::state::{EpochState, ProtocolConfig}, sdk::{create_finalize_registration_instruction, create_report_work_instruction}, utils::{get_epoch_pda_address, get_forester_epoch_pda_from_authority}, EpochPda, ForesterEpochPda, @@ -106,6 +106,7 @@ pub struct EpochManager { new_tree_sender: broadcast::Sender, tx_cache: Arc>, ops_cache: Arc>, + registration_cache: Arc>, } impl Clone for EpochManager { @@ -122,6 +123,7 @@ impl Clone for EpochManager { new_tree_sender: self.new_tree_sender.clone(), tx_cache: self.tx_cache.clone(), ops_cache: self.ops_cache.clone(), + registration_cache: self.registration_cache.clone(), } } } @@ -151,6 +153,7 @@ impl EpochManager { new_tree_sender, tx_cache, ops_cache, + registration_cache: Arc::new(DashMap::new()), }) } @@ -365,6 +368,7 @@ impl EpochManager { "Next epoch {} registration phase started, sending for processing", next_epoch ); + // Only send to the main processing channel, don't spawn a separate task if let Err(e) = tx.send(next_epoch).await { error!( "Failed to send next epoch {} for processing: {:?}", @@ -428,9 +432,38 @@ impl EpochManager { tx.send(previous_epoch).await?; } - // Process current epoch - debug!("Processing current epoch: {}", current_epoch); - tx.send(current_epoch).await?; + // Only process current epoch if we can still register or are already registered + // If registration has ended and we haven't registered, skip it to avoid errors + if slot < current_phases.registration.end { + debug!( + "Processing current epoch: {} (registration still open)", + current_epoch + ); + tx.send(current_epoch).await?; + } else { + // Check if we're already registered for this epoch + let forester_epoch_pda_pubkey = get_forester_epoch_pda_from_authority( + &self.config.derivation_pubkey, + current_epoch, + ) + .0; + let rpc = self.rpc_pool.get_connection().await?; + if let Ok(Some(_)) = rpc + .get_anchor_account::(&forester_epoch_pda_pubkey) + .await + { + debug!( + "Processing current epoch: {} (already registered)", + current_epoch + ); + tx.send(current_epoch).await?; + } else { + warn!( + "Skipping current epoch {} - registration ended at slot {} (current slot: {})", + current_epoch, current_phases.registration.end, slot + ); + } + } debug!("Finished processing current and previous epochs"); Ok(()) @@ -456,15 +489,72 @@ impl EpochManager { } let phases = get_epoch_phases(&self.protocol_config, epoch); - // Attempt to recover registration info - debug!("Recovering registration info for epoch {}", epoch); - let mut registration_info = match self.recover_registration_info(epoch).await { - Ok(info) => info, - Err(e) => { - warn!("Failed to recover registration info: {:?}", e); - // If recovery fails, attempt to register - self.register_for_epoch_with_retry(epoch, 100, Duration::from_millis(1000)) - .await? + // First check the cache for registration info + let mut registration_info = if let Some(cached_info) = self.registration_cache.get(&epoch) { + debug!("Using cached registration info for epoch {}", epoch); + cached_info.clone() + } else { + // Attempt to recover registration info + debug!("Recovering registration info for epoch {}", epoch); + match self.recover_registration_info(epoch).await { + Ok(info) => { + // Cache the recovered info + self.registration_cache.insert(epoch, info.clone()); + info + } + Err(e) => { + warn!("Failed to recover registration info: {:?}", e); + // Check if we're still in registration window + let current_slot = self.slot_tracker.estimated_current_slot(); + if current_slot >= phases.registration.end { + info!( + "Registration period ended for epoch {} (current slot: {}, registration ended at: {}). Will retry when next epoch registration opens.", + epoch, current_slot, phases.registration.end + ); + return Ok(()); + } + // If recovery fails and we're still in registration window, wait for parallel registration to complete + // or attempt registration if it hasn't been started + tokio::time::sleep(Duration::from_millis(500)).await; + + // Check cache again after waiting + if let Some(cached_info) = self.registration_cache.get(&epoch) { + debug!( + "Found cached registration info after waiting for epoch {}", + epoch + ); + cached_info.clone() + } else { + // Last resort: try to register + match self + .register_for_epoch(epoch, Some((10, Duration::from_millis(500)))) + .await + { + Ok(info) => { + self.registration_cache.insert(epoch, info.clone()); + info + } + Err(e) => { + // Check if this is a RegistrationPhaseEnded error by downcasting + if let Some(ForesterError::Registration( + RegistrationError::RegistrationPhaseEnded { + epoch: failed_epoch, + current_slot, + registration_end, + }, + )) = e.downcast_ref::() + { + info!( + "Registration period ended for epoch {} (current slot: {}, registration ended at: {}). Will retry when next epoch registration opens.", + failed_epoch, current_slot, registration_end + ); + return Ok(()); + } + return Err(e); + } + } + } + } } }; debug!("Recovered registration info for epoch {}", epoch); @@ -505,78 +595,81 @@ impl EpochManager { #[instrument(level = "debug", skip(self), fields(forester = %self.config.payer_keypair.pubkey(), epoch = epoch ))] - async fn register_for_epoch_with_retry( + async fn register_for_epoch( &self, epoch: u64, - max_retries: u32, - retry_delay: Duration, + retry_config: Option<(u32, Duration)>, ) -> Result { - let rpc = LightClient::new(LightClientConfig { - url: self.config.external_services.rpc_url.to_string(), - photon_url: self.config.external_services.indexer_url.clone(), - api_key: self.config.external_services.photon_api_key.clone(), - commitment_config: None, - fetch_active_tree: false, - }) - .await?; - let slot = rpc.get_slot().await?; - let phases = get_epoch_phases(&self.protocol_config, epoch); - - // Check if it's already too late to register - if slot >= phases.registration.end { - return Err(RegistrationError::RegistrationPhaseEnded { - epoch, - current_slot: slot, - registration_end: phases.registration.end, + // If retry config is provided, implement retry logic + if let Some((max_retries, retry_delay)) = retry_config { + let rpc = LightClient::new(LightClientConfig { + url: self.config.external_services.rpc_url.to_string(), + photon_url: self.config.external_services.indexer_url.clone(), + api_key: self.config.external_services.photon_api_key.clone(), + commitment_config: None, + fetch_active_tree: false, + }) + .await?; + let slot = rpc.get_slot().await?; + let phases = get_epoch_phases(&self.protocol_config, epoch); + + // Check if it's already too late to register + if slot >= phases.registration.end { + return Err(RegistrationError::RegistrationPhaseEnded { + epoch, + current_slot: slot, + registration_end: phases.registration.end, + } + .into()); } - .into()); - } - for attempt in 0..max_retries { - match self.register_for_epoch(epoch).await { - Ok(registration_info) => return Ok(registration_info), - Err(e) => { - warn!( - "Failed to register for epoch {} (attempt {}): {:?}", - epoch, - attempt + 1, - e - ); - if attempt < max_retries - 1 { - sleep(retry_delay).await; - } else { - if let Some(pagerduty_key) = - self.config.external_services.pagerduty_routing_key.clone() - { - if let Err(alert_err) = send_pagerduty_alert( - &pagerduty_key, - &format!( - "Forester failed to register for epoch {} after {} attempts", - epoch, max_retries - ), - "critical", - &format!("Forester {}", self.config.payer_keypair.pubkey()), - ) - .await + for attempt in 0..max_retries { + match self.register_for_epoch_internal(epoch).await { + Ok(registration_info) => return Ok(registration_info), + Err(e) => { + warn!( + "Failed to register for epoch {} (attempt {}): {:?}", + epoch, + attempt + 1, + e + ); + if attempt < max_retries - 1 { + sleep(retry_delay).await; + } else { + if let Some(pagerduty_key) = + self.config.external_services.pagerduty_routing_key.clone() { - error!("Failed to send PagerDuty alert: {:?}", alert_err); + if let Err(alert_err) = send_pagerduty_alert( + &pagerduty_key, + &format!( + "Forester failed to register for epoch {} after {} attempts", + epoch, max_retries + ), + "critical", + &format!("Forester {}", self.config.payer_keypair.pubkey()), + ) + .await + { + error!("Failed to send PagerDuty alert: {:?}", alert_err); + } } + return Err(e); } - return Err(e); } } } + Err(RegistrationError::MaxRetriesExceeded { + epoch, + attempts: max_retries, + } + .into()) + } else { + // No retry config, just call the internal function once + self.register_for_epoch_internal(epoch).await } - Err(RegistrationError::MaxRetriesExceeded { - epoch, - attempts: max_retries, - } - .into()) } - #[instrument(level = "debug", skip(self), fields(forester = %self.config.payer_keypair.pubkey(), epoch = epoch - ))] - async fn register_for_epoch(&self, epoch: u64) -> Result { + async fn register_for_epoch_internal(&self, epoch: u64) -> Result { info!("Registering for epoch: {}", epoch); let mut rpc = LightClient::new(LightClientConfig { url: self.config.external_services.rpc_url.to_string(), @@ -760,20 +853,47 @@ impl EpochManager { .await?; if let Some(registration) = existing_registration { - if registration.total_epoch_weight.is_none() { - // TODO: we can put this ix into every tx of the first batch of the current active phase + // Only finalize if: + // 1. We are actually registered (registration exists and has our authority) + // 2. The total_epoch_weight hasn't been set yet (not finalized) + if registration.total_epoch_weight.is_none() + && registration.authority == self.config.derivation_pubkey + { + debug!( + "Finalizing registration for epoch {}", + epoch_info.epoch.epoch + ); let ix = create_finalize_registration_instruction( &self.config.payer_keypair.pubkey(), &self.config.derivation_pubkey, epoch_info.epoch.epoch, ); - rpc.create_and_send_transaction( - &[ix], - &self.config.payer_keypair.pubkey(), - &[&self.config.payer_keypair], - ) - .await?; + match rpc + .create_and_send_transaction( + &[ix], + &self.config.payer_keypair.pubkey(), + &[&self.config.payer_keypair], + ) + .await + { + Ok(_) => { + info!( + "Successfully finalized registration for epoch {}", + epoch_info.epoch.epoch + ); + } + Err(e) => { + warn!("Failed to finalize registration for epoch {}: {:?}. This may be normal if not registered or already finalized.", epoch_info.epoch.epoch, e); + } + } + } else if registration.total_epoch_weight.is_none() { + debug!("Skipping finalization - ForesterEpochPda exists but not for our authority"); } + } else { + debug!( + "No ForesterEpochPda found for epoch {} - not registered", + epoch_info.epoch.epoch + ); } let mut epoch_info = (*epoch_info).clone(); @@ -906,7 +1026,6 @@ impl EpochManager { epoch_pda: &ForesterEpochPda, mut tree_schedule: TreeForesterSchedule, ) -> Result<()> { - info!("enter process_queue"); let mut current_slot = self.slot_tracker.estimated_current_slot(); 'outer_slot_loop: while current_slot < epoch_info.phases.active.end { let next_slot_to_process = tree_schedule @@ -1008,6 +1127,7 @@ impl EpochManager { current_light_slot, &tree_accounts.queue, epoch_info.epoch, + epoch_info, ) .await? { @@ -1063,7 +1183,20 @@ impl EpochManager { current_light_slot: u64, queue_pubkey: &Pubkey, current_epoch_num: u64, + epoch_info: &Epoch, ) -> Result { + let current_slot = self.slot_tracker.estimated_current_slot(); + let current_phase_state = epoch_info.phases.get_current_epoch_state(current_slot); + + if current_phase_state != EpochState::Active { + trace!( + "Skipping processing: not in active phase (current phase: {:?}, slot: {})", + current_phase_state, + current_slot + ); + return Ok(false); + } + let total_epoch_weight = epoch_pda.total_epoch_weight.ok_or_else(|| { anyhow::anyhow!( "Total epoch weight not available in ForesterEpochPda for epoch {}", @@ -1104,10 +1237,6 @@ impl EpochManager { ) -> Result { match tree_accounts.tree_type { TreeType::StateV1 | TreeType::AddressV1 => { - info!( - "Processing V1 tree: {} (type: {:?}, epoch: {})", - tree_accounts.merkle_tree, tree_accounts.tree_type, epoch_info.epoch - ); self.process_v1( epoch_info, epoch_pda, @@ -1118,10 +1247,6 @@ impl EpochManager { .await } TreeType::StateV2 | TreeType::AddressV2 => { - info!( - "Processing V2 tree: {} (type: {:?}, epoch: {})", - tree_accounts.merkle_tree, tree_accounts.tree_type, epoch_info.epoch - ); self.process_v2(epoch_info, tree_accounts).await } } @@ -1186,37 +1311,35 @@ impl EpochManager { async fn process_v2(&self, epoch_info: &Epoch, tree_accounts: &TreeAccounts) -> Result { let default_prover_url = "http://127.0.0.1:3001".to_string(); - let batch_context = BatchContext { - rpc_pool: self.rpc_pool.clone(), - authority: self.config.payer_keypair.insecure_clone(), - derivation: self.config.derivation_pubkey, - epoch: epoch_info.epoch, - merkle_tree: tree_accounts.merkle_tree, - output_queue: tree_accounts.queue, - ixs_per_tx: self.config.transaction_config.batch_ixs_per_tx, - prover_append_url: self - .config + let batch_context = BatchContext::from_params( + self.rpc_pool.clone(), + self.config.payer_keypair.insecure_clone(), + self.config.derivation_pubkey, + epoch_info.epoch, + tree_accounts.merkle_tree, + tree_accounts.queue, + self.config .external_services .prover_append_url .clone() .unwrap_or_else(|| default_prover_url.clone()), - prover_update_url: self - .config + self.config .external_services .prover_update_url .clone() .unwrap_or_else(|| default_prover_url.clone()), - prover_address_append_url: self - .config + self.config .external_services .prover_address_append_url .clone() .unwrap_or_else(|| default_prover_url.clone()), - prover_api_key: self.config.external_services.prover_api_key.clone(), - prover_polling_interval: Duration::from_secs(1), - prover_max_wait_time: Duration::from_secs(120), - ops_cache: self.ops_cache.clone(), - }; + self.config.external_services.prover_api_key.clone(), + Duration::from_secs(1), + Duration::from_secs(600), + self.ops_cache.clone(), + epoch_info.phases.clone(), + self.slot_tracker.clone(), + ); process_batched_operations(batch_context, tree_accounts.tree_type) .await @@ -1488,24 +1611,39 @@ pub async fn run_service( let trees = { let rpc = rpc_pool.get_connection().await?; - fetch_trees(&*rpc).await? + let mut fetched_trees = fetch_trees(&*rpc).await?; + if let Some(tree_id) = config.general_config.tree_id { + fetched_trees.retain(|tree| tree.merkle_tree == tree_id); + if fetched_trees.is_empty() { + error!("Specified tree {} not found", tree_id); + return Err(anyhow::anyhow!("Specified tree {} not found", tree_id)); + } + info!("Processing only tree: {}", tree_id); + } + fetched_trees }; trace!("Fetched initial trees: {:?}", trees); let (new_tree_sender, _) = broadcast::channel(100); - let mut tree_finder = TreeFinder::new( - rpc_pool.clone(), - trees.clone(), - new_tree_sender.clone(), - Duration::from_secs(config.general_config.tree_discovery_interval_seconds), - ); + // Only run tree finder if not filtering by specific tree + let _tree_finder_handle = if config.general_config.tree_id.is_none() { + let mut tree_finder = TreeFinder::new( + rpc_pool.clone(), + trees.clone(), + new_tree_sender.clone(), + Duration::from_secs(config.general_config.tree_discovery_interval_seconds), + ); - let _tree_finder_handle = tokio::spawn(async move { - if let Err(e) = tree_finder.run().await { - error!("Tree finder error: {:?}", e); - } - }); + Some(tokio::spawn(async move { + if let Err(e) = tree_finder.run().await { + error!("Tree finder error: {:?}", e); + } + })) + } else { + info!("Tree discovery disabled when processing single tree"); + None + }; while retry_count < config.retry_config.max_retries { debug!("Creating EpochManager (attempt {})", retry_count + 1); diff --git a/forester/src/errors.rs b/forester/src/errors.rs index b0388859e9..79c6cfcd45 100644 --- a/forester/src/errors.rs +++ b/forester/src/errors.rs @@ -56,6 +56,9 @@ pub enum ForesterError { #[error("Invalid tree type: {0}")] InvalidTreeType(TreeType), + #[error("Not in active phase")] + NotInActivePhase, + #[error("Forester error: {error}")] General { error: String }, diff --git a/forester/src/lib.rs b/forester/src/lib.rs index ff001da538..006441d63b 100644 --- a/forester/src/lib.rs +++ b/forester/src/lib.rs @@ -159,7 +159,7 @@ pub async fn run_pipeline( .url(config.external_services.rpc_url.to_string()) .photon_url(config.external_services.indexer_url.clone()) .api_key(config.external_services.photon_api_key.clone()) - .commitment(CommitmentConfig::confirmed()) + .commitment(CommitmentConfig::processed()) .max_size(config.rpc_pool_config.max_size) .connection_timeout_secs(config.rpc_pool_config.connection_timeout_secs) .idle_timeout_secs(config.rpc_pool_config.idle_timeout_secs) diff --git a/forester/src/processor/tx_cache.rs b/forester/src/processor/tx_cache.rs index 4b7c7fdb63..a2b8d5d120 100644 --- a/forester/src/processor/tx_cache.rs +++ b/forester/src/processor/tx_cache.rs @@ -1,7 +1,7 @@ use std::{collections::HashMap, time::Duration}; use tokio::time::Instant; -use tracing::warn; +use tracing::{trace, warn}; #[derive(Debug, Clone)] struct CacheEntry { @@ -54,9 +54,11 @@ impl ProcessedHashCache { if let Some(entry) = self.entries.get(hash) { let age = Instant::now().duration_since(entry.timestamp); if age > Duration::from_secs(60) && age < entry.timeout { - warn!( + trace!( "Cache entry {} has been processing for {:?} (timeout: {:?})", - hash, age, entry.timeout + hash, + age, + entry.timeout ); } true diff --git a/forester/src/processor/v1/send_transaction.rs b/forester/src/processor/v1/send_transaction.rs index bed289d84d..d3b106346f 100644 --- a/forester/src/processor/v1/send_transaction.rs +++ b/forester/src/processor/v1/send_transaction.rs @@ -22,7 +22,7 @@ use solana_sdk::{ transaction::Transaction, }; use tokio::time::Instant; -use tracing::{debug, error, info, trace, warn}; +use tracing::{error, trace, warn}; use crate::{ epoch_manager::WorkItem, @@ -63,13 +63,6 @@ pub async fn send_batched_transactions( ) -> Result { let function_start_time = Instant::now(); - info!( - "V1_TPS_METRIC: operation_start tree_type={} tree={} epoch={}", - tree_accounts.tree_type, - tree_accounts.merkle_tree, - transaction_builder.epoch() - ); - let num_sent_transactions = Arc::new(AtomicUsize::new(0)); let operation_cancel_signal = Arc::new(AtomicBool::new(false)); @@ -137,7 +130,7 @@ pub async fn send_batched_transactions( trace!(tree = %tree_accounts.merkle_tree, "Built {} transactions in {:?}", transactions_to_send.len(), build_start_time.elapsed()); if Instant::now() >= data.timeout_deadline { - warn!(tree = %tree_accounts.merkle_tree, "Reached global timeout deadline after building transactions, stopping."); + trace!(tree = %tree_accounts.merkle_tree, "Reached global timeout deadline after building transactions, stopping."); break; } @@ -160,15 +153,6 @@ pub async fn send_batched_transactions( let total_sent_successfully = num_sent_transactions.load(Ordering::SeqCst); trace!(tree = %tree_accounts.merkle_tree, "Transaction sending loop finished. Total transactions sent successfully: {}", total_sent_successfully); - let total_duration = function_start_time.elapsed(); - let tps = if total_duration.as_secs_f64() > 0.0 { - total_sent_successfully as f64 / total_duration.as_secs_f64() - } else { - 0.0 - }; - - info!("V1_TPS_METRIC: operation_complete tree_type={} tree={} epoch={} transactions={} duration_ms={} tps={:.2}", tree_accounts.tree_type, tree_accounts.merkle_tree, transaction_builder.epoch(), total_sent_successfully, total_duration.as_millis(), tps); - Ok(total_sent_successfully) } @@ -227,21 +211,10 @@ async fn prepare_batch_prerequisites( }; if queue_item_data.is_empty() { - info!( - "QUEUE_METRIC: queue_empty tree_type={} tree={}", - tree_accounts.tree_type, tree_accounts.merkle_tree - ); trace!(tree = %tree_id_str, "Queue is empty, no transactions to send."); return Ok(None); // Return None to indicate no work } - info!( - "QUEUE_METRIC: queue_has_elements tree_type={} tree={} count={}", - tree_accounts.tree_type, - tree_accounts.merkle_tree, - queue_item_data.len() - ); - let (recent_blockhash, last_valid_block_height) = { let mut rpc = pool.get_connection().await.map_err(|e| { error!(tree = %tree_id_str, "Failed to get RPC for blockhash: {:?}", e); @@ -338,7 +311,7 @@ async fn execute_transaction_chunk_sending( let send_time = Instant::now(); match rpc.send_transaction_with_config(&tx, rpc_send_config).await { Ok(signature) => { - if !cancel_signal_clone.load(Ordering::SeqCst) { // Re-check before incrementing + if !cancel_signal_clone.load(Ordering::SeqCst) { num_sent_transactions_clone.fetch_add(1, Ordering::SeqCst); trace!(tx.signature = %signature, elapsed = ?send_time.elapsed(), "Transaction sent successfully"); TransactionSendResult::Success(signature) @@ -366,36 +339,29 @@ async fn execute_transaction_chunk_sending( max_concurrent_sends ); let exec_start = Instant::now(); - let results = futures::stream::iter(transaction_send_futures) + let result = futures::stream::iter(transaction_send_futures) .buffer_unordered(max_concurrent_sends) // buffer_unordered for concurrency .collect::>() .await; - trace!("Finished executing batch in {:?}", exec_start.elapsed()); - - let mut successes = 0; - let mut failures = 0; - let mut cancelled_or_timeout = 0; - for outcome in results { - match outcome { + for res in result { + match res { TransactionSendResult::Success(sig) => { - trace!(tx.signature = %sig, outcome = "SuccessInChunkSummary"); - successes += 1; + trace!(tx.signature = %sig, "Transaction confirmed sent"); } - TransactionSendResult::Failure(err, opt_sig) => { - failures += 1; - if let Some(sig) = opt_sig { - trace!(tx.signature = %sig, error = ?err, outcome = "FailureInChunkSummary"); + TransactionSendResult::Failure(err, sig_opt) => { + if let Some(sig) = sig_opt { + warn!(tx.signature = %sig, error = ?err, "Transaction failed to send"); } else { - trace!(error = ?err, outcome = "FailureInChunkSummary (no signature)"); + error!(error = ?err, "Transaction failed to send, no signature available"); } } - TransactionSendResult::Cancelled | TransactionSendResult::Timeout => { - cancelled_or_timeout += 1; + TransactionSendResult::Cancelled => { + trace!("Transaction send cancelled due to global signal or timeout"); + } + TransactionSendResult::Timeout => { + warn!("Transaction send timed out due to global timeout"); } } } - debug!( - "Chunk send summary: {} successes, {} failures, {} cancelled/timeout", - successes, failures, cancelled_or_timeout - ); + trace!("Finished executing batch in {:?}", exec_start.elapsed()); } diff --git a/forester/src/processor/v2/account_parser.rs b/forester/src/processor/v2/account_parser.rs new file mode 100644 index 0000000000..3650ae20b8 --- /dev/null +++ b/forester/src/processor/v2/account_parser.rs @@ -0,0 +1,181 @@ +use anyhow::anyhow; +use forester_utils::{ParsedMerkleTreeData, ParsedQueueData}; +use light_batched_merkle_tree::{ + batch::BatchState, merkle_tree::BatchedMerkleTreeAccount, queue::BatchedQueueAccount, +}; +use light_compressed_account::TreeType; +use solana_sdk::pubkey::Pubkey; +use tracing::{debug, trace}; + +use super::types::BatchReadyState; +use crate::{errors::ForesterError, Result}; + +pub fn parse_merkle_tree_account( + tree_type: TreeType, + merkle_tree_pubkey: &Pubkey, + account: &mut solana_sdk::account::Account, +) -> Result<(ParsedMerkleTreeData, bool)> { + let merkle_tree = match tree_type { + TreeType::AddressV2 => BatchedMerkleTreeAccount::address_from_bytes( + account.data.as_mut_slice(), + &(*merkle_tree_pubkey).into(), + ), + TreeType::StateV2 => BatchedMerkleTreeAccount::state_from_bytes( + account.data.as_mut_slice(), + &(*merkle_tree_pubkey).into(), + ), + _ => return Err(ForesterError::InvalidTreeType(tree_type).into()), + }?; + + let batch_index = merkle_tree.queue_batches.pending_batch_index; + let batch = merkle_tree + .queue_batches + .batches + .get(batch_index as usize) + .ok_or_else(|| anyhow!("Batch not found"))?; + + let num_inserted_zkps = batch.get_num_inserted_zkps(); + let current_zkp_batch_index = batch.get_current_zkp_batch_index(); + + let mut leaves_hash_chains = Vec::new(); + for i in num_inserted_zkps..current_zkp_batch_index { + leaves_hash_chains.push(merkle_tree.hash_chain_stores[batch_index as usize][i as usize]); + } + + let parsed_data = ParsedMerkleTreeData { + next_index: merkle_tree.next_index, + current_root: *merkle_tree.root_history.last().unwrap(), + root_history: merkle_tree.root_history.to_vec(), + zkp_batch_size: batch.zkp_batch_size as u16, + pending_batch_index: batch_index as u32, + num_inserted_zkps, + current_zkp_batch_index, + leaves_hash_chains, + }; + + let is_ready = batch.get_state() != BatchState::Inserted + && batch.get_current_zkp_batch_index() > batch.get_num_inserted_zkps(); + + Ok((parsed_data, is_ready)) +} + +pub fn parse_output_queue_account( + account: &mut solana_sdk::account::Account, +) -> Result<(ParsedQueueData, bool)> { + let output_queue = BatchedQueueAccount::output_from_bytes(account.data.as_mut_slice())?; + + let batch_index = output_queue.batch_metadata.pending_batch_index; + let batch = output_queue + .batch_metadata + .batches + .get(batch_index as usize) + .ok_or_else(|| anyhow!("Batch not found"))?; + + let num_inserted_zkps = batch.get_num_inserted_zkps(); + let current_zkp_batch_index = batch.get_current_zkp_batch_index(); + + let mut leaves_hash_chains = Vec::new(); + for i in num_inserted_zkps..current_zkp_batch_index { + leaves_hash_chains.push(output_queue.hash_chain_stores[batch_index as usize][i as usize]); + } + + let parsed_data = ParsedQueueData { + zkp_batch_size: output_queue.batch_metadata.zkp_batch_size as u16, + pending_batch_index: batch_index as u32, + num_inserted_zkps, + current_zkp_batch_index, + leaves_hash_chains, + }; + + let is_ready = batch.get_state() != BatchState::Inserted + && batch.get_current_zkp_batch_index() > batch.get_num_inserted_zkps(); + + Ok((parsed_data, is_ready)) +} + +pub fn determine_batch_state( + tree_type: TreeType, + merkle_tree_pubkey: Pubkey, + merkle_tree_account: Option, + output_queue_account: Option, +) -> BatchReadyState { + let (merkle_tree_data, input_ready) = if let Some(mut account) = merkle_tree_account { + match parse_merkle_tree_account(tree_type, &merkle_tree_pubkey, &mut account) { + Ok((data, ready)) => (Some(data), ready), + Err(_) => (None, false), + } + } else { + (None, false) + }; + + let (output_queue_data, output_ready) = if tree_type == TreeType::StateV2 { + if let Some(mut account) = output_queue_account { + match parse_output_queue_account(&mut account) { + Ok((data, ready)) => (Some(data), ready), + Err(_) => (None, false), + } + } else { + (None, false) + } + } else { + (None, false) + }; + + trace!( + "tree_type: {}, input_ready: {}, output_ready: {}", + tree_type, + input_ready, + output_ready + ); + + if tree_type == TreeType::AddressV2 { + return if input_ready { + if let Some(mt_data) = merkle_tree_data { + BatchReadyState::AddressReadyForAppend { + merkle_tree_data: mt_data, + } + } else { + BatchReadyState::NotReady + } + } else { + BatchReadyState::NotReady + }; + } + + match (input_ready, output_ready) { + (true, true) => { + if let (Some(mt_data), Some(oq_data)) = (merkle_tree_data, output_queue_data) { + debug!( + "Both input and output queues ready for tree {}", + merkle_tree_pubkey + ); + BatchReadyState::BothReady { + merkle_tree_data: mt_data, + output_queue_data: oq_data, + } + } else { + BatchReadyState::NotReady + } + } + (true, false) => { + if let Some(mt_data) = merkle_tree_data { + BatchReadyState::StateReadyForNullify { + merkle_tree_data: mt_data, + } + } else { + BatchReadyState::NotReady + } + } + (false, true) => { + if let (Some(mt_data), Some(oq_data)) = (merkle_tree_data, output_queue_data) { + BatchReadyState::StateReadyForAppend { + merkle_tree_data: mt_data, + output_queue_data: oq_data, + } + } else { + BatchReadyState::NotReady + } + } + (false, false) => BatchReadyState::NotReady, + } +} diff --git a/forester/src/processor/v2/address.rs b/forester/src/processor/v2/address.rs index 52477bcca5..16cdde9a84 100644 --- a/forester/src/processor/v2/address.rs +++ b/forester/src/processor/v2/address.rs @@ -9,9 +9,9 @@ use light_client::rpc::Rpc; use light_registry::account_compression_cpi::sdk::create_batch_update_address_tree_instruction; use solana_program::instruction::Instruction; use solana_sdk::signer::Signer; -use tracing::{info, instrument}; +use tracing::instrument; -use super::common::{process_stream, BatchContext, ParsedMerkleTreeData}; +use super::{common::ParsedMerkleTreeData, context::BatchContext, utils::process_stream}; use crate::Result; async fn create_stream_future( @@ -27,11 +27,10 @@ where let config = AddressUpdateConfig { rpc_pool: ctx.rpc_pool.clone(), merkle_tree_pubkey: ctx.merkle_tree, - prover_url: ctx.prover_address_append_url.clone(), - prover_api_key: ctx.prover_api_key.clone(), - polling_interval: ctx.prover_polling_interval, - max_wait_time: ctx.prover_max_wait_time, - ixs_per_tx: ctx.ixs_per_tx, + prover_url: ctx.config.prover_address_append_url.clone(), + prover_api_key: ctx.config.prover_api_key.clone(), + polling_interval: ctx.config.prover_polling_interval, + max_wait_time: ctx.config.prover_max_wait_time, }; let (stream, size) = get_address_update_instruction_stream(config, merkle_tree_data) .await @@ -45,10 +44,6 @@ pub(crate) async fn process_batch( context: &BatchContext, merkle_tree_data: ParsedMerkleTreeData, ) -> Result { - info!( - "V2_TPS_METRIC: operation_start tree_type=AddressV2 tree={} epoch={}", - context.merkle_tree, context.epoch - ); let instruction_builder = |data: &InstructionDataAddressAppendInputs| -> Instruction { let serialized_data = data.try_to_vec().unwrap(); create_batch_update_address_tree_instruction( @@ -61,12 +56,5 @@ pub(crate) async fn process_batch( }; let stream_future = create_stream_future(context, merkle_tree_data); - process_stream( - context, - stream_future, - instruction_builder, - "AddressV2", - None, - ) - .await + process_stream(context, stream_future, instruction_builder).await } diff --git a/forester/src/processor/v2/changelog_cache.rs b/forester/src/processor/v2/changelog_cache.rs new file mode 100644 index 0000000000..66dd7cde69 --- /dev/null +++ b/forester/src/processor/v2/changelog_cache.rs @@ -0,0 +1,97 @@ +use std::{ + collections::HashMap, + sync::Arc, + time::{Duration, Instant}, +}; + +use anyhow::Result; +use light_sparse_merkle_tree::changelog::ChangelogEntry; +use solana_sdk::pubkey::Pubkey; +use tokio::sync::RwLock; +use tracing::{debug, warn}; + +pub static CHANGELOG_CACHE: tokio::sync::OnceCell = + tokio::sync::OnceCell::const_new(); + +pub async fn get_changelog_cache() -> &'static ChangelogCache { + CHANGELOG_CACHE + .get_or_init(|| async { ChangelogCache::new() }) + .await +} + +struct CacheEntry { + changelogs: Vec>, + last_accessed: Instant, +} + +pub struct ChangelogCache { + entries: Arc>>, + max_entries: usize, + ttl: Duration, +} + +impl ChangelogCache { + pub fn new() -> Self { + Self { + entries: Arc::new(RwLock::new(HashMap::new())), + max_entries: 100, // Default: cache up to 100 trees + ttl: Duration::from_secs(600), // Default: 10 minute TTL + } + } + + pub async fn get_changelogs(&self, merkle_tree: &Pubkey) -> Vec> { + let mut entries = self.entries.write().await; + + // Check if entry exists and is not expired + if let Some(entry) = entries.get_mut(merkle_tree) { + if entry.last_accessed.elapsed() < self.ttl { + entry.last_accessed = Instant::now(); + return entry.changelogs.clone(); + } else { + // Entry expired, remove it + debug!("Removing expired changelog cache for {:?}", merkle_tree); + entries.remove(merkle_tree); + } + } + + Vec::new() + } + + pub async fn append_changelogs( + &self, + merkle_tree: Pubkey, + new_changelogs: Vec>, + ) -> Result<()> { + let mut entries = self.entries.write().await; + + // Evict oldest entries if at capacity + if entries.len() >= self.max_entries && !entries.contains_key(&merkle_tree) { + // Find and remove the oldest entry + if let Some(oldest_key) = entries + .iter() + .min_by_key(|(_, entry)| entry.last_accessed) + .map(|(k, _)| *k) + { + warn!("Cache full, evicting oldest entry for {:?}", oldest_key); + entries.remove(&oldest_key); + } + } + + let entry = entries.entry(merkle_tree).or_insert_with(|| CacheEntry { + changelogs: Vec::new(), + last_accessed: Instant::now(), + }); + + let count = new_changelogs.len(); + entry.changelogs.extend(new_changelogs); + entry.last_accessed = Instant::now(); + + debug!( + "Appended {} changelogs for {:?}, total entries: {}", + count, + merkle_tree, + entry.changelogs.len() + ); + Ok(()) + } +} diff --git a/forester/src/processor/v2/common.rs b/forester/src/processor/v2/common.rs index 3632305682..994a174d9c 100644 --- a/forester/src/processor/v2/common.rs +++ b/forester/src/processor/v2/common.rs @@ -1,53 +1,10 @@ -use std::{future::Future, sync::Arc, time::Duration}; - -use borsh::BorshSerialize; -use forester_utils::rpc_pool::SolanaRpcPool; pub use forester_utils::{ParsedMerkleTreeData, ParsedQueueData}; -use futures::{pin_mut, stream::StreamExt, Stream}; -use light_batched_merkle_tree::{ - batch::BatchState, merkle_tree::BatchedMerkleTreeAccount, queue::BatchedQueueAccount, -}; use light_client::rpc::Rpc; use light_compressed_account::TreeType; -use solana_sdk::{instruction::Instruction, pubkey::Pubkey, signature::Keypair, signer::Signer}; -use tokio::sync::Mutex; use tracing::{debug, error, info, trace}; -use super::{address, state}; -use crate::{errors::ForesterError, processor::tx_cache::ProcessedHashCache, Result}; - -#[derive(Debug)] -pub enum BatchReadyState { - NotReady, - AddressReadyForAppend { - merkle_tree_data: ParsedMerkleTreeData, - }, - StateReadyForAppend { - merkle_tree_data: ParsedMerkleTreeData, - output_queue_data: ParsedQueueData, - }, - StateReadyForNullify { - merkle_tree_data: ParsedMerkleTreeData, - }, -} - -#[derive(Debug)] -pub struct BatchContext { - pub rpc_pool: Arc>, - pub authority: Keypair, - pub derivation: Pubkey, - pub epoch: u64, - pub merkle_tree: Pubkey, - pub output_queue: Pubkey, - pub ixs_per_tx: usize, - pub prover_append_url: String, - pub prover_update_url: String, - pub prover_address_append_url: String, - pub prover_api_key: Option, - pub prover_polling_interval: Duration, - pub prover_max_wait_time: Duration, - pub ops_cache: Arc>, -} +use super::{account_parser, address, context::BatchContext, state, types::BatchReadyState}; +use crate::Result; #[derive(Debug)] pub struct BatchProcessor { @@ -55,113 +12,6 @@ pub struct BatchProcessor { tree_type: TreeType, } -/// Processes a stream of batched instruction data into transactions. -pub(crate) async fn process_stream( - context: &BatchContext, - stream_creator_future: FutC, - instruction_builder: impl Fn(&D) -> Instruction, - tree_type_str: &str, - operation: Option<&str>, -) -> Result -where - R: Rpc, - S: Stream>> + Send, - D: BorshSerialize, - FutC: Future> + Send, -{ - let start_time = std::time::Instant::now(); - trace!("Executing batched stream processor (hybrid)"); - - let (batch_stream, zkp_batch_size) = stream_creator_future.await?; - - if zkp_batch_size == 0 { - trace!("ZKP batch size is 0, no work to do."); - return Ok(0); - } - - pin_mut!(batch_stream); - let mut total_instructions_processed = 0; - let mut transactions_sent = 0; - - while let Some(batch_result) = batch_stream.next().await { - let instruction_batch = batch_result?; - - if instruction_batch.is_empty() { - continue; - } - - let instructions: Vec = - instruction_batch.iter().map(&instruction_builder).collect(); - - let tx_start = std::time::Instant::now(); - let signature = send_transaction_batch(context, instructions).await?; - transactions_sent += 1; - total_instructions_processed += instruction_batch.len(); - let tx_duration = tx_start.elapsed(); - - let operation_suffix = operation - .map(|op| format!(" operation={}", op)) - .unwrap_or_default(); - info!( - "V2_TPS_METRIC: transaction_sent tree_type={}{} tree={} tx_num={} signature={} instructions={} tx_duration_ms={} (hybrid)", - tree_type_str, operation_suffix, context.merkle_tree, transactions_sent, signature, instruction_batch.len(), tx_duration.as_millis() - ); - } - - if total_instructions_processed == 0 { - trace!("No instructions were processed from the stream."); - return Ok(0); - } - - let total_duration = start_time.elapsed(); - let total_items_processed = total_instructions_processed * zkp_batch_size as usize; - let tps = if total_duration.as_secs_f64() > 0.0 { - transactions_sent as f64 / total_duration.as_secs_f64() - } else { - 0.0 - }; - let ips = if total_duration.as_secs_f64() > 0.0 { - total_instructions_processed as f64 / total_duration.as_secs_f64() - } else { - 0.0 - }; - - let operation_suffix = operation - .map(|op| format!(" operation={}", op)) - .unwrap_or_default(); - info!( - "V2_TPS_METRIC: operation_complete tree_type={}{} tree={} epoch={} zkp_batches={} transactions={} instructions={} duration_ms={} tps={:.2} ips={:.2} items_processed={} (hybrid)", - tree_type_str, operation_suffix, context.merkle_tree, context.epoch, total_instructions_processed, transactions_sent, total_instructions_processed, - total_duration.as_millis(), tps, ips, total_items_processed - ); - - info!( - "Batched stream processing complete. Processed {} total items.", - total_items_processed - ); - - Ok(total_items_processed) -} - -pub(crate) async fn send_transaction_batch( - context: &BatchContext, - instructions: Vec, -) -> Result { - info!( - "Sending transaction with {} instructions...", - instructions.len() - ); - let mut rpc = context.rpc_pool.get_connection().await?; - let signature = rpc - .create_and_send_transaction( - &instructions, - &context.authority.pubkey(), - &[&context.authority], - ) - .await?; - Ok(signature.to_string()) -} - impl BatchProcessor { pub fn new(context: BatchContext, tree_type: TreeType) -> Self { Self { context, tree_type } @@ -218,7 +68,7 @@ impl BatchProcessor { self.context.merkle_tree ); let result = self - .process_state_append_hybrid(merkle_tree_data, output_queue_data) + .process_state_append(merkle_tree_data, output_queue_data) .await; if let Err(ref e) = result { error!( @@ -233,7 +83,7 @@ impl BatchProcessor { "Processing batch for nullify, tree: {}", self.context.merkle_tree ); - let result = self.process_state_nullify_hybrid(merkle_tree_data).await; + let result = self.process_state_nullify(merkle_tree_data).await; if let Err(ref e) = result { error!( "State nullify failed for tree {}: {:?}", @@ -242,6 +92,17 @@ impl BatchProcessor { } result } + BatchReadyState::BothReady { + merkle_tree_data, + output_queue_data, + } => { + info!( + "Processing both nullify and append in parallel for tree: {}", + self.context.merkle_tree + ); + self.process_state(merkle_tree_data, output_queue_data) + .await + } BatchReadyState::NotReady => { trace!( "Batch not ready for processing, tree: {}", @@ -263,6 +124,7 @@ impl BatchProcessor { .await .ok() .flatten(); + let output_queue_account = if self.tree_type == TreeType::StateV2 { rpc.get_account(self.context.output_queue) .await @@ -272,122 +134,15 @@ impl BatchProcessor { None }; - let (merkle_tree_data, input_ready) = if let Some(mut account) = merkle_tree_account { - match self.parse_merkle_tree_account(&mut account) { - Ok((data, ready)) => (Some(data), ready), - Err(_) => (None, false), - } - } else { - (None, false) - }; - - let (output_queue_data, output_ready) = if self.tree_type == TreeType::StateV2 { - if let Some(mut account) = output_queue_account { - match self.parse_output_queue_account(&mut account) { - Ok((data, ready)) => (Some(data), ready), - Err(_) => (None, false), - } - } else { - (None, false) - } - } else { - (None, false) - }; - - trace!( - "self.tree_type: {}, input_ready: {}, output_ready: {}", + account_parser::determine_batch_state( self.tree_type, - input_ready, - output_ready - ); - - if !input_ready && !output_ready { - info!( - "QUEUE_METRIC: queue_empty tree_type={} tree={}", - self.tree_type, self.context.merkle_tree - ); - } else { - info!("QUEUE_METRIC: queue_has_elements tree_type={} tree={} input_ready={} output_ready={}", - self.tree_type, self.context.merkle_tree, input_ready, output_ready); - } - - if self.tree_type == TreeType::AddressV2 { - return if input_ready { - if let Some(mt_data) = merkle_tree_data { - BatchReadyState::AddressReadyForAppend { - merkle_tree_data: mt_data, - } - } else { - BatchReadyState::NotReady - } - } else { - BatchReadyState::NotReady - }; - } - - // For State tree type, balance appends and nullifies operations - // based on the queue states - match (input_ready, output_ready) { - (true, true) => { - if let (Some(mt_data), Some(oq_data)) = (merkle_tree_data, output_queue_data) { - // If both queues are ready, check their fill levels - let input_fill = Self::calculate_completion_from_parsed( - mt_data.num_inserted_zkps, - mt_data.current_zkp_batch_index, - ); - let output_fill = Self::calculate_completion_from_parsed( - oq_data.num_inserted_zkps, - oq_data.current_zkp_batch_index, - ); - - trace!( - "Input queue fill: {:.2}, Output queue fill: {:.2}", - input_fill, - output_fill - ); - if input_fill > output_fill { - BatchReadyState::StateReadyForNullify { - merkle_tree_data: mt_data, - } - } else { - BatchReadyState::StateReadyForAppend { - merkle_tree_data: mt_data, - output_queue_data: oq_data, - } - } - } else { - BatchReadyState::NotReady - } - } - (true, false) => { - if let Some(mt_data) = merkle_tree_data { - BatchReadyState::StateReadyForNullify { - merkle_tree_data: mt_data, - } - } else { - BatchReadyState::NotReady - } - } - (false, true) => { - if let (Some(mt_data), Some(oq_data)) = (merkle_tree_data, output_queue_data) { - BatchReadyState::StateReadyForAppend { - merkle_tree_data: mt_data, - output_queue_data: oq_data, - } - } else { - BatchReadyState::NotReady - } - } - (false, false) => BatchReadyState::NotReady, - } + self.context.merkle_tree, + merkle_tree_account, + output_queue_account, + ) } - async fn process_state_nullify_hybrid( - &self, - merkle_tree_data: ParsedMerkleTreeData, - ) -> Result { - let zkp_batch_size = merkle_tree_data.zkp_batch_size as usize; - + async fn process_state_nullify(&self, merkle_tree_data: ParsedMerkleTreeData) -> Result { let batch_hash = format!( "state_nullify_hybrid_{}_{}", self.context.merkle_tree, self.context.epoch @@ -405,7 +160,17 @@ impl BatchProcessor { cache.add(&batch_hash); } - state::perform_nullify(&self.context, merkle_tree_data).await?; + let empty_append_data = ParsedQueueData { + zkp_batch_size: 0, + pending_batch_index: 0, + num_inserted_zkps: 0, + current_zkp_batch_index: 0, + leaves_hash_chains: Vec::new(), // No append operations + }; + + let result = self + .process_state(merkle_tree_data, empty_append_data) + .await; trace!( "State nullify operation (hybrid) completed for tree: {}", @@ -415,16 +180,14 @@ impl BatchProcessor { cache.cleanup_by_key(&batch_hash); trace!("Cache cleaned up for batch: {}", batch_hash); - Ok(zkp_batch_size) + result } - async fn process_state_append_hybrid( + async fn process_state_append( &self, merkle_tree_data: ParsedMerkleTreeData, output_queue_data: ParsedQueueData, ) -> Result { - let zkp_batch_size = output_queue_data.zkp_batch_size as usize; - let batch_hash = format!( "state_append_hybrid_{}_{}", self.context.merkle_tree, self.context.epoch @@ -440,7 +203,21 @@ impl BatchProcessor { } cache.add(&batch_hash); } - state::perform_append(&self.context, merkle_tree_data, output_queue_data).await?; + let empty_nullify_data = ParsedMerkleTreeData { + next_index: merkle_tree_data.next_index, + current_root: merkle_tree_data.current_root, + root_history: merkle_tree_data.root_history.clone(), + zkp_batch_size: merkle_tree_data.zkp_batch_size, + pending_batch_index: merkle_tree_data.pending_batch_index, + num_inserted_zkps: merkle_tree_data.num_inserted_zkps, + current_zkp_batch_index: merkle_tree_data.current_zkp_batch_index, + leaves_hash_chains: Vec::new(), // No nullify operations + }; + + let result = self + .process_state(empty_nullify_data, output_queue_data) + .await; + trace!( "State append operation (hybrid) completed for tree: {}", self.context.merkle_tree @@ -449,106 +226,47 @@ impl BatchProcessor { let mut cache = self.context.ops_cache.lock().await; cache.cleanup_by_key(&batch_hash); - Ok(zkp_batch_size) + result } - /// Parse merkle tree account and check if batch is ready - fn parse_merkle_tree_account( + async fn process_state( &self, - account: &mut solana_sdk::account::Account, - ) -> Result<(ParsedMerkleTreeData, bool)> { - let merkle_tree = match self.tree_type { - TreeType::AddressV2 => BatchedMerkleTreeAccount::address_from_bytes( - account.data.as_mut_slice(), - &self.context.merkle_tree.into(), - ), - TreeType::StateV2 => BatchedMerkleTreeAccount::state_from_bytes( - account.data.as_mut_slice(), - &self.context.merkle_tree.into(), - ), - _ => return Err(ForesterError::InvalidTreeType(self.tree_type).into()), - }?; - - let batch_index = merkle_tree.queue_batches.pending_batch_index; - let batch = merkle_tree - .queue_batches - .batches - .get(batch_index as usize) - .ok_or_else(|| anyhow::anyhow!("Batch not found"))?; - - let num_inserted_zkps = batch.get_num_inserted_zkps(); - let current_zkp_batch_index = batch.get_current_zkp_batch_index(); - - let mut leaves_hash_chains = Vec::new(); - for i in num_inserted_zkps..current_zkp_batch_index { - leaves_hash_chains - .push(merkle_tree.hash_chain_stores[batch_index as usize][i as usize]); - } + merkle_tree_data: ParsedMerkleTreeData, + output_queue_data: ParsedQueueData, + ) -> Result { + info!("Processing state operations with hybrid approach: sequential changelogs, parallel proofs"); - let parsed_data = ParsedMerkleTreeData { - next_index: merkle_tree.next_index, - current_root: *merkle_tree.root_history.last().unwrap(), - root_history: merkle_tree.root_history.to_vec(), - zkp_batch_size: batch.zkp_batch_size as u16, - pending_batch_index: batch_index as u32, - num_inserted_zkps, - current_zkp_batch_index, - leaves_hash_chains, - }; + let _ = super::changelog_cache::get_changelog_cache().await; - let is_ready = batch.get_state() != BatchState::Inserted - && batch.get_current_zkp_batch_index() > batch.get_num_inserted_zkps(); + let (nullify_proofs, append_proofs) = + state::generate_state_inputs(&self.context, merkle_tree_data, output_queue_data) + .await?; - Ok((parsed_data, is_ready)) - } + let mut success_count = 0; - /// Parse output queue account and check if batch is ready - fn parse_output_queue_account( - &self, - account: &mut solana_sdk::account::Account, - ) -> Result<(ParsedQueueData, bool)> { - let output_queue = BatchedQueueAccount::output_from_bytes(account.data.as_mut_slice())?; - - let batch_index = output_queue.batch_metadata.pending_batch_index; - let batch = output_queue - .batch_metadata - .batches - .get(batch_index as usize) - .ok_or_else(|| anyhow::anyhow!("Batch not found"))?; - - let num_inserted_zkps = batch.get_num_inserted_zkps(); - let current_zkp_batch_index = batch.get_current_zkp_batch_index(); - - let mut leaves_hash_chains = Vec::new(); - for i in num_inserted_zkps..current_zkp_batch_index { - leaves_hash_chains - .push(output_queue.hash_chain_stores[batch_index as usize][i as usize]); + if let Err(e) = state::submit_nullify_transaction(&self.context, nullify_proofs).await { + error!("Nullify transaction failed: {:?}", e); + return Err(anyhow::anyhow!( + "Cannot proceed with append after nullify failure: {:?}", + e + )); + } else { + success_count += 1; + debug!("Nullify transaction completed successfully"); } - let parsed_data = ParsedQueueData { - zkp_batch_size: output_queue.batch_metadata.zkp_batch_size as u16, - pending_batch_index: batch_index as u32, - num_inserted_zkps, - current_zkp_batch_index, - leaves_hash_chains, - }; - - let is_ready = batch.get_state() != BatchState::Inserted - && batch.get_current_zkp_batch_index() > batch.get_num_inserted_zkps(); + if let Err(e) = state::submit_append_transaction(&self.context, append_proofs).await { + error!("Append transaction failed: {:?}", e); + } else { + success_count += 1; + debug!("Append transaction completed successfully"); + } - Ok((parsed_data, is_ready)) - } + info!( + "Processing completed for tree {}, {} operations succeeded", + self.context.merkle_tree, success_count + ); - /// Calculate completion percentage from parsed data - fn calculate_completion_from_parsed( - num_inserted_zkps: u64, - current_zkp_batch_index: u64, - ) -> f64 { - let total = current_zkp_batch_index; - if total == 0 { - return 0.0; - } - let remaining = total - num_inserted_zkps; - remaining as f64 / total as f64 + Ok(success_count) } } diff --git a/forester/src/processor/v2/context.rs b/forester/src/processor/v2/context.rs new file mode 100644 index 0000000000..9fc591b581 --- /dev/null +++ b/forester/src/processor/v2/context.rs @@ -0,0 +1,94 @@ +use std::sync::Arc; + +use forester_utils::{forester_epoch::EpochPhases, rpc_pool::SolanaRpcPool}; +use light_client::rpc::Rpc; +use solana_sdk::{pubkey::Pubkey, signature::Keypair}; +use tokio::sync::Mutex; + +use super::types::BatchProcessorConfig; +use crate::{processor::tx_cache::ProcessedHashCache, slot_tracker::SlotTracker}; + +#[derive(Debug)] +pub struct BatchContext { + pub rpc_pool: Arc>, + pub authority: Keypair, + pub derivation: Pubkey, + pub epoch: u64, + pub merkle_tree: Pubkey, + pub output_queue: Pubkey, + pub config: BatchProcessorConfig, + pub ops_cache: Arc>, + pub epoch_phases: EpochPhases, + pub slot_tracker: Arc, +} + +impl BatchContext { + #[allow(clippy::too_many_arguments)] + pub fn new( + rpc_pool: Arc>, + authority: Keypair, + derivation: Pubkey, + epoch: u64, + merkle_tree: Pubkey, + output_queue: Pubkey, + config: BatchProcessorConfig, + ops_cache: Arc>, + epoch_phases: EpochPhases, + slot_tracker: Arc, + ) -> Self { + Self { + rpc_pool, + authority, + derivation, + epoch, + merkle_tree, + output_queue, + config, + ops_cache, + epoch_phases, + slot_tracker, + } + } + + /// Create a new BatchContext with individual prover parameters (for backward compatibility) + #[allow(clippy::too_many_arguments)] + pub fn from_params( + rpc_pool: Arc>, + authority: Keypair, + derivation: Pubkey, + epoch: u64, + merkle_tree: Pubkey, + output_queue: Pubkey, + prover_append_url: String, + prover_update_url: String, + prover_address_append_url: String, + prover_api_key: Option, + prover_polling_interval: std::time::Duration, + prover_max_wait_time: std::time::Duration, + ops_cache: Arc>, + epoch_phases: EpochPhases, + slot_tracker: Arc, + ) -> Self { + let config = BatchProcessorConfig { + prover_append_url, + prover_update_url, + prover_address_append_url, + prover_api_key, + prover_polling_interval, + prover_max_wait_time, + }; + + Self::new( + rpc_pool, + authority, + derivation, + epoch, + merkle_tree, + output_queue, + config, + ops_cache, + epoch_phases, + slot_tracker, + ) + } +} diff --git a/forester/src/processor/v2/mod.rs b/forester/src/processor/v2/mod.rs index 6e660ba4e4..d072b46c7b 100644 --- a/forester/src/processor/v2/mod.rs +++ b/forester/src/processor/v2/mod.rs @@ -1,6 +1,11 @@ +mod account_parser; mod address; +mod changelog_cache; mod common; +mod context; mod state; +mod types; +mod utils; use common::BatchProcessor; use light_client::rpc::Rpc; @@ -26,5 +31,5 @@ pub async fn process_batched_operations( processor.process().await } -pub use common::BatchContext; +pub use context::BatchContext; use light_compressed_account::TreeType; diff --git a/forester/src/processor/v2/state.rs b/forester/src/processor/v2/state.rs index 9f767a3124..76ec4879af 100644 --- a/forester/src/processor/v2/state.rs +++ b/forester/src/processor/v2/state.rs @@ -1,148 +1,464 @@ -use anyhow::{Error, Ok}; +use std::sync::Arc; + +use anyhow::anyhow; use borsh::BorshSerialize; -use forester_utils::instructions::{ - state_batch_append::get_append_instruction_stream, - state_batch_nullify::get_nullify_instruction_stream, +use forester_utils::{error::ForesterUtilsError, ParsedMerkleTreeData, ParsedQueueData}; +use futures::future::join_all; +use light_batched_merkle_tree::{ + constants::DEFAULT_BATCH_STATE_TREE_HEIGHT, + merkle_tree::{InstructionDataBatchAppendInputs, InstructionDataBatchNullifyInputs}, }; -use futures::stream::{Stream, StreamExt}; -use light_batched_merkle_tree::merkle_tree::{ - InstructionDataBatchAppendInputs, InstructionDataBatchNullifyInputs, +use light_client::{indexer::Indexer, rpc::Rpc}; +use light_compressed_account::instruction_data::compressed_proof::CompressedProof; +use light_merkle_tree_metadata::QueueType; +use light_prover_client::{ + proof_client::ProofClient, + proof_types::{ + batch_append::{get_batch_append_inputs, BatchAppendsCircuitInputs}, + batch_update::{get_batch_update_inputs, BatchUpdateCircuitInputs}, + }, }; -use light_client::rpc::Rpc; use light_registry::account_compression_cpi::sdk::{ create_batch_append_instruction, create_batch_nullify_instruction, }; -use solana_program::instruction::Instruction; +use light_sparse_merkle_tree::changelog::ChangelogEntry; use solana_sdk::signer::Signer; -use tracing::{info, instrument}; +use tracing::{debug, info, instrument}; -use super::common::{process_stream, BatchContext, ParsedMerkleTreeData, ParsedQueueData}; +use super::{ + changelog_cache, context::BatchContext, types::StateConfig, utils::send_transaction_batch, +}; use crate::Result; -async fn create_nullify_stream_future( - ctx: &BatchContext, - merkle_tree_data: ParsedMerkleTreeData, -) -> Result<( - impl Stream>> + Send, - u16, -)> -where - R: Rpc, -{ - let (stream, size) = get_nullify_instruction_stream( - ctx.rpc_pool.clone(), - ctx.merkle_tree, - ctx.prover_update_url.clone(), - ctx.prover_api_key.clone(), - ctx.prover_polling_interval, - ctx.prover_max_wait_time, - merkle_tree_data, - ctx.ixs_per_tx, - ) - .await - .map_err(Error::from)?; - let stream = stream.map(|item| item.map_err(Error::from)); - Ok((stream, size)) -} - -async fn create_append_stream_future( - ctx: &BatchContext, +#[instrument( + level = "debug", + skip(context, merkle_tree_data, output_queue_data), + fields(merkle_tree = ?context.merkle_tree) +)] +pub(crate) async fn generate_state_inputs( + context: &BatchContext, merkle_tree_data: ParsedMerkleTreeData, output_queue_data: ParsedQueueData, ) -> Result<( - impl Stream>> + Send, - u16, -)> -where - R: Rpc, -{ - let (stream, size) = get_append_instruction_stream( - ctx.rpc_pool.clone(), - ctx.merkle_tree, - ctx.prover_append_url.clone(), - ctx.prover_api_key.clone(), - ctx.prover_polling_interval, - ctx.prover_max_wait_time, - merkle_tree_data, - output_queue_data, - ctx.ixs_per_tx, - ) - .await - .map_err(Error::from)?; - let stream = stream.map(|item| item.map_err(Error::from)); - Ok((stream, size)) + Vec, + Vec, +)> { + info!("Preparing proofs with sequential changelog calculation and parallel proof generation"); + + let state_config = StateConfig { + rpc_pool: context.rpc_pool.clone(), + merkle_tree_pubkey: context.merkle_tree, + output_queue_pubkey: context.output_queue, + nullify_prover_url: context.config.prover_update_url.clone(), + append_prover_url: context.config.prover_append_url.clone(), + prover_api_key: context.config.prover_api_key.clone(), + polling_interval: context.config.prover_polling_interval, + max_wait_time: context.config.prover_max_wait_time, + }; + + generate_proofs_with_changelogs(state_config, merkle_tree_data, output_queue_data).await } +/// Submit nullify transactions with pre-generated proofs +/// Each proof is sent in a separate transaction to handle root updates properly #[instrument( level = "debug", - skip(context, merkle_tree_data), + skip(context, proofs), fields(merkle_tree = ?context.merkle_tree) )] -pub(crate) async fn perform_nullify( +pub(crate) async fn submit_nullify_transaction( context: &BatchContext, - merkle_tree_data: ParsedMerkleTreeData, + proofs: Vec, ) -> Result<()> { - info!( - "V2_TPS_METRIC: operation_start tree_type=StateV2 operation=nullify tree={} epoch={} (hybrid)", - context.merkle_tree, context.epoch - ); + if proofs.is_empty() { + return Ok(()); + } + + // Send each proof in a separate transaction + for (i, data) in proofs.iter().enumerate() { + debug!("Submitting nullify proof {}/{}", i + 1, proofs.len()); - let instruction_builder = |data: &InstructionDataBatchNullifyInputs| -> Instruction { - create_batch_nullify_instruction( + let instruction = create_batch_nullify_instruction( context.authority.pubkey(), context.derivation, context.merkle_tree, context.epoch, - data.try_to_vec().unwrap(), - ) - }; + data.try_to_vec()?, + ); - let stream_future = create_nullify_stream_future(context, merkle_tree_data); + send_transaction_batch(context, vec![instruction]).await?; + + // Wait for indexer to catch up before sending next transaction + if i < proofs.len() - 1 { + let rpc = context.rpc_pool.get_connection().await?; + forester_utils::utils::wait_for_indexer(&*rpc) + .await + .map_err(|e| anyhow!("Indexer wait error: {:?}", e))?; + } + } - process_stream( - context, - stream_future, - instruction_builder, - "StateV2", - Some("nullify"), - ) - .await?; Ok(()) } +/// Submit append transactions with pre-generated proofs +/// Each proof is sent in a separate transaction to handle root updates properly #[instrument( level = "debug", - skip(context, merkle_tree_data, output_queue_data), + skip(context, proofs), fields(merkle_tree = ?context.merkle_tree) )] -pub(crate) async fn perform_append( +pub(crate) async fn submit_append_transaction( context: &BatchContext, - merkle_tree_data: ParsedMerkleTreeData, - output_queue_data: ParsedQueueData, + proofs: Vec, ) -> Result<()> { - info!( - "V2_TPS_METRIC: operation_start tree_type=StateV2 operation=append tree={} epoch={} (hybrid)", - context.merkle_tree, context.epoch - ); - let instruction_builder = |data: &InstructionDataBatchAppendInputs| -> Instruction { - create_batch_append_instruction( + if proofs.is_empty() { + return Ok(()); + } + + // Send each proof in a separate transaction + for (i, data) in proofs.iter().enumerate() { + debug!("Submitting append proof {}/{}", i + 1, proofs.len()); + + let instruction = create_batch_append_instruction( context.authority.pubkey(), context.derivation, context.merkle_tree, context.output_queue, context.epoch, - data.try_to_vec().unwrap(), - ) - }; + data.try_to_vec()?, + ); + + send_transaction_batch(context, vec![instruction]).await?; + + // Wait for indexer to catch up before sending next transaction + if i < proofs.len() - 1 { + let rpc = context.rpc_pool.get_connection().await?; + forester_utils::utils::wait_for_indexer(&*rpc) + .await + .map_err(|e| anyhow!("Indexer wait error: {:?}", e))?; + } + } - let stream_future = create_append_stream_future(context, merkle_tree_data, output_queue_data); - process_stream( - context, - stream_future, - instruction_builder, - "StateV2", - Some("append"), - ) - .await?; Ok(()) } + +async fn generate_proofs_with_changelogs( + config: StateConfig, + merkle_tree_data: ParsedMerkleTreeData, + output_queue_data: ParsedQueueData, +) -> Result<( + Vec, + Vec, +)> { + info!("Preparing proofs with optimized parallel generation"); + + let nullify_zkp_batch_size = merkle_tree_data.zkp_batch_size; + let append_zkp_batch_size = output_queue_data.zkp_batch_size; + let nullify_leaves_hash_chains = merkle_tree_data.leaves_hash_chains.clone(); + let append_leaves_hash_chains = output_queue_data.leaves_hash_chains.clone(); + + // Early return if nothing to process + if nullify_leaves_hash_chains.is_empty() && append_leaves_hash_chains.is_empty() { + return Ok((Vec::new(), Vec::new())); + } + + // Step 1: Fetch queue elements in parallel for both operations + let (nullify_elements, append_elements) = { + let nullify_future = async { + if nullify_leaves_hash_chains.is_empty() { + return Ok(Vec::new()); + } + let mut connection = config.rpc_pool.get_connection().await?; + let indexer = connection.indexer_mut()?; + let total_elements = nullify_zkp_batch_size as usize * nullify_leaves_hash_chains.len(); + let offset = merkle_tree_data.num_inserted_zkps * nullify_zkp_batch_size as u64; + + let res = indexer + .get_queue_elements( + config.merkle_tree_pubkey.to_bytes(), + QueueType::InputStateV2, + total_elements as u16, + Some(offset), + None, + ) + .await?; + Ok::<_, anyhow::Error>(res.value.0) + }; + + let append_future = async { + if append_leaves_hash_chains.is_empty() { + return Ok(Vec::new()); + } + let mut connection = config.rpc_pool.get_connection().await?; + let indexer = connection.indexer_mut()?; + let total_elements = append_zkp_batch_size as usize * append_leaves_hash_chains.len(); + let offset = merkle_tree_data.next_index; + + let res = indexer + .get_queue_elements( + config.merkle_tree_pubkey.to_bytes(), + QueueType::OutputStateV2, + total_elements as u16, + Some(offset), + None, + ) + .await?; + Ok::<_, anyhow::Error>(res.value.0) + }; + + futures::join!(nullify_future, append_future) + }; + + let nullify_queue_elements = nullify_elements?; + let append_queue_elements = append_elements?; + + // Step 2: Get cached changelogs + let changelog_cache = changelog_cache::get_changelog_cache().await; + let previous_changelogs = changelog_cache + .get_changelogs(&config.merkle_tree_pubkey) + .await; + info!( + "Starting with {} cached changelogs", + previous_changelogs.len() + ); + + // Step 3: Calculate nullify changelogs first (sequential) + let mut all_changelogs: Vec> = + previous_changelogs.clone(); + let mut nullify_circuit_inputs = Vec::new(); + let mut current_root = merkle_tree_data.current_root; + + for (batch_offset, leaves_hash_chain) in nullify_leaves_hash_chains.iter().enumerate() { + let start_idx = batch_offset * nullify_zkp_batch_size as usize; + let end_idx = start_idx + nullify_zkp_batch_size as usize; + let batch_elements = &nullify_queue_elements[start_idx..end_idx]; + + let mut leaves = Vec::new(); + let mut tx_hashes = Vec::new(); + let mut old_leaves = Vec::new(); + let mut path_indices = Vec::new(); + let mut merkle_proofs = Vec::new(); + + for leaf_info in batch_elements.iter() { + path_indices.push(leaf_info.leaf_index as u32); + leaves.push(leaf_info.account_hash); + old_leaves.push(leaf_info.leaf); + merkle_proofs.push(leaf_info.proof.clone()); + tx_hashes.push(leaf_info.tx_hash.ok_or_else(|| { + anyhow!("Missing tx_hash for leaf index {}", leaf_info.leaf_index) + })?); + } + + let (circuit_inputs, batch_changelog) = + get_batch_update_inputs::<{ DEFAULT_BATCH_STATE_TREE_HEIGHT as usize }>( + current_root, // Use the current root, which gets updated after each batch + tx_hashes, + leaves, + *leaves_hash_chain, + old_leaves, + merkle_proofs, + path_indices, + nullify_zkp_batch_size as u32, + &all_changelogs, // Use accumulated changelogs + )?; + + // Update current_root to the new root from this batch for the next iteration + let new_root_bytes = circuit_inputs.new_root.to_bytes_be().1; + if new_root_bytes.len() == 32 { + current_root.copy_from_slice(&new_root_bytes); + debug!( + "Updated root after nullify batch {}: {:?}", + batch_offset, current_root + ); + } else { + // Pad or truncate to 32 bytes if necessary + current_root = [0u8; 32]; + let offset = 32usize.saturating_sub(new_root_bytes.len()); + current_root[offset..].copy_from_slice(&new_root_bytes[..new_root_bytes.len().min(32)]); + debug!( + "Updated root after nullify batch {} (padded): {:?}", + batch_offset, current_root + ); + } + + all_changelogs.extend(batch_changelog); + nullify_circuit_inputs.push(circuit_inputs); + } + + info!( + "Calculated {} nullify changelogs", + all_changelogs.len() - previous_changelogs.len() + ); + + // Step 4: Calculate append inputs with nullifies changelogs + // Continue using the current_root from where nullify left off + let mut append_circuit_inputs = Vec::new(); + + for (batch_idx, leaves_hash_chain) in append_leaves_hash_chains.iter().enumerate() { + let start_idx = batch_idx * append_zkp_batch_size as usize; + let end_idx = start_idx + append_zkp_batch_size as usize; + let batch_elements = &append_queue_elements[start_idx..end_idx]; + + let new_leaves: Vec<[u8; 32]> = batch_elements.iter().map(|x| x.account_hash).collect(); + let merkle_proofs: Vec> = + batch_elements.iter().map(|x| x.proof.clone()).collect(); + let adjusted_start_index = merkle_tree_data.next_index as u32 + + (batch_idx * append_zkp_batch_size as usize) as u32; + let old_leaves: Vec<[u8; 32]> = batch_elements.iter().map(|x| x.leaf).collect(); + + let (circuit_inputs, batch_changelog) = + get_batch_append_inputs::<{ DEFAULT_BATCH_STATE_TREE_HEIGHT as usize }>( + current_root, // Use the current root, which was updated by nullify operations + adjusted_start_index, + new_leaves, + *leaves_hash_chain, + old_leaves, + merkle_proofs, + append_zkp_batch_size as u32, + &all_changelogs, // Use changelogs including nullify's + )?; + + // Update current_root for the next append batch + let new_root_bytes = circuit_inputs.new_root.to_bytes_be().1; + if new_root_bytes.len() == 32 { + current_root.copy_from_slice(&new_root_bytes); + debug!( + "Updated root after append batch {}: {:?}", + batch_idx, current_root + ); + } else { + // Pad or truncate to 32 bytes if necessary + current_root = [0u8; 32]; + let offset = 32usize.saturating_sub(new_root_bytes.len()); + current_root[offset..].copy_from_slice(&new_root_bytes[..new_root_bytes.len().min(32)]); + debug!( + "Updated root after append batch {} (padded): {:?}", + batch_idx, current_root + ); + } + + all_changelogs.extend(batch_changelog); + append_circuit_inputs.push(circuit_inputs); + } + + info!( + "Calculated {} append changelogs", + all_changelogs.len() - previous_changelogs.len() - nullify_circuit_inputs.len() + ); + + // Step 5: Generate all proofs in parallel + let nullify_proof_client = Arc::new(ProofClient::with_config( + config.nullify_prover_url, + config.polling_interval, + config.max_wait_time, + config.prover_api_key.clone(), + )); + + let append_proof_client = Arc::new(ProofClient::with_config( + config.append_prover_url, + config.polling_interval, + config.max_wait_time, + config.prover_api_key, + )); + + // Generate nullify proofs + let mut nullify_futures = Vec::new(); + for inputs in nullify_circuit_inputs { + let client = nullify_proof_client.clone(); + nullify_futures.push(generate_nullify_zkp_proof(inputs, client)); + } + + // Generate append proofs + let mut append_futures = Vec::new(); + for inputs in append_circuit_inputs { + let client = append_proof_client.clone(); + append_futures.push(generate_append_zkp_proof(inputs, client)); + } + + info!( + "Generating {} proofs in parallel ({} nullify, {} append)", + nullify_futures.len() + append_futures.len(), + nullify_futures.len(), + append_futures.len() + ); + + // Execute all proof generation + let (nullify_results, append_results) = + futures::join!(join_all(nullify_futures), join_all(append_futures)); + + // Collect nullify proofs + let mut nullify_proofs = Vec::new(); + for result in nullify_results { + match result { + Ok(proof) => nullify_proofs.push(proof), + Err(e) => return Err(e.into()), + } + } + + // Collect append proofs + let mut append_proofs = Vec::new(); + for result in append_results { + match result { + Ok(proof) => append_proofs.push(proof), + Err(e) => return Err(e.into()), + } + } + + // Step 6: Cache the new changelogs for future use + let new_changelogs = all_changelogs + .into_iter() + .skip(previous_changelogs.len()) + .collect::>(); + if !new_changelogs.is_empty() { + changelog_cache + .append_changelogs(config.merkle_tree_pubkey, new_changelogs.clone()) + .await?; + info!( + "Cached {} new changelogs for future operations", + new_changelogs.len() + ); + } + + info!( + "Generated {} nullify and {} append proofs", + nullify_proofs.len(), + append_proofs.len() + ); + Ok((nullify_proofs, append_proofs)) +} + +async fn generate_nullify_zkp_proof( + inputs: BatchUpdateCircuitInputs, + proof_client: Arc, +) -> std::result::Result { + let (proof, new_root) = proof_client + .generate_batch_update_proof(inputs) + .await + .map_err(|e| ForesterUtilsError::Prover(e.to_string()))?; + Ok(InstructionDataBatchNullifyInputs { + new_root, + compressed_proof: CompressedProof { + a: proof.a, + b: proof.b, + c: proof.c, + }, + }) +} + +async fn generate_append_zkp_proof( + circuit_inputs: BatchAppendsCircuitInputs, + proof_client: Arc, +) -> std::result::Result { + let (proof, new_root) = proof_client + .generate_batch_append_proof(circuit_inputs) + .await + .map_err(|e| ForesterUtilsError::Prover(e.to_string()))?; + Ok(InstructionDataBatchAppendInputs { + new_root, + compressed_proof: CompressedProof { + a: proof.a, + b: proof.b, + c: proof.c, + }, + }) +} diff --git a/forester/src/processor/v2/types.rs b/forester/src/processor/v2/types.rs new file mode 100644 index 0000000000..82c678e39e --- /dev/null +++ b/forester/src/processor/v2/types.rs @@ -0,0 +1,47 @@ +use std::{sync::Arc, time::Duration}; + +use forester_utils::{rpc_pool::SolanaRpcPool, ParsedMerkleTreeData, ParsedQueueData}; +use light_client::rpc::Rpc; +use solana_sdk::pubkey::Pubkey; + +#[derive(Debug)] +pub enum BatchReadyState { + NotReady, + AddressReadyForAppend { + merkle_tree_data: ParsedMerkleTreeData, + }, + StateReadyForAppend { + merkle_tree_data: ParsedMerkleTreeData, + output_queue_data: ParsedQueueData, + }, + StateReadyForNullify { + merkle_tree_data: ParsedMerkleTreeData, + }, + BothReady { + merkle_tree_data: ParsedMerkleTreeData, + output_queue_data: ParsedQueueData, + }, +} + +#[derive(Debug, Clone)] +pub struct StateConfig { + pub rpc_pool: Arc>, + pub merkle_tree_pubkey: Pubkey, + #[allow(dead_code)] + pub output_queue_pubkey: Pubkey, + pub nullify_prover_url: String, + pub append_prover_url: String, + pub prover_api_key: Option, + pub polling_interval: Duration, + pub max_wait_time: Duration, +} + +#[derive(Debug, Clone)] +pub struct BatchProcessorConfig { + pub prover_append_url: String, + pub prover_update_url: String, + pub prover_address_append_url: String, + pub prover_api_key: Option, + pub prover_polling_interval: Duration, + pub prover_max_wait_time: Duration, +} diff --git a/forester/src/processor/v2/utils.rs b/forester/src/processor/v2/utils.rs new file mode 100644 index 0000000000..8f6d2ef5bf --- /dev/null +++ b/forester/src/processor/v2/utils.rs @@ -0,0 +1,128 @@ +use std::future::Future; + +use borsh::BorshSerialize; +use forester_utils::utils::wait_for_indexer; +use futures::{pin_mut, stream::StreamExt, Stream}; +use light_client::rpc::Rpc; +use light_registry::protocol_config::state::EpochState; +use solana_sdk::{instruction::Instruction, signer::Signer}; +use tracing::{info, trace}; + +use super::context::BatchContext; +use crate::{errors::ForesterError, Result}; + +/// Processes a stream of batched instruction data into transactions. +pub(crate) async fn process_stream( + context: &BatchContext, + stream_creator_future: FutC, + instruction_builder: impl Fn(&D) -> Instruction, +) -> Result +where + R: Rpc, + S: Stream>> + Send, + D: BorshSerialize, + FutC: Future> + Send, +{ + trace!("Executing batched stream processor (hybrid)"); + + let (batch_stream, zkp_batch_size) = stream_creator_future.await?; + + if zkp_batch_size == 0 { + trace!("ZKP batch size is 0, no work to do."); + return Ok(0); + } + + pin_mut!(batch_stream); + let mut total_instructions_processed = 0; + + while let Some(batch_result) = batch_stream.next().await { + let instruction_batch = batch_result?; + + if instruction_batch.is_empty() { + continue; + } + + let current_slot = context.slot_tracker.estimated_current_slot(); + let phase_end_slot = context.epoch_phases.active.end; + let slots_remaining = phase_end_slot.saturating_sub(current_slot); + + const MIN_SLOTS_FOR_TRANSACTION: u64 = 30; + if slots_remaining < MIN_SLOTS_FOR_TRANSACTION { + info!( + "Only {} slots remaining in active phase (need at least {}), stopping batch processing", + slots_remaining, MIN_SLOTS_FOR_TRANSACTION + ); + if !instruction_batch.is_empty() { + let instructions: Vec = + instruction_batch.iter().map(&instruction_builder).collect(); + let _ = send_transaction_batch(context, instructions).await; + } + break; + } + + let instructions: Vec = + instruction_batch.iter().map(&instruction_builder).collect(); + + match send_transaction_batch(context, instructions).await { + Ok(_) => { + total_instructions_processed += instruction_batch.len(); + { + let rpc = context.rpc_pool.get_connection().await?; + wait_for_indexer(&*rpc) + .await + .map_err(|e| anyhow::anyhow!("Error: {:?}", e))?; + } + } + Err(e) => { + if let Some(ForesterError::NotInActivePhase) = e.downcast_ref::() { + info!("Active phase ended while processing batches, stopping gracefully"); + break; + } else { + return Err(e); + } + } + } + } + + if total_instructions_processed == 0 { + trace!("No instructions were processed from the stream."); + return Ok(0); + } + + let total_items_processed = total_instructions_processed * zkp_batch_size as usize; + Ok(total_items_processed) +} + +pub(crate) async fn send_transaction_batch( + context: &BatchContext, + instructions: Vec, +) -> Result { + // Check if we're still in the active phase before sending the transaction + let current_slot = context.slot_tracker.estimated_current_slot(); + let current_phase_state = context.epoch_phases.get_current_epoch_state(current_slot); + + if current_phase_state != EpochState::Active { + trace!( + "Skipping transaction send: not in active phase (current phase: {:?}, slot: {})", + current_phase_state, + current_slot + ); + return Err(ForesterError::NotInActivePhase.into()); + } + + info!( + "Sending transaction with {} instructions...", + instructions.len() + ); + + let mut rpc = context.rpc_pool.get_connection().await?; + let signature = rpc + .create_and_send_transaction( + &instructions, + &context.authority.pubkey(), + &[&context.authority], + ) + .await?; + + Ok(signature.to_string()) +} diff --git a/forester/tests/e2e_test.rs b/forester/tests/e2e_test.rs index 57bad425d9..2db6b6abde 100644 --- a/forester/tests/e2e_test.rs +++ b/forester/tests/e2e_test.rs @@ -211,7 +211,6 @@ async fn e2e_test() { queue_config: Default::default(), indexer_config: Default::default(), transaction_config: TransactionConfig { - batch_ixs_per_tx: 4, ..Default::default() }, general_config: GeneralConfig { diff --git a/forester/tests/legacy/address_v2_test.rs b/forester/tests/legacy/address_v2_test.rs index 96f4edd960..96ed4e2c58 100644 --- a/forester/tests/legacy/address_v2_test.rs +++ b/forester/tests/legacy/address_v2_test.rs @@ -66,7 +66,6 @@ async fn test_create_v2_address() { let env = TestAccounts::get_local_test_validator_accounts(); let mut config = forester_config(); - config.transaction_config.batch_ixs_per_tx = 1; config.payer_keypair = env.protocol.forester.insecure_clone(); config.derivation_pubkey = env.protocol.forester.pubkey(); config.general_config = GeneralConfig::test_address_v2(); diff --git a/forester/tests/legacy/batched_address_test.rs b/forester/tests/legacy/batched_address_test.rs index a9ba71b6ab..fe53f8c82e 100644 --- a/forester/tests/legacy/batched_address_test.rs +++ b/forester/tests/legacy/batched_address_test.rs @@ -1,10 +1,7 @@ use std::{sync::Arc, time::Duration}; use forester::run_pipeline; -use forester_utils::{ - registry::update_test_forester, - rpc_pool::SolanaRpcPoolBuilder, -}; +use forester_utils::{registry::update_test_forester, rpc_pool::SolanaRpcPoolBuilder}; use light_batched_merkle_tree::{ batch::BatchState, initialize_address_tree::InitAddressTreeAccountsInstructionData, merkle_tree::BatchedMerkleTreeAccount, @@ -52,7 +49,6 @@ async fn test_address_batched() { test_accounts.protocol.forester = forester_keypair.insecure_clone(); let mut config = forester_config(); - config.transaction_config.batch_ixs_per_tx = 1; config.payer_keypair = forester_keypair.insecure_clone(); let pool = SolanaRpcPoolBuilder::::default() diff --git a/forester/tests/legacy/batched_state_async_indexer_test.rs b/forester/tests/legacy/batched_state_async_indexer_test.rs index af65022065..f6fc8d6ef1 100644 --- a/forester/tests/legacy/batched_state_async_indexer_test.rs +++ b/forester/tests/legacy/batched_state_async_indexer_test.rs @@ -88,7 +88,6 @@ async fn test_state_indexer_async_batched() { let env = TestAccounts::get_local_test_validator_accounts(); let mut config = forester_config(); - config.transaction_config.batch_ixs_per_tx = 3; config.payer_keypair = env.protocol.forester.insecure_clone(); config.derivation_pubkey = env.protocol.forester.pubkey(); diff --git a/forester/tests/legacy/batched_state_indexer_test.rs b/forester/tests/legacy/batched_state_indexer_test.rs index 81ee4a4c1f..89d3b1faf9 100644 --- a/forester/tests/legacy/batched_state_indexer_test.rs +++ b/forester/tests/legacy/batched_state_indexer_test.rs @@ -51,7 +51,6 @@ async fn test_state_indexer_batched() { env.protocol.forester = forester_keypair.insecure_clone(); let mut config = forester_config(); - config.transaction_config.batch_ixs_per_tx = 1; config.payer_keypair = forester_keypair.insecure_clone(); let pool = SolanaRpcPoolBuilder::::default() diff --git a/forester/tests/legacy/batched_state_test.rs b/forester/tests/legacy/batched_state_test.rs index 9d439ec622..89fb391f11 100644 --- a/forester/tests/legacy/batched_state_test.rs +++ b/forester/tests/legacy/batched_state_test.rs @@ -55,7 +55,6 @@ async fn test_state_batched() { env.protocol.forester = forester_keypair.insecure_clone(); let mut config = forester_config(); - config.transaction_config.batch_ixs_per_tx = 1; config.payer_keypair = forester_keypair.insecure_clone(); config.general_config = GeneralConfig::test_state_v2(); diff --git a/forester/tests/legacy/priority_fee_test.rs b/forester/tests/legacy/priority_fee_test.rs index 5b17b90660..cf79a4e84b 100644 --- a/forester/tests/legacy/priority_fee_test.rs +++ b/forester/tests/legacy/priority_fee_test.rs @@ -51,7 +51,6 @@ async fn test_priority_fee_request() { indexer_batch_size: 50, indexer_max_concurrent_batches: 10, legacy_ixs_per_tx: 1, - batch_ixs_per_tx: 4, transaction_max_concurrent_batches: 20, tx_cache_ttl_seconds: 15, ops_cache_ttl_seconds: 180, diff --git a/forester/tests/priority_fee_test.rs b/forester/tests/priority_fee_test.rs index 10d050c855..9eac615b26 100644 --- a/forester/tests/priority_fee_test.rs +++ b/forester/tests/priority_fee_test.rs @@ -55,7 +55,6 @@ async fn test_priority_fee_request() { indexer_batch_size: 50, indexer_max_concurrent_batches: 10, legacy_ixs_per_tx: 1, - batch_ixs_per_tx: 4, transaction_max_concurrent_batches: 20, tx_cache_ttl_seconds: 15, ops_cache_ttl_seconds: 180, diff --git a/program-tests/utils/src/e2e_test_env.rs b/program-tests/utils/src/e2e_test_env.rs index 5c966bd087..f408547428 100644 --- a/program-tests/utils/src/e2e_test_env.rs +++ b/program-tests/utils/src/e2e_test_env.rs @@ -756,7 +756,7 @@ where .await .unwrap(); let addresses = - addresses.value.items.iter().map(|x| x.account_hash).collect::>(); + addresses.value.0.iter().map(|x| x.account_hash).collect::>(); // // local_leaves_hash_chain is only used for a test assertion. // let local_nullifier_hash_chain = create_hash_chain_from_array(&addresses); // assert_eq!(leaves_hash_chain, local_nullifier_hash_chain); diff --git a/program-tests/utils/src/test_batch_forester.rs b/program-tests/utils/src/test_batch_forester.rs index 44122fefd6..2fc29c63ea 100644 --- a/program-tests/utils/src/test_batch_forester.rs +++ b/program-tests/utils/src/test_batch_forester.rs @@ -74,6 +74,7 @@ pub async fn perform_batch_append( .create_and_send_transaction(&[instruction], &forester.pubkey(), &[forester]) .await?; bundle.merkle_tree.num_root_updates += 1; + Ok(res) } @@ -662,7 +663,7 @@ pub async fn create_batch_update_address_tree_instruction_data_with_proof>(); diff --git a/prover/client/src/proof_types/batch_append/proof_inputs.rs b/prover/client/src/proof_types/batch_append/proof_inputs.rs index 8ac7b999e3..c85681d719 100644 --- a/prover/client/src/proof_types/batch_append/proof_inputs.rs +++ b/prover/client/src/proof_types/batch_append/proof_inputs.rs @@ -56,7 +56,13 @@ pub fn get_batch_append_inputs( .enumerate() { let current_index = start_index as usize + i; - info!("Updating root with leaf index: {}", current_index); + info!( + leaf_index = current_index, + batch_position = i, + batch_size = batch_size, + tree_height = HEIGHT, + "Processing leaf for batch append" + ); for change_log_entry in previous_changelogs.iter() { match change_log_entry.update_proof(current_index, &mut merkle_proof) { diff --git a/scripts/install.sh b/scripts/install.sh index 4780e7cb28..4f4b002321 100755 --- a/scripts/install.sh +++ b/scripts/install.sh @@ -210,7 +210,7 @@ install_photon() { if [ "$photon_installed" = false ] || [ "$photon_correct_version" = false ]; then echo "Installing Photon indexer (version $expected_version)..." # Use git commit for now as specified in constants.ts - cargo install --git https://github.com/helius-labs/photon.git --rev b0ad386858384c22b4bb6a3bbbcd6a65911dac68 --locked --force + cargo install --git https://github.com/helius-labs/photon.git --rev 9641911ad4c21275b5679f040dc809edf5072da6 --locked --force log "photon" else echo "Photon already installed with correct version, skipping..." diff --git a/sdk-libs/client/src/indexer/indexer_trait.rs b/sdk-libs/client/src/indexer/indexer_trait.rs index 577372b6ed..e082f0c328 100644 --- a/sdk-libs/client/src/indexer/indexer_trait.rs +++ b/sdk-libs/client/src/indexer/indexer_trait.rs @@ -185,8 +185,7 @@ pub trait Indexer: std::marker::Send + std::marker::Sync { ) -> Result, IndexerError>; // TODO: in different pr: - // replace num_elements & start_offset with PaginatedOptions - // - startoffset is not robust, we should use a queue index as cursor instead + // replace num_elements & start_queue_index with PaginatedOptions // - return type should be ItemsWithCursor /// Returns queue elements from the queue with the given merkle tree pubkey. For input /// queues account compression program does not store queue elements in the @@ -197,9 +196,9 @@ pub trait Indexer: std::marker::Send + std::marker::Sync { merkle_tree_pubkey: [u8; 32], queue_type: QueueType, num_elements: u16, - start_offset: Option, + start_queue_index: Option, config: Option, - ) -> Result>, IndexerError>; + ) -> Result, Option)>, IndexerError>; async fn get_subtrees( &self, diff --git a/sdk-libs/client/src/indexer/photon_indexer.rs b/sdk-libs/client/src/indexer/photon_indexer.rs index 3409a5df7d..7ef1a5ab56 100644 --- a/sdk-libs/client/src/indexer/photon_indexer.rs +++ b/sdk-libs/client/src/indexer/photon_indexer.rs @@ -8,7 +8,7 @@ use photon_api::{ models::GetCompressedAccountsByOwnerPostRequestParams, }; use solana_pubkey::Pubkey; -use tracing::{debug, error, warn}; +use tracing::{error, trace, warn}; use super::{ types::{CompressedAccount, OwnerBalance, SignatureWithMetadata, TokenAccount, TokenBalance}, @@ -52,17 +52,18 @@ impl PhotonIndexer { loop { attempts += 1; - debug!( + trace!( "Attempt {}/{}: No rate limiter configured", - attempts, max_retries + attempts, + max_retries ); - debug!("Attempt {}/{}: Executing operation", attempts, max_retries); + trace!("Attempt {}/{}: Executing operation", attempts, max_retries); let result = operation().await; match result { Ok(value) => { - debug!("Attempt {}/{}: Operation succeeded.", attempts, max_retries); + trace!("Attempt {}/{}: Operation succeeded.", attempts, max_retries); return Ok(value); } Err(e) => { @@ -1282,11 +1283,6 @@ impl Indexer for PhotonIndexer { ) .await; - match &result { - Ok(response) => debug!("Raw API response: {:?}", response), - Err(e) => error!("API request failed: {:?}", e), - } - let result = result?; let api_response = match Self::extract_result_with_error_check( @@ -1579,9 +1575,9 @@ impl Indexer for PhotonIndexer { _pubkey: [u8; 32], _queue_type: QueueType, _num_elements: u16, - _start_offset: Option, + _start_queue_index: Option, _config: Option, - ) -> Result>, IndexerError> { + ) -> Result, Option)>, IndexerError> { #[cfg(not(feature = "v2"))] unimplemented!("get_queue_elements"); #[cfg(feature = "v2")] @@ -1589,7 +1585,7 @@ impl Indexer for PhotonIndexer { let pubkey = _pubkey; let queue_type = _queue_type; let limit = _num_elements; - let start_queue_index = _start_offset; + let start_queue_index = _start_queue_index; let config = _config.unwrap_or_default(); self.retry(config.retry_config, || async { let request: photon_api::models::GetQueueElementsPostRequest = @@ -1602,79 +1598,79 @@ impl Indexer for PhotonIndexer { }), ..Default::default() }; + let result = photon_api::apis::default_api::get_queue_elements_post( &self.configuration, request, ) .await; - - let result: Result>, IndexerError> = - match result { - Ok(api_response) => match api_response.result { - Some(api_result) => { - if api_result.context.slot < config.slot { - return Err(IndexerError::IndexerNotSyncedToSlot); - } - let response = api_result.value; - let proofs: Vec = response - .iter() - .map(|x| { - let proof = x - .proof - .iter() - .map(|x| Hash::from_base58(x).unwrap()) - .collect(); - let root = Hash::from_base58(&x.root).unwrap(); - let leaf = Hash::from_base58(&x.leaf).unwrap(); - let merkle_tree = Hash::from_base58(&x.tree).unwrap(); - let tx_hash = x - .tx_hash - .as_ref() - .map(|x| Hash::from_base58(x).unwrap()); - let account_hash = - Hash::from_base58(&x.account_hash).unwrap(); - - MerkleProofWithContext { - proof, - root, - leaf_index: x.leaf_index, - leaf, - merkle_tree, - root_seq: x.root_seq, - tx_hash, - account_hash, - } - }) - .collect(); - - Ok(Response { - context: Context { - slot: api_result.context.slot, - }, - value: Items { items: proofs }, - }) + let result: Result< + Response<(Vec, Option)>, + IndexerError, + > = match result { + Ok(api_response) => match api_response.result { + Some(api_result) => { + if api_result.context.slot < config.slot { + return Err(IndexerError::IndexerNotSyncedToSlot); } - None => { - let error = api_response.error.ok_or_else(|| { - IndexerError::PhotonError { + let response = api_result.value; + let proofs: Vec = response + .iter() + .map(|x| { + let proof = x + .proof + .iter() + .map(|x| Hash::from_base58(x).unwrap()) + .collect(); + let root = Hash::from_base58(&x.root).unwrap(); + let leaf = Hash::from_base58(&x.leaf).unwrap(); + let merkle_tree = Hash::from_base58(&x.tree).unwrap(); + let tx_hash = + x.tx_hash.as_ref().map(|x| Hash::from_base58(x).unwrap()); + let account_hash = Hash::from_base58(&x.account_hash).unwrap(); + + MerkleProofWithContext { + proof, + root, + leaf_index: x.leaf_index, + leaf, + merkle_tree, + root_seq: x.root_seq, + tx_hash, + account_hash, + } + }) + .collect(); + + Ok(Response { + context: Context { + slot: api_result.context.slot, + }, + value: (proofs, Some(api_result.first_value_queue_index as u64)), + }) + } + None => { + let error = + api_response + .error + .ok_or_else(|| IndexerError::PhotonError { context: "get_queue_elements".to_string(), message: "No error details provided".to_string(), - } - })?; + })?; - Err(IndexerError::PhotonError { - context: "get_queue_elements".to_string(), - message: error - .message - .unwrap_or_else(|| "Unknown error".to_string()), - }) - } - }, - Err(e) => Err(IndexerError::PhotonError { - context: "get_queue_elements".to_string(), - message: e.to_string(), - }), - }; + Err(IndexerError::PhotonError { + context: "get_queue_elements".to_string(), + message: error + .message + .unwrap_or_else(|| "Unknown error".to_string()), + }) + } + }, + Err(e) => Err(IndexerError::PhotonError { + context: "get_queue_elements".to_string(), + message: e.to_string(), + }), + }; result }) diff --git a/sdk-libs/client/src/rpc/indexer.rs b/sdk-libs/client/src/rpc/indexer.rs index 56963ed64c..0b41c417b7 100644 --- a/sdk-libs/client/src/rpc/indexer.rs +++ b/sdk-libs/client/src/rpc/indexer.rs @@ -206,9 +206,9 @@ impl Indexer for LightClient { merkle_tree_pubkey: [u8; 32], queue_type: QueueType, num_elements: u16, - start_offset: Option, + start_queue_index: Option, config: Option, - ) -> Result>, IndexerError> { + ) -> Result, Option)>, IndexerError> { Ok(self .indexer .as_mut() @@ -217,7 +217,7 @@ impl Indexer for LightClient { merkle_tree_pubkey, queue_type, num_elements, - start_offset, + start_queue_index, config, ) .await?) diff --git a/sdk-libs/program-test/src/indexer/test_indexer.rs b/sdk-libs/program-test/src/indexer/test_indexer.rs index 13803284b3..d6fcfa1715 100644 --- a/sdk-libs/program-test/src/indexer/test_indexer.rs +++ b/sdk-libs/program-test/src/indexer/test_indexer.rs @@ -601,7 +601,7 @@ impl Indexer for TestIndexer { _num_elements: u16, _start_offset: Option, _config: Option, - ) -> Result>, IndexerError> { + ) -> Result, Option)>, IndexerError> { #[cfg(not(feature = "v2"))] unimplemented!("get_queue_elements"); #[cfg(feature = "v2")] @@ -638,9 +638,7 @@ impl Indexer for TestIndexer { context: Context { slot: self.get_current_slot(), }, - value: Items { - items: merkle_proofs_with_context, - }, + value: (merkle_proofs_with_context, None), }); } @@ -711,9 +709,7 @@ impl Indexer for TestIndexer { context: Context { slot: self.get_current_slot(), }, - value: Items { - items: merkle_proofs_with_context, - }, + value: (merkle_proofs_with_context, None), }); } } @@ -784,9 +780,14 @@ impl Indexer for TestIndexer { context: Context { slot: self.get_current_slot(), }, - value: Items { - items: merkle_proofs_with_context, - }, + value: ( + merkle_proofs_with_context, + if queue_elements.is_empty() { + None + } else { + Some(queue_elements[0].1) + }, + ), }); } } @@ -882,8 +883,8 @@ impl Indexer for TestIndexer { .map_err(|_| IndexerError::Unknown("Failed to get queue elements".into()))? .value; - let addresses: Vec = address_proofs - .items + let (address_proof_items, _) = address_proofs; + let addresses: Vec = address_proof_items .iter() .enumerate() .map(|(i, proof)| AddressQueueIndex { @@ -894,11 +895,7 @@ impl Indexer for TestIndexer { let non_inclusion_proofs = self .get_multiple_new_address_proofs( merkle_tree_pubkey.to_bytes(), - address_proofs - .items - .iter() - .map(|x| x.account_hash) - .collect(), + address_proof_items.iter().map(|x| x.account_hash).collect(), None, ) .await @@ -990,6 +987,10 @@ impl Indexer for TestIndexer { #[async_trait] impl TestIndexerExtensions for TestIndexer { + fn get_address_merkle_trees(&self) -> &Vec { + &self.address_merkle_trees + } + fn get_address_merkle_tree( &self, merkle_tree_pubkey: Pubkey, @@ -1082,10 +1083,6 @@ impl TestIndexerExtensions for TestIndexer { &mut self.state_merkle_trees } - fn get_address_merkle_trees(&self) -> &Vec { - &self.address_merkle_trees - } - fn get_address_merkle_trees_mut(&mut self) -> &mut Vec { &mut self.address_merkle_trees } diff --git a/sdk-libs/program-test/src/program_test/indexer.rs b/sdk-libs/program-test/src/program_test/indexer.rs index 744148bf66..6cd63db5dd 100644 --- a/sdk-libs/program-test/src/program_test/indexer.rs +++ b/sdk-libs/program-test/src/program_test/indexer.rs @@ -202,9 +202,9 @@ impl Indexer for LightProgramTest { merkle_tree_pubkey: [u8; 32], queue_type: QueueType, num_elements: u16, - start_offset: Option, + start_queue_index: Option, config: Option, - ) -> Result>, IndexerError> { + ) -> Result, Option)>, IndexerError> { Ok(self .indexer .as_mut() @@ -213,7 +213,7 @@ impl Indexer for LightProgramTest { merkle_tree_pubkey, queue_type, num_elements, - start_offset, + start_queue_index, config, ) .await?)