From 14c1be736e11c6defe18b29911779aa006dd317d Mon Sep 17 00:00:00 2001 From: Sergey Timoshin Date: Fri, 21 Nov 2025 20:40:35 +0000 Subject: [PATCH] refactor: replace grpc queue info with polling --- Cargo.lock | 277 ++------------- cli/src/commands/test-validator/index.ts | 7 - cli/src/utils/initTestEnv.ts | 3 - cli/src/utils/processPhotonIndexer.ts | 3 - forester/Cargo.toml | 10 +- forester/build.rs | 6 - forester/proto/photon.proto | 70 ---- forester/src/epoch_manager.rs | 315 ++++++------------ forester/src/grpc/mod.rs | 3 - forester/src/grpc/router.rs | 214 ------------ forester/src/lib.rs | 3 +- forester/src/polling/mod.rs | 5 + forester/src/polling/queue_poller.rs | 284 ++++++++++++++++ forester/src/work_coordinator.rs | 213 ------------ forester/tests/e2e_test.rs | 20 -- forester/tests/legacy/batched_address_test.rs | 1 - .../batched_state_async_indexer_test.rs | 1 - .../legacy/batched_state_indexer_test.rs | 1 - forester/tests/legacy/batched_state_test.rs | 1 - forester/tests/legacy/e2e_test.rs | 2 - forester/tests/legacy/e2e_v1_test.rs | 2 - forester/tests/test_batch_append_spent.rs | 1 - forester/tests/test_compressible_ctoken.rs | 2 - .../compressed-token-test/tests/v1.rs | 1 - .../system-cpi-v2-test/tests/event.rs | 1 - scripts/devenv/versions.sh | 2 +- sdk-libs/client/Cargo.toml | 5 - sdk-libs/client/src/indexer/indexer_trait.rs | 9 +- sdk-libs/client/src/indexer/mod.rs | 5 +- sdk-libs/client/src/indexer/photon_indexer.rs | 60 ++++ sdk-libs/client/src/indexer/types.rs | 14 + sdk-libs/client/src/lib.rs | 1 - sdk-libs/client/src/local_test_validator.rs | 6 - sdk-libs/client/src/rpc/indexer.rs | 18 +- sdk-libs/photon-api/src/apis/default_api.rs | 48 +++ .../_get_queue_info_post_200_response.rs | 47 +++ ...get_queue_info_post_200_response_result.rs | 46 +++ .../models/_get_queue_info_post_request.rs | 64 ++++ sdk-libs/photon-api/src/models/mod.rs | 8 + .../program-test/src/indexer/test_indexer.rs | 7 + .../program-test/src/program_test/indexer.rs | 12 + sdk-tests/client-test/tests/light_client.rs | 1 - 42 files changed, 769 insertions(+), 1030 deletions(-) delete mode 100644 forester/build.rs delete mode 100644 forester/proto/photon.proto delete mode 100644 forester/src/grpc/mod.rs delete mode 100644 forester/src/grpc/router.rs create mode 100644 forester/src/polling/mod.rs create mode 100644 forester/src/polling/queue_poller.rs delete mode 100644 forester/src/work_coordinator.rs create mode 100644 sdk-libs/photon-api/src/models/_get_queue_info_post_200_response.rs create mode 100644 sdk-libs/photon-api/src/models/_get_queue_info_post_200_response_result.rs create mode 100644 sdk-libs/photon-api/src/models/_get_queue_info_post_request.rs diff --git a/Cargo.lock b/Cargo.lock index 8b94c8701a..e9f252ecb5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -909,49 +909,6 @@ version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c08606f8c3cbf4ce6ec8e28fb0014a2c086708fe954eaa885384a6165172e7e8" -[[package]] -name = "axum" -version = "0.8.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8a18ed336352031311f4e0b4dd2ff392d4fbb370777c9d18d7fc9d7359f73871" -dependencies = [ - "axum-core", - "bytes", - "futures-util", - "http 1.3.1", - "http-body 1.0.1", - "http-body-util", - "itoa", - "matchit", - "memchr", - "mime", - "percent-encoding", - "pin-project-lite", - "serde_core", - "sync_wrapper 1.0.2", - "tower", - "tower-layer", - "tower-service", -] - -[[package]] -name = "axum-core" -version = "0.5.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "59446ce19cd142f8833f856eb31f3eb097812d1479ab224f54d72428ca21ea22" -dependencies = [ - "bytes", - "futures-core", - "http 1.3.1", - "http-body 1.0.1", - "http-body-util", - "mime", - "pin-project-lite", - "sync_wrapper 1.0.2", - "tower-layer", - "tower-service", -] - [[package]] name = "base64" version = "0.12.3" @@ -1938,6 +1895,12 @@ version = "0.15.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1aaf95b3e5c8f23aa320147307562d361db0ae0d51242340f558153b4eb2439b" +[[package]] +name = "downcast-rs" +version = "2.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "117240f60069e65410b3ae1bb213295bd828f707b5bec6596a1afc8793ce0cbc" + [[package]] name = "dyn-clone" version = "1.0.20" @@ -2243,12 +2206,6 @@ version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2551bf44bc5f776c15044b9b94153a00198be06743e262afaaa61f11ac7523a5" -[[package]] -name = "fixedbitset" -version = "0.5.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1d674e81391d1e1ab681a28d99df07927c6d4aa5b027d7da16ba32d1d21ecd99" - [[package]] name = "flate2" version = "1.1.4" @@ -2301,6 +2258,7 @@ dependencies = [ "forester-utils", "futures", "itertools 0.14.0", + "kameo", "lazy_static", "light-account-checks", "light-batched-merkle-tree", @@ -2326,8 +2284,6 @@ dependencies = [ "once_cell", "photon-api", "prometheus", - "prost", - "prost-types", "rand 0.8.5", "reqwest 0.12.24", "scopeguard", @@ -2342,10 +2298,6 @@ dependencies = [ "solana-transaction-status", "thiserror 2.0.17", "tokio", - "tokio-stream", - "tonic", - "tonic-prost", - "tonic-prost-build", "tracing", "tracing-appender", "tracing-subscriber", @@ -2923,7 +2875,6 @@ dependencies = [ "http 1.3.1", "http-body 1.0.1", "httparse", - "httpdate", "itoa", "pin-project-lite", "pin-utils", @@ -2963,19 +2914,6 @@ dependencies = [ "webpki-roots 1.0.3", ] -[[package]] -name = "hyper-timeout" -version = "0.5.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2b90d566bffbce6a75bd8b09a05aa8c2cb1fabb6cb348f8840c9e4c90a0d83b0" -dependencies = [ - "hyper 1.7.0", - "hyper-util", - "pin-project-lite", - "tokio", - "tower-service", -] - [[package]] name = "hyper-tls" version = "0.5.0" @@ -3368,6 +3306,33 @@ dependencies = [ "serde", ] +[[package]] +name = "kameo" +version = "0.19.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c4af7638c67029fd6821d02813c3913c803784648725d4df4082c9b91d7cbb1" +dependencies = [ + "downcast-rs", + "dyn-clone", + "futures", + "kameo_macros", + "serde", + "tokio", + "tracing", +] + +[[package]] +name = "kameo_macros" +version = "0.19.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a13c324e2d8c8e126e63e66087448b4267e263e6cb8770c56d10a9d0d279d9e2" +dependencies = [ + "heck 0.5.0", + "proc-macro2", + "quote", + "syn 2.0.106", +] + [[package]] name = "keccak" version = "0.1.5" @@ -3536,7 +3501,6 @@ dependencies = [ "base64 0.13.1", "borsh 0.10.4", "bs58", - "bytemuck", "lazy_static", "light-compressed-account", "light-compressed-token-sdk", @@ -3549,7 +3513,6 @@ dependencies = [ "light-sdk", "litesvm", "num-bigint 0.4.6", - "num-traits", "photon-api", "rand 0.8.5", "solana-account", @@ -3559,7 +3522,6 @@ dependencies = [ "solana-clock", "solana-commitment-config", "solana-compute-budget-interface", - "solana-epoch-info", "solana-hash 2.3.0", "solana-instruction", "solana-keypair", @@ -3569,7 +3531,6 @@ dependencies = [ "solana-rpc-client", "solana-rpc-client-api", "solana-signature", - "solana-signer", "solana-transaction", "solana-transaction-error", "solana-transaction-status-client-types", @@ -4395,12 +4356,6 @@ dependencies = [ "regex-automata", ] -[[package]] -name = "matchit" -version = "0.8.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "47e1ffaa40ddd1f3ed91f717a33c8c0ee23fff369e3aa8772b9605cc1d22f4c3" - [[package]] name = "md-5" version = "0.10.6" @@ -4521,12 +4476,6 @@ dependencies = [ "version_check", ] -[[package]] -name = "multimap" -version = "0.10.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1d87ecb2933e8aeadb3e3a02b828fed80a7528047e68b4f424523a0981a3a084" - [[package]] name = "native-tls" version = "0.2.14" @@ -4936,16 +4885,6 @@ dependencies = [ "num", ] -[[package]] -name = "petgraph" -version = "0.7.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3672b37090dbd86368a4145bc067582552b29c27377cad4e0a306c97f9bd7772" -dependencies = [ - "fixedbitset", - "indexmap 2.11.4", -] - [[package]] name = "phf" version = "0.13.1" @@ -5245,86 +5184,12 @@ dependencies = [ "thiserror 1.0.69", ] -[[package]] -name = "prost" -version = "0.14.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7231bd9b3d3d33c86b58adbac74b5ec0ad9f496b19d22801d773636feaa95f3d" -dependencies = [ - "bytes", - "prost-derive", -] - -[[package]] -name = "prost-build" -version = "0.14.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ac6c3320f9abac597dcbc668774ef006702672474aad53c6d596b62e487b40b1" -dependencies = [ - "heck 0.5.0", - "itertools 0.14.0", - "log", - "multimap", - "once_cell", - "petgraph", - "prettyplease", - "prost", - "prost-types", - "pulldown-cmark", - "pulldown-cmark-to-cmark", - "regex", - "syn 2.0.106", - "tempfile", -] - -[[package]] -name = "prost-derive" -version = "0.14.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9120690fafc389a67ba3803df527d0ec9cbbc9cc45e4cc20b332996dfb672425" -dependencies = [ - "anyhow", - "itertools 0.14.0", - "proc-macro2", - "quote", - "syn 2.0.106", -] - -[[package]] -name = "prost-types" -version = "0.14.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b9b4db3d6da204ed77bb26ba83b6122a73aeb2e87e25fbf7ad2e84c4ccbf8f72" -dependencies = [ - "prost", -] - [[package]] name = "protobuf" version = "2.28.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "106dd99e98437432fed6519dedecfade6a06a73bb7b2a1e019fdd2bee5778d94" -[[package]] -name = "pulldown-cmark" -version = "0.13.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1e8bbe1a966bd2f362681a44f6edce3c2310ac21e4d5067a6e7ec396297a6ea0" -dependencies = [ - "bitflags 2.9.4", - "memchr", - "unicase", -] - -[[package]] -name = "pulldown-cmark-to-cmark" -version = "21.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e5b6a0769a491a08b31ea5c62494a8f144ee0987d86d670a8af4df1e1b7cde75" -dependencies = [ - "pulldown-cmark", -] - [[package]] name = "qstring" version = "0.7.2" @@ -10608,6 +10473,7 @@ dependencies = [ "signal-hook-registry", "socket2 0.6.1", "tokio-macros", + "tracing", "windows-sys 0.61.2", ] @@ -10703,7 +10569,6 @@ dependencies = [ "futures-core", "pin-project-lite", "tokio", - "tokio-util 0.7.16", ] [[package]] @@ -10862,74 +10727,6 @@ version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "df8b2b54733674ad286d16267dcfc7a71ed5c776e4ac7aa3c3e2561f7c637bf2" -[[package]] -name = "tonic" -version = "0.14.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eb7613188ce9f7df5bfe185db26c5814347d110db17920415cf2fbcad85e7203" -dependencies = [ - "async-trait", - "axum", - "base64 0.22.1", - "bytes", - "h2 0.4.12", - "http 1.3.1", - "http-body 1.0.1", - "http-body-util", - "hyper 1.7.0", - "hyper-timeout", - "hyper-util", - "percent-encoding", - "pin-project", - "socket2 0.6.1", - "sync_wrapper 1.0.2", - "tokio", - "tokio-stream", - "tower", - "tower-layer", - "tower-service", - "tracing", -] - -[[package]] -name = "tonic-build" -version = "0.14.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4c40aaccc9f9eccf2cd82ebc111adc13030d23e887244bc9cfa5d1d636049de3" -dependencies = [ - "prettyplease", - "proc-macro2", - "quote", - "syn 2.0.106", -] - -[[package]] -name = "tonic-prost" -version = "0.14.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "66bd50ad6ce1252d87ef024b3d64fe4c3cf54a86fb9ef4c631fdd0ded7aeaa67" -dependencies = [ - "bytes", - "prost", - "tonic", -] - -[[package]] -name = "tonic-prost-build" -version = "0.14.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b4a16cba4043dc3ff43fcb3f96b4c5c154c64cbd18ca8dce2ab2c6a451d058a2" -dependencies = [ - "prettyplease", - "proc-macro2", - "prost-build", - "prost-types", - "quote", - "syn 2.0.106", - "tempfile", - "tonic-build", -] - [[package]] name = "tower" version = "0.5.2" @@ -10938,15 +10735,11 @@ checksum = "d039ad9159c98b70ecfd540b2573b97f7f52c3e8d9f8ad57a24b916a536975f9" dependencies = [ "futures-core", "futures-util", - "indexmap 2.11.4", "pin-project-lite", - "slab", "sync_wrapper 1.0.2", "tokio", - "tokio-util 0.7.16", "tower-layer", "tower-service", - "tracing", ] [[package]] diff --git a/cli/src/commands/test-validator/index.ts b/cli/src/commands/test-validator/index.ts index dee831c578..332fadf278 100644 --- a/cli/src/commands/test-validator/index.ts +++ b/cli/src/commands/test-validator/index.ts @@ -69,12 +69,6 @@ class SetupCommand extends Command { default: 8784, exclusive: ["skip-indexer"], }), - "grpc-port": Flags.integer({ - description: "Enable Photon indexer gRPC on this port.", - required: false, - default: 50051, - exclusive: ["skip-indexer"], - }), "prover-port": Flags.integer({ description: "Enable Light Prover server on this port.", required: false, @@ -200,7 +194,6 @@ class SetupCommand extends Command { rpcPort: flags["rpc-port"], gossipHost: flags["gossip-host"], indexerPort: flags["indexer-port"], - grpcPort: flags["grpc-port"], proverPort: flags["prover-port"], prover: !flags["skip-prover"], skipSystemAccounts: flags["skip-system-accounts"], diff --git a/cli/src/utils/initTestEnv.ts b/cli/src/utils/initTestEnv.ts index e298f7c2cc..013eefe706 100644 --- a/cli/src/utils/initTestEnv.ts +++ b/cli/src/utils/initTestEnv.ts @@ -86,7 +86,6 @@ export async function initTestEnv({ prover = true, rpcPort = 8899, indexerPort = 8784, - grpcPort = 50051, proverPort = 3001, gossipHost = "127.0.0.1", checkPhotonVersion = true, @@ -101,7 +100,6 @@ export async function initTestEnv({ prover: boolean; rpcPort?: number; indexerPort?: number; - grpcPort?: number; proverPort?: number; gossipHost?: string; checkPhotonVersion?: boolean; @@ -130,7 +128,6 @@ export async function initTestEnv({ await startIndexer( `http://127.0.0.1:${rpcPort}`, indexerPort, - grpcPort, checkPhotonVersion, photonDatabaseUrl, ); diff --git a/cli/src/utils/processPhotonIndexer.ts b/cli/src/utils/processPhotonIndexer.ts index 2f7b1c33df..1945818bc1 100644 --- a/cli/src/utils/processPhotonIndexer.ts +++ b/cli/src/utils/processPhotonIndexer.ts @@ -39,7 +39,6 @@ function getPhotonInstallMessage(): string { export async function startIndexer( rpcUrl: string, indexerPort: number, - grpcPort: number = 50051, checkPhotonVersion: boolean = true, photonDatabaseUrl?: string, ) { @@ -58,8 +57,6 @@ export async function startIndexer( indexerPort.toString(), "--rpc-url", rpcUrl, - "--grpc-port", - grpcPort.toString(), ]; if (photonDatabaseUrl) { args.push("--db-url", photonDatabaseUrl); diff --git a/forester/Cargo.toml b/forester/Cargo.toml index 714d2f2e70..5e57fd80f2 100644 --- a/forester/Cargo.toml +++ b/forester/Cargo.toml @@ -59,17 +59,9 @@ dashmap = "6.1.0" scopeguard = "1.2.0" itertools = "0.14.0" num-bigint = { workspace = true } - -tonic = "0.14.2" -prost = "0.14.1" -prost-types = "0.14.1" -tonic-prost = "0.14.2" -tokio-stream = { version = "0.1", features = ["sync"] } +kameo = "0.19" once_cell = "1.21.3" -[build-dependencies] -tonic-prost-build = "0.14.2" - [dev-dependencies] serial_test = { workspace = true } light-prover-client = { workspace = true, features = ["devenv"] } diff --git a/forester/build.rs b/forester/build.rs deleted file mode 100644 index 2760c2372a..0000000000 --- a/forester/build.rs +++ /dev/null @@ -1,6 +0,0 @@ -fn main() -> Result<(), Box> { - tonic_prost_build::configure().compile_protos(&["proto/photon.proto"], &["proto"])?; - println!("cargo:rerun-if-changed=proto/photon.proto"); - - Ok(()) -} diff --git a/forester/proto/photon.proto b/forester/proto/photon.proto deleted file mode 100644 index 8ba5a52c0e..0000000000 --- a/forester/proto/photon.proto +++ /dev/null @@ -1,70 +0,0 @@ -syntax = "proto3"; - -package photon; - -// Queue information service -service QueueService { - // Get current queue information for all or specific trees - rpc GetQueueInfo(GetQueueInfoRequest) returns (GetQueueInfoResponse); - - // Subscribe to queue updates - rpc SubscribeQueueUpdates(SubscribeQueueUpdatesRequest) returns (stream QueueUpdate); -} - -// Request message for GetQueueInfo -message GetQueueInfoRequest { - // Optional list of tree pubkeys to filter by (base58 encoded) - // If empty, returns info for all trees - repeated string trees = 1; -} - -// Response message for GetQueueInfo -message GetQueueInfoResponse { - repeated QueueInfo queues = 1; - uint64 slot = 2; -} - -// Information about a single queue -message QueueInfo { - // Tree public key (base58 encoded) - string tree = 1; - - // Queue public key (base58 encoded) - string queue = 2; - - // Queue type: 3 = InputStateV2, 4 = AddressV2, 5 = OutputStateV2 - uint32 queue_type = 3; - - // Current number of items in the queue - uint64 queue_size = 4; -} - -// Request message for SubscribeQueueUpdates -message SubscribeQueueUpdatesRequest { - // Optional list of tree pubkeys to subscribe to (base58 encoded) - // If empty, subscribes to all trees - repeated string trees = 1; - - // Whether to send initial state before streaming updates - bool send_initial_state = 2; -} - -// Streamed queue update message -message QueueUpdate { - // The queue that was updated - QueueInfo queue_info = 1; - - // Slot at which the update occurred - uint64 slot = 2; - - // Type of update - UpdateType update_type = 3; -} - -// Type of queue update -enum UpdateType { - UPDATE_TYPE_UNSPECIFIED = 0; - UPDATE_TYPE_INITIAL = 1; // Initial state sent at subscription - UPDATE_TYPE_ITEM_ADDED = 2; // Item added to queue - UPDATE_TYPE_ITEM_REMOVED = 3; // Item removed from queue -} diff --git a/forester/src/epoch_manager.rs b/forester/src/epoch_manager.rs index 2688e6ff17..9030614310 100644 --- a/forester/src/epoch_manager.rs +++ b/forester/src/epoch_manager.rs @@ -14,6 +14,7 @@ use forester_utils::{ rpc_pool::SolanaRpcPool, }; use futures::future::join_all; +use kameo::actor::{ActorRef, Spawn}; use light_client::{ indexer::{MerkleProof, NewAddressProofWithContext}, rpc::{LightClient, LightClientConfig, RetryConfig, Rpc, RpcError}, @@ -44,9 +45,9 @@ use crate::{ errors::{ ChannelError, ForesterError, InitializationError, RegistrationError, WorkReportError, }, - grpc::{QueueEventRouter, QueueUpdateMessage}, metrics::{push_metrics, queue_metric_update, update_forester_sol_balance}, pagerduty::send_pagerduty_alert, + polling::{QueueInfoPoller, QueueUpdateMessage, RegisterTree}, processor::{ tx_cache::ProcessedHashCache, v1::{ @@ -108,7 +109,7 @@ pub struct EpochManager { new_tree_sender: broadcast::Sender, tx_cache: Arc>, ops_cache: Arc>, - coordinator: Option>, + queue_poller: Option>, compressible_tracker: Option>, } @@ -126,7 +127,7 @@ impl Clone for EpochManager { new_tree_sender: self.new_tree_sender.clone(), tx_cache: self.tx_cache.clone(), ops_cache: self.ops_cache.clone(), - coordinator: self.coordinator.clone(), + queue_poller: self.queue_poller.clone(), compressible_tracker: self.compressible_tracker.clone(), } } @@ -146,29 +147,22 @@ impl EpochManager { ops_cache: Arc>, compressible_tracker: Option>, ) -> Result { - let coordinator = if let Some(url) = &config.external_services.photon_grpc_url { - match QueueEventRouter::new(url.clone()).await { - Ok(coord) => { - let coord_arc = Arc::new(coord); - - tokio::spawn({ - let coord_clone = Arc::clone(&coord_arc); - async move { - if let Err(e) = coord_clone.run_dispatcher().await { - error!("dispatcher error: {:?}", e); - } - } - }); + let queue_poller = if let Some(indexer_url) = &config.external_services.indexer_url { + info!( + "Spawning QueueInfoPoller actor for indexer at {}", + indexer_url + ); - Some(coord_arc) - } - Err(e) => { - warn!("{:?}. V2 trees will use polling fallback.", e); - None - } - } + let poller = QueueInfoPoller::new( + indexer_url.clone(), + config.external_services.photon_api_key.clone(), + ); + + let actor_ref = QueueInfoPoller::spawn(poller); + info!("QueueInfoPoller actor spawn initiated"); + Some(actor_ref) } else { - info!("photon_grpc_url not configured, V2 trees will use polling mode"); + info!("indexer_url not configured, V2 trees will not have queue updates"); None }; @@ -184,7 +178,7 @@ impl EpochManager { new_tree_sender, tx_cache, ops_cache, - coordinator, + queue_poller, compressible_tracker, }) } @@ -956,18 +950,10 @@ impl EpochManager { ) }); - let coordinator = self.coordinator.clone(); + let queue_poller = self.queue_poller.clone(); - if let Some(ref coord) = coordinator { - if coord.is_healthy() { - info!("Using WorkCoordinator for {} V2 trees", v2_trees.len()); - } else { - info!( - "WorkCoordinator exists but not yet healthy. V2 trees will use polling fallback until connection establishes." - ); - } - } else if !v2_trees.is_empty() { - info!("No WorkCoordinator available. V2 trees will use polling mode."); + if queue_poller.is_some() { + info!("Using QueueInfoPoller for {} V2 trees", v2_trees.len()); } let self_arc = Arc::new(self.clone()); @@ -983,10 +969,35 @@ impl EpochManager { tree.tree_accounts.tree_type, TreeType::StateV2 | TreeType::AddressV2 ) { - if let Some(ref coord) = coordinator { - Some(coord.register_tree(tree.tree_accounts.merkle_tree).await) + if let Some(ref poller) = queue_poller { + match poller + .ask(RegisterTree { + tree_pubkey: tree.tree_accounts.merkle_tree, + }) + .send() + .await + { + Ok(rx) => Some(rx), + Err(e) => { + error!( + "Failed to register V2 tree {} with queue poller: {:?}.", + tree.tree_accounts.merkle_tree, e + ); + return Err(anyhow::anyhow!( + "Failed to register V2 tree {} with queue poller: {}. Cannot process without queue updates.", + tree.tree_accounts.merkle_tree, e + )); + } + } } else { - None + error!( + "No queue poller available for V2 tree {}.", + tree.tree_accounts.merkle_tree + ); + return Err(anyhow::anyhow!( + "No queue poller available for V2 tree {}. Cannot process without queue updates.", + tree.tree_accounts.merkle_tree + )); } } else { None @@ -1001,7 +1012,6 @@ impl EpochManager { let self_clone = self_arc.clone(); let epoch_info_clone = epoch_info_arc.clone(); let tree = tree.clone(); - let coordinator_clone = coordinator.clone(); let handle = tokio::spawn(async move { self_clone @@ -1010,7 +1020,6 @@ impl EpochManager { &epoch_info_clone.forester_epoch_pda, tree.clone(), queue_update_rx, - coordinator_clone.clone(), ) .await }); @@ -1105,7 +1114,7 @@ impl EpochManager { #[instrument( level = "debug", - skip(self, epoch_info, epoch_pda, tree_schedule, queue_update_rx, coordinator), + skip(self, epoch_info, epoch_pda, tree_schedule, queue_update_rx), fields(forester = %self.config.payer_keypair.pubkey(), epoch = epoch_info.epoch, tree = %tree_schedule.tree_accounts.merkle_tree) )] @@ -1115,7 +1124,6 @@ impl EpochManager { epoch_pda: &ForesterEpochPda, mut tree_schedule: TreeForesterSchedule, mut queue_update_rx: Option>, - coordinator: Option>, ) -> Result<()> { let mut current_slot = self.slot_tracker.estimated_current_slot(); @@ -1132,8 +1140,6 @@ impl EpochManager { epoch_info.phases.active.end ); - let use_events = queue_update_rx.is_some(); - 'outer_slot_loop: while current_slot < epoch_info.phases.active.end { let next_slot_to_process = tree_schedule .slots @@ -1153,24 +1159,24 @@ impl EpochManager { .await } TreeType::StateV2 | TreeType::AddressV2 => { - if use_events && queue_update_rx.is_some() { - self.process_light_slot_v2_event( + if let Some(ref mut rx) = queue_update_rx { + self.process_light_slot_v2( epoch_info, epoch_pda, &tree_schedule.tree_accounts, &light_slot_details, - queue_update_rx.as_mut().unwrap(), - coordinator.clone(), + rx, ) .await } else { - self.process_light_slot_v2_fallback( - epoch_info, - epoch_pda, - &tree_schedule.tree_accounts, - &light_slot_details, - ) - .await + error!( + "No queue update channel available for V2 tree {}.", + tree_schedule.tree_accounts.merkle_tree + ); + Err(anyhow::anyhow!( + "No queue update channel for V2 tree {}", + tree_schedule.tree_accounts.merkle_tree + )) } } TreeType::Unknown => { @@ -1323,17 +1329,16 @@ impl EpochManager { #[instrument( level = "debug", - skip(self, epoch_info, epoch_pda, tree_accounts, forester_slot_details, queue_update_rx, coordinator), + skip(self, epoch_info, epoch_pda, tree_accounts, forester_slot_details, queue_update_rx), fields(tree = %tree_accounts.merkle_tree) )] - async fn process_light_slot_v2_event( + async fn process_light_slot_v2( &self, epoch_info: &Epoch, epoch_pda: &ForesterEpochPda, tree_accounts: &TreeAccounts, forester_slot_details: &ForesterSlot, queue_update_rx: &mut mpsc::Receiver, - coordinator: Option>, ) -> Result<()> { info!( "Processing V2 light slot {} ({}-{})", @@ -1353,13 +1358,14 @@ impl EpochManager { let tree_pubkey = tree_accounts.merkle_tree; let mut estimated_slot = self.slot_tracker.estimated_current_slot(); - let mut fallback_timer = tokio::time::interval(Duration::from_secs(5)); - fallback_timer.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + let mut timeouts = 0u32; + const MAX_TIMEOUTS: u32 = 100; + const QUEUE_UPDATE_TIMEOUT: Duration = Duration::from_millis(150); 'inner_processing_loop: loop { if estimated_slot >= forester_slot_details.end_solana_slot { trace!( - "Ending V2 event processing for slot {:?}", + "Ending V2 processing for slot {:?}", forester_slot_details.slot ); break 'inner_processing_loop; @@ -1385,8 +1391,10 @@ impl EpochManager { break 'inner_processing_loop; } - tokio::select! { - Some(update) = queue_update_rx.recv() => { + match tokio::time::timeout(QUEUE_UPDATE_TIMEOUT, queue_update_rx.recv()).await { + Ok(Some(update)) => { + timeouts = 0; + if update.queue_size > 0 { info!( "V2 Queue update received for tree {}: {} items (type: {:?})", @@ -1394,63 +1402,61 @@ impl EpochManager { ); let processing_start_time = Instant::now(); - match self.dispatch_tree_processing( - epoch_info, - epoch_pda, - tree_accounts, - forester_slot_details, - estimated_slot, - Some(&update), // Pass gRPC queue update hint - ).await { + match self + .dispatch_tree_processing( + epoch_info, + epoch_pda, + tree_accounts, + forester_slot_details, + estimated_slot, + Some(&update), + ) + .await + { Ok(count) => { if count > 0 { - info!("V2 event processed {} items", count); + info!("V2 processed {} items", count); self.update_metrics_and_counts( epoch_info.epoch, count, processing_start_time.elapsed(), - ).await; + ) + .await; } } Err(e) => { - error!("V2 event processing failed: {:?}", e); + error!("V2 processing failed: {:?}", e); } } } else { trace!("V2 received empty queue update for tree {}", tree_pubkey); } } + Ok(None) => { + info!("Queue update channel closed for tree {}.", tree_pubkey); + break 'inner_processing_loop; + } + Err(_elapsed) => { + timeouts += 1; - _ = fallback_timer.tick() => { - let is_healthy = coordinator - .as_ref() - .map(|c| c.is_healthy()) - .unwrap_or(false); - - if !is_healthy { - warn!("V2 gRPC connection unhealthy, running fallback check for tree {}", tree_pubkey); - let processing_start_time = Instant::now(); - match self.dispatch_tree_processing( - epoch_info, - epoch_pda, - tree_accounts, - forester_slot_details, - estimated_slot, - None, // No queue update hint in fallback path - ).await { - Ok(count) if count > 0 => { - info!("V2 fallback found {} items", count); - self.update_metrics_and_counts( - epoch_info.epoch, - count, - processing_start_time.elapsed(), - ).await; - } - Ok(_) => trace!("V2 fallback check: no work"), - Err(e) => error!("V2 fallback check failed: {:?}", e), - } + if timeouts >= MAX_TIMEOUTS { + error!( + "Queue poller has not sent updates for tree {} after {} timeouts ({} total).", + tree_pubkey, + timeouts, + timeouts as u64 * QUEUE_UPDATE_TIMEOUT.as_millis() as u64 + ); + return Err(anyhow::anyhow!( + "Queue poller health check failed: {} consecutive timeouts for tree {}", + timeouts, + tree_pubkey + )); } else { - trace!("V2 fallback check skipped (gRPC healthy)"); + trace!( + "Queue update timeout for tree {} (timeout #{}, continuing to check slot window)", + tree_pubkey, + timeouts + ); } } } @@ -1462,107 +1468,6 @@ impl EpochManager { Ok(()) } - /// V2 polling fallback (when gRPC unavailable) - #[instrument( - level = "debug", - skip(self, epoch_info, epoch_pda, tree_accounts, forester_slot_details), - fields(tree = %tree_accounts.merkle_tree) - )] - async fn process_light_slot_v2_fallback( - &self, - epoch_info: &Epoch, - epoch_pda: &ForesterEpochPda, - tree_accounts: &TreeAccounts, - forester_slot_details: &ForesterSlot, - ) -> Result<()> { - info!( - "Processing V2 light slot {} fallback ({}-{})", - forester_slot_details.slot, - forester_slot_details.start_solana_slot, - forester_slot_details.end_solana_slot - ); - - let mut rpc = self.rpc_pool.get_connection().await?; - wait_until_slot_reached( - &mut *rpc, - &self.slot_tracker, - forester_slot_details.start_solana_slot, - ) - .await?; - - let mut estimated_slot = self.slot_tracker.estimated_current_slot(); - - 'inner_processing_loop: loop { - if estimated_slot >= forester_slot_details.end_solana_slot { - break 'inner_processing_loop; - } - - let current_light_slot = (estimated_slot - epoch_info.phases.active.start) - / epoch_pda.protocol_config.slot_length; - if current_light_slot != forester_slot_details.slot { - break 'inner_processing_loop; - } - - if !self - .check_forester_eligibility( - epoch_pda, - current_light_slot, - &tree_accounts.queue, - epoch_info.epoch, - epoch_info, - ) - .await? - { - break 'inner_processing_loop; - } - - let processing_start_time = Instant::now(); - let items_processed_this_iteration = match self - .dispatch_tree_processing( - epoch_info, - epoch_pda, - tree_accounts, - forester_slot_details, - estimated_slot, - None, // No queue update hint for regular processing - ) - .await - { - Ok(count) => count, - Err(e) => { - error!("Failed V2 polling fallback: {:?}", e); - break 'inner_processing_loop; - } - }; - - if items_processed_this_iteration > 0 { - info!( - "V2 polling fallback processed {} items", - items_processed_this_iteration - ); - } - - self.update_metrics_and_counts( - epoch_info.epoch, - items_processed_this_iteration, - processing_start_time.elapsed(), - ) - .await; - - push_metrics(&self.config.external_services.pushgateway_url).await?; - estimated_slot = self.slot_tracker.estimated_current_slot(); - - let sleep_duration_ms = if items_processed_this_iteration > 0 { - 1_000 - } else { - 5_000 - }; - - tokio::time::sleep(Duration::from_millis(sleep_duration_ms)).await; - } - Ok(()) - } - async fn check_forester_eligibility( &self, epoch_pda: &ForesterEpochPda, diff --git a/forester/src/grpc/mod.rs b/forester/src/grpc/mod.rs deleted file mode 100644 index 7ec8ce7b16..0000000000 --- a/forester/src/grpc/mod.rs +++ /dev/null @@ -1,3 +0,0 @@ -mod router; - -pub use router::{QueueEventRouter, QueueUpdateMessage}; diff --git a/forester/src/grpc/router.rs b/forester/src/grpc/router.rs deleted file mode 100644 index e2b41d5408..0000000000 --- a/forester/src/grpc/router.rs +++ /dev/null @@ -1,214 +0,0 @@ -use std::{ - collections::HashMap, - sync::{ - atomic::{AtomicBool, Ordering}, - Arc, - }, - time::Duration, -}; - -use anyhow::{anyhow, Context, Result}; -use light_compressed_account::QueueType; -use proto::{queue_service_client::QueueServiceClient, SubscribeQueueUpdatesRequest}; -use solana_sdk::pubkey::Pubkey; -use tokio::sync::{mpsc, RwLock}; -use tokio_stream::StreamExt; -use tonic::transport::Channel; -use tracing::{debug, error, info, trace, warn}; - -// Generated protobuf code -pub mod proto { - tonic::include_proto!("photon"); -} - -/// Message sent to tree tasks when queue updates occur -#[derive(Debug, Clone)] -pub struct QueueUpdateMessage { - pub tree: Pubkey, - pub queue: Pubkey, - pub queue_type: QueueType, - pub queue_size: u64, - pub slot: u64, - pub update_type: proto::UpdateType, -} - -#[derive(Debug)] -pub struct QueueEventRouter { - grpc_client: RwLock>, - tree_notifiers: Arc>>>, - connection_healthy: Arc, - photon_grpc_url: String, -} - -impl QueueEventRouter { - pub async fn new(photon_grpc_url: String) -> Result { - info!("Connecting to Photon gRPC at {}", photon_grpc_url); - - let grpc_client = QueueServiceClient::connect(photon_grpc_url.clone()) - .await - .context("Failed to connect to Photon gRPC service")?; - - info!("Successfully connected to Photon gRPC"); - - Ok(Self { - grpc_client: RwLock::new(grpc_client), - tree_notifiers: Arc::new(RwLock::new(HashMap::new())), - // Initialize as healthy since connection just succeeded - // Will be set to false if subscription fails in run_dispatcher - connection_healthy: Arc::new(AtomicBool::new(true)), - photon_grpc_url, - }) - } - - pub async fn register_tree(&self, tree_pubkey: Pubkey) -> mpsc::Receiver { - let (tx, rx) = mpsc::channel(100); - self.tree_notifiers.write().await.insert(tree_pubkey, tx); - debug!("Registered tree {} for queue updates", tree_pubkey); - rx - } - - pub async fn unregister_tree(&self, tree_pubkey: &Pubkey) { - self.tree_notifiers.write().await.remove(tree_pubkey); - debug!("Unregistered tree {}", tree_pubkey); - } - - pub async fn run_dispatcher(self: Arc) -> Result<()> { - let mut reconnect_delay = Duration::from_secs(1); - const MAX_RECONNECT_DELAY: Duration = Duration::from_secs(30); - - loop { - match self.dispatch_loop().await { - Ok(()) => { - warn!("gRPC stream ended; attempting to reconnect…"); - self.connection_healthy.store(false, Ordering::Relaxed); - tokio::time::sleep(reconnect_delay).await; - let _ = self.reconnect().await; - reconnect_delay = Duration::from_secs(1); - continue; - } - Err(e) => { - error!("gRPC dispatcher error: {:?}", e); - self.connection_healthy.store(false, Ordering::Relaxed); - - warn!("Reconnecting in {:?}...", reconnect_delay); - tokio::time::sleep(reconnect_delay).await; - reconnect_delay = std::cmp::min(reconnect_delay * 2, MAX_RECONNECT_DELAY); - - match self.reconnect().await { - Ok(()) => { - info!("Successfully reconnected to Photon gRPC"); - reconnect_delay = Duration::from_secs(1); - } - Err(e) => { - error!("Failed to reconnect: {:?}", e); - } - } - } - } - } - } - - async fn reconnect(&self) -> Result<()> { - let new_client = QueueServiceClient::connect(self.photon_grpc_url.clone()) - .await - .context("Failed to reconnect to Photon gRPC service")?; - *self.grpc_client.write().await = new_client; - Ok(()) - } - - async fn dispatch_loop(&self) -> Result<()> { - info!("Starting gRPC queue update subscription"); - - let request = SubscribeQueueUpdatesRequest { - trees: vec![], - send_initial_state: true, - }; - - let mut stream = self - .grpc_client - .read() - .await - .clone() - .subscribe_queue_updates(request) - .await - .context("Failed to subscribe to queue updates")? - .into_inner(); - - self.connection_healthy.store(true, Ordering::Relaxed); - info!("gRPC subscription established successfully"); - - while let Some(update_result) = stream.next().await { - let update = update_result.context("Error receiving queue update")?; - - let queue_info = update - .queue_info - .ok_or_else(|| anyhow!("Missing queue_info in update"))?; - - let tree_pubkey = queue_info - .tree - .parse::() - .context("Failed to parse tree pubkey")?; - - let queue_pubkey = queue_info - .queue - .parse::() - .context("Failed to parse queue pubkey")?; - - let queue_type = QueueType::from(queue_info.queue_type as u64); - - let update_type = proto::UpdateType::try_from(update.update_type) - .unwrap_or(proto::UpdateType::Unspecified); - - let message = QueueUpdateMessage { - tree: tree_pubkey, - queue: queue_pubkey, - queue_type, - queue_size: queue_info.queue_size, - slot: update.slot, - update_type, - }; - - let notifiers = self.tree_notifiers.read().await; - if let Some(tx) = notifiers.get(&tree_pubkey) { - match tx.try_send(message.clone()) { - Ok(()) => { - trace!( - "Routed update to tree {}: {} items (type: {:?})", - tree_pubkey, - message.queue_size, - queue_type - ); - } - Err(mpsc::error::TrySendError::Full(_)) => { - warn!( - "Tree {} channel full, dropping update (tree processing slower than updates)", - tree_pubkey - ); - } - Err(mpsc::error::TrySendError::Closed(_)) => { - debug!("Tree {} channel closed (task likely finished)", tree_pubkey); - } - } - } else { - trace!("Received update for unregistered tree {}", tree_pubkey); - } - } - - warn!("gRPC stream ended"); - self.connection_healthy.store(false, Ordering::Relaxed); - Ok(()) - } - - pub fn is_healthy(&self) -> bool { - self.connection_healthy.load(Ordering::Relaxed) - } - - pub async fn registered_tree_count(&self) -> usize { - self.tree_notifiers.read().await.len() - } - - pub async fn shutdown(&self) { - info!("Shutting down WorkCoordinator"); - self.connection_healthy.store(false, Ordering::Relaxed); - } -} diff --git a/forester/src/lib.rs b/forester/src/lib.rs index 17b37830ff..ade1fb161b 100644 --- a/forester/src/lib.rs +++ b/forester/src/lib.rs @@ -6,11 +6,11 @@ pub mod config; pub mod epoch_manager; pub mod errors; pub mod forester_status; -pub mod grpc; pub mod health_check; pub mod helius_priority_fee_types; pub mod metrics; pub mod pagerduty; +pub mod polling; pub mod processor; pub mod pubsub_client; pub mod queue_helpers; @@ -21,7 +21,6 @@ pub mod telemetry; pub mod tree_data_sync; pub mod tree_finder; pub mod utils; -pub mod work_coordinator; use std::{sync::Arc, time::Duration}; diff --git a/forester/src/polling/mod.rs b/forester/src/polling/mod.rs new file mode 100644 index 0000000000..ac64bba8d8 --- /dev/null +++ b/forester/src/polling/mod.rs @@ -0,0 +1,5 @@ +pub mod queue_poller; + +pub use queue_poller::{ + QueueInfoPoller, QueueUpdateMessage, RegisterTree, RegisteredTreeCount, UnregisterTree, +}; diff --git a/forester/src/polling/queue_poller.rs b/forester/src/polling/queue_poller.rs new file mode 100644 index 0000000000..df36d8b8da --- /dev/null +++ b/forester/src/polling/queue_poller.rs @@ -0,0 +1,284 @@ +use std::{ + collections::HashMap, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, + }, + time::Duration, +}; + +use anyhow::Result; +use kameo::{ + actor::{ActorRef, WeakActorRef}, + error::ActorStopReason, + message::Message, + Actor, +}; +use light_client::indexer::{photon_indexer::PhotonIndexer, Indexer}; +use light_compressed_account::QueueType; +use solana_sdk::pubkey::Pubkey; +use tokio::sync::mpsc; +use tracing::{debug, error, info, trace, warn}; + +const POLLING_INTERVAL_SECS: u64 = 1; + +#[derive(Debug, Clone)] +pub struct QueueUpdateMessage { + pub tree: Pubkey, + pub queue: Pubkey, + pub queue_type: QueueType, + pub queue_size: u64, + pub slot: u64, +} + +pub struct QueueInfoPoller { + indexer: PhotonIndexer, + tree_notifiers: HashMap>, + polling_active: Arc, +} + +impl Actor for QueueInfoPoller { + type Args = Self; + type Error = anyhow::Error; + + async fn on_start(state: Self::Args, actor_ref: ActorRef) -> Result { + info!("QueueInfoPoller actor starting"); + + let polling_active = state.polling_active.clone(); + tokio::spawn(async move { + polling_loop(actor_ref, polling_active).await; + }); + + Ok(state) + } + + async fn on_stop( + &mut self, + _actor_ref: WeakActorRef, + _reason: ActorStopReason, + ) -> Result<()> { + info!("QueueInfoPoller actor stopping"); + // Use Release ordering to synchronize with Acquire loads in polling_loop + self.polling_active.store(false, Ordering::Release); + Ok(()) + } +} + +impl QueueInfoPoller { + pub fn new(indexer_url: String, api_key: Option) -> Self { + let indexer = PhotonIndexer::new(format!("{}/v1", indexer_url), api_key); + + Self { + indexer, + tree_notifiers: HashMap::new(), + polling_active: Arc::new(AtomicBool::new(true)), + } + } + + async fn poll_queue_info(&mut self) -> Result> { + match self.indexer.get_queue_info(None).await { + Ok(response) => { + let result = response.value; + let slot = result.slot; + + let queue_infos = result + .queues + .into_iter() + .map(|queue| QueueInfo { + tree: queue.tree, + queue: queue.queue, + queue_type: QueueType::from(queue.queue_type as u64), + queue_size: queue.queue_size, + slot, + }) + .collect(); + + Ok(queue_infos) + } + Err(e) => { + error!("Failed to call getQueueInfo: {:?}", e); + Err(anyhow::anyhow!("Failed to call getQueueInfo").context(e)) + } + } + } + + fn distribute_updates(&self, queue_infos: Vec) { + for info in queue_infos { + if let Some(tx) = self.tree_notifiers.get(&info.tree) { + let message = QueueUpdateMessage { + tree: info.tree, + queue: info.queue, + queue_type: info.queue_type, + queue_size: info.queue_size, + slot: info.slot, + }; + + match tx.try_send(message.clone()) { + Ok(()) => { + trace!( + "Routed update to tree {}: {} items (type: {:?})", + info.tree, + message.queue_size, + info.queue_type + ); + } + Err(mpsc::error::TrySendError::Full(_)) => { + warn!( + "Tree {} channel full, dropping update (tree processing slower than updates)", + info.tree + ); + } + Err(mpsc::error::TrySendError::Closed(_)) => { + trace!("Tree {} channel closed (task likely finished)", info.tree); + } + } + } + } + } +} + +#[derive(Debug, Clone)] +struct QueueInfo { + tree: Pubkey, + queue: Pubkey, + queue_type: QueueType, + queue_size: u64, + slot: u64, +} + +#[derive(Debug, Clone)] +pub struct RegisterTree { + pub tree_pubkey: Pubkey, +} + +impl Message for QueueInfoPoller { + type Reply = mpsc::Receiver; + + async fn handle( + &mut self, + msg: RegisterTree, + _ctx: &mut kameo::message::Context, + ) -> Self::Reply { + let (tx, rx) = mpsc::channel(100); + + // Check if there's already a sender registered for this tree + if let Some(old_sender) = self.tree_notifiers.insert(msg.tree_pubkey, tx) { + warn!( + "Double registration detected for tree {}. Replacing existing sender (previous receiver will be closed).", + msg.tree_pubkey + ); + // The old sender is dropped here, which will close the old receiver + drop(old_sender); + } else { + debug!("Registered tree {} for queue updates", msg.tree_pubkey); + } + + rx + } +} + +#[derive(Debug, Clone)] +pub struct UnregisterTree { + pub tree_pubkey: Pubkey, +} + +impl Message for QueueInfoPoller { + type Reply = (); + + async fn handle( + &mut self, + msg: UnregisterTree, + _ctx: &mut kameo::message::Context, + ) -> Self::Reply { + // Check if the tree was actually registered before removing + if let Some(sender) = self.tree_notifiers.remove(&msg.tree_pubkey) { + debug!("Unregistered tree {}", msg.tree_pubkey); + // Drop the sender to close the receiver + drop(sender); + } else { + warn!( + "Attempted to unregister non-existent tree {}. This may indicate a mismatch between receiver drops and explicit unregistration.", + msg.tree_pubkey + ); + } + } +} + +#[derive(Debug, Clone, Copy)] +pub struct RegisteredTreeCount; + +impl Message for QueueInfoPoller { + type Reply = usize; + + async fn handle( + &mut self, + _msg: RegisteredTreeCount, + _ctx: &mut kameo::message::Context, + ) -> Self::Reply { + self.tree_notifiers.len() + } +} + +#[derive(Debug, Clone, Copy)] +struct PollNow; + +impl Message for QueueInfoPoller { + type Reply = (); + + async fn handle( + &mut self, + _msg: PollNow, + _ctx: &mut kameo::message::Context, + ) -> Self::Reply { + if self.tree_notifiers.is_empty() { + debug!("No trees registered; skipping queue info poll"); + return; + } + + match self.poll_queue_info().await { + Ok(queue_infos) => { + self.distribute_updates(queue_infos); + } + Err(e) => { + error!("Failed to poll queue info: {:?}", e); + } + } + } +} + +async fn polling_loop(actor_ref: ActorRef, polling_active: Arc) { + info!("Starting queue info polling loop (1 second interval)"); + + let mut interval = tokio::time::interval(Duration::from_secs(POLLING_INTERVAL_SECS)); + interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + + loop { + // Check if polling should continue + if !polling_active.load(Ordering::Acquire) { + info!("Polling loop shutting down (polling_active=false)"); + break; + } + + interval.tick().await; + + // Check again after the tick in case shutdown was signaled during sleep + if !polling_active.load(Ordering::Acquire) { + info!("Polling loop shutting down (polling_active=false)"); + break; + } + + match actor_ref.tell(PollNow).send().await { + Ok(_) => {} + Err(e) => { + if polling_active.load(Ordering::Acquire) { + error!("Failed to send poll message to actor: {:?}", e); + } else { + info!("Poll message send failed during shutdown: {:?}", e); + } + break; + } + } + } + + info!("Polling loop stopped"); +} diff --git a/forester/src/work_coordinator.rs b/forester/src/work_coordinator.rs deleted file mode 100644 index 0c6bd529b3..0000000000 --- a/forester/src/work_coordinator.rs +++ /dev/null @@ -1,213 +0,0 @@ -use std::{ - collections::HashMap, - sync::{ - atomic::{AtomicBool, Ordering}, - Arc, - }, - time::Duration, -}; - -use anyhow::{anyhow, Context, Result}; -use light_compressed_account::QueueType; -use solana_sdk::pubkey::Pubkey; -use tokio::sync::{mpsc, RwLock}; -use tokio_stream::StreamExt; -use tonic::transport::Channel; -use tracing::{debug, error, info, trace, warn}; - -// Generated protobuf code -pub mod proto { - tonic::include_proto!("photon"); -} - -use proto::{queue_service_client::QueueServiceClient, SubscribeQueueUpdatesRequest}; - -/// Message sent to tree tasks when queue updates occur -#[derive(Debug, Clone)] -pub struct QueueUpdateMessage { - pub tree: Pubkey, - pub queue: Pubkey, - pub queue_type: QueueType, - pub queue_size: u64, - pub slot: u64, - pub update_type: proto::UpdateType, -} - -#[derive(Debug)] -pub struct WorkCoordinator { - grpc_client: RwLock>, - tree_notifiers: Arc>>>, - connection_healthy: Arc, - photon_grpc_url: String, -} - -impl WorkCoordinator { - pub async fn new(photon_grpc_url: String) -> Result { - info!("Connecting to Photon gRPC at {}", photon_grpc_url); - - let grpc_client = QueueServiceClient::connect(photon_grpc_url.clone()) - .await - .context("Failed to connect to Photon gRPC service")?; - - info!("Successfully connected to Photon gRPC"); - - Ok(Self { - grpc_client: RwLock::new(grpc_client), - tree_notifiers: Arc::new(RwLock::new(HashMap::new())), - connection_healthy: Arc::new(AtomicBool::new(false)), - photon_grpc_url, - }) - } - - pub async fn register_tree(&self, tree_pubkey: Pubkey) -> mpsc::Receiver { - let (tx, rx) = mpsc::channel(100); - self.tree_notifiers.write().await.insert(tree_pubkey, tx); - debug!("Registered tree {} for queue updates", tree_pubkey); - rx - } - - pub async fn unregister_tree(&self, tree_pubkey: &Pubkey) { - self.tree_notifiers.write().await.remove(tree_pubkey); - debug!("Unregistered tree {}", tree_pubkey); - } - - pub async fn run_dispatcher(self: Arc) -> Result<()> { - let mut reconnect_delay = Duration::from_secs(1); - const MAX_RECONNECT_DELAY: Duration = Duration::from_secs(30); - - loop { - match self.dispatch_loop().await { - Ok(()) => { - warn!("gRPC stream ended; attempting to reconnect…"); - self.connection_healthy.store(false, Ordering::Relaxed); - tokio::time::sleep(reconnect_delay).await; - let _ = self.reconnect().await; - reconnect_delay = Duration::from_secs(1); - continue; - } - Err(e) => { - error!("gRPC dispatcher error: {:?}", e); - self.connection_healthy.store(false, Ordering::Relaxed); - - warn!("Reconnecting in {:?}...", reconnect_delay); - tokio::time::sleep(reconnect_delay).await; - reconnect_delay = std::cmp::min(reconnect_delay * 2, MAX_RECONNECT_DELAY); - - match self.reconnect().await { - Ok(()) => { - info!("Successfully reconnected to Photon gRPC"); - reconnect_delay = Duration::from_secs(1); - } - Err(e) => { - error!("Failed to reconnect: {:?}", e); - } - } - } - } - } - } - - async fn reconnect(&self) -> Result<()> { - let new_client = QueueServiceClient::connect(self.photon_grpc_url.clone()) - .await - .context("Failed to reconnect to Photon gRPC service")?; - *self.grpc_client.write().await = new_client; - Ok(()) - } - - async fn dispatch_loop(&self) -> Result<()> { - info!("Starting gRPC queue update subscription"); - - let request = SubscribeQueueUpdatesRequest { - trees: vec![], - send_initial_state: true, - }; - - let mut stream = self - .grpc_client - .read() - .await - .clone() - .subscribe_queue_updates(request) - .await - .context("Failed to subscribe to queue updates")? - .into_inner(); - - self.connection_healthy.store(true, Ordering::Relaxed); - info!("gRPC subscription established successfully"); - - while let Some(update_result) = stream.next().await { - let update = update_result.context("Error receiving queue update")?; - - let queue_info = update - .queue_info - .ok_or_else(|| anyhow!("Missing queue_info in update"))?; - - let tree_pubkey = queue_info - .tree - .parse::() - .context("Failed to parse tree pubkey")?; - - let queue_pubkey = queue_info - .queue - .parse::() - .context("Failed to parse queue pubkey")?; - - let queue_type = QueueType::from(queue_info.queue_type as u64); - - let update_type = proto::UpdateType::try_from(update.update_type) - .unwrap_or(proto::UpdateType::Unspecified); - - let message = QueueUpdateMessage { - tree: tree_pubkey, - queue: queue_pubkey, - queue_type, - queue_size: queue_info.queue_size, - slot: update.slot, - update_type, - }; - - let notifiers = self.tree_notifiers.read().await; - if let Some(tx) = notifiers.get(&tree_pubkey) { - match tx.try_send(message.clone()) { - Ok(()) => { - trace!( - "Routed update to tree {}: {} items (type: {:?})", - tree_pubkey, - message.queue_size, - queue_type - ); - } - Err(mpsc::error::TrySendError::Full(_)) => { - warn!( - "Tree {} channel full, dropping update (tree processing slower than updates)", - tree_pubkey - ); - } - Err(mpsc::error::TrySendError::Closed(_)) => { - debug!("Tree {} channel closed (task likely finished)", tree_pubkey); - } - } - } else { - trace!("Received update for unregistered tree {}", tree_pubkey); - } - } - - warn!("gRPC stream ended"); - self.connection_healthy.store(false, Ordering::Relaxed); - Ok(()) - } - - pub fn is_healthy(&self) -> bool { - self.connection_healthy.load(Ordering::Relaxed) - } - - pub async fn registered_tree_count(&self) -> usize { - self.tree_notifiers.read().await.len() - } - - pub async fn shutdown(&self) { - info!("Shutting down WorkCoordinator"); - self.connection_healthy.store(false, Ordering::Relaxed); - } -} diff --git a/forester/tests/e2e_test.rs b/forester/tests/e2e_test.rs index dd51f28efb..ff7f8b4b25 100644 --- a/forester/tests/e2e_test.rs +++ b/forester/tests/e2e_test.rs @@ -269,7 +269,6 @@ async fn e2e_test() { "../target/deploy/create_address_test_program.so".to_string(), )], limit_ledger_size: None, - grpc_port: Some(50051), })) .await; spawn_prover().await; @@ -1353,25 +1352,6 @@ async fn create_v1_address( println!("indexer root: {:?}, index: {}", root, index); - { - let account = rpc - .get_anchor_account::(merkle_tree_pubkey) - .await - .unwrap(); - println!("address merkle tree account: {:?}", account); - let merkle_tree = - get_indexed_merkle_tree::( - rpc, - *merkle_tree_pubkey, - ) - .await; - - for (idx, root) in merkle_tree.roots.iter().enumerate() { - println!("root[{}]: {:?}", idx, root); - } - println!("root index: {}", merkle_tree.root_index()); - } - let instruction = create_invoke_instruction( &payer.pubkey(), &payer.pubkey(), diff --git a/forester/tests/legacy/batched_address_test.rs b/forester/tests/legacy/batched_address_test.rs index fcf328b960..7b6db499d7 100644 --- a/forester/tests/legacy/batched_address_test.rs +++ b/forester/tests/legacy/batched_address_test.rs @@ -40,7 +40,6 @@ async fn test_address_batched() { "../target/deploy/create_address_test_program.so".to_string(), )], limit_ledger_size: None, - grpc_port: None, })) .await; let tree_params = InitAddressTreeAccountsInstructionData::test_default(); diff --git a/forester/tests/legacy/batched_state_async_indexer_test.rs b/forester/tests/legacy/batched_state_async_indexer_test.rs index 2fd28955bb..bab17a37d9 100644 --- a/forester/tests/legacy/batched_state_async_indexer_test.rs +++ b/forester/tests/legacy/batched_state_async_indexer_test.rs @@ -82,7 +82,6 @@ async fn test_state_indexer_async_batched() { wait_time: 30, sbf_programs: vec![], limit_ledger_size: None, - grpc_port: None, })) .await; spawn_prover().await; diff --git a/forester/tests/legacy/batched_state_indexer_test.rs b/forester/tests/legacy/batched_state_indexer_test.rs index fe3ca8d3ab..32d3a229a8 100644 --- a/forester/tests/legacy/batched_state_indexer_test.rs +++ b/forester/tests/legacy/batched_state_indexer_test.rs @@ -43,7 +43,6 @@ async fn test_state_indexer_batched() { wait_time: 90, sbf_programs: vec![], limit_ledger_size: None, - grpc_port: None, })) .await; diff --git a/forester/tests/legacy/batched_state_test.rs b/forester/tests/legacy/batched_state_test.rs index 2559954e59..559af5b181 100644 --- a/forester/tests/legacy/batched_state_test.rs +++ b/forester/tests/legacy/batched_state_test.rs @@ -47,7 +47,6 @@ async fn test_state_batched() { wait_time: 30, sbf_programs: vec![], limit_ledger_size: None, - grpc_port: None, })) .await; diff --git a/forester/tests/legacy/e2e_test.rs b/forester/tests/legacy/e2e_test.rs index 8c0789ddd1..9dc712eb3f 100644 --- a/forester/tests/legacy/e2e_test.rs +++ b/forester/tests/legacy/e2e_test.rs @@ -39,7 +39,6 @@ async fn test_epoch_monitor_with_2_foresters() { wait_time: 90, sbf_programs: vec![], limit_ledger_size: None, - grpc_port: None, })) .await; let forester_keypair1 = Keypair::new(); @@ -386,7 +385,6 @@ async fn test_epoch_double_registration() { wait_time: 90, sbf_programs: vec![], limit_ledger_size: None, - grpc_port: None, })) .await; diff --git a/forester/tests/legacy/e2e_v1_test.rs b/forester/tests/legacy/e2e_v1_test.rs index 092e4dd419..050ece14af 100644 --- a/forester/tests/legacy/e2e_v1_test.rs +++ b/forester/tests/legacy/e2e_v1_test.rs @@ -40,7 +40,6 @@ async fn test_e2e_v1() { wait_time: 90, sbf_programs: vec![], limit_ledger_size: None, - grpc_port: None, })) .await; let forester_keypair1 = Keypair::new(); @@ -383,7 +382,6 @@ async fn test_epoch_double_registration() { wait_time: 90, sbf_programs: vec![], limit_ledger_size: None, - grpc_port: None, })) .await; diff --git a/forester/tests/test_batch_append_spent.rs b/forester/tests/test_batch_append_spent.rs index 8adfb4df7a..a0b189e191 100644 --- a/forester/tests/test_batch_append_spent.rs +++ b/forester/tests/test_batch_append_spent.rs @@ -50,7 +50,6 @@ async fn test_batch_sequence() { wait_time: 10, sbf_programs: vec![], limit_ledger_size: None, - grpc_port: None, })) .await; diff --git a/forester/tests/test_compressible_ctoken.rs b/forester/tests/test_compressible_ctoken.rs index 53bfedc5a9..a27036e5a9 100644 --- a/forester/tests/test_compressible_ctoken.rs +++ b/forester/tests/test_compressible_ctoken.rs @@ -191,7 +191,6 @@ async fn test_compressible_ctoken_compression() { wait_time: 10, sbf_programs: vec![], limit_ledger_size: None, - grpc_port: None, }) .await; let mut rpc = LightClient::new(LightClientConfig::local()) @@ -352,7 +351,6 @@ async fn test_compressible_ctoken_bootstrap() { wait_time: 10, sbf_programs: vec![], limit_ledger_size: None, - grpc_port: None, }) .await; diff --git a/program-tests/compressed-token-test/tests/v1.rs b/program-tests/compressed-token-test/tests/v1.rs index c037652362..f666bafd61 100644 --- a/program-tests/compressed-token-test/tests/v1.rs +++ b/program-tests/compressed-token-test/tests/v1.rs @@ -5404,7 +5404,6 @@ async fn test_transfer_with_photon_and_batched_tree() { wait_time: 15, sbf_programs: vec![], limit_ledger_size: None, - grpc_port: None, }) .await; diff --git a/program-tests/system-cpi-v2-test/tests/event.rs b/program-tests/system-cpi-v2-test/tests/event.rs index 6c7c13a0fe..47625f9ef9 100644 --- a/program-tests/system-cpi-v2-test/tests/event.rs +++ b/program-tests/system-cpi-v2-test/tests/event.rs @@ -538,7 +538,6 @@ async fn generate_photon_test_data_multiple_events() { "../../target/deploy/create_address_test_program.so".to_string(), )], limit_ledger_size: None, - grpc_port: None, }) .await; diff --git a/scripts/devenv/versions.sh b/scripts/devenv/versions.sh index 37dc48a1ad..7c0d6f9b7f 100755 --- a/scripts/devenv/versions.sh +++ b/scripts/devenv/versions.sh @@ -13,7 +13,7 @@ export SOLANA_VERSION="2.2.15" export ANCHOR_VERSION="0.31.1" export JQ_VERSION="1.8.0" export PHOTON_VERSION="0.51.0" -export PHOTON_COMMIT="94b3688b08477668bb946a689b0267319f5c1ae1" +export PHOTON_COMMIT="2cffb6132a21ce148268129ccbbb24c1f3cabc61" export REDIS_VERSION="8.0.1" export ANCHOR_TAG="anchor-v${ANCHOR_VERSION}" diff --git a/sdk-libs/client/Cargo.toml b/sdk-libs/client/Cargo.toml index 3f33c3935d..565939fd51 100644 --- a/sdk-libs/client/Cargo.toml +++ b/sdk-libs/client/Cargo.toml @@ -27,8 +27,6 @@ solana-clock = { workspace = true } solana-signature = { workspace = true } solana-commitment-config = { workspace = true } solana-account = { workspace = true } -solana-signer = { workspace = true } -solana-epoch-info = { workspace = true } solana-keypair = { workspace = true } solana-compute-budget-interface = { workspace = true } solana-banks-client = { workspace = true, optional = true } @@ -57,15 +55,12 @@ borsh = { workspace = true } async-trait = { workspace = true } thiserror = { workspace = true } num-bigint = { workspace = true } -num-traits = { workspace = true } base64 = { workspace = true } bs58 = { workspace = true } tokio = { workspace = true, features = ["rt", "time"] } -bytemuck = { workspace = true } tracing = { workspace = true } lazy_static = { workspace = true } rand = { workspace = true } # Tests are in program-tests/client-test/tests/light-client.rs -# [dev-dependencies] diff --git a/sdk-libs/client/src/indexer/indexer_trait.rs b/sdk-libs/client/src/indexer/indexer_trait.rs index ab08965ecf..81d1fbf99d 100644 --- a/sdk-libs/client/src/indexer/indexer_trait.rs +++ b/sdk-libs/client/src/indexer/indexer_trait.rs @@ -5,7 +5,7 @@ use super::{ response::{Items, ItemsWithCursor, Response}, types::{ CompressedAccount, CompressedTokenAccount, OwnerBalance, QueueElementsResult, - SignatureWithMetadata, TokenBalance, ValidityProofWithContext, + QueueInfoResult, SignatureWithMetadata, TokenBalance, ValidityProofWithContext, }, Address, AddressWithTree, BatchAddressUpdateIndexerResponse, GetCompressedAccountsByOwnerConfig, GetCompressedTokenAccountsByOwnerOrDelegateOptions, Hash, @@ -201,6 +201,13 @@ pub trait Indexer: std::marker::Send + std::marker::Sync { config: Option, ) -> Result, IndexerError>; + /// Returns information about all queues in the system. + /// Includes tree pubkey, queue pubkey, queue type, and queue size for each queue. + async fn get_queue_info( + &self, + config: Option, + ) -> Result, IndexerError>; + async fn get_subtrees( &self, merkle_tree_pubkey: [u8; 32], diff --git a/sdk-libs/client/src/indexer/mod.rs b/sdk-libs/client/src/indexer/mod.rs index 5ec991c09e..b03dcedac7 100644 --- a/sdk-libs/client/src/indexer/mod.rs +++ b/sdk-libs/client/src/indexer/mod.rs @@ -17,8 +17,9 @@ pub use types::{ AccountProofInputs, Address, AddressMerkleTreeAccounts, AddressProofInputs, AddressQueueIndex, AddressWithTree, BatchAddressUpdateIndexerResponse, CompressedAccount, CompressedTokenAccount, Hash, MerkleProof, MerkleProofWithContext, NewAddressProofWithContext, NextTreeInfo, - OwnerBalance, ProofOfLeaf, QueueElementsResult, RootIndex, SignatureWithMetadata, - StateMerkleTreeAccounts, TokenBalance, TreeInfo, ValidityProofWithContext, + OwnerBalance, ProofOfLeaf, QueueElementsResult, QueueInfo, QueueInfoResult, RootIndex, + SignatureWithMetadata, StateMerkleTreeAccounts, TokenBalance, TreeInfo, + ValidityProofWithContext, }; mod options; pub use options::*; diff --git a/sdk-libs/client/src/indexer/photon_indexer.rs b/sdk-libs/client/src/indexer/photon_indexer.rs index ec70157acd..70970dd2a6 100644 --- a/sdk-libs/client/src/indexer/photon_indexer.rs +++ b/sdk-libs/client/src/indexer/photon_indexer.rs @@ -1737,6 +1737,66 @@ impl Indexer for PhotonIndexer { } } + async fn get_queue_info( + &self, + config: Option, + ) -> Result, IndexerError> { + let config = config.unwrap_or_default(); + self.retry(config.retry_config, || async { + let request = photon_api::models::GetQueueInfoPostRequest { + ..Default::default() + }; + + let result = + photon_api::apis::default_api::get_queue_info_post(&self.configuration, request) + .await?; + + let api_response = Self::extract_result_with_error_check( + "get_queue_info", + result.error.map(|e| { + Box::new( + photon_api::models::GetBatchAddressUpdateInfoPost200ResponseError { + code: Some(e.code), + message: Some(e.message), + }, + ) + }), + result.result, + )?; + + if api_response.slot < config.slot { + return Err(IndexerError::IndexerNotSyncedToSlot); + } + + let queues = api_response + .queues + .iter() + .map(|q| -> Result<_, IndexerError> { + let tree_bytes = super::base58::decode_base58_to_fixed_array(&q.tree)?; + let queue_bytes = super::base58::decode_base58_to_fixed_array(&q.queue)?; + + Ok(super::QueueInfo { + tree: Pubkey::new_from_array(tree_bytes), + queue: Pubkey::new_from_array(queue_bytes), + queue_type: q.queue_type, + queue_size: q.queue_size, + }) + }) + .collect::, _>>()?; + + Ok(Response { + context: Context { + slot: api_response.slot, + }, + value: super::QueueInfoResult { + queues, + slot: api_response.slot, + }, + }) + }) + .await + } + async fn get_subtrees( &self, _merkle_tree_pubkey: [u8; 32], diff --git a/sdk-libs/client/src/indexer/types.rs b/sdk-libs/client/src/indexer/types.rs index 723d450cf3..7638274eef 100644 --- a/sdk-libs/client/src/indexer/types.rs +++ b/sdk-libs/client/src/indexer/types.rs @@ -29,6 +29,20 @@ pub struct ProofOfLeaf { pub type Address = [u8; 32]; pub type Hash = [u8; 32]; +#[derive(Debug, Clone, PartialEq)] +pub struct QueueInfo { + pub tree: Pubkey, + pub queue: Pubkey, + pub queue_type: u8, + pub queue_size: u64, +} + +#[derive(Debug, Clone, PartialEq, Default)] +pub struct QueueInfoResult { + pub queues: Vec, + pub slot: u64, +} + #[derive(Debug, Clone, PartialEq, Default)] pub struct QueueElementsResult { pub output_queue_elements: Option>, diff --git a/sdk-libs/client/src/lib.rs b/sdk-libs/client/src/lib.rs index 39394c9e63..5ab761c25e 100644 --- a/sdk-libs/client/src/lib.rs +++ b/sdk-libs/client/src/lib.rs @@ -41,7 +41,6 @@ //! wait_time: 75, //! sbf_programs: vec![], //! limit_ledger_size: None, -//! grpc_port: None, //! }; //! spawn_validator(config).await; //! diff --git a/sdk-libs/client/src/local_test_validator.rs b/sdk-libs/client/src/local_test_validator.rs index 0418cf5de1..2d46ba7e72 100644 --- a/sdk-libs/client/src/local_test_validator.rs +++ b/sdk-libs/client/src/local_test_validator.rs @@ -9,7 +9,6 @@ pub struct LightValidatorConfig { pub wait_time: u64, pub sbf_programs: Vec<(String, String)>, pub limit_ledger_size: Option, - pub grpc_port: Option, } impl Default for LightValidatorConfig { @@ -20,7 +19,6 @@ impl Default for LightValidatorConfig { wait_time: 35, sbf_programs: vec![], limit_ledger_size: None, - grpc_port: None, } } } @@ -48,10 +46,6 @@ pub async fn spawn_validator(config: LightValidatorConfig) { path.push_str(" --skip-prover"); } - if let Some(grpc_port) = config.grpc_port { - path.push_str(&format!(" --grpc-port {}", grpc_port)); - } - println!("Starting validator with command: {}", path); let child = Command::new("sh") diff --git a/sdk-libs/client/src/rpc/indexer.rs b/sdk-libs/client/src/rpc/indexer.rs index acafeac398..5ddd6c0372 100644 --- a/sdk-libs/client/src/rpc/indexer.rs +++ b/sdk-libs/client/src/rpc/indexer.rs @@ -7,8 +7,8 @@ use crate::indexer::{ CompressedTokenAccount, GetCompressedAccountsByOwnerConfig, GetCompressedTokenAccountsByOwnerOrDelegateOptions, Hash, Indexer, IndexerError, IndexerRpcConfig, Items, ItemsWithCursor, MerkleProof, NewAddressProofWithContext, - OwnerBalance, PaginatedOptions, Response, RetryConfig, SignatureWithMetadata, TokenBalance, - ValidityProofWithContext, + OwnerBalance, PaginatedOptions, QueueElementsResult, QueueInfoResult, Response, RetryConfig, + SignatureWithMetadata, TokenBalance, ValidityProofWithContext, }; #[async_trait] @@ -209,7 +209,7 @@ impl Indexer for LightClient { input_queue_start_index: Option, input_queue_limit: Option, config: Option, - ) -> Result, IndexerError> { + ) -> Result, IndexerError> { Ok(self .indexer .as_mut() @@ -225,6 +225,18 @@ impl Indexer for LightClient { .await?) } + async fn get_queue_info( + &self, + config: Option, + ) -> Result, IndexerError> { + Ok(self + .indexer + .as_ref() + .ok_or(IndexerError::NotInitialized)? + .get_queue_info(config) + .await?) + } + async fn get_subtrees( &self, merkle_tree_pubkey: [u8; 32], diff --git a/sdk-libs/photon-api/src/apis/default_api.rs b/sdk-libs/photon-api/src/apis/default_api.rs index dd523674ea..d0dd52fa51 100644 --- a/sdk-libs/photon-api/src/apis/default_api.rs +++ b/sdk-libs/photon-api/src/apis/default_api.rs @@ -295,6 +295,15 @@ pub enum GetMultipleNewAddressProofsV2PostError { UnknownValue(serde_json::Value), } +/// struct for typed errors of method [`get_queue_info_post`] +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(untagged)] +pub enum GetQueueInfoPostError { + Status429(models::GetBatchAddressUpdateInfoPost429Response), + Status500(models::GetBatchAddressUpdateInfoPost429Response), + UnknownValue(serde_json::Value), +} + /// struct for typed errors of method [`get_queue_elements_post`] #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(untagged)] @@ -1740,6 +1749,45 @@ pub async fn get_multiple_new_address_proofs_v2_post( } } +pub async fn get_queue_info_post( + configuration: &configuration::Configuration, + get_queue_info_post_request: models::GetQueueInfoPostRequest, +) -> Result> { + let local_var_configuration = configuration; + + let local_var_client = &local_var_configuration.client; + + let local_var_uri_str = format!("{}/getQueueInfo", local_var_configuration.base_path); + let local_var_uri_str = append_api_key(local_var_configuration, &local_var_uri_str); + let mut local_var_req_builder = + local_var_client.request(reqwest::Method::POST, local_var_uri_str.as_str()); + + if let Some(ref local_var_user_agent) = local_var_configuration.user_agent { + local_var_req_builder = + local_var_req_builder.header(reqwest::header::USER_AGENT, local_var_user_agent.clone()); + } + local_var_req_builder = local_var_req_builder.json(&get_queue_info_post_request); + + let local_var_req = local_var_req_builder.build()?; + let local_var_resp = local_var_client.execute(local_var_req).await?; + + let local_var_status = local_var_resp.status(); + let local_var_content = local_var_resp.text().await?; + + if !local_var_status.is_client_error() && !local_var_status.is_server_error() { + serde_json::from_str(&local_var_content).map_err(Error::from) + } else { + let local_var_entity: Option = + serde_json::from_str(&local_var_content).ok(); + let local_var_error = ResponseContent { + status: local_var_status, + content: local_var_content, + entity: local_var_entity, + }; + Err(Error::ResponseError(local_var_error)) + } +} + pub async fn get_queue_elements_post( configuration: &configuration::Configuration, get_queue_elements_post_request: models::GetQueueElementsPostRequest, diff --git a/sdk-libs/photon-api/src/models/_get_queue_info_post_200_response.rs b/sdk-libs/photon-api/src/models/_get_queue_info_post_200_response.rs new file mode 100644 index 0000000000..101456b52e --- /dev/null +++ b/sdk-libs/photon-api/src/models/_get_queue_info_post_200_response.rs @@ -0,0 +1,47 @@ +/* + * photon-indexer + * + * Solana indexer for general compression + * + * The version of the OpenAPI document: 0.50.0 + * + * Generated by: https://openapi-generator.tech + */ + +use crate::models; + +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct JsonRpcError { + pub code: i32, + pub message: String, + #[serde(skip_serializing_if = "Option::is_none")] + pub data: Option, +} + +#[derive(Clone, Debug, PartialEq, Default, Serialize, Deserialize)] +pub struct GetQueueInfoPost200Response { + #[serde(rename = "jsonrpc")] + pub jsonrpc: String, + #[serde(rename = "result", skip_serializing_if = "Option::is_none")] + pub result: Option, + #[serde(rename = "error", skip_serializing_if = "Option::is_none")] + pub error: Option, + #[serde(rename = "id")] + pub id: Option, +} + +impl GetQueueInfoPost200Response { + pub fn new( + jsonrpc: String, + result: Option, + id: Option, + ) -> GetQueueInfoPost200Response { + GetQueueInfoPost200Response { + jsonrpc, + result, + error: None, + id, + } + } +} diff --git a/sdk-libs/photon-api/src/models/_get_queue_info_post_200_response_result.rs b/sdk-libs/photon-api/src/models/_get_queue_info_post_200_response_result.rs new file mode 100644 index 0000000000..00435c3aaa --- /dev/null +++ b/sdk-libs/photon-api/src/models/_get_queue_info_post_200_response_result.rs @@ -0,0 +1,46 @@ +/* + * photon-indexer + * + * Solana indexer for general compression + * + * The version of the OpenAPI document: 0.50.0 + * + * Generated by: https://openapi-generator.tech + */ + +#[derive(Clone, Debug, PartialEq, Default, Serialize, Deserialize)] +pub struct GetQueueInfoPost200ResponseResult { + #[serde(rename = "queues")] + pub queues: Vec, + #[serde(rename = "slot")] + pub slot: u64, +} + +#[derive(Clone, Debug, PartialEq, Default, Serialize, Deserialize)] +pub struct QueueInfo { + #[serde(rename = "tree")] + pub tree: String, + #[serde(rename = "queue")] + pub queue: String, + #[serde(rename = "queueType")] + pub queue_type: u8, + #[serde(rename = "queueSize")] + pub queue_size: u64, +} + +impl GetQueueInfoPost200ResponseResult { + pub fn new(queues: Vec, slot: u64) -> GetQueueInfoPost200ResponseResult { + GetQueueInfoPost200ResponseResult { queues, slot } + } +} + +impl QueueInfo { + pub fn new(tree: String, queue: String, queue_type: u8, queue_size: u64) -> QueueInfo { + QueueInfo { + tree, + queue, + queue_type, + queue_size, + } + } +} diff --git a/sdk-libs/photon-api/src/models/_get_queue_info_post_request.rs b/sdk-libs/photon-api/src/models/_get_queue_info_post_request.rs new file mode 100644 index 0000000000..29511189a3 --- /dev/null +++ b/sdk-libs/photon-api/src/models/_get_queue_info_post_request.rs @@ -0,0 +1,64 @@ +/* + * photon-indexer + * + * Solana indexer for general compression + * + * The version of the OpenAPI document: 0.50.0 + * + * Generated by: https://openapi-generator.tech + */ + +#[derive(Clone, Default, Debug, PartialEq, Serialize, Deserialize)] +pub struct GetQueueInfoPostRequest { + /// An ID to identify the request. + #[serde(rename = "id")] + pub id: i32, + /// The version of the JSON-RPC protocol. + #[serde(rename = "jsonrpc")] + pub jsonrpc: Jsonrpc, + /// The name of the method to invoke. + #[serde(rename = "method")] + pub method: Method, + #[serde(rename = "params")] + pub params: GetQueueInfoParams, +} + +impl GetQueueInfoPostRequest { + pub fn new(id: i32, jsonrpc: Jsonrpc, method: Method) -> GetQueueInfoPostRequest { + GetQueueInfoPostRequest { + id, + jsonrpc, + method, + params: GetQueueInfoParams::default(), + } + } +} + +#[derive(Clone, Default, Debug, PartialEq, Serialize, Deserialize)] +pub struct GetQueueInfoParams {} + +/// The version of the JSON-RPC protocol. +#[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd, Hash, Serialize, Deserialize)] +pub enum Jsonrpc { + #[serde(rename = "2.0")] + Variant2Period0, +} + +impl Default for Jsonrpc { + fn default() -> Jsonrpc { + Self::Variant2Period0 + } +} + +/// The name of the method to invoke. +#[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd, Hash, Serialize, Deserialize)] +pub enum Method { + #[serde(rename = "getQueueInfo")] + GetQueueInfo, +} + +impl Default for Method { + fn default() -> Method { + Self::GetQueueInfo + } +} diff --git a/sdk-libs/photon-api/src/models/mod.rs b/sdk-libs/photon-api/src/models/mod.rs index fc925f9215..b8aa4810dd 100644 --- a/sdk-libs/photon-api/src/models/mod.rs +++ b/sdk-libs/photon-api/src/models/mod.rs @@ -322,3 +322,11 @@ pub mod token_data; pub use self::token_data::TokenData; pub mod tree_context_info; pub use self::tree_context_info::TreeContextInfo; +pub mod _get_queue_info_post_request; +pub use self::_get_queue_info_post_request::GetQueueInfoPostRequest; +pub mod _get_queue_info_post_200_response; +pub use self::_get_queue_info_post_200_response::GetQueueInfoPost200Response; +pub mod _get_queue_info_post_200_response_result; +pub use self::_get_queue_info_post_200_response_result::{ + GetQueueInfoPost200ResponseResult, QueueInfo, +}; diff --git a/sdk-libs/program-test/src/indexer/test_indexer.rs b/sdk-libs/program-test/src/indexer/test_indexer.rs index bc5a07e6e6..cac5d82f84 100644 --- a/sdk-libs/program-test/src/indexer/test_indexer.rs +++ b/sdk-libs/program-test/src/indexer/test_indexer.rs @@ -865,6 +865,13 @@ impl Indexer for TestIndexer { } } + async fn get_queue_info( + &self, + _config: Option, + ) -> Result, IndexerError> { + unimplemented!("get_queue_info") + } + async fn get_subtrees( &self, _merkle_tree_pubkey: [u8; 32], diff --git a/sdk-libs/program-test/src/program_test/indexer.rs b/sdk-libs/program-test/src/program_test/indexer.rs index 86191b1972..26beca72eb 100644 --- a/sdk-libs/program-test/src/program_test/indexer.rs +++ b/sdk-libs/program-test/src/program_test/indexer.rs @@ -221,6 +221,18 @@ impl Indexer for LightProgramTest { .await?) } + async fn get_queue_info( + &self, + config: Option, + ) -> Result, IndexerError> { + Ok(self + .indexer + .as_ref() + .ok_or(IndexerError::NotInitialized)? + .get_queue_info(config) + .await?) + } + async fn get_subtrees( &self, merkle_tree_pubkey: [u8; 32], diff --git a/sdk-tests/client-test/tests/light_client.rs b/sdk-tests/client-test/tests/light_client.rs index d36fd37e90..b0d4e4b19f 100644 --- a/sdk-tests/client-test/tests/light_client.rs +++ b/sdk-tests/client-test/tests/light_client.rs @@ -54,7 +54,6 @@ async fn test_all_endpoints() { wait_time: 10, sbf_programs: vec![], limit_ledger_size: None, - grpc_port: None, }; spawn_validator(config).await;