diff --git a/forester-utils/src/instructions/address_batch_update.rs b/forester-utils/src/instructions/address_batch_update.rs index fc1d790626..1a148a1997 100644 --- a/forester-utils/src/instructions/address_batch_update.rs +++ b/forester-utils/src/instructions/address_batch_update.rs @@ -1,3 +1,5 @@ +use std::time::Duration; + use account_compression::processor::initialize_address_merkle_tree::Pubkey; use futures::future; use light_batched_merkle_tree::{ @@ -27,6 +29,9 @@ pub async fn create_batch_update_address_tree_instruction_data( rpc: &mut R, indexer: &mut I, merkle_tree_pubkey: &Pubkey, + prover_url: String, + polling_interval: Duration, + max_wait_time: Duration, ) -> Result<(Vec, u16), ForesterUtilsError> where R: Rpc, @@ -253,7 +258,7 @@ where } info!("Generating {} ZK proofs asynchronously", all_inputs.len()); - let proof_client = ProofClient::local(); + let proof_client = ProofClient::with_config(prover_url, polling_interval, max_wait_time); let proof_futures = all_inputs .into_iter() .map(|inputs| proof_client.generate_batch_address_append_proof(inputs)); diff --git a/forester-utils/src/instructions/state_batch_append.rs b/forester-utils/src/instructions/state_batch_append.rs index 07ff0bb09e..9929739300 100644 --- a/forester-utils/src/instructions/state_batch_append.rs +++ b/forester-utils/src/instructions/state_batch_append.rs @@ -1,3 +1,5 @@ +use std::{sync::Arc, time::Duration}; + use account_compression::processor::initialize_address_merkle_tree::Pubkey; use light_batched_merkle_tree::{ constants::DEFAULT_BATCH_STATE_TREE_HEIGHT, @@ -22,6 +24,9 @@ pub async fn create_append_batch_ix_data( indexer: &mut I, merkle_tree_pubkey: Pubkey, output_queue_pubkey: Pubkey, + prover_url: String, + polling_interval: Duration, + max_wait_time: Duration, ) -> Result, ForesterUtilsError> { trace!("Creating append batch instruction data"); @@ -84,6 +89,11 @@ pub async fn create_append_batch_ix_data( let mut all_changelogs: Vec> = Vec::new(); let mut proof_futures = Vec::new(); + let proof_client = Arc::new(ProofClient::with_config( + prover_url.clone(), + polling_interval, + max_wait_time, + )); for (batch_idx, leaves_hash_chain) in leaves_hash_chains.iter().enumerate() { let start_idx = batch_idx * zkp_batch_size as usize; @@ -134,7 +144,8 @@ pub async fn create_append_batch_ix_data( bigint_to_be_bytes_array::<32>(&circuit_inputs.new_root.to_biguint().unwrap()).unwrap(); all_changelogs.extend(batch_changelogs); - let proof_future = generate_zkp_proof(circuit_inputs); + let client = Arc::clone(&proof_client); + let proof_future = generate_zkp_proof(circuit_inputs, client); proof_futures.push(proof_future); } @@ -162,8 +173,8 @@ pub async fn create_append_batch_ix_data( } async fn generate_zkp_proof( circuit_inputs: BatchAppendsCircuitInputs, + proof_client: Arc, ) -> Result<(CompressedProof, [u8; 32]), ForesterUtilsError> { - let proof_client = ProofClient::local(); let (proof, new_root) = proof_client .generate_batch_append_proof(circuit_inputs) .await diff --git a/forester-utils/src/instructions/state_batch_nullify.rs b/forester-utils/src/instructions/state_batch_nullify.rs index f179650136..4bfe0004d3 100644 --- a/forester-utils/src/instructions/state_batch_nullify.rs +++ b/forester-utils/src/instructions/state_batch_nullify.rs @@ -1,3 +1,5 @@ +use std::{sync::Arc, time::Duration}; + use account_compression::processor::initialize_address_merkle_tree::Pubkey; use light_batched_merkle_tree::{ constants::DEFAULT_BATCH_STATE_TREE_HEIGHT, @@ -19,6 +21,9 @@ pub async fn create_nullify_batch_ix_data( rpc: &mut R, indexer: &mut I, merkle_tree_pubkey: Pubkey, + prover_url: String, + polling_interval: Duration, + max_wait_time: Duration, ) -> Result, ForesterUtilsError> { trace!("create_multiple_nullify_batch_ix_data"); // Get the tree information and find out how many ZKP batches need processing @@ -124,6 +129,11 @@ pub async fn create_nullify_batch_ix_data( let mut all_changelogs = Vec::new(); let mut proof_futures = Vec::new(); + let proof_client = Arc::new(ProofClient::with_config( + prover_url.clone(), + polling_interval, + max_wait_time, + )); let mut current_root = old_root; @@ -205,48 +215,36 @@ pub async fn create_nullify_batch_ix_data( ForesterUtilsError::Prover("Failed to convert new root to bytes".into()) })?; - let proof_future = tokio::spawn(generate_nullify_zkp_proof(circuit_inputs)); + let client = Arc::clone(&proof_client); + let proof_future = generate_nullify_zkp_proof(circuit_inputs, client); proof_futures.push(proof_future); } - // Wait for all proof generation to complete - let mut results = Vec::new(); - - for (i, future) in futures::future::join_all(proof_futures) - .await - .into_iter() - .enumerate() - { - match future { - Ok(result) => match result { - Ok((proof, new_root)) => { - results.push(InstructionDataBatchNullifyInputs { - new_root, - compressed_proof: proof, - }); - trace!("Successfully generated proof for batch {}", i); - } - Err(e) => { - error!("Error generating proof for batch {}: {:?}", i, e); - return Err(e); - } - }, + let proof_results = futures::future::join_all(proof_futures).await; + let mut instruction_data_vec = Vec::new(); + + for (i, proof_result) in proof_results.into_iter().enumerate() { + match proof_result { + Ok((proof, new_root)) => { + trace!("Successfully generated proof for batch {}", i); + instruction_data_vec.push(InstructionDataBatchNullifyInputs { + new_root, + compressed_proof: proof, + }); + } Err(e) => { - error!("Task error for batch {}: {:?}", i, e); - return Err(ForesterUtilsError::Prover(format!( - "Task error for batch {}: {:?}", - i, e - ))); + error!("Failed to generate proof for batch {}: {:?}", i, e); + return Err(e); } } } - Ok(results) + Ok(instruction_data_vec) } async fn generate_nullify_zkp_proof( inputs: BatchUpdateCircuitInputs, + proof_client: Arc, ) -> Result<(CompressedProof, [u8; 32]), ForesterUtilsError> { - let proof_client = ProofClient::local(); let (proof, new_root) = proof_client .generate_batch_update_proof(inputs) .await diff --git a/forester/src/epoch_manager.rs b/forester/src/epoch_manager.rs index 4402a74794..98c331fa71 100644 --- a/forester/src/epoch_manager.rs +++ b/forester/src/epoch_manager.rs @@ -1130,6 +1130,14 @@ impl + 'static> EpochManager { merkle_tree: tree_accounts.merkle_tree, output_queue: tree_accounts.queue, ixs_per_tx: self.config.transaction_config.batch_ixs_per_tx, + prover_url: self + .config + .external_services + .prover_url + .clone() + .unwrap_or_else(|| "http://127.0.0.1:3001".to_string()), + prover_polling_interval: Duration::from_secs(1), + prover_max_wait_time: Duration::from_secs(120), }; process_batched_operations(batch_context, tree_accounts.tree_type) diff --git a/forester/src/processor/v2/address.rs b/forester/src/processor/v2/address.rs index d602606c79..48ca8e51c6 100644 --- a/forester/src/processor/v2/address.rs +++ b/forester/src/processor/v2/address.rs @@ -23,6 +23,9 @@ pub(crate) async fn process_batch>( &mut *rpc, &mut *context.indexer.lock().await, &context.merkle_tree, + context.prover_url.clone(), + context.prover_polling_interval, + context.prover_max_wait_time, ) .await .map_err(|e| { diff --git a/forester/src/processor/v2/common.rs b/forester/src/processor/v2/common.rs index 99efc9dd51..1a3e9225bc 100644 --- a/forester/src/processor/v2/common.rs +++ b/forester/src/processor/v2/common.rs @@ -1,4 +1,4 @@ -use std::sync::Arc; +use std::{sync::Arc, time::Duration}; use forester_utils::rpc_pool::SolanaRpcPool; use light_batched_merkle_tree::{ @@ -26,6 +26,9 @@ pub struct BatchContext { pub merkle_tree: Pubkey, pub output_queue: Pubkey, pub ixs_per_tx: usize, + pub prover_url: String, + pub prover_polling_interval: Duration, + pub prover_max_wait_time: Duration, } #[derive(Debug)] diff --git a/forester/src/processor/v2/state.rs b/forester/src/processor/v2/state.rs index e2ffd403e7..6adf514b2e 100644 --- a/forester/src/processor/v2/state.rs +++ b/forester/src/processor/v2/state.rs @@ -36,6 +36,9 @@ pub(crate) async fn perform_append>( &mut *context.indexer.lock().await, context.merkle_tree, context.output_queue, + context.prover_url.clone(), + context.prover_polling_interval, + context.prover_max_wait_time, ) .await .map_err(|e| { @@ -141,13 +144,19 @@ pub(crate) async fn perform_nullify>( rpc: &mut R, ) -> Result<()> { let batch_index = get_batch_index(context, rpc).await?; - let instruction_data_vec = - create_nullify_batch_ix_data(rpc, &mut *context.indexer.lock().await, context.merkle_tree) - .await - .map_err(|e| { - error!("Failed to create nullify batch instruction data: {}", e); - BatchProcessError::InstructionData(e.to_string()) - })?; + let instruction_data_vec = create_nullify_batch_ix_data( + rpc, + &mut *context.indexer.lock().await, + context.merkle_tree, + context.prover_url.clone(), + context.prover_polling_interval, + context.prover_max_wait_time, + ) + .await + .map_err(|e| { + error!("Failed to create nullify batch instruction data: {}", e); + BatchProcessError::InstructionData(e.to_string()) + })?; if instruction_data_vec.is_empty() { debug!("No zkp batches to nullify");