Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions forester/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ num-bigint = { workspace = true }
kameo = "0.19"
once_cell = "1.21.3"
async-channel = "2.3"
solana-pubkey = { workspace = true }

[dev-dependencies]
serial_test = { workspace = true }
Expand Down
42 changes: 27 additions & 15 deletions forester/src/compressible/compressor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ use std::{str::FromStr, sync::Arc};

use anchor_lang::{InstructionData, ToAccountMetas};
use forester_utils::rpc_pool::SolanaRpcPool;
use light_client::rpc::Rpc;
use light_client::{indexer::TreeInfo, rpc::Rpc};
use light_compressed_account::TreeType;
use light_compressible::config::CompressibleConfig;
use light_ctoken_interface::CTOKEN_PROGRAM_ID;
use light_ctoken_sdk::compressed_token::compress_and_close::CompressAndCloseAccounts as CTokenAccounts;
Expand All @@ -11,13 +12,14 @@ use light_registry::{
instruction::CompressAndClose,
};
use light_sdk::instruction::PackedAccounts;
use solana_pubkey::pubkey;
use solana_sdk::{
instruction::Instruction,
pubkey::Pubkey,
signature::{Keypair, Signature},
signer::Signer,
};
use tracing::{debug, warn};
use tracing::{debug, info};

