Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions cli/src/utils/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@ export const SOLANA_VALIDATOR_PROCESS_NAME = "solana-test-validator";
export const LIGHT_PROVER_PROCESS_NAME = "light-prover";
export const INDEXER_PROCESS_NAME = "photon";

export const PHOTON_VERSION = "0.51.0";
export const PHOTON_VERSION = "0.51.2";

// Set these to override Photon requirements with a specific git commit:
export const USE_PHOTON_FROM_GIT = true; // If true, will show git install command instead of crates.io.
export const PHOTON_GIT_REPO = "https://github.com/lightprotocol/photon.git";
export const PHOTON_GIT_COMMIT = "1a785036de52896b68d06413e3b0231122d6aa4a"; // If empty, will use main branch.
export const PHOTON_GIT_COMMIT = "711c47b20330c6bb78feb0a2c15e8292fcd0a7b0"; // If empty, will use main branch.

export const LIGHT_PROTOCOL_PROGRAMS_DIR_ENV = "LIGHT_PROTOCOL_PROGRAMS_DIR";
export const BASE_PATH = "../../bin/";
Expand Down
140 changes: 74 additions & 66 deletions forester-utils/src/instructions/address_batch_update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@ use futures::stream::Stream;
use light_batched_merkle_tree::{
constants::DEFAULT_BATCH_ADDRESS_TREE_HEIGHT, merkle_tree::InstructionDataAddressAppendInputs,
};
use light_client::{indexer::Indexer, rpc::Rpc};
use light_client::{
indexer::{AddressQueueData, Indexer, QueueElementsV2Options},
rpc::Rpc,
};
use light_compressed_account::{
hash_chain::create_hash_chain_from_slice, instruction_data::compressed_proof::CompressedProof,
};
Expand Down Expand Up @@ -71,40 +74,49 @@ async fn stream_instruction_data<'a, R: Rpc>(
}
}

