Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/ingester/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ pub enum IngesterError {
ParserError(String),
#[error("Empty batch event.")]
EmptyBatchEvent,
#[error("Invalid event.")]
InvalidEvent,
}

impl From<sea_orm::error::DbErr> for IngesterError {
Expand Down
30 changes: 24 additions & 6 deletions src/ingester/parser/indexer_events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,24 +59,27 @@ pub struct CompressedAccountData {
/// [`StateMerkleTree`](light_merkle_tree_program::state::StateMerkleTree)
/// change. Indexers can use this type of events to re-build a non-sparse
/// version of state Merkle tree.
#[derive(AnchorDeserialize, AnchorSerialize, Debug)]
#[derive(AnchorDeserialize, AnchorSerialize, Clone, Eq, PartialEq, Debug)]
#[repr(C)]
pub enum MerkleTreeEvent {
V1(ChangelogEvent),
V2(NullifierEvent),
V3(IndexedMerkleTreeEvent),
BatchAppend(BatchEvent),
BatchNullify(BatchEvent),
BatchAddressAppend(BatchEvent),
}

/// Node of the Merkle path with an index representing the position in a
/// non-sparse Merkle tree.
#[derive(AnchorDeserialize, AnchorSerialize, Debug, Eq, PartialEq)]
#[derive(AnchorDeserialize, AnchorSerialize, Clone, Debug, Eq, PartialEq)]
pub struct PathNode {
pub node: [u8; 32],
pub index: u32,
}

/// Version 1 of the [`ChangelogEvent`](light_merkle_tree_program::state::ChangelogEvent).
#[derive(AnchorDeserialize, AnchorSerialize, Debug)]
#[derive(AnchorDeserialize, AnchorSerialize, PartialEq, Eq, Clone, Debug)]
pub struct ChangelogEvent {
/// Public key of the tree.
pub id: [u8; 32],
Expand All @@ -88,7 +91,7 @@ pub struct ChangelogEvent {
pub index: u32,
}

#[derive(AnchorSerialize, AnchorDeserialize, Debug)]
#[derive(AnchorSerialize, AnchorDeserialize, PartialEq, Eq, Clone, Debug)]
pub struct NullifierEvent {
/// Public key of the tree.
pub id: [u8; 32],
Expand All @@ -109,7 +112,7 @@ pub struct RawIndexedElement {
pub index: usize,
}

#[derive(AnchorDeserialize, AnchorSerialize, Debug, Clone)]
#[derive(AnchorDeserialize, AnchorSerialize, PartialEq, Eq, Debug, Clone)]
pub struct IndexedMerkleTreeUpdate {
pub new_low_element: RawIndexedElement,
/// Leaf hash in new_low_element.index.
Expand All @@ -120,7 +123,7 @@ pub struct IndexedMerkleTreeUpdate {
pub new_high_element_hash: [u8; 32],
}

#[derive(AnchorDeserialize, AnchorSerialize, Debug)]
#[derive(AnchorDeserialize, AnchorSerialize, Clone, PartialEq, Eq, Debug)]
pub struct IndexedMerkleTreeEvent {
/// Public key of the tree.
pub id: [u8; 32],
Expand All @@ -130,3 +133,18 @@ pub struct IndexedMerkleTreeEvent {
/// seq + 1 corresponds to leaves[1].
pub seq: u64,
}

#[repr(C)]
#[derive(AnchorDeserialize, AnchorSerialize, Debug, PartialEq, Clone, Eq)]
pub struct BatchEvent {
pub merkle_tree_pubkey: [u8; 32],
pub batch_index: u64,
pub zkp_batch_index: u64,
pub zkp_batch_size: u64,
pub old_next_index: u64,
pub new_next_index: u64,
pub new_root: [u8; 32],
pub root_index: u32,
pub sequence_number: u64,
pub output_queue_pubkey: Option<[u8; 32]>,
}
Original file line number Diff line number Diff line change
@@ -1,67 +1,69 @@
use std::collections::HashMap;

use crate::ingester::error::IngesterError;
use crate::ingester::parser::indexer_events::{
IndexedMerkleTreeEvent, MerkleTreeEvent, NullifierEvent, PublicTransactionEvent,
IndexedMerkleTreeEvent, MerkleTreeEvent, NullifierEvent,
};
use crate::ingester::parser::state_update::{
IndexedTreeLeafUpdate, LeafNullification, StateUpdate,
};
use crate::ingester::parser::tx_event_parser::parse_public_transaction_event;
use crate::ingester::parser::{ACCOUNT_COMPRESSION_PROGRAM_ID, NOOP_PROGRAM_ID, SYSTEM_PROGRAM};
use crate::ingester::parser::{ACCOUNT_COMPRESSION_PROGRAM_ID, NOOP_PROGRAM_ID};
use crate::ingester::typedefs::block_info::{Instruction, TransactionInfo};
use borsh::BorshDeserialize;
use log::info;
use solana_program::pubkey::Pubkey;
use solana_sdk::signature::Signature;

pub fn parse_legacy_merkle_tree_events(
signature: Signature,
instruction: &Instruction,
) -> Result<StateUpdate, IngesterError> {
let merkle_tree_event = MerkleTreeEvent::deserialize(&mut instruction.data.as_slice())
.map_err(|e| {
IngesterError::ParserError(format!("Failed to deserialize MerkleTreeEvent: {}", e))
})?;

match merkle_tree_event {
MerkleTreeEvent::V2(nullifier_event) => {
parse_legacy_nullifier_event(signature, nullifier_event)
}
MerkleTreeEvent::V3(indexed_merkle_tree_event) => {
parse_indexed_merkle_tree_update(indexed_merkle_tree_event)
}
_ => Err(IngesterError::ParserError(
"Expected nullifier event or merkle tree update".to_string(),
)),
}
}
pub type IndexedBatchEvents = HashMap<[u8; 32], Vec<(u64, MerkleTreeEvent)>>;

pub fn parse_legacy_public_transaction_event(
tx: &TransactionInfo,
slot: u64,
pub fn parse_merkle_tree_event(
instruction: &Instruction,
next_instruction: &Instruction,
next_next_instruction: &Instruction,
tx: &TransactionInfo,
) -> Result<Option<StateUpdate>, IngesterError> {
if ACCOUNT_COMPRESSION_PROGRAM_ID == instruction.program_id
&& next_instruction.program_id == SYSTEM_PROGRAM
&& next_next_instruction.program_id == NOOP_PROGRAM_ID
&& next_instruction.program_id == NOOP_PROGRAM_ID
&& tx.error.is_none()
{
info!(
"Indexing transaction with slot {} and id {}",
slot, tx.signature
);

let public_transaction_event =
PublicTransactionEvent::deserialize(&mut next_next_instruction.data.as_slice())
.map_err(|e| {
IngesterError::ParserError(format!(
"Failed to deserialize PublicTransactionEvent: {}",
e
))
})?;

parse_public_transaction_event(tx.signature, slot, public_transaction_event).map(Some)
let merkle_tree_event = MerkleTreeEvent::deserialize(&mut next_instruction.data.as_slice());
if let Ok(merkle_tree_event) = merkle_tree_event {
let mut state_update = StateUpdate::new();
let event = match merkle_tree_event {
MerkleTreeEvent::V2(nullifier_event) => {
parse_legacy_nullifier_event(tx.signature, nullifier_event)?
}
MerkleTreeEvent::V3(indexed_merkle_tree_event) => {
parse_indexed_merkle_tree_update(indexed_merkle_tree_event)?
}
MerkleTreeEvent::BatchAppend(batch_event) => {
state_update
.batch_events
.entry(batch_event.merkle_tree_pubkey)
.or_default()
.push((
batch_event.sequence_number,
MerkleTreeEvent::BatchAppend(batch_event),
));
state_update
}
MerkleTreeEvent::BatchNullify(batch_event) => {
state_update
.batch_events
.entry(batch_event.merkle_tree_pubkey)
.or_default()
.push((
batch_event.sequence_number,
MerkleTreeEvent::BatchNullify(batch_event),
));
state_update
}
_ => Err(IngesterError::ParserError(
"Expected nullifier event or merkle tree update".to_string(),
))?,
};
Ok(Some(event))
} else {
Ok(None)
}
} else {
Ok(None)
}
Expand Down
11 changes: 6 additions & 5 deletions src/ingester/parser/mod.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,19 @@
use batch_event_parser::{create_state_update, parse_merkle_tree_event};
use legacy::parse_legacy_public_transaction_event;
use merkle_tree_events_parser::parse_merkle_tree_event;
use solana_sdk::pubkey::Pubkey;
use tx_event_parser::parse_legacy_public_transaction_event;
use tx_event_parser_v2::create_state_update;

use super::{error::IngesterError, typedefs::block_info::TransactionInfo};

use self::state_update::{StateUpdate, Transaction};

pub mod batch_event_parser;
pub mod indexer_events;
mod legacy;
pub mod merkle_tree_events_parser;
pub mod state_update;
mod tx_event_parser;
pub mod tx_event_parser_v2;

use crate::ingester::parser::batch_event_parser::parse_public_transaction_event_v2;
use crate::ingester::parser::tx_event_parser_v2::parse_public_transaction_event_v2;
use solana_program::pubkey;
pub use tx_event_parser::map_tree_and_queue_accounts;

Expand Down
3 changes: 1 addition & 2 deletions src/ingester/parser/state_update.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use super::{
batch_event_parser::IndexedBatchEvents,
indexer_events::{MerkleTreeSequenceNumber, RawIndexedElement},
merkle_tree_events_parser::IndexedBatchEvents,
};
use crate::common::typedefs::account::AccountWithContext;
use crate::common::typedefs::hash::Hash;
Expand Down Expand Up @@ -81,7 +81,6 @@ pub struct StateUpdate {
pub transactions: HashSet<Transaction>,
pub leaf_nullifications: HashSet<LeafNullification>,
pub indexed_merkle_tree_updates: HashMap<(Pubkey, u64), IndexedTreeLeafUpdate>,

pub batch_events: IndexedBatchEvents,
pub input_context: Vec<BatchNullifyContext>,
}
Expand Down
37 changes: 37 additions & 0 deletions src/ingester/parser/tx_event_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,18 @@ use crate::common::typedefs::account::AccountWithContext;
use crate::ingester::error::IngesterError;
use crate::ingester::parser::indexer_events::PublicTransactionEvent;
use crate::ingester::parser::state_update::{AccountTransaction, StateUpdate};
use crate::ingester::typedefs::block_info::{Instruction, TransactionInfo};
use anchor_lang::AnchorDeserialize;
use lazy_static::lazy_static;
use light_merkle_tree_metadata::merkle_tree::TreeType;
use log::info;
use solana_program::pubkey::Pubkey;
use solana_sdk::pubkey;
use solana_sdk::signature::Signature;
use std::collections::HashMap;

use super::{ACCOUNT_COMPRESSION_PROGRAM_ID, NOOP_PROGRAM_ID, SYSTEM_PROGRAM};

pub struct TreeAndQueue {
tree: Pubkey,
queue: Pubkey,
Expand Down Expand Up @@ -89,6 +94,38 @@ pub fn map_tree_and_queue_accounts<'a>(pubkey: String) -> Option<&'a TreeAndQueu
QUEUE_TREE_MAPPING.get(pubkey.as_str())
}

pub fn parse_legacy_public_transaction_event(
tx: &TransactionInfo,
slot: u64,
instruction: &Instruction,
next_instruction: &Instruction,
next_next_instruction: &Instruction,
) -> Result<Option<StateUpdate>, IngesterError> {
if ACCOUNT_COMPRESSION_PROGRAM_ID == instruction.program_id
&& next_instruction.program_id == SYSTEM_PROGRAM
&& next_next_instruction.program_id == NOOP_PROGRAM_ID
&& tx.error.is_none()
{
info!(
"Indexing transaction with slot {} and id {}",
slot, tx.signature
);

let public_transaction_event =
PublicTransactionEvent::deserialize(&mut next_next_instruction.data.as_slice())
.map_err(|e| {
IngesterError::ParserError(format!(
"Failed to deserialize PublicTransactionEvent: {}",
e
))
})?;

parse_public_transaction_event(tx.signature, slot, public_transaction_event).map(Some)
} else {
Ok(None)
}
}

pub fn parse_public_transaction_event(
tx: Signature,
slot: u64,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,88 +1,15 @@
use std::collections::HashMap;

use crate::ingester::error::IngesterError;
use crate::ingester::parser::indexer_events::{
BatchPublicTransactionEvent, CompressedAccount, CompressedAccountData,
MerkleTreeSequenceNumber, OutputCompressedAccountWithPackedContext, PublicTransactionEvent,
};
use crate::ingester::parser::legacy::parse_legacy_merkle_tree_events;
use crate::ingester::parser::state_update::StateUpdate;
use crate::ingester::parser::tx_event_parser::parse_public_transaction_event;
use crate::ingester::parser::{ACCOUNT_COMPRESSION_PROGRAM_ID, NOOP_PROGRAM_ID};
use crate::ingester::typedefs::block_info::{Instruction, TransactionInfo};
use borsh::BorshDeserialize;
use light_batched_merkle_tree::event::{
BatchAppendEvent, BatchNullifyEvent, BATCH_ADDRESS_APPEND_EVENT_DISCRIMINATOR,
BATCH_APPEND_EVENT_DISCRIMINATOR, BATCH_NULLIFY_EVENT_DISCRIMINATOR,
};

use light_compressed_account::indexer_event::parse::event_from_light_transaction;
use solana_program::pubkey::Pubkey;
use solana_sdk::signature::Signature;

#[derive(Debug, Clone, Eq, PartialEq)]
pub enum BatchEvent {
BatchAppend(BatchAppendEvent),
BatchNullify(BatchNullifyEvent),
}

pub type IndexedBatchEvents = HashMap<[u8; 32], Vec<(u64, BatchEvent)>>;

pub fn parse_merkle_tree_event(
instruction: &Instruction,
next_instruction: &Instruction,
tx: &TransactionInfo,
) -> Result<Option<StateUpdate>, IngesterError> {
if ACCOUNT_COMPRESSION_PROGRAM_ID == instruction.program_id
&& next_instruction.program_id == NOOP_PROGRAM_ID
&& tx.error.is_none()
{
// Try to parse as batch append/nullify event first
if let Ok(batch_event) =
BatchAppendEvent::deserialize(&mut next_instruction.data.as_slice())
{
let mut state_update = StateUpdate::new();

match batch_event.discriminator {
BATCH_APPEND_EVENT_DISCRIMINATOR => {
state_update
.batch_events
.entry(batch_event.merkle_tree_pubkey)
.or_default()
.push((
batch_event.sequence_number,
BatchEvent::BatchAppend(batch_event),
));
}
BATCH_NULLIFY_EVENT_DISCRIMINATOR => {
state_update
.batch_events
.entry(batch_event.merkle_tree_pubkey)
.or_default()
.push((
batch_event.sequence_number,
BatchEvent::BatchNullify(batch_event),
));
}
// TODO: implement address append (in different PR)
_ => {
log::warn!(
"Unsupported batch event discriminator: {} batch address discriminator: {}",
batch_event.discriminator,
BATCH_ADDRESS_APPEND_EVENT_DISCRIMINATOR
);
}
}

return Ok(Some(state_update));
}

// If not batch event, try legacy events
parse_legacy_merkle_tree_events(tx.signature, next_instruction).map(Some)
} else {
Ok(None)
}
}

pub fn parse_public_transaction_event_v2(
program_ids: &[Pubkey],
instructions: &[Vec<u8>],
Expand Down
Loading