Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion forester-utils/src/instructions/address_batch_update.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::time::Duration;

use account_compression::processor::initialize_address_merkle_tree::Pubkey;
use futures::future;
use light_batched_merkle_tree::{
Expand Down Expand Up @@ -27,6 +29,9 @@ pub async fn create_batch_update_address_tree_instruction_data<R, I>(
rpc: &mut R,
indexer: &mut I,
merkle_tree_pubkey: &Pubkey,
prover_url: String,
polling_interval: Duration,
max_wait_time: Duration,
) -> Result<(Vec<InstructionDataBatchNullifyInputs>, u16), ForesterUtilsError>
where
R: Rpc,
Expand Down Expand Up @@ -253,7 +258,7 @@ where
}

info!("Generating {} ZK proofs asynchronously", all_inputs.len());
let proof_client = ProofClient::local();
let proof_client = ProofClient::with_config(prover_url, polling_interval, max_wait_time);
let proof_futures = all_inputs
.into_iter()
.map(|inputs| proof_client.generate_batch_address_append_proof(inputs));
Expand Down
15 changes: 13 additions & 2 deletions forester-utils/src/instructions/state_batch_append.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::{sync::Arc, time::Duration};

use account_compression::processor::initialize_address_merkle_tree::Pubkey;
use light_batched_merkle_tree::{
constants::DEFAULT_BATCH_STATE_TREE_HEIGHT,
Expand All @@ -22,6 +24,9 @@ pub async fn create_append_batch_ix_data<R: Rpc, I: Indexer>(
indexer: &mut I,
merkle_tree_pubkey: Pubkey,
output_queue_pubkey: Pubkey,
prover_url: String,
polling_interval: Duration,
max_wait_time: Duration,
) -> Result<Vec<InstructionDataBatchAppendInputs>, ForesterUtilsError> {
trace!("Creating append batch instruction data");

Expand Down Expand Up @@ -84,6 +89,11 @@ pub async fn create_append_batch_ix_data<R: Rpc, I: Indexer>(
let mut all_changelogs: Vec<ChangelogEntry<{ DEFAULT_BATCH_STATE_TREE_HEIGHT as usize }>> =
Vec::new();
let mut proof_futures = Vec::new();
let proof_client = Arc::new(ProofClient::with_config(
prover_url.clone(),
polling_interval,
max_wait_time,
));

for (batch_idx, leaves_hash_chain) in leaves_hash_chains.iter().enumerate() {
let start_idx = batch_idx * zkp_batch_size as usize;
Expand Down Expand Up @@ -134,7 +144,8 @@ pub async fn create_append_batch_ix_data<R: Rpc, I: Indexer>(
bigint_to_be_bytes_array::<32>(&circuit_inputs.new_root.to_biguint().unwrap()).unwrap();
all_changelogs.extend(batch_changelogs);

let proof_future = generate_zkp_proof(circuit_inputs);
let client = Arc::clone(&proof_client);
let proof_future = generate_zkp_proof(circuit_inputs, client);

proof_futures.push(proof_future);
}
Expand Down Expand Up @@ -162,8 +173,8 @@ pub async fn create_append_batch_ix_data<R: Rpc, I: Indexer>(
}
async fn generate_zkp_proof(
circuit_inputs: BatchAppendsCircuitInputs,
proof_client: Arc<ProofClient>,
) -> Result<(CompressedProof, [u8; 32]), ForesterUtilsError> {
let proof_client = ProofClient::local();
let (proof, new_root) = proof_client
.generate_batch_append_proof(circuit_inputs)
.await
Expand Down
58 changes: 28 additions & 30 deletions forester-utils/src/instructions/state_batch_nullify.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::{sync::Arc, time::Duration};

use account_compression::processor::initialize_address_merkle_tree::Pubkey;
use light_batched_merkle_tree::{
constants::DEFAULT_BATCH_STATE_TREE_HEIGHT,
Expand All @@ -19,6 +21,9 @@ pub async fn create_nullify_batch_ix_data<R: Rpc, I: Indexer>(
rpc: &mut R,
indexer: &mut I,
merkle_tree_pubkey: Pubkey,
prover_url: String,
polling_interval: Duration,
max_wait_time: Duration,
) -> Result<Vec<InstructionDataBatchNullifyInputs>, ForesterUtilsError> {
trace!("create_multiple_nullify_batch_ix_data");
// Get the tree information and find out how many ZKP batches need processing
Expand Down Expand Up @@ -124,6 +129,11 @@ pub async fn create_nullify_batch_ix_data<R: Rpc, I: Indexer>(

let mut all_changelogs = Vec::new();
let mut proof_futures = Vec::new();
let proof_client = Arc::new(ProofClient::with_config(
prover_url.clone(),
polling_interval,
max_wait_time,
));

let mut current_root = old_root;

Expand Down Expand Up @@ -205,48 +215,36 @@ pub async fn create_nullify_batch_ix_data<R: Rpc, I: Indexer>(
ForesterUtilsError::Prover("Failed to convert new root to bytes".into())
})?;

let proof_future = tokio::spawn(generate_nullify_zkp_proof(circuit_inputs));
let client = Arc::clone(&proof_client);
let proof_future = generate_nullify_zkp_proof(circuit_inputs, client);
proof_futures.push(proof_future);
}

// Wait for all proof generation to complete
let mut results = Vec::new();

for (i, future) in futures::future::join_all(proof_futures)
.await
.into_iter()
.enumerate()
{
match future {
Ok(result) => match result {
Ok((proof, new_root)) => {
results.push(InstructionDataBatchNullifyInputs {
new_root,
compressed_proof: proof,
});
trace!("Successfully generated proof for batch {}", i);
}
Err(e) => {
error!("Error generating proof for batch {}: {:?}", i, e);
return Err(e);
}
},
let proof_results = futures::future::join_all(proof_futures).await;
let mut instruction_data_vec = Vec::new();

for (i, proof_result) in proof_results.into_iter().enumerate() {
match proof_result {
Ok((proof, new_root)) => {
trace!("Successfully generated proof for batch {}", i);
instruction_data_vec.push(InstructionDataBatchNullifyInputs {
new_root,
compressed_proof: proof,
});
}
Err(e) => {
error!("Task error for batch {}: {:?}", i, e);
return Err(ForesterUtilsError::Prover(format!(
"Task error for batch {}: {:?}",
i, e
)));
error!("Failed to generate proof for batch {}: {:?}", i, e);
return Err(e);
}
}
}

Ok(results)
Ok(instruction_data_vec)
}
async fn generate_nullify_zkp_proof(
inputs: BatchUpdateCircuitInputs,
proof_client: Arc<ProofClient>,
) -> Result<(CompressedProof, [u8; 32]), ForesterUtilsError> {
let proof_client = ProofClient::local();
let (proof, new_root) = proof_client
.generate_batch_update_proof(inputs)
.await
Expand Down
8 changes: 8 additions & 0 deletions forester/src/epoch_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1130,6 +1130,14 @@ impl<R: Rpc, I: Indexer + IndexerType<R> + 'static> EpochManager<R, I> {
merkle_tree: tree_accounts.merkle_tree,
output_queue: tree_accounts.queue,
ixs_per_tx: self.config.transaction_config.batch_ixs_per_tx,
prover_url: self
.config
.external_services
.prover_url
.clone()
.unwrap_or_else(|| "http://127.0.0.1:3001".to_string()),
prover_polling_interval: Duration::from_secs(1),
prover_max_wait_time: Duration::from_secs(120),
};

process_batched_operations(batch_context, tree_accounts.tree_type)
Expand Down
3 changes: 3 additions & 0 deletions forester/src/processor/v2/address.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ pub(crate) async fn process_batch<R: Rpc, I: Indexer + IndexerType<R>>(
&mut *rpc,
&mut *context.indexer.lock().await,
&context.merkle_tree,
context.prover_url.clone(),
context.prover_polling_interval,
context.prover_max_wait_time,
)
.await
.map_err(|e| {
Expand Down
5 changes: 4 additions & 1 deletion forester/src/processor/v2/common.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::sync::Arc;
use std::{sync::Arc, time::Duration};

use forester_utils::rpc_pool::SolanaRpcPool;
use light_batched_merkle_tree::{
Expand Down Expand Up @@ -26,6 +26,9 @@ pub struct BatchContext<R: Rpc, I: Indexer> {
pub merkle_tree: Pubkey,
pub output_queue: Pubkey,
pub ixs_per_tx: usize,
pub prover_url: String,
pub prover_polling_interval: Duration,
pub prover_max_wait_time: Duration,
}

#[derive(Debug)]
Expand Down
23 changes: 16 additions & 7 deletions forester/src/processor/v2/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ pub(crate) async fn perform_append<R: Rpc, I: Indexer + IndexerType<R>>(
&mut *context.indexer.lock().await,
context.merkle_tree,
context.output_queue,
context.prover_url.clone(),
context.prover_polling_interval,
context.prover_max_wait_time,
)
.await
.map_err(|e| {
Expand Down Expand Up @@ -141,13 +144,19 @@ pub(crate) async fn perform_nullify<R: Rpc, I: Indexer + IndexerType<R>>(
rpc: &mut R,
) -> Result<()> {
let batch_index = get_batch_index(context, rpc).await?;
let instruction_data_vec =
create_nullify_batch_ix_data(rpc, &mut *context.indexer.lock().await, context.merkle_tree)
.await
.map_err(|e| {
error!("Failed to create nullify batch instruction data: {}", e);
BatchProcessError::InstructionData(e.to_string())
})?;
let instruction_data_vec = create_nullify_batch_ix_data(
rpc,
&mut *context.indexer.lock().await,
context.merkle_tree,
context.prover_url.clone(),
context.prover_polling_interval,
context.prover_max_wait_time,
)
.await
.map_err(|e| {
error!("Failed to create nullify batch instruction data: {}", e);
BatchProcessError::InstructionData(e.to_string())
})?;

if instruction_data_vec.is_empty() {
debug!("No zkp batches to nullify");
Expand Down
Loading