diff --git a/Cargo.lock b/Cargo.lock index a1c44cb394..47ba5b6c93 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2379,6 +2379,7 @@ dependencies = [ "solana-account-decoder", "solana-client", "solana-program", + "solana-pubkey 2.4.0", "solana-rpc-client-api", "solana-sdk", "solana-transaction-status", diff --git a/forester/Cargo.toml b/forester/Cargo.toml index 18ef1ab14c..1aa74e6168 100644 --- a/forester/Cargo.toml +++ b/forester/Cargo.toml @@ -62,6 +62,7 @@ num-bigint = { workspace = true } kameo = "0.19" once_cell = "1.21.3" async-channel = "2.3" +solana-pubkey = { workspace = true } [dev-dependencies] serial_test = { workspace = true } diff --git a/forester/src/compressible/compressor.rs b/forester/src/compressible/compressor.rs index 2c75b65765..7801158485 100644 --- a/forester/src/compressible/compressor.rs +++ b/forester/src/compressible/compressor.rs @@ -2,7 +2,8 @@ use std::{str::FromStr, sync::Arc}; use anchor_lang::{InstructionData, ToAccountMetas}; use forester_utils::rpc_pool::SolanaRpcPool; -use light_client::rpc::Rpc; +use light_client::{indexer::TreeInfo, rpc::Rpc}; +use light_compressed_account::TreeType; use light_compressible::config::CompressibleConfig; use light_ctoken_interface::CTOKEN_PROGRAM_ID; use light_ctoken_sdk::compressed_token::compress_and_close::CompressAndCloseAccounts as CTokenAccounts; @@ -11,13 +12,14 @@ use light_registry::{ instruction::CompressAndClose, }; use light_sdk::instruction::PackedAccounts; +use solana_pubkey::pubkey; use solana_sdk::{ instruction::Instruction, pubkey::Pubkey, signature::{Keypair, Signature}, signer::Signer, }; -use tracing::{debug, warn}; +use tracing::{debug, info}; use super::{state::CompressibleAccountTracker, types::CompressibleAccountState}; use crate::Result; @@ -83,13 +85,23 @@ impl Compressor { // Get output tree from RPC let mut rpc = self.rpc_pool.get_connection().await?; - rpc.get_latest_active_state_trees() - .await - .map_err(|e| anyhow::anyhow!("Failed to get state tree info: {}", e))?; - let output_tree_info = rpc - .get_random_state_tree_info() - .map_err(|e| anyhow::anyhow!("Failed to get state tree info: {}", e))?; + // FIXME: Use latest active state tree after updating lookup tables + // rpc.get_latest_active_state_trees() + // .await + // .map_err(|e| anyhow::anyhow!("Failed to get state tree info: {}", e))?; + // let output_tree_info = rpc + // .get_random_state_tree_info() + // .map_err(|e| anyhow::anyhow!("Failed to get state tree info: {}", e))?; + + let output_tree_info = TreeInfo { + tree: pubkey!("bmt1LryLZUMmF7ZtqESaw7wifBXLfXHQYoE4GAmrahU"), + queue: pubkey!("oq1na8gojfdUhsfCpyjNt6h4JaDWtHf1yQj4koBWfto"), + cpi_context: Some(pubkey!("cpi15BoVPKgEPw5o8wc2T816GE7b378nMXnhH3Xbq4y")), + tree_type: TreeType::StateV2, + next_tree_info: None, + }; + let output_queue = output_tree_info .get_output_pubkey() .map_err(|e| anyhow::anyhow!("Failed to get output queue: {}", e))?; @@ -196,9 +208,6 @@ impl Compressor { accounts.len() ); - // Collect pubkeys for sync before creating instruction - let pubkeys: Vec<_> = account_states.iter().map(|state| state.pubkey).collect(); - let ix = Instruction { program_id: registry_program_id, accounts, @@ -206,6 +215,8 @@ impl Compressor { }; // Send transaction + // Note: Account removal from tracker is handled by LogSubscriber which parses + // the "compress_and_close:" logs emitted by the registry program let signature = rpc .create_and_send_transaction( &[ix], @@ -215,10 +226,11 @@ impl Compressor { .await .map_err(|e| anyhow::anyhow!("Failed to send transaction: {}", e))?; - // Sync accounts to verify they're closed - if let Err(e) = self.tracker.sync_accounts(&*rpc, &pubkeys).await { - warn!("Failed to sync accounts after compression: {:?}. Tracker will update via subscriptions.", e); - } + info!( + "compress_and_close tx with ({:?}) accounts sent {}", + account_states.iter().map(|a| a.pubkey.to_string()), + signature + ); Ok(signature) } diff --git a/forester/src/compressible/config.rs b/forester/src/compressible/config.rs index 2b7c9b77d9..ac10229a86 100644 --- a/forester/src/compressible/config.rs +++ b/forester/src/compressible/config.rs @@ -13,7 +13,7 @@ pub struct CompressibleConfig { } fn default_batch_size() -> usize { - 10 + 5 } fn default_max_concurrent_batches() -> usize { diff --git a/forester/src/compressible/mod.rs b/forester/src/compressible/mod.rs index ee22882838..2cd8d30b5b 100644 --- a/forester/src/compressible/mod.rs +++ b/forester/src/compressible/mod.rs @@ -9,5 +9,5 @@ pub use bootstrap::bootstrap_compressible_accounts; pub use compressor::Compressor; pub use config::CompressibleConfig; pub use state::CompressibleAccountTracker; -pub use subscriber::AccountSubscriber; +pub use subscriber::{AccountSubscriber, LogSubscriber}; pub use types::CompressibleAccountState; diff --git a/forester/src/compressible/state.rs b/forester/src/compressible/state.rs index 82a047a57d..accda1359c 100644 --- a/forester/src/compressible/state.rs +++ b/forester/src/compressible/state.rs @@ -156,86 +156,6 @@ impl CompressibleAccountTracker { Ok(()) } - - /// Query accounts and update tracker: remove non-existent accounts, update lamports for existing ones - pub async fn sync_accounts( - &self, - rpc: &R, - pubkeys: &[Pubkey], - ) -> Result<()> { - // Query all accounts at once using get_multiple_accounts - let accounts = rpc.get_multiple_accounts(pubkeys).await?; - - for (pubkey, account_opt) in pubkeys.iter().zip(accounts.iter()) { - match account_opt { - Some(account) => { - // Check if account is closed (lamports == 0) - if account.lamports == 0 { - self.remove(pubkey); - debug!("Removed closed account {} (lamports == 0)", pubkey); - continue; - } - - // Re-deserialize account data to verify it's still valid - let ctoken = match CToken::try_from_slice(&account.data) { - Ok(ct) => ct, - Err(e) => { - self.remove(pubkey); - debug!( - "Removed account {} (deserialization failed: {:?})", - pubkey, e - ); - continue; - } - }; - - // Verify Compressible extension still exists - let has_compressible_ext = ctoken.extensions.as_ref().is_some_and(|exts| { - exts.iter() - .any(|ext| matches!(ext, ExtensionStruct::Compressible(_))) - }); - - if !has_compressible_ext { - self.remove(pubkey); - debug!( - "Removed account {} (missing Compressible extension)", - pubkey - ); - continue; - } - - // Account is valid - update state - if let Some(mut state) = self.accounts.get_mut(pubkey) { - match calculate_compressible_slot(&ctoken, account.lamports) { - Ok(compressible_slot) => { - state.account = ctoken; - state.lamports = account.lamports; - state.compressible_slot = compressible_slot; - debug!( - "Updated account {}: lamports={}, compressible_slot={}", - pubkey, account.lamports, compressible_slot - ); - } - Err(e) => { - warn!( - "Failed to calculate compressible slot for account {}: {}. Removing from tracker.", - pubkey, e - ); - drop(state); - self.remove(pubkey); - } - } - } - } - None => { - // Account doesn't exist - remove from tracker - self.remove(pubkey); - debug!("Removed non-existent account {}", pubkey); - } - } - } - Ok(()) - } } impl Default for CompressibleAccountTracker { diff --git a/forester/src/compressible/subscriber.rs b/forester/src/compressible/subscriber.rs index ccfd57b657..c2d857747a 100644 --- a/forester/src/compressible/subscriber.rs +++ b/forester/src/compressible/subscriber.rs @@ -5,29 +5,38 @@ use light_ctoken_interface::{COMPRESSIBLE_TOKEN_ACCOUNT_SIZE, CTOKEN_PROGRAM_ID} use solana_account_decoder::UiAccountEncoding; use solana_client::{ nonblocking::pubsub_client::PubsubClient, - rpc_config::{RpcAccountInfoConfig, RpcProgramAccountsConfig}, - rpc_response::{Response as RpcResponse, RpcKeyedAccount}, + rpc_config::{ + RpcAccountInfoConfig, RpcProgramAccountsConfig, RpcTransactionLogsConfig, + RpcTransactionLogsFilter, + }, + rpc_response::{Response as RpcResponse, RpcKeyedAccount, RpcLogsResponse}, }; use solana_rpc_client_api::filter::RpcFilterType; use solana_sdk::{commitment_config::CommitmentConfig, pubkey::Pubkey}; -use tokio::sync::oneshot; +use tokio::sync::broadcast; use tracing::{debug, error, info}; use super::state::CompressibleAccountTracker; use crate::Result; +/// Registry program ID for subscribing to compress_and_close logs +const REGISTRY_PROGRAM_ID_STR: &str = "Lighton6oQpVkeewmo2mcPTQQp7kYHr4fWpAgJyEmDX"; + +/// Log prefix emitted by registry program when closing accounts +const COMPRESS_AND_CLOSE_LOG_PREFIX: &str = "compress_and_close:"; + /// Subscribes to account changes for all compressible CToken accounts pub struct AccountSubscriber { ws_url: String, tracker: Arc, - shutdown_rx: oneshot::Receiver<()>, + shutdown_rx: broadcast::Receiver<()>, } impl AccountSubscriber { pub fn new( ws_url: String, tracker: Arc, - shutdown_rx: oneshot::Receiver<()>, + shutdown_rx: broadcast::Receiver<()>, ) -> Self { Self { ws_url, @@ -45,7 +54,6 @@ impl AccountSubscriber { .map_err(|e| anyhow::anyhow!("Failed to connect to WebSocket: {}", e))?; let program_id = Pubkey::new_from_array(CTOKEN_PROGRAM_ID); - // Subscribe to compressed token program accounts with filter for compressible account size let (mut subscription, unsubscribe) = pubsub_client .program_subscribe( @@ -87,7 +95,7 @@ impl AccountSubscriber { } } } - _ = &mut self.shutdown_rx => { + _ = self.shutdown_rx.recv() => { info!("Shutdown signal received"); unsubscribe().await; break; @@ -154,3 +162,125 @@ impl AccountSubscriber { } } } + +/// Subscribes to registry program logs to detect compress_and_close operations +/// and remove closed accounts from the tracker by parsing log messages directly +pub struct LogSubscriber { + ws_url: String, + tracker: Arc, + shutdown_rx: broadcast::Receiver<()>, +} + +impl LogSubscriber { + pub fn new( + ws_url: String, + tracker: Arc, + shutdown_rx: broadcast::Receiver<()>, + ) -> Self { + Self { + ws_url, + tracker, + shutdown_rx, + } + } + + pub async fn run(&mut self) -> Result<()> { + info!("Starting log subscriber at {}", self.ws_url); + + // Connect to WebSocket + let pubsub_client = PubsubClient::new(&self.ws_url) + .await + .map_err(|e| anyhow::anyhow!("Failed to connect to WebSocket: {}", e))?; + + let registry_program_id = Pubkey::from_str(REGISTRY_PROGRAM_ID_STR) + .map_err(|e| anyhow::anyhow!("Invalid registry program ID: {}", e))?; + + // Subscribe to logs mentioning the registry program + let filter = RpcTransactionLogsFilter::Mentions(vec![registry_program_id.to_string()]); + let config = RpcTransactionLogsConfig { + commitment: Some(CommitmentConfig::confirmed()), + }; + + let (mut subscription, unsubscribe) = pubsub_client + .logs_subscribe(filter, config) + .await + .map_err(|e| anyhow::anyhow!("Failed to subscribe to logs: {}", e))?; + + info!( + "Log subscription established for registry program {}", + registry_program_id + ); + + // Process subscription messages + loop { + tokio::select! { + result = subscription.next() => { + match result { + Some(response) => { + self.handle_log_notification(response); + } + None => { + error!("Log subscription stream closed unexpectedly"); + unsubscribe().await; + return Err(anyhow::anyhow!("Log subscription stream closed")); + } + } + } + _ = self.shutdown_rx.recv() => { + info!("Shutdown signal received for log subscriber"); + unsubscribe().await; + break; + } + } + } + + info!("Log subscriber stopped"); + Ok(()) + } + + fn handle_log_notification(&self, response: RpcResponse) { + let logs_response = response.value; + + // Skip failed transactions + if logs_response.err.is_some() { + debug!("Skipping failed transaction {}", logs_response.signature); + return; + } + + // Parse logs looking for compress_and_close entries + let mut removed_count = 0; + for log in &logs_response.logs { + // Look for our log prefix: "Program log: compress_and_close:" + // The actual log format is "Program log: compress_and_close:" + if let Some(pubkey_str) = log + .strip_prefix("Program log: ") + .and_then(|s| s.strip_prefix(COMPRESS_AND_CLOSE_LOG_PREFIX)) + { + match Pubkey::from_str(pubkey_str) { + Ok(pubkey) => { + if self.tracker.remove(&pubkey).is_some() { + debug!( + "Removed closed account {} from tracker (compress_and_close log)", + pubkey + ); + removed_count += 1; + } + } + Err(e) => { + error!( + "Invalid pubkey in compress_and_close log '{}': {}", + pubkey_str, e + ); + } + } + } + } + + if removed_count > 0 { + info!( + "Removed {} closed accounts from transaction {}", + removed_count, logs_response.signature + ); + } + } +} diff --git a/forester/src/epoch_manager.rs b/forester/src/epoch_manager.rs index d0dc4300f5..e5aa9140d4 100644 --- a/forester/src/epoch_manager.rs +++ b/forester/src/epoch_manager.rs @@ -194,18 +194,6 @@ impl EpochManager { } pub async fn run(self: Arc) -> Result<()> { - // Add synthetic compression tree if enabled - if self.compressible_tracker.is_some() && self.config.compressible_config.is_some() { - let compression_tree_accounts = TreeAccounts { - merkle_tree: solana_sdk::pubkey::Pubkey::default(), - queue: solana_sdk::pubkey::Pubkey::default(), - tree_type: TreeType::Unknown, - is_rolledover: false, - }; - self.add_new_tree(compression_tree_accounts).await?; - info!("Added compression tree"); - } - let (tx, mut rx) = mpsc::channel(100); let tx = Arc::new(tx); @@ -939,6 +927,24 @@ impl EpochManager { let trees = self.trees.lock().await; trace!("Adding schedule for trees: {:?}", *trees); epoch_info.add_trees_with_schedule(&trees, slot)?; + + if self.compressible_tracker.is_some() && self.config.compressible_config.is_some() { + let compression_tree_accounts = TreeAccounts { + merkle_tree: solana_sdk::pubkey::Pubkey::default(), + queue: solana_sdk::pubkey::Pubkey::default(), + tree_type: TreeType::Unknown, + is_rolledover: false, + }; + let tree_schedule = TreeForesterSchedule::new_with_schedule( + &compression_tree_accounts, + slot, + &epoch_info.forester_epoch_pda, + &epoch_info.epoch_pda, + )?; + epoch_info.trees.insert(0, tree_schedule); + debug!("Added compression tree to epoch {}", epoch_info.epoch.epoch); + } + info!("Finished waiting for active phase"); Ok(epoch_info) } @@ -963,66 +969,85 @@ impl EpochManager { self.sync_slot().await?; - let (_, v2_trees): (Vec<_>, Vec<_>) = epoch_info + let queue_poller = self.queue_poller.clone(); + + let self_arc = Arc::new(self.clone()); + let epoch_info_arc = Arc::new(epoch_info.clone()); + let mut handles: Vec>> = Vec::new(); + + let trees_to_process: Vec<_> = epoch_info .trees .iter() .filter(|tree| !should_skip_tree(&self.config, &tree.tree_accounts.tree_type)) - .partition(|tree| { + .cloned() + .collect(); + + let v2_trees: Vec<_> = trees_to_process + .iter() + .filter(|tree| { matches!( tree.tree_accounts.tree_type, - TreeType::StateV1 | TreeType::AddressV1 + TreeType::StateV2 | TreeType::AddressV2 ) - }); - - let queue_poller = self.queue_poller.clone(); + }) + .collect(); if queue_poller.is_some() { info!("Using QueueInfoPoller for {} V2 trees", v2_trees.len()); } - let self_arc = Arc::new(self.clone()); - let epoch_info_arc = Arc::new(epoch_info.clone()); - let mut handles: Vec>> = Vec::new(); + let mut v2_receivers: std::collections::HashMap< + Pubkey, + mpsc::Receiver, + > = std::collections::HashMap::new(); + + if !v2_trees.is_empty() { + if let Some(ref poller) = queue_poller { + let registration_futures: Vec<_> = v2_trees + .iter() + .map(|tree| { + let poller = poller.clone(); + let tree_pubkey = tree.tree_accounts.merkle_tree; + async move { + let result = poller.ask(RegisterTree { tree_pubkey }).send().await; + (tree_pubkey, result) + } + }) + .collect(); - for tree in epoch_info.trees.iter() { - if should_skip_tree(&self.config, &tree.tree_accounts.tree_type) { - continue; - } + let results = join_all(registration_futures).await; - let queue_update_rx = if matches!( - tree.tree_accounts.tree_type, - TreeType::StateV2 | TreeType::AddressV2 - ) { - if let Some(ref poller) = queue_poller { - match poller - .ask(RegisterTree { - tree_pubkey: tree.tree_accounts.merkle_tree, - }) - .send() - .await - { - Ok(rx) => Some(rx), + for (tree_pubkey, result) in results { + match result { + Ok(rx) => { + v2_receivers.insert(tree_pubkey, rx); + } Err(e) => { error!( "Failed to register V2 tree {} with queue poller: {:?}.", - tree.tree_accounts.merkle_tree, e + tree_pubkey, e ); return Err(anyhow::anyhow!( "Failed to register V2 tree {} with queue poller: {}. Cannot process without queue updates.", - tree.tree_accounts.merkle_tree, e + tree_pubkey, e )); } } - } else { - error!( - "No queue poller available for V2 tree {}.", - tree.tree_accounts.merkle_tree - ); - return Err(anyhow::anyhow!( - "No queue poller available for V2 tree {}. Cannot process without queue updates.", - tree.tree_accounts.merkle_tree - )); } + } else { + error!("No queue poller available for V2 trees."); + return Err(anyhow::anyhow!( + "No queue poller available for V2 trees. Cannot process without queue updates." + )); + } + } + + for tree in trees_to_process { + let queue_update_rx = if matches!( + tree.tree_accounts.tree_type, + TreeType::StateV2 | TreeType::AddressV2 + ) { + v2_receivers.remove(&tree.tree_accounts.merkle_tree) } else { None }; @@ -1035,7 +1060,6 @@ impl EpochManager { let self_clone = self_arc.clone(); let epoch_info_clone = epoch_info_arc.clone(); - let tree = tree.clone(); let handle = tokio::spawn(async move { self_clone @@ -1173,7 +1197,7 @@ impl EpochManager { if let Some((slot_idx, light_slot_details)) = next_slot_to_process { let result = match tree_type { - TreeType::StateV1 | TreeType::AddressV1 => { + TreeType::StateV1 | TreeType::AddressV1 | TreeType::Unknown => { self.process_light_slot( epoch_info, epoch_pda, @@ -1207,13 +1231,6 @@ impl EpochManager { )) } } - TreeType::Unknown => { - warn!( - "TreeType::Unknown not supported for light slot processing. \ - Compression is handled separately via dispatch_compression()" - ); - Ok(()) - } }; match result { @@ -1563,7 +1580,14 @@ impl EpochManager { queue_update: Option<&QueueUpdateMessage>, ) -> Result { match tree_accounts.tree_type { - TreeType::Unknown => self.dispatch_compression(epoch_info.epoch).await, + TreeType::Unknown => { + self.dispatch_compression( + epoch_info, + forester_slot_details, + consecutive_eligibility_end, + ) + .await + } TreeType::StateV1 | TreeType::AddressV1 => { self.process_v1( epoch_info, @@ -1586,8 +1610,30 @@ impl EpochManager { } } - async fn dispatch_compression(&self, current_epoch: u64) -> Result { - trace!("Dispatching compression for epoch {}", current_epoch); + async fn dispatch_compression( + &self, + epoch_info: &Epoch, + forester_slot_details: &ForesterSlot, + consecutive_eligibility_end: u64, + ) -> Result { + let current_slot = self.slot_tracker.estimated_current_slot(); + if current_slot >= consecutive_eligibility_end { + debug!( + "Skipping compression: forester no longer eligible (current_slot={}, eligibility_end={})", + current_slot, consecutive_eligibility_end + ); + return Ok(0); + } + + if current_slot >= forester_slot_details.end_solana_slot { + debug!( + "Skipping compression: forester slot ended (current_slot={}, slot_end={})", + current_slot, forester_slot_details.end_solana_slot + ); + return Ok(0); + } + + debug!("Dispatching compression for epoch {}", epoch_info.epoch); let tracker = self .compressible_tracker @@ -1599,8 +1645,6 @@ impl EpochManager { .compressible_config .as_ref() .ok_or_else(|| anyhow!("Compressible config not set"))?; - - let current_slot = self.slot_tracker.estimated_current_slot(); let accounts = tracker.get_ready_to_compress(current_slot); if accounts.is_empty() { @@ -1626,7 +1670,7 @@ impl EpochManager { let (registered_forester_pda, _) = light_registry::utils::get_forester_epoch_pda_from_authority( &self.config.derivation_pubkey, - current_epoch, + epoch_info.epoch, ); // Create parallel compression futures @@ -1639,9 +1683,38 @@ impl EpochManager { .map(|(idx, chunk)| (idx, chunk.to_vec())) .collect(); + let slot_tracker = self.slot_tracker.clone(); + // Shared cancellation flag - when set, all pending futures should skip processing + let cancelled = Arc::new(AtomicBool::new(false)); + let compression_futures = batches.into_iter().map(|(batch_idx, batch)| { let compressor = compressor.clone(); + let slot_tracker = slot_tracker.clone(); + let cancelled = cancelled.clone(); async move { + // Check if already cancelled by another future + if cancelled.load(Ordering::Relaxed) { + debug!( + "Skipping compression batch {}/{}: cancelled", + batch_idx + 1, + num_batches + ); + return Err((batch_idx, batch.len(), anyhow!("Cancelled"))); + } + + // Check forester is still eligible before processing this batch + let current_slot = slot_tracker.estimated_current_slot(); + if current_slot >= consecutive_eligibility_end { + // Signal cancellation to all other futures + cancelled.store(true, Ordering::Relaxed); + warn!( + "Cancelling compression: forester no longer eligible (current_slot={}, eligibility_end={})", + current_slot, + consecutive_eligibility_end + ); + return Err((batch_idx, batch.len(), anyhow!("Forester no longer eligible"))); + } + debug!( "Processing compression batch {}/{} with {} accounts", batch_idx + 1, @@ -1653,8 +1726,24 @@ impl EpochManager { .compress_batch(&batch, registered_forester_pda) .await { - Ok(sig) => Ok((batch_idx, batch.len(), sig)), - Err(e) => Err((batch_idx, batch.len(), e)), + Ok(sig) => { + debug!( + "Compression batch {}/{} succeeded: {}", + batch_idx + 1, + num_batches, + sig + ); + Ok((batch_idx, batch.len(), sig)) + } + Err(e) => { + error!( + "Compression batch {}/{} failed: {:?}", + batch_idx + 1, + num_batches, + e + ); + Err((batch_idx, batch.len(), e)) + } } } }); @@ -1693,7 +1782,7 @@ impl EpochManager { info!( "Completed compression for epoch {}: compressed {} accounts", - current_epoch, total_compressed + epoch_info.epoch, total_compressed ); Ok(total_compressed) } diff --git a/forester/src/lib.rs b/forester/src/lib.rs index ade1fb161b..de5254d0bb 100644 --- a/forester/src/lib.rs +++ b/forester/src/lib.rs @@ -158,7 +158,7 @@ pub async fn run_pipeline( rpc_rate_limiter: Option, send_tx_rate_limiter: Option, shutdown_service: oneshot::Receiver<()>, - shutdown_compressible: Option>, + shutdown_compressible: Option>, shutdown_bootstrap: Option>, work_report_sender: mpsc::Sender, ) -> Result<()> { @@ -225,7 +225,10 @@ pub async fn run_pipeline( let tracker_clone = tracker.clone(); let ws_url = compressible_config.ws_url.clone(); - // Spawn subscriber + // Create a second receiver for the log subscriber + let shutdown_rx_log = shutdown_rx.resubscribe(); + + // Spawn account subscriber tokio::spawn(async move { let mut subscriber = compressible::AccountSubscriber::new(ws_url, tracker_clone, shutdown_rx); @@ -234,6 +237,21 @@ pub async fn run_pipeline( } }); + // Spawn log subscriber to detect compress_and_close operations + let tracker_clone_log = tracker.clone(); + let ws_url_log = compressible_config.ws_url.clone(); + + tokio::spawn(async move { + let mut log_subscriber = compressible::LogSubscriber::new( + ws_url_log, + tracker_clone_log, + shutdown_rx_log, + ); + if let Err(e) = log_subscriber.run().await { + tracing::error!("Log subscriber error: {:?}", e); + } + }); + // Spawn bootstrap task if let Some(shutdown_bootstrap_rx) = shutdown_bootstrap { let tracker_clone = tracker.clone(); diff --git a/forester/src/main.rs b/forester/src/main.rs index d56cafc143..c157ac54c5 100644 --- a/forester/src/main.rs +++ b/forester/src/main.rs @@ -41,7 +41,7 @@ async fn main() -> Result<(), ForesterError> { let (shutdown_receiver_compressible, shutdown_receiver_bootstrap) = if config.compressible_config.is_some() { let (shutdown_sender_compressible, shutdown_receiver_compressible) = - oneshot::channel(); + tokio::sync::broadcast::channel(1); let (shutdown_sender_bootstrap, shutdown_receiver_bootstrap) = oneshot::channel(); tokio::spawn(async move { diff --git a/forester/tests/e2e_test.rs b/forester/tests/e2e_test.rs index abb43eb646..c1312ba6a8 100644 --- a/forester/tests/e2e_test.rs +++ b/forester/tests/e2e_test.rs @@ -759,12 +759,13 @@ async fn setup_forester_pipeline( ) -> ( tokio::task::JoinHandle>, oneshot::Sender<()>, - oneshot::Sender<()>, + tokio::sync::broadcast::Sender<()>, oneshot::Sender<()>, mpsc::Receiver, ) { let (shutdown_sender, shutdown_receiver) = oneshot::channel(); - let (shutdown_compressible_sender, shutdown_compressible_receiver) = oneshot::channel(); + let (shutdown_compressible_sender, shutdown_compressible_receiver) = + tokio::sync::broadcast::channel(1); let (shutdown_bootstrap_sender, shutdown_bootstrap_receiver) = oneshot::channel(); let (work_report_sender, work_report_receiver) = mpsc::channel(100); diff --git a/forester/tests/test_batch_append_spent.rs b/forester/tests/test_batch_append_spent.rs index a0b189e191..9df0c0ee47 100644 --- a/forester/tests/test_batch_append_spent.rs +++ b/forester/tests/test_batch_append_spent.rs @@ -322,7 +322,8 @@ async fn test_batch_sequence() { async fn run_forester(config: &ForesterConfig, duration: Duration) { let (shutdown_sender, shutdown_receiver) = oneshot::channel(); - let (shutdown_compressible_sender, shutdown_compressible_receiver) = oneshot::channel(); + let (shutdown_compressible_sender, shutdown_compressible_receiver) = + tokio::sync::broadcast::channel(1); let (work_report_sender, _) = mpsc::channel(100); let service_handle = tokio::spawn(run_pipeline::( diff --git a/forester/tests/test_compressible_ctoken.rs b/forester/tests/test_compressible_ctoken.rs index c2a9fb2d7a..77d5fd0f60 100644 --- a/forester/tests/test_compressible_ctoken.rs +++ b/forester/tests/test_compressible_ctoken.rs @@ -1,6 +1,8 @@ use std::{sync::Arc, time::Duration}; -use forester::compressible::{AccountSubscriber, CompressibleAccountTracker, Compressor}; +use forester::compressible::{ + AccountSubscriber, CompressibleAccountTracker, Compressor, LogSubscriber, +}; use forester_utils::{ forester_epoch::get_epoch_phases, rpc_pool::{SolanaRpcPool, SolanaRpcPoolBuilder}, @@ -203,16 +205,35 @@ async fn test_compressible_ctoken_compression() { rpc.airdrop_lamports(&payer.pubkey(), 10_000_000_000) .await .expect("Failed to airdrop lamports"); - // Setup tracker and subscriber + // Setup tracker and subscribers let tracker = Arc::new(CompressibleAccountTracker::new()); - let (shutdown_tx, shutdown_rx) = oneshot::channel(); - let mut subscriber = AccountSubscriber::new( + let (shutdown_tx, shutdown_rx) = tokio::sync::broadcast::channel(1); + let shutdown_rx_log = shutdown_tx.subscribe(); + + // Spawn account subscriber to track new/updated accounts + let mut account_subscriber = AccountSubscriber::new( "ws://localhost:8900".to_string(), tracker.clone(), shutdown_rx, ); - let subscriber_handle = tokio::spawn(async move { - subscriber.run().await.expect("Subscriber failed to run"); + let account_subscriber_handle = tokio::spawn(async move { + account_subscriber + .run() + .await + .expect("Account subscriber failed to run"); + }); + + // Spawn log subscriber to detect compress_and_close operations + let mut log_subscriber = LogSubscriber::new( + "ws://localhost:8900".to_string(), + tracker.clone(), + shutdown_rx_log, + ); + let log_subscriber_handle = tokio::spawn(async move { + log_subscriber + .run() + .await + .expect("Log subscriber failed to run"); }); sleep(Duration::from_secs(2)).await; // Create mint @@ -332,7 +353,12 @@ async fn test_compressible_ctoken_compression() { shutdown_tx .send(()) .expect("Failed to send shutdown signal"); - subscriber_handle.await.expect("Subscriber task panicked"); + account_subscriber_handle + .await + .expect("Account subscriber task panicked"); + log_subscriber_handle + .await + .expect("Log subscriber task panicked"); } /// Test that bootstrap process picks up existing compressible token accounts diff --git a/programs/registry/src/compressible/compress_and_close.rs b/programs/registry/src/compressible/compress_and_close.rs index 534865160b..8a51a1c30d 100644 --- a/programs/registry/src/compressible/compress_and_close.rs +++ b/programs/registry/src/compressible/compress_and_close.rs @@ -53,6 +53,16 @@ pub fn process_compress_and_close<'c: 'info, 'info>( Transfer2CpiAccounts::try_from_account_infos(fee_payer, ctx.remaining_accounts) .map_err(ProgramError::from)?; + // Emit logs for closed accounts (used by forester to track closures) + for idx in &indices { + if let Ok(source_account) = transfer2_accounts + .packed_accounts + .get_u8(idx.source_index, "source_account") + { + msg!("compress_and_close:{}", source_account.key); + } + } + let instruction = compress_and_close_ctoken_accounts_with_indices( ctx.accounts.authority.key(), authority_index, diff --git a/xtask/src/create_ctoken_account.rs b/xtask/src/create_ctoken_account.rs index 5d1fa00f7a..be9097d43c 100644 --- a/xtask/src/create_ctoken_account.rs +++ b/xtask/src/create_ctoken_account.rs @@ -4,7 +4,10 @@ use clap::Parser; use dirs::home_dir; use light_client::rpc::{LightClient, LightClientConfig, Rpc}; use light_ctoken_sdk::ctoken::{CompressibleParams, CreateCTokenAccount}; -use solana_sdk::signature::{read_keypair_file, Keypair, Signer}; +use solana_sdk::{ + signature::{read_keypair_file, Keypair, Signer}, + transaction::Transaction, +}; #[derive(Debug, Parser)] pub struct Options { @@ -82,9 +85,14 @@ pub async fn create_ctoken_account(options: Options) -> anyhow::Result<()> { .with_compressible(compressible_params) .instruction()?; - let signature = rpc - .create_and_send_transaction(&[create_ix], &payer.pubkey(), &[&payer, &account_keypair]) - .await?; + let transaction = Transaction::new_signed_with_payer( + &[create_ix], + Some(&payer.pubkey()), + &[&payer, &account_keypair], + rpc.get_latest_blockhash().await?.0, + ); + + let signature = rpc.send_transaction(&transaction).await?; println!( "[{}/{}] Account: {} | Mint: {} | Sig: {:?}",