use super::{state::CompressibleAccountTracker, types::CompressibleAccountState};
use crate::Result;
Expand Down Expand Up @@ -83,13 +85,23 @@ impl<R: Rpc> Compressor<R> {

// Get output tree from RPC
let mut rpc = self.rpc_pool.get_connection().await?;
rpc.get_latest_active_state_trees()
.await
.map_err(|e| anyhow::anyhow!("Failed to get state tree info: {}", e))?;

let output_tree_info = rpc
.get_random_state_tree_info()
.map_err(|e| anyhow::anyhow!("Failed to get state tree info: {}", e))?;
// FIXME: Use latest active state tree after updating lookup tables
// rpc.get_latest_active_state_trees()
// .await
// .map_err(|e| anyhow::anyhow!("Failed to get state tree info: {}", e))?;
// let output_tree_info = rpc
// .get_random_state_tree_info()
// .map_err(|e| anyhow::anyhow!("Failed to get state tree info: {}", e))?;

let output_tree_info = TreeInfo {
tree: pubkey!("bmt1LryLZUMmF7ZtqESaw7wifBXLfXHQYoE4GAmrahU"),
queue: pubkey!("oq1na8gojfdUhsfCpyjNt6h4JaDWtHf1yQj4koBWfto"),
cpi_context: Some(pubkey!("cpi15BoVPKgEPw5o8wc2T816GE7b378nMXnhH3Xbq4y")),
tree_type: TreeType::StateV2,
next_tree_info: None,
};
Comment on lines +89 to +103
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

🏁 Script executed:

# First, let's examine the file and surrounding context
head -n 120 forester/src/compressible/compressor.rs | tail -n 40

Repository: Lightprotocol/light-protocol

Length of output: 1768


🏁 Script executed:

# Search for TreeInfo struct definition and usage patterns
rg "TreeInfo" --type rust -B 2 -A 2 | head -n 100

Repository: Lightprotocol/light-protocol

Length of output: 6846


🏁 Script executed:

# Look for get_latest_active_state_trees and get_random_state_tree_info
rg "get_latest_active_state_trees|get_random_state_tree_info" --type rust

Repository: Lightprotocol/light-protocol

Length of output: 2948


🏁 Script executed:

# Check for open issues or tracking related to dynamic tree lookup
git log --oneline --all | grep -i "tree\|lookup\|dynamic" | head -n 20

Repository: Lightprotocol/light-protocol

Length of output: 1500


🏁 Script executed:

# Search for environment configs or test-specific markers
fd "env" -t f | grep -i config
rg "test\|TEST\|dev\|DEV" forester/src/compressible/compressor.rs | head -n 20

Repository: Lightprotocol/light-protocol

Length of output: 54


🏁 Script executed:

# Search for issues or PRs related to dynamic tree lookup or lookup tables
rg "lookup table|dynamic.*tree" --type rust -i | head -n 30

Repository: Lightprotocol/light-protocol

Length of output: 2632


🏁 Script executed:

# Check recent git history for this specific file
git log --oneline -20 -- forester/src/compressible/compressor.rs

Repository: Lightprotocol/light-protocol

Length of output: 87


🏁 Script executed:

# Look for issue tracking or comments about this FIXME
rg "FIXME.*tree|lookup table" --type rust -B 2 -A 2

Repository: Lightprotocol/light-protocol

Length of output: 4813


🏁 Script executed:

# Check if forester is test-only or production code
head -n 50 forester/Cargo.toml | tail -n 20

Repository: Lightprotocol/light-protocol

Length of output: 770


🏁 Script executed:

# Check if there's a GitHub issues file or tracking
find . -name "*.md" -type f | xargs grep -l "lookup table\|forester" | head -5

Repository: Lightprotocol/light-protocol

Length of output: 300


🏁 Script executed:

# Examine the full context around the FIXME to understand the blocking reason
sed -n '80,110p' forester/src/compressible/compressor.rs

Repository: Lightprotocol/light-protocol

Length of output: 1368


🏁 Script executed:

# Check if get_latest_active_state_trees is actually implemented or just a stub
rg -A 10 "async fn get_latest_active_state_trees" --type rust

Repository: Lightprotocol/light-protocol

Length of output: 2632


🏁 Script executed:

# Check if forester is built/deployed as production or test-only
grep -E "^\[.*test.*\]|^\[.*dev.*\]" forester/Cargo.toml

Repository: Lightprotocol/light-protocol

Length of output: 90


Hardcoded TreeInfo blocks production deployment pending lookup table infrastructure updates.

The hardcoded pubkeys will only work with this specific environment's state trees. While get_latest_active_state_trees() and get_random_state_tree_info() are implemented in the RPC client, this code remains on hardcoded values due to incomplete lookup table integration (as noted in the FIXME).

This must be resolved before deploying to any environment with different state tree addresses. Ensure there's a tracking issue for completing the lookup table infrastructure work.


let output_queue = output_tree_info
.get_output_pubkey()
.map_err(|e| anyhow::anyhow!("Failed to get output queue: {}", e))?;
Expand Down Expand Up @@ -196,16 +208,15 @@ impl<R: Rpc> Compressor<R> {
accounts.len()
);

// Collect pubkeys for sync before creating instruction
let pubkeys: Vec<_> = account_states.iter().map(|state| state.pubkey).collect();

let ix = Instruction {
program_id: registry_program_id,
accounts,
data: instruction.data(),
};

// Send transaction
// Note: Account removal from tracker is handled by LogSubscriber which parses
// the "compress_and_close:<pubkey>" logs emitted by the registry program
let signature = rpc
.create_and_send_transaction(
&[ix],
Expand All @@ -215,10 +226,11 @@ impl<R: Rpc> Compressor<R> {
.await
.map_err(|e| anyhow::anyhow!("Failed to send transaction: {}", e))?;

// Sync accounts to verify they're closed
if let Err(e) = self.tracker.sync_accounts(&*rpc, &pubkeys).await {
warn!("Failed to sync accounts after compression: {:?}. Tracker will update via subscriptions.", e);
}
info!(
"compress_and_close tx with ({:?}) accounts sent {}",
account_states.iter().map(|a| a.pubkey.to_string()),
signature
);

Ok(signature)
}
Expand Down
2 changes: 1 addition & 1 deletion forester/src/compressible/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ pub struct CompressibleConfig {
}

fn default_batch_size() -> usize {
10
5
}

fn default_max_concurrent_batches() -> usize {
Expand Down
2 changes: 1 addition & 1 deletion forester/src/compressible/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,5 @@ pub use bootstrap::bootstrap_compressible_accounts;
pub use compressor::Compressor;
pub use config::CompressibleConfig;
pub use state::CompressibleAccountTracker;
pub use subscriber::AccountSubscriber;
pub use subscriber::{AccountSubscriber, LogSubscriber};
pub use types::CompressibleAccountState;
80 changes: 0 additions & 80 deletions forester/src/compressible/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,86 +156,6 @@ impl CompressibleAccountTracker {

Ok(())
}

/// Query accounts and update tracker: remove non-existent accounts, update lamports for existing ones
pub async fn sync_accounts<R: light_client::rpc::Rpc>(
&self,
rpc: &R,
pubkeys: &[Pubkey],
) -> Result<()> {
// Query all accounts at once using get_multiple_accounts
let accounts = rpc.get_multiple_accounts(pubkeys).await?;

for (pubkey, account_opt) in pubkeys.iter().zip(accounts.iter()) {
match account_opt {
Some(account) => {
// Check if account is closed (lamports == 0)
if account.lamports == 0 {
self.remove(pubkey);
debug!("Removed closed account {} (lamports == 0)", pubkey);
continue;
}

// Re-deserialize account data to verify it's still valid
let ctoken = match CToken::try_from_slice(&account.data) {
Ok(ct) => ct,
Err(e) => {
self.remove(pubkey);
debug!(
"Removed account {} (deserialization failed: {:?})",
pubkey, e
);
continue;
}
};

// Verify Compressible extension still exists
let has_compressible_ext = ctoken.extensions.as_ref().is_some_and(|exts| {
exts.iter()
.any(|ext| matches!(ext, ExtensionStruct::Compressible(_)))
});

if !has_compressible_ext {
self.remove(pubkey);
debug!(
"Removed account {} (missing Compressible extension)",
pubkey
);
continue;
}

// Account is valid - update state
if let Some(mut state) = self.accounts.get_mut(pubkey) {
match calculate_compressible_slot(&ctoken, account.lamports) {
Ok(compressible_slot) => {
state.account = ctoken;
state.lamports = account.lamports;
state.compressible_slot = compressible_slot;
debug!(
"Updated account {}: lamports={}, compressible_slot={}",
pubkey, account.lamports, compressible_slot
);
}
Err(e) => {
warn!(
"Failed to calculate compressible slot for account {}: {}. Removing from tracker.",
pubkey, e
);
drop(state);
self.remove(pubkey);
}
}
}
}
None => {
// Account doesn't exist - remove from tracker
self.remove(pubkey);
debug!("Removed non-existent account {}", pubkey);
}
}
}
Ok(())
}
}

impl Default for CompressibleAccountTracker {
Expand Down
144 changes: 137 additions & 7 deletions forester/src/compressible/subscriber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,29 +5,38 @@ use light_ctoken_interface::{COMPRESSIBLE_TOKEN_ACCOUNT_SIZE, CTOKEN_PROGRAM_ID}
use solana_account_decoder::UiAccountEncoding;
use solana_client::{
nonblocking::pubsub_client::PubsubClient,
rpc_config::{RpcAccountInfoConfig, RpcProgramAccountsConfig},
rpc_response::{Response as RpcResponse, RpcKeyedAccount},
rpc_config::{
RpcAccountInfoConfig, RpcProgramAccountsConfig, RpcTransactionLogsConfig,
RpcTransactionLogsFilter,
},
rpc_response::{Response as RpcResponse, RpcKeyedAccount, RpcLogsResponse},
};
use solana_rpc_client_api::filter::RpcFilterType;
use solana_sdk::{commitment_config::CommitmentConfig, pubkey::Pubkey};
use tokio::sync::oneshot;
use tokio::sync::broadcast;
use tracing::{debug, error, info};

use super::state::CompressibleAccountTracker;
use crate::Result;

/// Registry program ID for subscribing to compress_and_close logs
const REGISTRY_PROGRAM_ID_STR: &str = "Lighton6oQpVkeewmo2mcPTQQp7kYHr4fWpAgJyEmDX";

/// Log prefix emitted by registry program when closing accounts
const COMPRESS_AND_CLOSE_LOG_PREFIX: &str = "compress_and_close:";

/// Subscribes to account changes for all compressible CToken accounts
pub struct AccountSubscriber {
ws_url: String,
tracker: Arc<CompressibleAccountTracker>,
shutdown_rx: oneshot::Receiver<()>,
shutdown_rx: broadcast::Receiver<()>,
}

impl AccountSubscriber {
pub fn new(
ws_url: String,
tracker: Arc<CompressibleAccountTracker>,
shutdown_rx: oneshot::Receiver<()>,
shutdown_rx: broadcast::Receiver<()>,
) -> Self {
Self {
ws_url,
Expand All @@ -45,7 +54,6 @@ impl AccountSubscriber {
.map_err(|e| anyhow::anyhow!("Failed to connect to WebSocket: {}", e))?;

let program_id = Pubkey::new_from_array(CTOKEN_PROGRAM_ID);

// Subscribe to compressed token program accounts with filter for compressible account size
let (mut subscription, unsubscribe) = pubsub_client
.program_subscribe(
Expand Down Expand Up @@ -87,7 +95,7 @@ impl AccountSubscriber {
}
}
}
_ = &mut self.shutdown_rx => {
_ = self.shutdown_rx.recv() => {
info!("Shutdown signal received");
unsubscribe().await;
break;
Expand Down Expand Up @@ -154,3 +162,125 @@ impl AccountSubscriber {
}
}
}

/// Subscribes to registry program logs to detect compress_and_close operations
/// and remove closed accounts from the tracker by parsing log messages directly
pub struct LogSubscriber {
ws_url: String,
tracker: Arc<CompressibleAccountTracker>,
shutdown_rx: broadcast::Receiver<()>,
}

impl LogSubscriber {
pub fn new(
ws_url: String,
tracker: Arc<CompressibleAccountTracker>,
shutdown_rx: broadcast::Receiver<()>,
) -> Self {
Self {
ws_url,
tracker,
shutdown_rx,
}
}

pub async fn run(&mut self) -> Result<()> {
info!("Starting log subscriber at {}", self.ws_url);

// Connect to WebSocket
let pubsub_client = PubsubClient::new(&self.ws_url)
.await
.map_err(|e| anyhow::anyhow!("Failed to connect to WebSocket: {}", e))?;

let registry_program_id = Pubkey::from_str(REGISTRY_PROGRAM_ID_STR)
.map_err(|e| anyhow::anyhow!("Invalid registry program ID: {}", e))?;

// Subscribe to logs mentioning the registry program
let filter = RpcTransactionLogsFilter::Mentions(vec![registry_program_id.to_string()]);
let config = RpcTransactionLogsConfig {
commitment: Some(CommitmentConfig::confirmed()),
};

let (mut subscription, unsubscribe) = pubsub_client
.logs_subscribe(filter, config)
.await
.map_err(|e| anyhow::anyhow!("Failed to subscribe to logs: {}", e))?;

info!(
"Log subscription established for registry program {}",
registry_program_id
);

// Process subscription messages
loop {
tokio::select! {
result = subscription.next() => {
match result {
Some(response) => {
self.handle_log_notification(response);
}
None => {
error!("Log subscription stream closed unexpectedly");
unsubscribe().await;
return Err(anyhow::anyhow!("Log subscription stream closed"));
}
}
}
_ = self.shutdown_rx.recv() => {
info!("Shutdown signal received for log subscriber");
unsubscribe().await;
break;
}
}
}

info!("Log subscriber stopped");
Ok(())
}

fn handle_log_notification(&self, response: RpcResponse<RpcLogsResponse>) {
let logs_response = response.value;

// Skip failed transactions
if logs_response.err.is_some() {
debug!("Skipping failed transaction {}", logs_response.signature);
return;
}

// Parse logs looking for compress_and_close entries
let mut removed_count = 0;
for log in &logs_response.logs {
// Look for our log prefix: "Program log: compress_and_close:<pubkey>"
// The actual log format is "Program log: compress_and_close:<pubkey>"
if let Some(pubkey_str) = log
.strip_prefix("Program log: ")
.and_then(|s| s.strip_prefix(COMPRESS_AND_CLOSE_LOG_PREFIX))
{
match Pubkey::from_str(pubkey_str) {
Ok(pubkey) => {
if self.tracker.remove(&pubkey).is_some() {
debug!(
"Removed closed account {} from tracker (compress_and_close log)",
pubkey
);
removed_count += 1;
}
}
Err(e) => {
error!(
"Invalid pubkey in compress_and_close log '{}': {}",
pubkey_str, e
);
}
}
}
}

if removed_count > 0 {
info!(
"Removed {} closed accounts from transaction {}",
removed_count, logs_response.signature
);
}
}
Comment on lines +241 to +285
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Search for the log format in the registry program source
rg -n "compress_and_close:" --type rust

Repository: Lightprotocol/light-protocol

Length of output: 1734


Log parsing is robust but tightly coupled to log format.

The log parsing logic correctly:

  1. Skips failed transactions
  2. Strips the "Program log: " prefix before checking for the close prefix
  3. Handles invalid pubkey parsing with error logging
  4. Tracks removed accounts for batch logging

The log format "Program log: compress_and_close:<pubkey>" is an implicit contract with the registry program (emitted via msg!() in programs/registry/src/compressible/compress_and_close.rs). If the registry changes its log format, this will silently stop working. Consider adding a metric or periodic health check to verify the subscriber is still receiving expected events.

}
Comment on lines +241 to +286
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial

Consider trimming whitespace when parsing pubkeys from logs.

The current implementation extracts the pubkey string directly from the log without trimming whitespace. If the registry program's log format includes any trailing whitespace or newlines, Pubkey::from_str() will fail.

Consider adding .trim() to handle potential whitespace:

             if let Some(pubkey_str) = log
                 .strip_prefix("Program log: ")
                 .and_then(|s| s.strip_prefix(COMPRESS_AND_CLOSE_LOG_PREFIX))
             {
-                match Pubkey::from_str(pubkey_str) {
+                match Pubkey::from_str(pubkey_str.trim()) {
                     Ok(pubkey) => {

This makes the parser more robust against formatting variations.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
fn handle_log_notification(&self, response: RpcResponse<RpcLogsResponse>) {
let logs_response = response.value;
// Skip failed transactions
if logs_response.err.is_some() {
debug!("Skipping failed transaction {}", logs_response.signature);
return;
}
// Parse logs looking for compress_and_close entries
let mut removed_count = 0;
for log in &logs_response.logs {
// Look for our log prefix: "Program log: compress_and_close:<pubkey>"
// The actual log format is "Program log: compress_and_close:<pubkey>"
if let Some(pubkey_str) = log
.strip_prefix("Program log: ")
.and_then(|s| s.strip_prefix(COMPRESS_AND_CLOSE_LOG_PREFIX))
{
match Pubkey::from_str(pubkey_str) {
Ok(pubkey) => {
if self.tracker.remove(&pubkey).is_some() {
debug!(
"Removed closed account {} from tracker (compress_and_close log)",
pubkey
);
removed_count += 1;
}
}
Err(e) => {
error!(
"Invalid pubkey in compress_and_close log '{}': {}",
pubkey_str, e
);
}
}
}
}
if removed_count > 0 {
info!(
"Removed {} closed accounts from transaction {}",
removed_count, logs_response.signature
);
}
}
}
fn handle_log_notification(&self, response: RpcResponse<RpcLogsResponse>) {
let logs_response = response.value;
// Skip failed transactions
if logs_response.err.is_some() {
debug!("Skipping failed transaction {}", logs_response.signature);
return;
}
// Parse logs looking for compress_and_close entries
let mut removed_count = 0;
for log in &logs_response.logs {
// Look for our log prefix: "Program log: compress_and_close:<pubkey>"
// The actual log format is "Program log: compress_and_close:<pubkey>"
if let Some(pubkey_str) = log
.strip_prefix("Program log: ")
.and_then(|s| s.strip_prefix(COMPRESS_AND_CLOSE_LOG_PREFIX))
{
match Pubkey::from_str(pubkey_str.trim()) {
Ok(pubkey) => {
if self.tracker.remove(&pubkey).is_some() {
debug!(
"Removed closed account {} from tracker (compress_and_close log)",
pubkey
);
removed_count += 1;
}
}
Err(e) => {
error!(
"Invalid pubkey in compress_and_close log '{}': {}",
pubkey_str, e
);
}
}
}
}
if removed_count > 0 {
info!(
"Removed {} closed accounts from transaction {}",
removed_count, logs_response.signature
);
}
}
🤖 Prompt for AI Agents
In forester/src/compressible/subscriber.rs around lines 241 to 286, the code
extracts the pubkey substring from the log and passes it directly to
Pubkey::from_str, which fails if the log contains trailing whitespace/newlines;
update the parsing to trim whitespace (e.g., call .trim() on the extracted
pubkey_str before handing it to Pubkey::from_str) so the pubkey is validated
against a trimmed string and the error branch only triggers for truly invalid
pubkeys.

Loading
Loading