let indexer_update_info = {
let address_queue = {
let mut connection = rpc_pool.get_connection().await?;
let indexer = connection.indexer_mut()?;
debug!(
"Requesting {} addresses from Photon for chunk {} with start_queue_index={:?}",
elements_for_chunk, chunk_idx, next_queue_index
);
let options = QueueElementsV2Options::default()
.with_address_queue(next_queue_index, Some(elements_for_chunk as u16));
match indexer
.get_address_queue_with_proofs(
&merkle_tree_pubkey,
elements_for_chunk as u16,
next_queue_index,
None,
)
.await {
Ok(info) => info,
.get_queue_elements(merkle_tree_pubkey.to_bytes(), options, None)
.await
{
Ok(response) => match response.value.address_queue {
Some(queue) => queue,
None => {
yield Err(ForesterUtilsError::Indexer(
"No address queue data in response".into(),
));
return;
}
},
Err(e) => {
yield Err(ForesterUtilsError::Indexer(format!("Failed to get address queue with proofs: {}", e)));
yield Err(ForesterUtilsError::Indexer(format!(
"Failed to get queue elements: {}",
e
)));
return;
}
}
};

debug!(
"Photon response for chunk {}: received {} addresses, batch_start_index={}, first_queue_index={:?}, last_queue_index={:?}",
"Photon response for chunk {}: received {} addresses, start_index={}, first_queue_index={:?}, last_queue_index={:?}",
chunk_idx,
indexer_update_info.value.addresses.len(),
indexer_update_info.value.batch_start_index,
indexer_update_info.value.addresses.first().map(|a| a.queue_index),
indexer_update_info.value.addresses.last().map(|a| a.queue_index)
address_queue.addresses.len(),
address_queue.start_index,
address_queue.queue_indices.first(),
address_queue.queue_indices.last()
);

if let Some(last_address) = indexer_update_info.value.addresses.last() {
next_queue_index = Some(last_address.queue_index + 1);
if let Some(last_queue_index) = address_queue.queue_indices.last() {
next_queue_index = Some(last_queue_index + 1);
debug!(
"Setting next_queue_index={} for chunk {}",
next_queue_index.unwrap(),
Expand All @@ -113,21 +125,24 @@ async fn stream_instruction_data<'a, R: Rpc>(
}

if chunk_idx == 0 {
if let Some(first_proof) = indexer_update_info.value.non_inclusion_proofs.first() {
if first_proof.root != current_root {
warn!("Indexer root does not match on-chain root");
yield Err(ForesterUtilsError::Indexer("Indexer root does not match on-chain root".into()));
return;
}
} else {
yield Err(ForesterUtilsError::Indexer("No non-inclusion proofs found in indexer response".into()));
if address_queue.addresses.is_empty() {
yield Err(ForesterUtilsError::Indexer(
"No addresses found in indexer response".into(),
));
return;
}
if address_queue.initial_root != current_root {
warn!("Indexer root does not match on-chain root");
yield Err(ForesterUtilsError::Indexer(
"Indexer root does not match on-chain root".into(),
));
return;
}
}

let (all_inputs, new_current_root) = match get_all_circuit_inputs_for_chunk(
chunk_hash_chains,
&indexer_update_info,
&address_queue,
zkp_batch_size,
chunk_start,
start_index,
Expand Down Expand Up @@ -197,9 +212,7 @@ fn calculate_max_zkp_batches_per_call(batch_size: u16) -> usize {

fn get_all_circuit_inputs_for_chunk(
chunk_hash_chains: &[[u8; 32]],
indexer_update_info: &light_client::indexer::Response<
light_client::indexer::BatchAddressUpdateIndexerResponse,
>,
address_queue: &AddressQueueData,
batch_size: u16,
chunk_start_idx: usize,
global_start_index: u64,
Expand All @@ -212,14 +225,9 @@ fn get_all_circuit_inputs_for_chunk(
ForesterUtilsError,
> {
let subtrees_array: [[u8; 32]; DEFAULT_BATCH_ADDRESS_TREE_HEIGHT as usize] =
indexer_update_info
.value
.subtrees
.clone()
.try_into()
.map_err(|_| {
ForesterUtilsError::Prover("Failed to convert subtrees to array".into())
})?;
address_queue.subtrees.clone().try_into().map_err(|_| {
ForesterUtilsError::Prover("Failed to convert subtrees to array".into())
})?;

let mut sparse_merkle_tree =
SparseMerkleTree::<Poseidon, { DEFAULT_BATCH_ADDRESS_TREE_HEIGHT as usize }>::new(
Expand All @@ -235,7 +243,7 @@ fn get_all_circuit_inputs_for_chunk(
let start_idx = batch_idx * batch_size as usize;
let end_idx = start_idx + batch_size as usize;

let addresses_len = indexer_update_info.value.addresses.len();
let addresses_len = address_queue.addresses.len();
if start_idx >= addresses_len {
return Err(ForesterUtilsError::Indexer(format!(
"Insufficient addresses: batch {} requires start_idx {} but only {} addresses available",
Expand All @@ -250,41 +258,41 @@ fn get_all_circuit_inputs_for_chunk(
)));
}

let batch_addresses: Vec<[u8; 32]> = indexer_update_info.value.addresses
[start_idx..safe_end_idx]
.iter()
.map(|x| x.address)
.collect();
let batch_addresses: Vec<[u8; 32]> =
address_queue.addresses[start_idx..safe_end_idx].to_vec();

let proofs_len = indexer_update_info.value.non_inclusion_proofs.len();
if start_idx >= proofs_len {
// Check that we have enough low element data
let low_elements_len = address_queue.low_element_values.len();
if start_idx >= low_elements_len {
return Err(ForesterUtilsError::Indexer(format!(
"Insufficient non-inclusion proofs: batch {} requires start_idx {} but only {} proofs available",
batch_idx, start_idx, proofs_len
"Insufficient low element data: batch {} requires start_idx {} but only {} elements available",
batch_idx, start_idx, low_elements_len
)));
}
let safe_proofs_end_idx = std::cmp::min(end_idx, proofs_len);
if safe_proofs_end_idx - start_idx != batch_size as usize {
let safe_low_end_idx = std::cmp::min(end_idx, low_elements_len);
if safe_low_end_idx - start_idx != batch_size as usize {
return Err(ForesterUtilsError::Indexer(format!(
"Insufficient non-inclusion proofs: batch {} requires {} proofs (indices {}..{}) but only {} available",
batch_idx, batch_size, start_idx, end_idx, safe_proofs_end_idx - start_idx
"Insufficient low element data: batch {} requires {} elements (indices {}..{}) but only {} available",
batch_idx, batch_size, start_idx, end_idx, safe_low_end_idx - start_idx
)));
}

let mut low_element_values = Vec::new();
let mut low_element_next_values = Vec::new();
let mut low_element_indices = Vec::new();
let mut low_element_next_indices = Vec::new();
let mut low_element_proofs = Vec::new();

for proof in &indexer_update_info.value.non_inclusion_proofs[start_idx..safe_proofs_end_idx]
{
low_element_values.push(proof.low_address_value);
low_element_indices.push(proof.low_address_index as usize);
low_element_next_indices.push(proof.low_address_next_index as usize);
low_element_next_values.push(proof.low_address_next_value);
low_element_proofs.push(proof.low_address_proof.to_vec());
}
let low_element_values: Vec<[u8; 32]> =
address_queue.low_element_values[start_idx..safe_low_end_idx].to_vec();
let low_element_next_values: Vec<[u8; 32]> =
address_queue.low_element_next_values[start_idx..safe_low_end_idx].to_vec();
let low_element_indices: Vec<usize> = address_queue.low_element_indices
[start_idx..safe_low_end_idx]
.iter()
.map(|&x| x as usize)
.collect();
let low_element_next_indices: Vec<usize> = address_queue.low_element_next_indices
[start_idx..safe_low_end_idx]
.iter()
.map(|&x| x as usize)
.collect();
let low_element_proofs: Vec<Vec<[u8; 32]>> =
address_queue.low_element_proofs[start_idx..safe_low_end_idx].to_vec();

let computed_hash_chain = create_hash_chain_from_slice(&batch_addresses)?;
if computed_hash_chain != *leaves_hash_chain {
Expand Down
4 changes: 2 additions & 2 deletions forester/src/processor/v2/state/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ pub async fn fetch_batches<R: Rpc>(
input_start_index: Option<u64>,
fetch_len: u64,
zkp_batch_size: u64,
) -> crate::Result<Option<light_client::indexer::StateQueueDataV2>> {
) -> crate::Result<Option<light_client::indexer::StateQueueData>> {
let fetch_len_u16: u16 = match fetch_len.try_into() {
Ok(v) => v,
Err(_) => {
Expand Down Expand Up @@ -70,7 +70,7 @@ pub async fn fetch_batches<R: Rpc>(
.with_input_queue_batch_size(Some(zkp_batch_size_u16));

let res = indexer
.get_queue_elements_v2(context.merkle_tree.to_bytes(), options, None)
.get_queue_elements(context.merkle_tree.to_bytes(), options, None)
.await?;

Ok(res.value.state_queue)
Expand Down
4 changes: 2 additions & 2 deletions forester/src/processor/v2/state/supervisor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -431,7 +431,7 @@ impl<R: Rpc> StateSupervisor<R> {
async fn build_append_job(
&mut self,
batch_idx: usize,
state_queue: &light_client::indexer::StateQueueDataV2,
state_queue: &light_client::indexer::StateQueueData,
start: usize,
result_tx: mpsc::Sender<ProofResult>,
) -> crate::Result<Option<ProofJob>> {
Expand Down Expand Up @@ -483,7 +483,7 @@ impl<R: Rpc> StateSupervisor<R> {
async fn build_nullify_job(
&mut self,
batch_idx: usize,
state_queue: &light_client::indexer::StateQueueDataV2,
state_queue: &light_client::indexer::StateQueueData,
start: usize,
result_tx: mpsc::Sender<ProofResult>,
) -> crate::Result<Option<ProofJob>> {
Expand Down
29 changes: 16 additions & 13 deletions forester/tests/legacy/batched_address_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ use light_batched_merkle_tree::{
merkle_tree::BatchedMerkleTreeAccount,
};
use light_client::{
indexer::{photon_indexer::PhotonIndexer, AddressMerkleTreeAccounts, Indexer},
indexer::{
photon_indexer::PhotonIndexer, AddressMerkleTreeAccounts, Indexer, QueueElementsV2Options,
},
local_test_validator::LightValidatorConfig,
rpc::{client::RpcUrl, LightClient, LightClientConfig, Rpc},
};
Expand Down Expand Up @@ -167,24 +169,25 @@ async fn test_address_batched() {
)
.unwrap();

let photon_address_queue_with_proofs = photon_indexer
.get_address_queue_with_proofs(&address_merkle_tree_pubkey, 10, None, None)
let options = QueueElementsV2Options::default().with_address_queue(None, Some(10));
let photon_address_queue = photon_indexer
.get_queue_elements(address_merkle_tree_pubkey.to_bytes(), options.clone(), None)
.await
.unwrap();

let test_indexer_address_queue_with_proofs = env
let test_indexer_address_queue = env
.indexer
.get_address_queue_with_proofs(&address_merkle_tree_pubkey, 10, None, None)
.get_queue_elements(address_merkle_tree_pubkey.to_bytes(), options.clone(), None)
.await
.unwrap();

println!(
"photon_indexer_update_info {}: {:#?}",
0, photon_address_queue_with_proofs
0, photon_address_queue
);
println!(
"test_indexer_update_info {}: {:#?}",
0, test_indexer_address_queue_with_proofs
0, test_indexer_address_queue
);

for i in 0..merkle_tree.queue_batches.batch_size {
Expand All @@ -202,26 +205,26 @@ async fn test_address_batched() {
sleep(Duration::from_millis(100)).await;

if (i + 1) % 10 == 0 {
let photon_address_queue_with_proofs = photon_indexer
.get_address_queue_with_proofs(&address_merkle_tree_pubkey, 10, None, None)
let photon_address_queue = photon_indexer
.get_queue_elements(address_merkle_tree_pubkey.to_bytes(), options.clone(), None)
.await
.unwrap();

let test_indexer_address_queue_with_proofs = env
let test_indexer_address_queue = env
.indexer
.get_address_queue_with_proofs(&address_merkle_tree_pubkey, 10, None, None)
.get_queue_elements(address_merkle_tree_pubkey.to_bytes(), options.clone(), None)
.await
.unwrap();

println!(
"photon_indexer_update_info {}: {:#?}",
i + 1,
photon_address_queue_with_proofs
photon_address_queue
);
println!(
"test_indexer_update_info {}: {:#?}",
i + 1,
test_indexer_address_queue_with_proofs
test_indexer_address_queue
);
}
}
Expand Down
25 changes: 13 additions & 12 deletions program-tests/utils/src/e2e_test_env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,10 @@ use light_batched_merkle_tree::{
};
use light_client::{
fee::{FeeConfig, TransactionParams},
indexer::{AddressMerkleTreeAccounts, AddressWithTree, Indexer, StateMerkleTreeAccounts},
indexer::{
AddressMerkleTreeAccounts, AddressWithTree, Indexer, QueueElementsV2Options,
StateMerkleTreeAccounts,
},
rpc::{errors::RpcError, merkle_tree::MerkleTreeExt, Rpc},
};
// TODO: implement traits for context object and indexer that we can implement with an rpc as well
Expand Down Expand Up @@ -741,20 +744,18 @@ where
[full_batch_index as usize]
[zkp_batch_index as usize];

let addresses = self
let options = QueueElementsV2Options::default()
.with_address_queue(None, Some(batch.batch_size as u16));
let result = self
.indexer
.get_queue_elements(
merkle_tree_pubkey.to_bytes(),
None,
Some(batch.batch_size as u16),
None,
None,
None,
)
.get_queue_elements(merkle_tree_pubkey.to_bytes(), options, None)
.await
.unwrap();
let addresses =
addresses.value.output_queue_elements.unwrap_or_default().iter().map(|x| x.account_hash).collect::<Vec<_>>();
let addresses = result
.value
.address_queue
.map(|aq| aq.addresses)
.unwrap_or_default();
// // local_leaves_hash_chain is only used for a test assertion.
// let local_nullifier_hash_chain = create_hash_chain_from_array(&addresses);
// assert_eq!(leaves_hash_chain, local_nullifier_hash_chain);
Expand Down
Loading