Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 17 additions & 14 deletions forester/src/forester_status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,22 +179,28 @@ pub async fn fetch_forester_status(args: &StatusArgs) -> crate::Result<()> {
fetch_active_tree: false,
})
.await?;
let trees = fetch_trees(&rpc).await?;
let trees = fetch_trees(&rpc)
.await?
.iter()
.sorted_by_key(|t| t.merkle_tree.to_string())
.cloned()
.collect::<Vec<_>>();

if trees.is_empty() {
warn!("No trees found. Exiting.");
}
run_queue_info(config.clone(), trees.clone(), TreeType::StateV1).await?;
run_queue_info(config.clone(), trees.clone(), TreeType::AddressV1).await?;
run_queue_info(config.clone(), &trees, TreeType::StateV1).await?;
run_queue_info(config.clone(), &trees, TreeType::AddressV1).await?;

run_queue_info(config.clone(), trees.clone(), TreeType::StateV2).await?;
run_queue_info(config.clone(), trees.clone(), TreeType::AddressV2).await?;
run_queue_info(config.clone(), &trees, TreeType::StateV2).await?;
run_queue_info(config.clone(), &trees, TreeType::AddressV2).await?;

for tree in &trees {
let tree_type = format!("[{}]", tree.tree_type);
let tree_type = format!("{}", tree.tree_type);
let tree_info = get_tree_fullness(&mut rpc, tree.merkle_tree, tree.tree_type).await?;
let fullness_percentage = tree_info.fullness * 100.0;
println!(
"{} Tree {}: Fullness: {:.4}% | Next Index: {} | Threshold: {}",
"{} {}: Fullness: {:.4}% | Next Index: {} | Threshold: {}",
tree_type,
&tree.merkle_tree,
format!("{:.2}%", fullness_percentage),
Expand Down Expand Up @@ -323,9 +329,7 @@ fn print_current_forester_assignments(
);

println!("Queue processors for the current light slot:");
println!("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━");
println!("│ Tree Type │ Tree Address │ Forester │");
println!("┼───────────┼──────────────────────────────────────────┼──────────────────────────────────────────┤");
println!("Tree Type\t\tTree Address\tForester");

for tree in trees {
let eligible_forester_slot_index = match ForesterEpochPda::get_eligible_forester_index(
Expand All @@ -337,7 +341,7 @@ fn print_current_forester_assignments(
Ok(idx) => idx,
Err(e) => {
println!(
"│ {:9} │ {} │ ERROR: {:?}",
"{:12}\t\t{}\tERROR: {:?}",
tree.tree_type, tree.merkle_tree, e
);
continue;
Expand All @@ -350,17 +354,16 @@ fn print_current_forester_assignments(

if let Some(forester_pda) = assigned_forester {
println!(
"│ {:9} │ {} │ {} │",
"{:12}\t\t{}\t{}",
tree.tree_type, tree.merkle_tree, forester_pda.authority
);
} else {
println!(
"│ {:9} │ {} │ UNASSIGNED (Eligible Index: {})",
"{:12}\t\t{}\tUNASSIGNED (Eligible Index: {})",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we could also use a table library for this, we use one in xtask

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have another idea. I want to remove this command entirely and instead create a small http server with a /status endpoint, so then, we could create a custom dashboard

tree.tree_type, tree.merkle_tree, eligible_forester_slot_index
);
}
}
println!("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━");
} else {
println!(
"ERROR: Could not find EpochPda for active epoch {}. Cannot determine forester assignments.",
Expand Down
114 changes: 77 additions & 37 deletions forester/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ pub use config::{ForesterConfig, ForesterEpochInfo};
use forester_utils::{
forester_epoch::TreeAccounts, rate_limiter::RateLimiter, rpc_pool::SolanaRpcPoolBuilder,
};
use itertools::Itertools;
use light_client::rpc::{LightClient, LightClientConfig, Rpc};
use light_compressed_account::TreeType;
use solana_sdk::commitment_config::CommitmentConfig;
Expand All @@ -37,15 +38,16 @@ use crate::{
metrics::QUEUE_LENGTH,
processor::tx_cache::ProcessedHashCache,
queue_helpers::{
fetch_address_v2_queue_length, fetch_queue_item_data, fetch_state_v2_queue_length,
fetch_queue_item_data, print_address_v2_queue_info, print_state_v2_input_queue_info,
print_state_v2_output_queue_info,
},
slot_tracker::SlotTracker,
utils::get_protocol_config,
};

pub async fn run_queue_info(
config: Arc<ForesterConfig>,
trees: Vec<TreeAccounts>,
trees: &[TreeAccounts],
queue_type: TreeType,
) -> Result<()> {
let mut rpc = LightClient::new(LightClientConfig {
Expand All @@ -60,49 +62,87 @@ pub async fn run_queue_info(
let trees: Vec<_> = trees
.iter()
.filter(|t| t.tree_type == queue_type)
.sorted_by_key(|t| t.merkle_tree.to_string())
.cloned()
.collect();

for tree_data in trees {
let queue_length = match tree_data.tree_type {
TreeType::StateV1 => fetch_queue_item_data(
&mut rpc,
&tree_data.queue,
0,
STATE_NULLIFIER_QUEUE_VALUES,
STATE_NULLIFIER_QUEUE_VALUES,
)
.await?
.len(),
TreeType::AddressV1 => fetch_queue_item_data(
&mut rpc,
&tree_data.queue,
0,
ADDRESS_QUEUE_VALUES,
ADDRESS_QUEUE_VALUES,
)
.await?
.len(),
TreeType::StateV2 => fetch_state_v2_queue_length(&mut rpc, &tree_data.queue).await?,
TreeType::AddressV2 => {
fetch_address_v2_queue_length(&mut rpc, &tree_data.merkle_tree).await?
match tree_data.tree_type {
TreeType::StateV1 => {
let queue_length = fetch_queue_item_data(
&mut rpc,
&tree_data.queue,
0,
STATE_NULLIFIER_QUEUE_VALUES,
STATE_NULLIFIER_QUEUE_VALUES,
)
.await?
.len();
QUEUE_LENGTH
.with_label_values(&[
&*queue_type.to_string(),
&tree_data.merkle_tree.to_string(),
])
.set(queue_length as i64);

println!(
"{:?} queue {} length: {}",
queue_type, tree_data.queue, queue_length
);
}
};
TreeType::AddressV1 => {
let queue_length = fetch_queue_item_data(
&mut rpc,
&tree_data.queue,
0,
ADDRESS_QUEUE_VALUES,
ADDRESS_QUEUE_VALUES,
)
.await?
.len();
QUEUE_LENGTH
.with_label_values(&[
&*queue_type.to_string(),
&tree_data.merkle_tree.to_string(),
])
.set(queue_length as i64);

println!(
"{:?} queue {} length: {}",
queue_type, tree_data.queue, queue_length
);
}
TreeType::StateV2 => {
println!("\n=== StateV2 {} ===", tree_data.merkle_tree);

QUEUE_LENGTH
.with_label_values(&[&*queue_type.to_string(), &tree_data.merkle_tree.to_string()])
.set(queue_length as i64);
println!("\n1. APPEND OPERATIONS:");
let append_unprocessed =
print_state_v2_output_queue_info(&mut rpc, &tree_data.queue).await?;

let queue_identifier = if tree_data.tree_type == TreeType::AddressV2 {
tree_data.merkle_tree.to_string()
} else {
tree_data.queue.to_string()
};
println!("\n2. NULLIFY OPERATIONS:");
let nullify_unprocessed =
print_state_v2_input_queue_info(&mut rpc, &tree_data.merkle_tree).await?;

println!("===========================================\n");

println!(
"{:?} queue {} length: {}",
queue_type, queue_identifier, queue_length
);
QUEUE_LENGTH
.with_label_values(&["StateV2.Append", &tree_data.queue.to_string()])
.set(append_unprocessed as i64);

QUEUE_LENGTH
.with_label_values(&["StateV2.Nullify", &tree_data.merkle_tree.to_string()])
.set(nullify_unprocessed as i64);
}
TreeType::AddressV2 => {
println!("\n=== AddressV2 {} ===", tree_data.merkle_tree);
let queue_length =
print_address_v2_queue_info(&mut rpc, &tree_data.merkle_tree).await?;
println!("===========================================\n");
QUEUE_LENGTH
.with_label_values(&["AddressV2", &tree_data.merkle_tree.to_string()])
.set(queue_length as i64);
}
};
}
Ok(())
}
Expand Down
Loading