Skip to content

refactor: replace grpc queue info with polling#2068

Merged
sergeytimoshin merged 1 commit intomainfrom
sergey/get_queue_info_poll
Nov 22, 2025
Merged

refactor: replace grpc queue info with polling#2068
sergeytimoshin merged 1 commit intomainfrom
sergey/get_queue_info_poll

Conversation

@sergeytimoshin
Copy link
Contributor

@sergeytimoshin sergeytimoshin commented Nov 21, 2025

  • remove grpc
  • add QueueInfoPoller actor
  • adjust EpochManager to use polling
  • remove grpc-port from CLI flags
  • update client sdk to support new queue info endpoint

Summary by CodeRabbit

  • New Features

    • Added a queue-info API and SDK types to fetch current queue metadata.
    • Introduced an actor-based poller for periodic queue monitoring and notifications.
  • Refactor

    • Replaced gRPC streaming/router with HTTP-based polling and actor-driven routing.
    • Shifted from build-time protobuf compilation to runtime/API-driven queue integration.
  • Chores

    • Removed protobuf artifact and build script; updated dependencies and test/validator configs (explicit gRPC port removed).

✏️ Tip: You can customize this high-level summary in your review settings.

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Nov 21, 2025

Walkthrough

Replaces Prost/Tonic gRPC-based Photon queue streaming with a kameo actor-based QueueInfoPoller backed by an indexer RPC; removes photon.proto, gRPC router/work_coordinator, build-time proto compilation, and grpc_port config; adds indexer get_queue_info APIs and models; updates EpochManager to use the actor.

Changes

Cohort / File(s) Summary
Dependency Migration
forester/Cargo.toml
Removed Tonic/Prost crates and tonic-prost-build; added kameo = "0.19".
Build Script Removal
forester/build.rs
Deleted protobuf compile steps and rerun directive.
Protocol Buffer Removal
forester/proto/photon.proto
Removed photon.proto and all QueueService/messages/enums.
gRPC Router Removal
forester/src/grpc/mod.rs, forester/src/grpc/router.rs
Removed router module and public re-exports + gRPC routing logic.
Work Coordinator Removal
forester/src/work_coordinator.rs
Deleted WorkCoordinator, its QueueUpdateMessage type, and subscription/dispatch logic.
New Polling Module
forester/src/polling/mod.rs, forester/src/polling/queue_poller.rs
Added QueueInfoPoller actor, QueueUpdateMessage, Register/Unregister messages, polling loop, distribution logic and public re-exports.
EpochManager Integration
forester/src/epoch_manager.rs
Replaced coordinator with queue_poller: Option<ActorRef<QueueInfoPoller>>; spawn poller when indexer_url is set; updated V2 registration to use actor ask/send; removed coordinator params.
Public API / Module Changes
forester/src/lib.rs
Removed grpc and work_coordinator modules; added polling module.
Indexer Client APIs & Types
sdk-libs/client/src/indexer/types.rs, sdk-libs/client/src/indexer/indexer_trait.rs, sdk-libs/client/src/indexer/mod.rs
Added QueueInfo and QueueInfoResult types; added Indexer::get_queue_info async method and re-exports.
Photon Indexer Impl
sdk-libs/client/src/indexer/photon_indexer.rs
Implemented get_queue_info calling Photon API, decoding base58 IDs, validating slot, returning QueueInfoResult.
RPC Wrapper
sdk-libs/client/src/rpc/indexer.rs
Exposed get_queue_info on LightClient Indexer and adjusted queue result types.
Photon API Client Models
sdk-libs/photon-api/src/apis/default_api.rs, sdk-libs/photon-api/src/models/...
Added get_queue_info_post endpoint, request/response/result models, and error enum; added related model modules and re-exports.
Program Test Stubs
sdk-libs/program-test/src/indexer/test_indexer.rs, sdk-libs/program-test/src/program_test/indexer.rs
Added get_queue_info stub/forwarding methods for test indexer and LightProgramTest.
Local Validator / Tests
sdk-libs/client/src/local_test_validator.rs, forester/tests/*, forester/tests/legacy/*
Removed grpc_port field from LightValidatorConfig and removed explicit grpc_port settings from multiple tests (and a debug logging block in one test).
Client Cargo Cleanup
sdk-libs/client/Cargo.toml
Removed workspace dependencies: solana-signer, solana-epoch-info, num-traits, bytemuck.
Docs / Examples
sdk-libs/client/src/lib.rs
Removed grpc_port: None from documentation example.

Sequence Diagram(s)

sequenceDiagram
    participant App as Application
    participant EM as EpochManager
    participant QP as QueueInfoPoller (Actor)
    participant PI as PhotonIndexer

    Note over EM,QP: startup when indexer_url configured
    App->>EM: new(config with indexer_url)
    EM->>QP: Spawn QueueInfoPoller::new(indexer_url, api_key)

    Note over QP,PI: periodic poll loop
    QP->>PI: get_queue_info(config)
    PI-->>QP: QueueInfoResult (queues + slot)
    QP->>QP: distribute_updates() -> mpsc channels to registered trees

    Note over EM,QP: registration flow
    App->>EM: process_light_slot_v2_event(tree_pubkey)
    EM->>QP: ask(RegisterTree{tree_pubkey}).send()
    QP-->>EM: Receiver<QueueUpdateMessage>

    Note over EM,QP: unregister
    App->>EM: unregister_tree(tree_pubkey)
    EM->>QP: ask(UnregisterTree{tree_pubkey}).send()
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~50–70 minutes

Areas needing extra attention:

  • forester/src/polling/queue_poller.rs — actor lifecycle, concurrency, channel/backpressure handling, error paths.
  • forester/src/epoch_manager.rs — ActorRef.ask().send() usage, Clone/Drop semantics, instrumentation updates.
  • sdk-libs/photon-api and sdk-libs/client indexer paths — serde shapes, JSON-RPC model correctness, base58 decoding and slot sync validation.
  • Tests & local validator changes — ensure defaults remain correct after removing grpc_port and removed test logging.

Possibly related PRs

Suggested labels

ai-review

Suggested reviewers

  • SwenSchaeferjohann

Poem

Actors hum where streams once flowed,
Polls wake photons, data showed.
Trees subscribe, then channels sing,
Slot by slot the updates bring. 🌲✨

Pre-merge checks and finishing touches

❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 10.81% which is insufficient. The required threshold is 70.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title clearly and specifically summarizes the main architectural change: replacing gRPC-based queue info with a polling-based approach using actors.
✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch sergey/get_queue_info_poll

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 10

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
forester/src/epoch_manager.rs (1)

1381-1422: Channel closure creates a metrics-spamming tight loop—implement the suggested fix

The analysis is correct. When all senders to queue_update_rx are dropped (e.g., the QueueInfoPoller shuts down), the if let Some(update) block at line 1381 becomes a no-op on every iteration. The loop then repeatedly executes lines 1419–1420 without backoff until the time-based exit condition, hammering your metrics endpoint and wasting CPU.

The suggested refactor to match on the None case and break from the labeled loop is the right approach. Your loop label 'inner_processing_loop exists at line 1352, so the syntax is valid. The fix preserves event-driven processing during healthy operation while preventing the runaway loop once the channel closes.

Implement the suggested patch at lines 1381–1422 to handle channel closure gracefully.

📜 Review details

Configuration used: Path: .coderabbit.yaml

Review profile: ASSERTIVE

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between b09cda4 and 700ab65.

⛔ Files ignored due to path filters (8)
  • Cargo.lock is excluded by !**/*.lock and included by none
  • cli/src/commands/test-validator/index.ts is excluded by none and included by none
  • cli/src/utils/initTestEnv.ts is excluded by none and included by none
  • cli/src/utils/processPhotonIndexer.ts is excluded by none and included by none
  • program-tests/compressed-token-test/tests/v1.rs is excluded by none and included by none
  • program-tests/system-cpi-v2-test/tests/event.rs is excluded by none and included by none
  • scripts/devenv/versions.sh is excluded by none and included by none
  • sdk-tests/client-test/tests/light_client.rs is excluded by none and included by none
📒 Files selected for processing (33)
  • forester/Cargo.toml (1 hunks)
  • forester/build.rs (0 hunks)
  • forester/proto/photon.proto (0 hunks)
  • forester/src/epoch_manager.rs (11 hunks)
  • forester/src/grpc/mod.rs (0 hunks)
  • forester/src/grpc/router.rs (0 hunks)
  • forester/src/lib.rs (1 hunks)
  • forester/src/polling/mod.rs (1 hunks)
  • forester/src/polling/queue_poller.rs (1 hunks)
  • forester/src/work_coordinator.rs (0 hunks)
  • forester/tests/e2e_test.rs (0 hunks)
  • forester/tests/legacy/batched_address_test.rs (0 hunks)
  • forester/tests/legacy/batched_state_async_indexer_test.rs (0 hunks)
  • forester/tests/legacy/batched_state_indexer_test.rs (0 hunks)
  • forester/tests/legacy/batched_state_test.rs (0 hunks)
  • forester/tests/legacy/e2e_test.rs (0 hunks)
  • forester/tests/legacy/e2e_v1_test.rs (0 hunks)
  • forester/tests/test_batch_append_spent.rs (0 hunks)
  • forester/tests/test_compressible_ctoken.rs (0 hunks)
  • sdk-libs/client/src/indexer/indexer_trait.rs (2 hunks)
  • sdk-libs/client/src/indexer/mod.rs (1 hunks)
  • sdk-libs/client/src/indexer/photon_indexer.rs (1 hunks)
  • sdk-libs/client/src/indexer/types.rs (1 hunks)
  • sdk-libs/client/src/lib.rs (0 hunks)
  • sdk-libs/client/src/local_test_validator.rs (0 hunks)
  • sdk-libs/client/src/rpc/indexer.rs (3 hunks)
  • sdk-libs/photon-api/src/apis/default_api.rs (2 hunks)
  • sdk-libs/photon-api/src/models/_get_queue_info_post_200_response.rs (1 hunks)
  • sdk-libs/photon-api/src/models/_get_queue_info_post_200_response_result.rs (1 hunks)
  • sdk-libs/photon-api/src/models/_get_queue_info_post_request.rs (1 hunks)
  • sdk-libs/photon-api/src/models/mod.rs (1 hunks)
  • sdk-libs/program-test/src/indexer/test_indexer.rs (1 hunks)
  • sdk-libs/program-test/src/program_test/indexer.rs (1 hunks)
💤 Files with no reviewable changes (16)
  • forester/tests/legacy/batched_address_test.rs
  • forester/tests/e2e_test.rs
  • forester/tests/test_compressible_ctoken.rs
  • forester/tests/legacy/batched_state_async_indexer_test.rs
  • sdk-libs/client/src/lib.rs
  • forester/build.rs
  • forester/tests/test_batch_append_spent.rs
  • forester/tests/legacy/e2e_v1_test.rs
  • forester/src/grpc/mod.rs
  • forester/proto/photon.proto
  • forester/tests/legacy/batched_state_indexer_test.rs
  • sdk-libs/client/src/local_test_validator.rs
  • forester/tests/legacy/e2e_test.rs
  • forester/src/grpc/router.rs
  • forester/src/work_coordinator.rs
  • forester/tests/legacy/batched_state_test.rs
🧰 Additional context used
🧬 Code graph analysis (10)
sdk-libs/program-test/src/indexer/test_indexer.rs (8)
sdk-libs/client/src/indexer/indexer_trait.rs (1)
  • get_queue_info (206-209)
sdk-libs/client/src/indexer/photon_indexer.rs (1)
  • get_queue_info (1740-1791)
sdk-libs/client/src/rpc/indexer.rs (1)
  • get_queue_info (228-238)
sdk-libs/program-test/src/program_test/indexer.rs (1)
  • get_queue_info (224-234)
sdk-libs/program-test/src/program_test/light_program_test.rs (1)
  • indexer (382-384)
sdk-libs/client/src/rpc/rpc_trait.rs (1)
  • indexer (199-199)
sdk-libs/program-test/src/program_test/rpc.rs (1)
  • indexer (238-240)
sdk-libs/client/src/rpc/client.rs (1)
  • indexer (684-686)
sdk-libs/client/src/indexer/indexer_trait.rs (4)
sdk-libs/client/src/indexer/photon_indexer.rs (1)
  • get_queue_info (1740-1791)
sdk-libs/client/src/rpc/indexer.rs (1)
  • get_queue_info (228-238)
sdk-libs/program-test/src/indexer/test_indexer.rs (1)
  • get_queue_info (868-873)
sdk-libs/program-test/src/program_test/indexer.rs (1)
  • get_queue_info (224-234)
sdk-libs/program-test/src/program_test/indexer.rs (7)
sdk-libs/client/src/indexer/indexer_trait.rs (1)
  • get_queue_info (206-209)
sdk-libs/client/src/indexer/photon_indexer.rs (1)
  • get_queue_info (1740-1791)
sdk-libs/client/src/rpc/indexer.rs (1)
  • get_queue_info (228-238)
sdk-libs/program-test/src/indexer/test_indexer.rs (1)
  • get_queue_info (868-873)
sdk-libs/program-test/src/program_test/light_program_test.rs (1)
  • indexer (382-384)
sdk-libs/program-test/src/program_test/rpc.rs (1)
  • indexer (238-240)
sdk-libs/client/src/rpc/client.rs (1)
  • indexer (684-686)
sdk-libs/photon-api/src/models/_get_queue_info_post_200_response.rs (2)
sdk-libs/client/src/indexer/photon_indexer.rs (2)
  • new (115-126)
  • result (1760-1774)
sdk-libs/photon-api/src/models/_get_queue_info_post_200_response_result.rs (2)
  • new (32-34)
  • new (38-45)
sdk-libs/photon-api/src/models/_get_queue_info_post_request.rs (5)
forester/src/epoch_manager.rs (1)
  • new (138-184)
forester/src/polling/queue_poller.rs (1)
  • new (66-74)
sdk-libs/client/src/indexer/photon_indexer.rs (1)
  • new (115-126)
sdk-libs/photon-api/src/models/_get_queue_info_post_200_response.rs (1)
  • new (24-34)
sdk-libs/photon-api/src/models/_get_queue_info_post_200_response_result.rs (2)
  • new (32-34)
  • new (38-45)
sdk-libs/photon-api/src/models/_get_queue_info_post_200_response_result.rs (3)
forester/src/polling/queue_poller.rs (1)
  • new (66-74)
sdk-libs/client/src/indexer/photon_indexer.rs (1)
  • new (115-126)
sdk-libs/photon-api/src/models/_get_queue_info_post_200_response.rs (1)
  • new (24-34)
sdk-libs/client/src/rpc/indexer.rs (4)
sdk-libs/client/src/indexer/indexer_trait.rs (1)
  • get_queue_info (206-209)
sdk-libs/client/src/indexer/photon_indexer.rs (1)
  • get_queue_info (1740-1791)
sdk-libs/program-test/src/indexer/test_indexer.rs (1)
  • get_queue_info (868-873)
sdk-libs/program-test/src/program_test/indexer.rs (1)
  • get_queue_info (224-234)
sdk-libs/client/src/indexer/photon_indexer.rs (7)
sdk-libs/client/src/indexer/indexer_trait.rs (1)
  • get_queue_info (206-209)
sdk-libs/client/src/rpc/indexer.rs (1)
  • get_queue_info (228-238)
sdk-libs/program-test/src/indexer/test_indexer.rs (2)
  • get_queue_info (868-873)
  • new (1355-1416)
sdk-libs/program-test/src/program_test/indexer.rs (1)
  • get_queue_info (224-234)
sdk-libs/photon-api/src/apis/default_api.rs (1)
  • get_queue_info_post (1752-1789)
sdk-libs/client/src/indexer/base58.rs (1)
  • decode_base58_to_fixed_array (37-48)
program-libs/compressed-account/src/pubkey.rs (1)
  • new_from_array (79-81)
forester/src/epoch_manager.rs (2)
sdk-libs/client/src/indexer/photon_indexer.rs (1)
  • new (115-126)
sdk-libs/client/src/rpc/rpc_trait.rs (2)
  • new (34-42)
  • new (76-78)
forester/src/polling/queue_poller.rs (2)
forester/src/epoch_manager.rs (1)
  • new (138-184)
sdk-libs/client/src/indexer/photon_indexer.rs (2)
  • new (115-126)
  • result (1760-1774)
⏰ Context from checks skipped due to timeout of 900000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (19)
  • GitHub Check: system-programs (sdk-libs, light-sdk-macros light-sdk light-program-test light-client light-compr...
  • GitHub Check: system-programs (anchor & pinocchio, ["cargo-test-sbf -p sdk-anchor-test", "cargo-test-sbf -p sdk...
  • GitHub Check: system-programs (token test, ["cargo-test-sbf -p sdk-token-test"])
  • GitHub Check: system-programs (native, ["cargo-test-sbf -p sdk-native-test", "cargo-test-sbf -p sdk-v1-native-t...
  • GitHub Check: programs (system-cpi-test-v2-functional-account-infos, ["cargo-test-sbf -p system-cpi-v2-test -- ...
  • GitHub Check: programs (system-cpi-test, ["cargo-test-sbf -p system-cpi-test", "cargo test -p light-system-prog...
  • GitHub Check: lint
  • GitHub Check: programs (system-cpi-test-v2-functional-read-only, ["cargo-test-sbf -p system-cpi-v2-test -- func...
  • GitHub Check: Test batched-merkle-tree-simulate
  • GitHub Check: programs (light-system-program-address, ["cargo-test-sbf -p system-test -- test_with_address", "c...
  • GitHub Check: Test program-libs-fast
  • GitHub Check: Test program-libs-slow
  • GitHub Check: programs (compressed-token-and-e2e, ["cargo-test-sbf -p compressed-token-test --test v1", "cargo-...
  • GitHub Check: programs (account-compression-and-registry, ["cargo-test-sbf -p account-compression-test", "cargo...
  • GitHub Check: cli-v1
  • GitHub Check: cli-v2
  • GitHub Check: stateless-js-v2
  • GitHub Check: stateless-js-v1
  • GitHub Check: Forester e2e test
🔇 Additional comments (13)
forester/src/lib.rs (1)

13-13: Polling module export is wired appropriately

Exposing pub mod polling; at the crate root keeps the new queue‑poller functionality discoverable in the same way as the old gRPC/work coordinator modules. No issues with this wiring from an API or layering perspective.

sdk-libs/client/src/indexer/mod.rs (1)

20-22: QueueInfo / QueueInfoResult re-exports are correct and complete

Re‑exporting QueueInfo and QueueInfoResult from types here keeps the indexer public surface coherent and lets downstream code use the new API without reaching into submodules. This matches how other indexer types are exposed.

forester/src/polling/mod.rs (1)

1-5: Nice façade over queue_poller internals

Using pub mod queue_poller; plus a focused pub use keeps the external API (QueueInfoPoller, messages, and counters) clean while allowing you to reorganize internals later without breaking callers. No issues here.

sdk-libs/client/src/indexer/indexer_trait.rs (1)

4-14: New get_queue_info API is well-shaped and consistent with existing Indexer methods

Pulling QueueInfoResult into the type imports and adding get_queue_info(&self, config: Option<IndexerRpcConfig>) -> Result<Response<QueueInfoResult>, IndexerError> fits neatly with the rest of the trait:

  • Read‑only call on &self with the standard IndexerRpcConfig envelope.
  • Return type matches the Photon indexer implementation and the client/RPC wrappers that proxy this method.
  • Doc comment clearly states what’s returned, so consumers have a good mental model.

No issues from an API‑design or correctness perspective.

Also applies to: 204-209

sdk-libs/program-test/src/program_test/indexer.rs (1)

224-235: Delegation for get_queue_info matches existing wrapper patterns

This implementation follows the same guard‑and‑delegate pattern as the rest of the Indexer for LightProgramTest impl: it checks for initialization and then forwards to the inner indexer. The signature and return type are consistent with the trait and the client implementation, so this should “just work” once the underlying indexer supports queue info.

forester/src/polling/queue_poller.rs (1)

76-99: Conversion is correct and intentional—original concern was unfounded

Verification confirms QueueType::from(queue.queue_type as u64) is the intended and safe conversion:

  • Indexer source: queue_type is u8 (from light_client::indexer::QueueInfo)
  • QueueType enum: 5 variants with explicit discriminants 1–5
  • From impl: Maps 1→NullifierV1, 2→AddressV1, 3→InputStateV2, 4→AddressV2, 5→OutputStateV2, and panics on any other value
  • Result: The cast is lossless and 1:1 within the valid range. Rather than silently misclassifying unexpected values, the code intentionally fails fast with a panic, which is the safer behavior.

No changes needed.

sdk-libs/photon-api/src/models/_get_queue_info_post_request.rs (1)

11-20: GetQueueInfoPostRequest model is minimal and appropriate for current usage

The struct and new() constructor align with how get_queue_info_post is invoked (no params yet) and match other OpenAPI‑generated request types in style. No issues from a modeling perspective.

sdk-libs/photon-api/src/models/mod.rs (1)

325-332: Queue info models correctly exposed via models module

The new pub mod and pub use entries for GetQueueInfoPostRequest, GetQueueInfoPost200Response, GetQueueInfoPost200ResponseResult, and QueueInfo are consistent with the rest of the file and give downstream code a clean import surface.

sdk-libs/client/src/indexer/photon_indexer.rs (1)

1740-1791: get_queue_info implementation is consistent with other Photon indexer RPCs

The new method cleanly follows the existing pattern: wraps the call in retry, enforces result.slot >= config.slot, and converts API QueueInfo entries into internal QueueInfo using the shared base58 helpers and Pubkey::new_from_array. Error mapping into IndexerError::PhotonError is also in line with the rest of the file.

If the Photon API ever adds an error field to the 200‑response envelope, consider reusing the existing extract_result_with_error_check helper for uniform error handling, but the current implementation is correct for the present model.

sdk-libs/client/src/rpc/indexer.rs (1)

10-12: RPC facade correctly wires queue APIs through LightClient

Importing QueueElementsResult and QueueInfoResult into the local prelude and updating get_queue_elements’s return type keeps the signatures consistent and avoids overly verbose paths. The new get_queue_info method follows the same delegation + NotInitialized pattern as the other RPCs, so the public surface of LightClient stays uniform.

Also applies to: 204-238

sdk-libs/photon-api/src/apis/default_api.rs (1)

298-305: get_queue_info_post wiring matches the existing generated HTTP helpers

The new GetQueueInfoPostError enum and get_queue_info_post function follow the same pattern as every other endpoint in this file: same status handling, same ResponseContent construction on error, and the usual API‑key + User‑Agent handling. Reusing GetBatchAddressUpdateInfoPost429Response for 429/500 is consistent with the surrounding error enums.

Given the 200‑response model currently has only a result field (no error), this helper will treat any non‑2xx HTTP status as an error and any 2xx body as a successful queue info result. That’s fine as long as the Photon indexer doesn’t send logical JSON‑RPC errors inside 2xx responses for /getQueueInfo.

Also applies to: 1752-1789

sdk-libs/photon-api/src/models/_get_queue_info_post_200_response_result.rs (1)

11-46: QueueInfo result model aligns with downstream consumer expectations

GetQueueInfoPost200ResponseResult and QueueInfo are shaped as expected for the indexer client: queues is a flat list, slot is a simple u64, and queueType/queueSize match the fields the client maps into light_client::indexer::QueueInfo. As long as the backend’s queueType numeric values stay in sync with your QueueType enum, this model should deserialize cleanly.

forester/src/epoch_manager.rs (1)

17-20: QueueInfoPoller integration and V2 tree registration look solid

The new queue_poller: Option<ActorRef<QueueInfoPoller>> field, initialization in EpochManager::new, and per‑tree RegisterTree flow in perform_active_work/process_queue_v2 are all wired in a way that:

  • Only activates polling when indexer_url is configured.
  • Fails soft: if spawning/registering with the poller fails, V2 trees fall back to the existing polling path.
  • Keeps V1 and “compression tree” behavior unchanged.

The additional skip(.., queue_update_rx) annotations on the tracing spans are also a good touch to keep logs readable.

Also applies to: 50-51, 100-114, 136-183, 953-1001

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (2)
forester/src/epoch_manager.rs (2)

972-994: V2 tree tasks spawn even when registration fails, leading to idle workers

When RegisterTree fails (lines 982-986), the code logs a warning but still spawns a processing task for that tree. Later, process_queue_v2 detects the missing channel (lines 1162-1166) and logs repeatedly that it will be "inactive," but the task continues running and consuming the slot schedule.

This wastes resources and produces confusing logs. Consider:

  • Skipping the task spawn entirely when registration fails (continue the loop at line 994), OR
  • Propagating the error and failing the epoch if V2 queue updates are critical to your operation.

If trees can legitimately run without the indexer, document that behavior clearly so operators understand why tasks show as "inactive."


1376-1418: Critical: recv().await blocks indefinitely and can prevent slot-end detection

At line 1377, the code blocks on queue_update_rx.recv().await waiting for a queue update. If no update arrives (e.g., the queue is empty or the poller is slow), the task will hang and never check whether estimated_slot >= forester_slot_details.end_solana_slot (line 1348).

This can cause the forester to miss its slot deadline, fail to exit the loop, and potentially stall the entire tree's processing schedule.

Fix: Use tokio::select! to wait for either:

  • An update from queue_update_rx.recv(), OR
  • A timeout/interval that periodically checks estimated_slot and breaks when the slot has ended.

Example refactor:

  'inner_processing_loop: loop {
+     // Re-check slot end before blocking on recv
      if estimated_slot >= forester_slot_details.end_solana_slot {
          trace!(/* ... */);
          break 'inner_processing_loop;
      }

-     // Wait for queue update from poller
-     if let Some(update) = queue_update_rx.recv().await {
+     // Wait for queue update OR timeout to check slot end
+     let update = tokio::select! {
+         maybe_update = queue_update_rx.recv() => {
+             match maybe_update {
+                 Some(u) => u,
+                 None => {
+                     warn!("Queue update channel closed for tree {}", tree_pubkey);
+                     break 'inner_processing_loop;
+                 }
+             }
+         }
+         _ = tokio::time::sleep(Duration::from_millis(500)) => {
+             // Re-check estimated_slot and continue
+             estimated_slot = self.slot_tracker.estimated_current_slot();
+             continue 'inner_processing_loop;
+         }
+     };
+
+     if update.queue_size > 0 {
          // existing processing logic...
      }

This ensures the task can always exit when the slot ends, even if no updates arrive.

♻️ Duplicate comments (7)
forester/src/polling/queue_poller.rs (7)

37-37: polling_active field is never read—either use it or remove it

The polling_active flag is set to true on construction (line 72) and false on stop (line 60), but it's never checked anywhere. This makes it dead state that suggests an incomplete shutdown mechanism.

Two options:

  1. Use it for graceful shutdown: Clone the Arc<AtomicBool> into polling_loop, check polling_active.load(Ordering::Acquire) each iteration, and break cleanly when false.
  2. Remove it: Delete the field and rely entirely on actor shutdown semantics (when tell().send() fails, the loop exits).

Right now it just adds maintenance overhead without providing any functionality.


228-238: Error log on normal shutdown produces false alarms

When the actor stops gracefully, tell().send() will fail and trigger the error! log at line 234: "Failed to send poll message to actor." This is expected behavior during shutdown, not an error condition, so the log is misleading.

Fix: Change line 234 to:

-                error!("Failed to send poll message to actor: {:?}", e);
+                debug!("Polling loop ending (actor stopped): {:?}", e);

Alternatively, if you implement the polling_active flag check mentioned in the previous comment, you can distinguish intentional shutdown (debug log) from unexpected failure (error log).


103-135: Avoid cloning message and remove closed channels to reduce log spam

At line 114, message.clone() is unnecessary—you can move message into try_send and use info.* fields for logging. Additionally, closed receivers will trigger Closed errors on every poll until unregistered, filling logs with repeated warnings.

Refactor:

  fn distribute_updates(&mut self, queue_infos: Vec<QueueInfo>) {
      for info in queue_infos {
          if let Some(tx) = self.tree_notifiers.get(&info.tree) {
              let message = QueueUpdateMessage {
                  tree: info.tree,
                  queue: info.queue,
                  queue_type: info.queue_type,
                  queue_size: info.queue_size,
                  slot: info.slot,
              };

-             match tx.try_send(message.clone()) {
+             match tx.try_send(message) {
                  Ok(()) => {
                      trace!(
                          "Routed update to tree {}: {} items (type: {:?})",
-                         info.tree, message.queue_size, info.queue_type
+                         info.tree, info.queue_size, info.queue_type
                      );
                  }
                  Err(mpsc::error::TrySendError::Full(_)) => {
                      warn!(/* ... */);
                  }
                  Err(mpsc::error::TrySendError::Closed(_)) => {
-                     debug!("Tree {} channel closed (task likely finished)", info.tree);
+                     debug!("Removing closed channel for tree {}", info.tree);
+                     self.tree_notifiers.remove(&info.tree);
                  }
              }
          }
      }
  }

This requires changing the signature to &mut self to allow removal.


155-164: Double registration silently replaces existing subscription

If RegisterTree is called twice for the same tree, the insert at line 161 overwrites the previous sender, and the original receiver suddenly stops receiving updates. This could be confusing for callers who don't expect their subscription to be replaced.

Consider:

  async fn handle(/* ... */) -> Self::Reply {
      let (tx, rx) = mpsc::channel(100);
-     self.tree_notifiers.insert(msg.tree_pubkey, tx);
+     if let Some(old_tx) = self.tree_notifiers.insert(msg.tree_pubkey, tx) {
+         warn!("Tree {} was already registered; replacing previous subscription", msg.tree_pubkey);
+     }
      debug!("Registered tree {} for queue updates", msg.tree_pubkey);
      rx
  }

Alternatively, return the existing receiver if one exists, or return a Result indicating double registration.


175-182: Log when unregistering a non-existent tree

UnregisterTree silently does nothing if the tree isn't registered. Logging when remove returns None helps detect mismatches (e.g., unregistering before registering, or after the receiver was already dropped).

  async fn handle(/* ... */) -> Self::Reply {
-     self.tree_notifiers.remove(&msg.tree_pubkey);
+     if self.tree_notifiers.remove(&msg.tree_pubkey).is_none() {
+         debug!("Attempted to unregister tree {} but it was not registered", msg.tree_pubkey);
+     } else {
+         debug!("Unregistered tree {}", msg.tree_pubkey);
+     }
-     debug!("Unregistered tree {}", msg.tree_pubkey);
  }

206-219: Short-circuit PollNow when no trees are registered to reduce indexer load

The handler calls poll_queue_info even when tree_notifiers is empty, wasting indexer bandwidth since all results will be discarded in distribute_updates.

Add early exit:

  async fn handle(/* ... */) -> Self::Reply {
+     if self.tree_notifiers.is_empty() {
+         trace!("No trees registered; skipping queue info poll");
+         return;
+     }
+
      match self.poll_queue_info().await {
          Ok(queue_infos) => {
              self.distribute_updates(queue_infos);
          }
          Err(e) => {
              error!("Failed to poll queue info: {:?}", e);
          }
      }
  }

This eliminates unnecessary indexer calls when idle. For further optimization, consider adaptive backoff (increase interval when queues are empty) to reduce steady-state polling load.


23-23: Consider making polling interval configurable

POLLING_INTERVAL_SECS is hard-coded to 1 second. For operational tuning (e.g., adjusting for indexer rate limits, reducing load when queues are empty, or speeding up during high activity), a configurable interval would be helpful.

Suggestion: Add a polling_interval field to ForesterConfig and pass it through to QueueInfoPoller::new. This allows runtime/config-based tuning without code changes.

For now, 1 second is reasonable, but consider this for future operational flexibility.

📜 Review details

Configuration used: Path: .coderabbit.yaml

Review profile: ASSERTIVE

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 700ab65 and c70cf8d.

⛔ Files ignored due to path filters (1)
  • Cargo.lock is excluded by !**/*.lock and included by none
📒 Files selected for processing (7)
  • forester/src/epoch_manager.rs (13 hunks)
  • forester/src/polling/queue_poller.rs (1 hunks)
  • forester/tests/e2e_test.rs (0 hunks)
  • sdk-libs/client/Cargo.toml (1 hunks)
  • sdk-libs/client/src/indexer/photon_indexer.rs (1 hunks)
  • sdk-libs/photon-api/src/models/_get_queue_info_post_200_response.rs (1 hunks)
  • sdk-libs/photon-api/src/models/_get_queue_info_post_request.rs (1 hunks)
💤 Files with no reviewable changes (1)
  • forester/tests/e2e_test.rs
🧰 Additional context used
🧬 Code graph analysis (5)
forester/src/epoch_manager.rs (2)
forester/src/polling/queue_poller.rs (1)
  • new (66-74)
sdk-libs/client/src/indexer/photon_indexer.rs (1)
  • new (115-126)
sdk-libs/client/src/indexer/photon_indexer.rs (6)
sdk-libs/client/src/indexer/indexer_trait.rs (1)
  • get_queue_info (206-209)
sdk-libs/client/src/rpc/indexer.rs (1)
  • get_queue_info (228-238)
sdk-libs/program-test/src/program_test/indexer.rs (1)
  • get_queue_info (224-234)
sdk-libs/photon-api/src/apis/default_api.rs (1)
  • get_queue_info_post (1752-1789)
sdk-libs/client/src/indexer/base58.rs (1)
  • decode_base58_to_fixed_array (37-48)
program-libs/compressed-account/src/pubkey.rs (1)
  • new_from_array (79-81)
sdk-libs/photon-api/src/models/_get_queue_info_post_request.rs (3)
sdk-libs/client/src/indexer/photon_indexer.rs (1)
  • new (115-126)
sdk-libs/photon-api/src/models/_get_queue_info_post_200_response.rs (1)
  • new (35-46)
sdk-libs/photon-api/src/models/_get_queue_info_post_200_response_result.rs (2)
  • new (32-34)
  • new (38-45)
forester/src/polling/queue_poller.rs (5)
forester/src/epoch_manager.rs (1)
  • new (138-184)
sdk-libs/client/src/indexer/photon_indexer.rs (1)
  • new (115-126)
sdk-libs/photon-api/src/models/_get_queue_info_post_200_response.rs (1)
  • new (35-46)
sdk-libs/photon-api/src/models/_get_queue_info_post_request.rs (1)
  • new (27-34)
sdk-libs/photon-api/src/models/_get_queue_info_post_200_response_result.rs (2)
  • new (32-34)
  • new (38-45)
sdk-libs/photon-api/src/models/_get_queue_info_post_200_response.rs (1)
sdk-libs/photon-api/src/models/_get_queue_info_post_200_response_result.rs (2)
  • new (32-34)
  • new (38-45)
⏰ Context from checks skipped due to timeout of 900000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (8)
  • GitHub Check: Test program-libs-fast
  • GitHub Check: programs (system-cpi-test, ["cargo-test-sbf -p system-cpi-test", "cargo test -p light-system-prog...
  • GitHub Check: Test batched-merkle-tree-simulate
  • GitHub Check: Test program-libs-slow
  • GitHub Check: programs (system-cpi-test-v2-functional-read-only, ["cargo-test-sbf -p system-cpi-v2-test -- func...
  • GitHub Check: programs (light-system-program-compression, ["cargo-test-sbf -p system-test -- test_with_compress...
  • GitHub Check: programs (system-cpi-test-v2-functional-account-infos, ["cargo-test-sbf -p system-cpi-v2-test -- ...
  • GitHub Check: Forester e2e test
🔇 Additional comments (6)
sdk-libs/client/src/indexer/photon_indexer.rs (2)

1775-1776: LGTM: Base58 decoding with appropriate error handling

The use of decode_base58_to_fixed_array for both tree and queue properly handles decoding errors via the ? operator. The error will propagate as IndexerError::InvalidResponseData if the base58 string is malformed or the wrong length.


1767-1769: Verify slot staleness check is sufficient for your use case

The slot check if api_response.slot < config.slot only triggers when an explicit config.slot is provided. If callers omit the config or don't set a minimum slot, stale data could be returned without warning.

If queue info staleness is critical for correctness (e.g., for epoch scheduling), consider:

  • Logging when the returned slot is significantly behind the current slot, or
  • Documenting that callers must provide a slot threshold if they need freshness guarantees.

This is consistent with other methods in the file, so it's more of a usage note than a defect.

sdk-libs/photon-api/src/models/_get_queue_info_post_200_response.rs (1)

11-47: LGTM: JSON-RPC 2.0 response structure is well-defined

The response model now includes the error field (addressing the earlier review concern), and both result and error are properly optional. The JsonRpcError structure follows JSON-RPC 2.0 conventions with code, message, and optional data.

The constructor defaults error to None, which is appropriate for success responses. Deserialization will handle cases where the server returns an error instead of a result.

sdk-libs/photon-api/src/models/_get_queue_info_post_request.rs (1)

11-64: LGTM: Request model is correctly structured for parameterless JSON-RPC call

The request model properly represents a JSON-RPC 2.0 call to getQueueInfo:

  • Empty GetQueueInfoParams struct correctly indicates the method takes no parameters
  • Type-safe enums for jsonrpc (version) and method (name) prevent typos
  • Constructor properly defaults params to an empty struct

This follows standard JSON-RPC patterns and matches the usage in the PhotonIndexer implementation.

forester/src/epoch_manager.rs (2)

150-167: LGTM: QueueInfoPoller initialization with appropriate fallback

The initialization properly:

  • Spawns the actor only when indexer_url is configured
  • Logs the startup and fallback cases clearly
  • Uses Option<ActorRef<QueueInfoPoller>> to handle the optional indexer integration

The None fallback when indexer_url is not set allows the forester to run without V2 queue updates, which is appropriate for backward compatibility or testing scenarios.


1152-1166: Appropriate fallback when queue update channel is unavailable

The code correctly handles missing queue_update_rx by logging a warning and returning early. This prevents V2 trees from attempting to process without queue updates, which would be incorrect.

The warning message clearly explains the situation: "No queue update channel available for V2 tree ... Processing will be inactive."

This is consistent with the registration failure handling elsewhere in the file.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

♻️ Duplicate comments (1)
forester/src/polling/queue_poller.rs (1)

104-136: Remove unnecessary clone and clean up closed senders.

Two related issues:

  1. Line 115: message.clone() is unnecessary—you can move message into try_send and still log using the fields from info (which are already copied into message).

  2. Lines 130–132: When a receiver is closed, you log at trace! but never remove the sender from tree_notifiers. This means every subsequent poll will attempt to send to a closed channel and log again, creating noise and wasting cycles.

Apply this refactor to eliminate the clone and clean up closed channels:

-    fn distribute_updates(&self, queue_infos: Vec<QueueInfo>) {
+    fn distribute_updates(&mut self, queue_infos: Vec<QueueInfo>) {
+        let mut closed_trees = Vec::new();
+
         for info in queue_infos {
             if let Some(tx) = self.tree_notifiers.get(&info.tree) {
                 let message = QueueUpdateMessage {
                     tree: info.tree,
                     queue: info.queue,
                     queue_type: info.queue_type,
                     queue_size: info.queue_size,
                     slot: info.slot,
                 };
 
-                match tx.try_send(message.clone()) {
+                match tx.try_send(message) {
                     Ok(()) => {
                         trace!(
                             "Routed update to tree {}: {} items (type: {:?})",
                             info.tree,
-                            message.queue_size,
+                            info.queue_size,
                             info.queue_type
                         );
                     }
                     Err(mpsc::error::TrySendError::Full(_)) => {
                         warn!(
                             "Tree {} channel full, dropping update (tree processing slower than updates)",
                             info.tree
                         );
                     }
                     Err(mpsc::error::TrySendError::Closed(_)) => {
-                        trace!("Tree {} channel closed (task likely finished)", info.tree);
+                        debug!("Tree {} channel closed (task likely finished), will unregister", info.tree);
+                        closed_trees.push(info.tree);
                     }
                 }
             }
         }
+
+        // Remove closed senders
+        for tree in closed_trees {
+            self.tree_notifiers.remove(&tree);
+        }
     }

This also requires changing the signature to &mut self since we're now mutating tree_notifiers. The PollNow handler already has &mut self, so this is safe.

📜 Review details

Configuration used: Path: .coderabbit.yaml

Review profile: ASSERTIVE

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between c70cf8d and 594d3b2.

⛔ Files ignored due to path filters (1)
  • Cargo.lock is excluded by !**/*.lock and included by none
📒 Files selected for processing (2)
  • forester/src/polling/queue_poller.rs (1 hunks)
  • sdk-libs/client/Cargo.toml (0 hunks)
💤 Files with no reviewable changes (1)
  • sdk-libs/client/Cargo.toml
🧰 Additional context used
🧬 Code graph analysis (1)
forester/src/polling/queue_poller.rs (3)
sdk-libs/client/src/indexer/photon_indexer.rs (1)
  • new (115-126)
forester/src/epoch_manager.rs (1)
  • new (138-184)
sdk-libs/photon-api/src/models/_get_queue_info_post_200_response_result.rs (2)
  • new (32-34)
  • new (38-45)
⏰ Context from checks skipped due to timeout of 900000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (10)
  • GitHub Check: Test batched-merkle-tree-simulate
  • GitHub Check: Test program-libs-fast
  • GitHub Check: Test program-libs-slow
  • GitHub Check: Forester e2e test
  • GitHub Check: programs (system-cpi-test-v2-functional-read-only, ["cargo-test-sbf -p system-cpi-v2-test -- func...
  • GitHub Check: programs (system-cpi-test-v2-functional-account-infos, ["cargo-test-sbf -p system-cpi-v2-test -- ...
  • GitHub Check: programs (account-compression-and-registry, ["cargo-test-sbf -p account-compression-test", "cargo...
  • GitHub Check: programs (compressed-token-and-e2e, ["cargo-test-sbf -p compressed-token-test --test v1", "cargo-...
  • GitHub Check: programs (light-system-program-compression, ["cargo-test-sbf -p system-test -- test_with_compress...
  • GitHub Check: programs (system-cpi-test, ["cargo-test-sbf -p system-cpi-test", "cargo test -p light-system-prog...
🔇 Additional comments (5)
forester/src/polling/queue_poller.rs (5)

163-173: Good handling of double registration.

The warning log and explicit drop(old_sender) clearly communicate the behavior when a tree is registered multiple times. This addresses the concern from past reviews and makes the subscription lifecycle more explicit.

Optional enhancement: Consider making the channel buffer size (line 161) configurable alongside POLLING_INTERVAL_SECS if you expect different workloads or tree update rates.


192-202: Good handling of unregistration edge cases.

Checking the remove return value and logging when attempting to unregister a non-existent tree makes the subscription lifecycle more observable and easier to debug. This addresses the past review concern.


232-235: Good optimization: early-exit when no subscribers.

Skipping the indexer poll when tree_notifiers is empty avoids unnecessary load. This addresses the past review concern about wasted indexer traffic when idle.


248-279: Polling loop implementation looks solid.

Several good practices:

  • Line 252: MissedTickBehavior::Skip prevents tick buildup if processing is slow, which is correct for this use case.
  • Lines 256, 264: The double check of polling_active (before and after the tick) ensures responsive shutdown while being defensive about the timing of the stop signal.
  • Lines 269–274: The error log is appropriate here—since polling_active is checked before attempting to send, reaching this error means the actor has stopped unexpectedly (not a clean shutdown), so logging at error! level is justified.

The past review concern about noisy shutdown logs has been addressed by the polling_active checks that break the loop cleanly before attempting to send to a stopped actor.


89-89: The u8u64 conversion is safe but indirect; a From<u8> implementation would be more idiomatic.

The API returns queue_type as u8 (confirmed in git/sdk-libs/photon-api/src/models/_get_queue_info_post_200_response_result.rs), which is cast to u64 before calling QueueType::from. The From<u64> implementation exists (in program-libs/compressed-account/src/lib.rs:152), but no From<u8> implementation was found.

The cast itself is safe—u8 to u64 is lossless—so the code works correctly. However, it's unnecessarily indirect. Since the source is always u8, implementing From<u8> for QueueType would eliminate the intermediate cast and make the intent clearer. Consider adding that trait implementation to the QueueType definition if it's a frequent conversion pattern across the codebase.

@sergeytimoshin sergeytimoshin force-pushed the sergey/get_queue_info_poll branch from 6513182 to f2549c3 Compare November 22, 2025 01:02
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
forester/src/epoch_manager.rs (1)

953-994: V2 tree registration logic is correct, but failed registrations may be silent.

The code properly uses poller.ask(RegisterTree).send().await to obtain the queue update receiver. However, if registration fails (lines 981-986), processing continues with queue_update_rx = None, which means that tree will silently not process any V2 updates.

Consider whether failed registration warrants more visibility:

                        Err(e) => {
-                            warn!(
+                            error!(
                                "Failed to register tree {} with poller: {:?}",
                                tree.tree_accounts.merkle_tree, e
                            );
                            None
                        }

This makes registration failures more prominent in logs since they represent a degraded operational state for that tree.

♻️ Duplicate comments (4)
sdk-libs/client/src/indexer/types.rs (1)

32-44: QueueInfo / QueueInfoResult match the Photon queue-info payloads

The field set (tree, queue, type, size, plus top-level queues + slot) and derives are appropriate for client use and align with how PhotonIndexer::get_queue_info populates them. The earlier comment about slot being duplicated between Response.context and QueueInfoResult::slot still applies but doesn’t block correctness.

sdk-libs/program-test/src/indexer/test_indexer.rs (1)

868-873: Avoid unimplemented! in TestIndexer::get_queue_info to prevent panics in tests

Right now this method will panic whenever a test (directly or via LightProgramTest/LightClient) calls get_queue_info, which makes failures noisy and harder to debug compared to a structured IndexerError.

You can keep the trait satisfied while returning a clear, non-retryable error, e.g.:

-    async fn get_queue_info(
-        &self,
-        _config: Option<IndexerRpcConfig>,
-    ) -> Result<Response<light_client::indexer::QueueInfoResult>, IndexerError> {
-        unimplemented!("get_queue_info")
-    }
+    async fn get_queue_info(
+        &self,
+        _config: Option<IndexerRpcConfig>,
+    ) -> Result<Response<light_client::indexer::QueueInfoResult>, IndexerError> {
+        Err(IndexerError::NotImplemented(
+            "get_queue_info is not implemented for TestIndexer".into(),
+        ))
+    }

This keeps the contract explicit and avoids crashing test processes when the API starts being used.

forester/Cargo.toml (1)

62-62: kameo version 0.19 does not exist—use 0.17.x or pin a git revision.

As flagged in the previous review, kameo 0.19 is not published on crates.io. The latest stable release is 0.17.2. This will cause cargo build to fail.

Apply one of these fixes:

Option 1: Use the latest stable release

-kameo = "0.19"
+kameo = "0.17"

Option 2: Pin to a specific git commit if you need unreleased features

-kameo = "0.19"
+kameo = { git = "https://github.com/tqwewe/kameo", rev = "COMMIT_HASH" }

After updating, run cargo update and cargo check to verify compatibility with your Tokio version (kameo 0.17.x requires Tokio ^1.44) and MSRV (1.85.1).

Based on learnings (previous review).

forester/src/polling/queue_poller.rs (1)

105-137: Consider removing closed senders to avoid repeated logs.

The method correctly routes updates to registered trees. However, when a receiver is closed (line 131), the sender remains in tree_notifiers, causing a trace log on every subsequent update until explicit unregistration.

For cleaner behavior, consider removing closed senders immediately:

 fn distribute_updates(&self, queue_infos: Vec<QueueInfo>) {
+    let mut closed_trees = Vec::new();
     for info in queue_infos {
         if let Some(tx) = self.tree_notifiers.get(&info.tree) {
             // ... existing code ...
             match tx.try_send(message.clone()) {
                 // ... existing cases ...
                 Err(mpsc::error::TrySendError::Closed(_)) => {
                     trace!("Tree {} channel closed (task likely finished)", info.tree);
+                    closed_trees.push(info.tree);
                 }
             }
         }
     }
+    // Note: This requires &mut self, so you'd need to change the signature or use interior mutability
 }

Alternatively, change the signature to fn distribute_updates(&mut self, ...) and remove closed senders inline. This is a minor optimization; current behavior is functionally correct.

📜 Review details

Configuration used: Path: .coderabbit.yaml

Review profile: ASSERTIVE

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 594d3b2 and f2549c3.

⛔ Files ignored due to path filters (8)
  • Cargo.lock is excluded by !**/*.lock and included by none
  • cli/src/commands/test-validator/index.ts is excluded by none and included by none
  • cli/src/utils/initTestEnv.ts is excluded by none and included by none
  • cli/src/utils/processPhotonIndexer.ts is excluded by none and included by none
  • program-tests/compressed-token-test/tests/v1.rs is excluded by none and included by none
  • program-tests/system-cpi-v2-test/tests/event.rs is excluded by none and included by none
  • scripts/devenv/versions.sh is excluded by none and included by none
  • sdk-tests/client-test/tests/light_client.rs is excluded by none and included by none
📒 Files selected for processing (34)
  • forester/Cargo.toml (1 hunks)
  • forester/build.rs (0 hunks)
  • forester/proto/photon.proto (0 hunks)
  • forester/src/epoch_manager.rs (13 hunks)
  • forester/src/grpc/mod.rs (0 hunks)
  • forester/src/grpc/router.rs (0 hunks)
  • forester/src/lib.rs (1 hunks)
  • forester/src/polling/mod.rs (1 hunks)
  • forester/src/polling/queue_poller.rs (1 hunks)
  • forester/src/work_coordinator.rs (0 hunks)
  • forester/tests/e2e_test.rs (0 hunks)
  • forester/tests/legacy/batched_address_test.rs (0 hunks)
  • forester/tests/legacy/batched_state_async_indexer_test.rs (0 hunks)
  • forester/tests/legacy/batched_state_indexer_test.rs (0 hunks)
  • forester/tests/legacy/batched_state_test.rs (0 hunks)
  • forester/tests/legacy/e2e_test.rs (0 hunks)
  • forester/tests/legacy/e2e_v1_test.rs (0 hunks)
  • forester/tests/test_batch_append_spent.rs (0 hunks)
  • forester/tests/test_compressible_ctoken.rs (0 hunks)
  • sdk-libs/client/Cargo.toml (0 hunks)
  • sdk-libs/client/src/indexer/indexer_trait.rs (2 hunks)
  • sdk-libs/client/src/indexer/mod.rs (1 hunks)
  • sdk-libs/client/src/indexer/photon_indexer.rs (1 hunks)
  • sdk-libs/client/src/indexer/types.rs (1 hunks)
  • sdk-libs/client/src/lib.rs (0 hunks)
  • sdk-libs/client/src/local_test_validator.rs (0 hunks)
  • sdk-libs/client/src/rpc/indexer.rs (3 hunks)
  • sdk-libs/photon-api/src/apis/default_api.rs (2 hunks)
  • sdk-libs/photon-api/src/models/_get_queue_info_post_200_response.rs (1 hunks)
  • sdk-libs/photon-api/src/models/_get_queue_info_post_200_response_result.rs (1 hunks)
  • sdk-libs/photon-api/src/models/_get_queue_info_post_request.rs (1 hunks)
  • sdk-libs/photon-api/src/models/mod.rs (1 hunks)
  • sdk-libs/program-test/src/indexer/test_indexer.rs (1 hunks)
  • sdk-libs/program-test/src/program_test/indexer.rs (1 hunks)
💤 Files with no reviewable changes (17)
  • sdk-libs/client/src/lib.rs
  • forester/tests/legacy/batched_state_indexer_test.rs
  • forester/tests/test_batch_append_spent.rs
  • forester/tests/e2e_test.rs
  • forester/tests/legacy/e2e_v1_test.rs
  • forester/tests/legacy/batched_state_async_indexer_test.rs
  • forester/tests/legacy/batched_state_test.rs
  • forester/build.rs
  • forester/src/grpc/mod.rs
  • sdk-libs/client/src/local_test_validator.rs
  • forester/tests/legacy/e2e_test.rs
  • forester/tests/test_compressible_ctoken.rs
  • forester/src/work_coordinator.rs
  • forester/tests/legacy/batched_address_test.rs
  • forester/src/grpc/router.rs
  • forester/proto/photon.proto
  • sdk-libs/client/Cargo.toml
🧰 Additional context used
🧬 Code graph analysis (10)
sdk-libs/client/src/indexer/indexer_trait.rs (4)
sdk-libs/client/src/indexer/photon_indexer.rs (1)
  • get_queue_info (1740-1798)
sdk-libs/client/src/rpc/indexer.rs (1)
  • get_queue_info (228-238)
sdk-libs/program-test/src/indexer/test_indexer.rs (1)
  • get_queue_info (868-873)
sdk-libs/program-test/src/program_test/indexer.rs (1)
  • get_queue_info (224-234)
sdk-libs/program-test/src/indexer/test_indexer.rs (8)
sdk-libs/client/src/indexer/indexer_trait.rs (1)
  • get_queue_info (206-209)
sdk-libs/client/src/indexer/photon_indexer.rs (1)
  • get_queue_info (1740-1798)
sdk-libs/client/src/rpc/indexer.rs (1)
  • get_queue_info (228-238)
sdk-libs/program-test/src/program_test/indexer.rs (1)
  • get_queue_info (224-234)
sdk-libs/program-test/src/program_test/light_program_test.rs (1)
  • indexer (382-384)
sdk-libs/client/src/rpc/rpc_trait.rs (1)
  • indexer (199-199)
sdk-libs/client/src/rpc/client.rs (1)
  • indexer (684-686)
sdk-libs/program-test/src/program_test/rpc.rs (1)
  • indexer (238-240)
sdk-libs/client/src/indexer/photon_indexer.rs (4)
sdk-libs/client/src/indexer/indexer_trait.rs (1)
  • get_queue_info (206-209)
sdk-libs/client/src/rpc/indexer.rs (1)
  • get_queue_info (228-238)
sdk-libs/client/src/indexer/base58.rs (1)
  • decode_base58_to_fixed_array (37-48)
program-libs/compressed-account/src/pubkey.rs (1)
  • new_from_array (79-81)
sdk-libs/client/src/rpc/indexer.rs (4)
sdk-libs/client/src/indexer/indexer_trait.rs (1)
  • get_queue_info (206-209)
sdk-libs/client/src/indexer/photon_indexer.rs (1)
  • get_queue_info (1740-1798)
sdk-libs/program-test/src/indexer/test_indexer.rs (1)
  • get_queue_info (868-873)
sdk-libs/program-test/src/program_test/indexer.rs (1)
  • get_queue_info (224-234)
sdk-libs/photon-api/src/models/_get_queue_info_post_200_response_result.rs (1)
sdk-libs/photon-api/src/models/_get_queue_info_post_200_response.rs (1)
  • new (35-46)
sdk-libs/photon-api/src/models/_get_queue_info_post_200_response.rs (2)
sdk-libs/photon-api/src/models/_get_queue_info_post_200_response_result.rs (2)
  • new (32-34)
  • new (38-45)
sdk-libs/photon-api/src/models/_get_queue_info_post_request.rs (1)
  • new (27-34)
sdk-libs/program-test/src/program_test/indexer.rs (5)
sdk-libs/client/src/indexer/indexer_trait.rs (1)
  • get_queue_info (206-209)
sdk-libs/client/src/indexer/photon_indexer.rs (1)
  • get_queue_info (1740-1798)
sdk-libs/program-test/src/indexer/test_indexer.rs (1)
  • get_queue_info (868-873)
sdk-libs/program-test/src/program_test/light_program_test.rs (1)
  • indexer (382-384)
sdk-libs/program-test/src/program_test/rpc.rs (1)
  • indexer (238-240)
forester/src/epoch_manager.rs (1)
sdk-libs/client/src/indexer/photon_indexer.rs (1)
  • new (115-126)
sdk-libs/photon-api/src/models/_get_queue_info_post_request.rs (2)
sdk-libs/photon-api/src/models/_get_queue_info_post_200_response.rs (1)
  • new (35-46)
sdk-libs/photon-api/src/models/_get_queue_info_post_200_response_result.rs (2)
  • new (32-34)
  • new (38-45)
forester/src/polling/queue_poller.rs (4)
forester/src/epoch_manager.rs (1)
  • new (138-184)
sdk-libs/client/src/indexer/photon_indexer.rs (1)
  • new (115-126)
sdk-libs/photon-api/src/models/_get_queue_info_post_200_response.rs (1)
  • new (35-46)
sdk-libs/photon-api/src/models/_get_queue_info_post_200_response_result.rs (2)
  • new (32-34)
  • new (38-45)
⏰ Context from checks skipped due to timeout of 900000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
  • GitHub Check: programs (compressed-token-and-e2e, ["cargo-test-sbf -p compressed-token-test --test v1", "cargo-...
  • GitHub Check: Test batched-merkle-tree-simulate
🔇 Additional comments (30)
sdk-libs/client/src/indexer/mod.rs (1)

16-23: Re-export wiring for QueueInfo / QueueInfoResult looks correct

The new queue-related types are exported alongside the rest of the indexer surface, so downstream crates can use them without extra imports; no issues here.

sdk-libs/program-test/src/program_test/indexer.rs (1)

224-235: LightProgramTest::get_queue_info delegation is consistent with existing indexer methods

The method correctly guards against NotInitialized and forwards config and the response from the inner indexer, matching the pattern of other delegated calls.

sdk-libs/client/src/indexer/indexer_trait.rs (2)

4-14: Import of QueueInfoResult into the trait module is correct

The new type is pulled in alongside other indexer value types, allowing the trait method to use it without fully-qualified paths.


204-209: New get_queue_info trait method is well-shaped

The async signature and documentation match the Photon/indexer implementations and queue-info types (tree, queue, type, size), so implementors have a clear contract to follow.

sdk-libs/photon-api/src/models/mod.rs (1)

325-332: Queue-info Photon models are wired into models::mod consistently

The new GetQueueInfoPost* modules and QueueInfo re-export follow the same pattern as other API endpoints, so downstream uses in photon_indexer will resolve cleanly.

sdk-libs/client/src/indexer/photon_indexer.rs (1)

1740-1798: PhotonIndexer::get_queue_info correctly adapts Photon API to client types

The method cleanly follows the existing retry pattern, enforces config.slot freshness, decodes base58 tree/queue fields into Pubkeys, and wraps everything in QueueInfoResult inside a Response; this is consistent with the rest of the indexer surface.

sdk-libs/client/src/rpc/indexer.rs (3)

5-12: Indexer RPC import list correctly includes queue-related types

Pulling QueueElementsResult and QueueInfoResult into scope simplifies signatures and keeps all indexer value types centralized here.


204-213: get_queue_elements return type refactor is purely cosmetic

Switching from Response<crate::indexer::QueueElementsResult> to Response<QueueElementsResult> just leverages the new import; it doesn’t alter behavior or API shape.


228-238: LightClient RPC now exposes get_queue_info consistently

The new method mirrors other RPC bridge methods: it guards against NotInitialized and forwards config and the inner indexer’s Response<QueueInfoResult> unchanged, which is exactly what callers would expect.

sdk-libs/photon-api/src/models/_get_queue_info_post_200_response.rs (1)

13-47: LGTM - Previous concern about error field has been addressed

The response model now properly includes both result and error fields (line 27, 29), matching the JSON-RPC 2.0 pattern used by other endpoints. The JsonRpcError struct is correctly defined with standard JSON-RPC error fields. The constructor provides a convenient happy-path builder while still allowing direct field access for error cases.

sdk-libs/photon-api/src/apis/default_api.rs (2)

298-305: LGTM - Error enum follows established pattern

The GetQueueInfoPostError enum is consistent with all other endpoint error enums in this file, using the same variants and structure.


1752-1790: LGTM - Implementation follows established patterns

The get_queue_info_post function correctly implements the standard pattern used by all other POST endpoints in this file: proper configuration handling, User-Agent header attachment, request body serialization, and consistent error response handling.

sdk-libs/photon-api/src/models/_get_queue_info_post_200_response_result.rs (1)

11-46: LGTM - Queue info models are well-structured

The data models appropriately represent queue information:

  • GetQueueInfoPost200ResponseResult correctly bundles queue data with slot number
  • QueueInfo fields have appropriate types (u8 for queue_type, u64 for sizes)
  • Serde rename attributes properly map to camelCase JSON field names
  • Constructors provide convenient initialization
sdk-libs/photon-api/src/models/_get_queue_info_post_request.rs (1)

11-64: LGTM - JSON-RPC request model is correctly structured

The request model properly implements JSON-RPC 2.0 structure:

  • Empty GetQueueInfoParams (line 37-38) is a valid pattern when the endpoint requires no parameters
  • Constructor appropriately defaults params (line 32) since it's empty
  • Jsonrpc and Method enums enforce valid values at the type level
  • Default implementations provide sensible values ("2.0" for jsonrpc, GetQueueInfo for method)
forester/src/lib.rs (1)

13-13: LGTM! Clean architectural boundary for the new polling subsystem.

The new polling module properly replaces the removed grpc and work_coordinator modules, providing a clear API boundary for the actor-based queue polling mechanism.

forester/src/polling/mod.rs (1)

1-5: LGTM! Clean module structure with well-defined public API.

The re-exports provide a clear interface for the polling subsystem: the actor (QueueInfoPoller), the update message (QueueUpdateMessage), and the three command messages (RegisterTree, UnregisterTree, RegisteredTreeCount).

forester/src/epoch_manager.rs (6)

17-17: LGTM! Imports correctly reflect the architectural migration.

The new imports properly replace the gRPC-based coordination with actor-based polling:

  • kameo::actor::{ActorRef, Spawn} for actor lifecycle management
  • polling::{QueueInfoPoller, QueueUpdateMessage, RegisterTree} for the new polling API

Also applies to: 50-50


112-112: LGTM! Field type and Clone implementation are correct.

The queue_poller field properly replaces the coordinator, and ActorRef<QueueInfoPoller> is the correct handle type for the kameo actor. The Option wrapper appropriately handles cases where indexer_url is not configured.

Also applies to: 130-130


1107-1107: LGTM! Unified processing function with optional event-driven updates.

The queue_update_rx parameter correctly enables V2 trees to receive event-driven queue updates while maintaining compatibility with V1 trees (which pass None).

Also applies to: 1111-1117


1152-1166: Clarify expected behavior when V2 trees lack queue update channels.

When queue_update_rx is None (lines 1162-1166), the code logs a warning and returns Ok(()), meaning the tree won't process any queue items during its assigned slots. This occurs when:

  1. No indexer URL is configured, or
  2. Tree registration with the poller failed

Is this the intended behavior, or should these trees fall back to periodic polling of the queue? Currently, a configuration issue or transient registration failure causes permanent inactivity for that tree in that epoch.

Consider documenting this behavior or implementing a fallback:

} else {
    warn!("No queue update channel for V2 tree {}. Falling back to periodic polling.",
        tree_schedule.tree_accounts.merkle_tree
    );
    // Optionally: implement fallback polling logic here
    Ok(())
}

1318-1318: LGTM! V2 event-driven processing properly integrated.

The signature correctly takes queue_update_rx: &mut mpsc::Receiver<QueueUpdateMessage>, and the initialization properly waits for the slot boundary before beginning processing.

Also applies to: 1321-1328


150-167: The original review comment is based on an incorrect assumption about kameo's API.

After verification, kameo's spawn() method returns an ActorRef directly—not a Result. The actor spawn function returns an actor reference (an ActorRef), not a Result. The code at lines 150–167 is correct as written and requires no error handling.

The QueueInfoPoller struct implements kameo's Actor trait, and when you call spawn() on an actor, it either succeeds and returns an ActorRef, or the function itself would panic (not return an error). The suggested match-based error handling is unnecessary and doesn't align with kameo's design.

Likely an incorrect or invalid review comment.

forester/src/polling/queue_poller.rs (8)

25-32: LGTM! QueueUpdateMessage is well-structured for queue updates.

The message type includes all necessary fields: tree and queue pubkeys, queue type, size, and slot. This provides complete context for processing decisions.


34-38: LGTM! Actor state properly encapsulates dependencies.

The structure holds the indexer client, registered tree subscriptions, and a shutdown signal. The Arc<AtomicBool> for polling_active is now properly used for coordinated shutdown (as verified in the polling loop).


40-65: LGTM! Actor lifecycle properly implements coordinated shutdown.

The on_start spawns the polling loop with the shared polling_active flag, and on_stop correctly uses Ordering::Release (line 62) to synchronize with the Ordering::Acquire loads in the polling loop. This ensures the shutdown signal is visible to the spawned task.


67-76: LGTM! Constructor properly initializes the actor state.

The indexer URL is correctly formatted with the /v1 suffix for the Photon API, and the initial state is properly set up.


78-103: LGTM! Queue info polling with appropriate error logging.

The method correctly fetches queue info from the indexer and transforms it into internal types. Error logging is at error! level (line 99), which is appropriate for indexer communication failures.


149-178: LGTM! Double registration properly handled with clear warnings.

The handler correctly detects when a tree is already registered (lines 165-171), logs a warning, and explicitly drops the old sender to close the previous receiver. This makes the behavior predictable and debuggable.


180-205: LGTM! Unregistration includes proper validation.

The handler checks whether the tree was actually registered (lines 194-203) and logs a warning for mismatches, helping detect potential bugs in registration/unregistration coordination.


222-247: LGTM! PollNow handler includes optimization for empty subscriptions.

The early exit at lines 233-236 avoids unnecessary indexer calls when no trees are registered, reducing load on both the poller and the indexer service.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

♻️ Duplicate comments (4)
forester/Cargo.toml (1)

62-62: kameo 0.19 does not exist on crates.io

As flagged in the previous review, kameo version 0.19 is not published on crates.io. The latest available version is 0.17.2. You'll need to either:

  • Use kameo = "0.17" or kameo = "0.17.2" (latest stable), or
  • Pin to a Git commit if you require unreleased features: kameo = { git = "https://github.com/tqwewe/kameo", rev = "..." }
sdk-libs/program-test/src/indexer/test_indexer.rs (1)

868-873: Replace unimplemented!() with a structured error

The unimplemented!() macro will panic at runtime, which creates noisy test failures. In a test harness, prefer returning a clear error:

async fn get_queue_info(
    &self,
    _config: Option<IndexerRpcConfig>,
) -> Result<Response<light_client::indexer::QueueInfoResult>, IndexerError> {
    Err(IndexerError::Unknown(
        "get_queue_info is not implemented for TestIndexer".into(),
    ))
}

This gives callers an actionable, non-panicking signal.

forester/src/polling/queue_poller.rs (2)

23-76: Consider making the polling interval configurable rather than hard‑coded

POLLING_INTERVAL_SECS: u64 = 1 is baked into the actor and polling_loop. Depending on indexer latency, rate limits, and deployment characteristics, a fixed 1s poll may be either too aggressive or too slow. Threading a Duration (or raw u64) from configuration into QueueInfoPoller::new and using it in interval(Duration::from_secs(...)) would make this tunable without code changes.

Also applies to: 249-280


105-137: Avoid cloning QueueUpdateMessage and consider pruning closed senders

distribute_updates currently clones each QueueUpdateMessage before try_send, and on TrySendError::Closed(_) it logs but leaves the closed sender in tree_notifiers. That means:

  • Extra per-update allocation/copy via message.clone().
  • Future updates for that tree keep hitting a closed channel and re-logging until something explicitly unregisters it.

You can tighten this by (a) moving the message into try_send and logging via info’s fields, and (b) removing closed senders so you don’t keep trying them:

-    fn distribute_updates(&self, queue_infos: Vec<QueueInfo>) {
-        for info in queue_infos {
-            if let Some(tx) = self.tree_notifiers.get(&info.tree) {
+    fn distribute_updates(&mut self, queue_infos: Vec<QueueInfo>) {
+        for info in queue_infos {
+            if let Some(tx) = self.tree_notifiers.get(&info.tree) {
                 let message = QueueUpdateMessage {
                     tree: info.tree,
                     queue: info.queue,
                     queue_type: info.queue_type,
                     queue_size: info.queue_size,
                     slot: info.slot,
                 };
 
-                match tx.try_send(message.clone()) {
+                match tx.try_send(message) {
                     Ok(()) => {
                         trace!(
                             "Routed update to tree {}: {} items (type: {:?})",
                             info.tree,
-                            message.queue_size,
+                            info.queue_size,
                             info.queue_type
                         );
                     }
                     Err(mpsc::error::TrySendError::Full(_)) => {
                         warn!(
                             "Tree {} channel full, dropping update (tree processing slower than updates)",
                             info.tree
                         );
                     }
                     Err(mpsc::error::TrySendError::Closed(_)) => {
-                        trace!("Tree {} channel closed (task likely finished)", info.tree);
+                        trace!("Tree {} channel closed; removing notifier", info.tree);
+                        self.tree_notifiers.remove(&info.tree);
                     }
                 }
             }
         }
     }

This keeps logs meaningful and avoids repeated work for trees whose receivers are gone.

📜 Review details

Configuration used: Path: .coderabbit.yaml

Review profile: ASSERTIVE

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 594d3b2 and f2549c3.

⛔ Files ignored due to path filters (8)
  • Cargo.lock is excluded by !**/*.lock and included by none
  • cli/src/commands/test-validator/index.ts is excluded by none and included by none
  • cli/src/utils/initTestEnv.ts is excluded by none and included by none
  • cli/src/utils/processPhotonIndexer.ts is excluded by none and included by none
  • program-tests/compressed-token-test/tests/v1.rs is excluded by none and included by none
  • program-tests/system-cpi-v2-test/tests/event.rs is excluded by none and included by none
  • scripts/devenv/versions.sh is excluded by none and included by none
  • sdk-tests/client-test/tests/light_client.rs is excluded by none and included by none
📒 Files selected for processing (34)
  • forester/Cargo.toml (1 hunks)
  • forester/build.rs (0 hunks)
  • forester/proto/photon.proto (0 hunks)
  • forester/src/epoch_manager.rs (13 hunks)
  • forester/src/grpc/mod.rs (0 hunks)
  • forester/src/grpc/router.rs (0 hunks)
  • forester/src/lib.rs (1 hunks)
  • forester/src/polling/mod.rs (1 hunks)
  • forester/src/polling/queue_poller.rs (1 hunks)
  • forester/src/work_coordinator.rs (0 hunks)
  • forester/tests/e2e_test.rs (0 hunks)
  • forester/tests/legacy/batched_address_test.rs (0 hunks)
  • forester/tests/legacy/batched_state_async_indexer_test.rs (0 hunks)
  • forester/tests/legacy/batched_state_indexer_test.rs (0 hunks)
  • forester/tests/legacy/batched_state_test.rs (0 hunks)
  • forester/tests/legacy/e2e_test.rs (0 hunks)
  • forester/tests/legacy/e2e_v1_test.rs (0 hunks)
  • forester/tests/test_batch_append_spent.rs (0 hunks)
  • forester/tests/test_compressible_ctoken.rs (0 hunks)
  • sdk-libs/client/Cargo.toml (0 hunks)
  • sdk-libs/client/src/indexer/indexer_trait.rs (2 hunks)
  • sdk-libs/client/src/indexer/mod.rs (1 hunks)
  • sdk-libs/client/src/indexer/photon_indexer.rs (1 hunks)
  • sdk-libs/client/src/indexer/types.rs (1 hunks)
  • sdk-libs/client/src/lib.rs (0 hunks)
  • sdk-libs/client/src/local_test_validator.rs (0 hunks)
  • sdk-libs/client/src/rpc/indexer.rs (3 hunks)
  • sdk-libs/photon-api/src/apis/default_api.rs (2 hunks)
  • sdk-libs/photon-api/src/models/_get_queue_info_post_200_response.rs (1 hunks)
  • sdk-libs/photon-api/src/models/_get_queue_info_post_200_response_result.rs (1 hunks)
  • sdk-libs/photon-api/src/models/_get_queue_info_post_request.rs (1 hunks)
  • sdk-libs/photon-api/src/models/mod.rs (1 hunks)
  • sdk-libs/program-test/src/indexer/test_indexer.rs (1 hunks)
  • sdk-libs/program-test/src/program_test/indexer.rs (1 hunks)
💤 Files with no reviewable changes (17)
  • forester/build.rs
  • forester/tests/legacy/e2e_test.rs
  • forester/tests/test_compressible_ctoken.rs
  • sdk-libs/client/src/local_test_validator.rs
  • forester/tests/legacy/batched_state_test.rs
  • sdk-libs/client/src/lib.rs
  • forester/tests/legacy/e2e_v1_test.rs
  • forester/tests/e2e_test.rs
  • forester/tests/legacy/batched_state_async_indexer_test.rs
  • forester/src/grpc/mod.rs
  • forester/tests/legacy/batched_address_test.rs
  • sdk-libs/client/Cargo.toml
  • forester/tests/legacy/batched_state_indexer_test.rs
  • forester/proto/photon.proto
  • forester/src/work_coordinator.rs
  • forester/src/grpc/router.rs
  • forester/tests/test_batch_append_spent.rs
🧰 Additional context used
🧬 Code graph analysis (9)
sdk-libs/client/src/indexer/photon_indexer.rs (5)
sdk-libs/client/src/indexer/indexer_trait.rs (1)
  • get_queue_info (206-209)
sdk-libs/client/src/rpc/indexer.rs (1)
  • get_queue_info (228-238)
sdk-libs/photon-api/src/apis/default_api.rs (1)
  • get_queue_info_post (1752-1789)
sdk-libs/client/src/indexer/base58.rs (1)
  • decode_base58_to_fixed_array (37-48)
program-libs/compressed-account/src/pubkey.rs (1)
  • new_from_array (79-81)
sdk-libs/client/src/indexer/indexer_trait.rs (4)
sdk-libs/client/src/indexer/photon_indexer.rs (1)
  • get_queue_info (1740-1798)
sdk-libs/client/src/rpc/indexer.rs (1)
  • get_queue_info (228-238)
sdk-libs/program-test/src/indexer/test_indexer.rs (1)
  • get_queue_info (868-873)
sdk-libs/program-test/src/program_test/indexer.rs (1)
  • get_queue_info (224-234)
sdk-libs/client/src/rpc/indexer.rs (4)
sdk-libs/client/src/indexer/indexer_trait.rs (1)
  • get_queue_info (206-209)
sdk-libs/client/src/indexer/photon_indexer.rs (1)
  • get_queue_info (1740-1798)
sdk-libs/program-test/src/indexer/test_indexer.rs (1)
  • get_queue_info (868-873)
sdk-libs/program-test/src/program_test/indexer.rs (1)
  • get_queue_info (224-234)
sdk-libs/program-test/src/program_test/indexer.rs (7)
sdk-libs/client/src/indexer/indexer_trait.rs (1)
  • get_queue_info (206-209)
sdk-libs/client/src/indexer/photon_indexer.rs (1)
  • get_queue_info (1740-1798)
sdk-libs/client/src/rpc/indexer.rs (1)
  • get_queue_info (228-238)
sdk-libs/program-test/src/indexer/test_indexer.rs (1)
  • get_queue_info (868-873)
sdk-libs/program-test/src/program_test/light_program_test.rs (1)
  • indexer (382-384)
sdk-libs/program-test/src/program_test/rpc.rs (1)
  • indexer (238-240)
sdk-libs/client/src/rpc/client.rs (1)
  • indexer (684-686)
sdk-libs/program-test/src/indexer/test_indexer.rs (4)
sdk-libs/client/src/indexer/indexer_trait.rs (1)
  • get_queue_info (206-209)
sdk-libs/client/src/indexer/photon_indexer.rs (1)
  • get_queue_info (1740-1798)
sdk-libs/client/src/rpc/indexer.rs (1)
  • get_queue_info (228-238)
sdk-libs/program-test/src/program_test/indexer.rs (1)
  • get_queue_info (224-234)
sdk-libs/photon-api/src/models/_get_queue_info_post_request.rs (3)
sdk-libs/client/src/indexer/photon_indexer.rs (1)
  • new (115-126)
sdk-libs/photon-api/src/models/_get_queue_info_post_200_response.rs (1)
  • new (35-46)
sdk-libs/photon-api/src/models/_get_queue_info_post_200_response_result.rs (2)
  • new (32-34)
  • new (38-45)
sdk-libs/photon-api/src/models/_get_queue_info_post_200_response_result.rs (1)
sdk-libs/photon-api/src/models/_get_queue_info_post_200_response.rs (1)
  • new (35-46)
forester/src/polling/queue_poller.rs (4)
forester/src/epoch_manager.rs (1)
  • new (138-184)
sdk-libs/client/src/indexer/photon_indexer.rs (1)
  • new (115-126)
sdk-libs/photon-api/src/models/_get_queue_info_post_200_response.rs (1)
  • new (35-46)
sdk-libs/photon-api/src/models/_get_queue_info_post_200_response_result.rs (2)
  • new (32-34)
  • new (38-45)
sdk-libs/photon-api/src/models/_get_queue_info_post_200_response.rs (2)
sdk-libs/photon-api/src/models/_get_queue_info_post_200_response_result.rs (2)
  • new (32-34)
  • new (38-45)
sdk-libs/photon-api/src/models/_get_queue_info_post_request.rs (1)
  • new (27-34)
⏰ Context from checks skipped due to timeout of 900000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: programs (compressed-token-and-e2e, ["cargo-test-sbf -p compressed-token-test --test v1", "cargo-...
🔇 Additional comments (18)
forester/src/lib.rs (1)

13-13: LGTM! Clean module migration

The shift from grpc/work_coordinator to polling aligns well with the PR's architectural refactor toward actor-based queue polling. The change is minimal and focused.

forester/src/polling/mod.rs (1)

1-5: LGTM! Clean module structure

This module provides a clean facade for the polling subsystem. The re-exports are well-named and align with the actor-based architecture described in the PR.

sdk-libs/client/src/indexer/mod.rs (1)

20-22: LGTM! Clean public API extension

The new QueueInfo and QueueInfoResult types are correctly added to the public API surface, maintaining alphabetical ordering and consistency with the existing export pattern.

sdk-libs/program-test/src/program_test/indexer.rs (1)

224-234: LGTM! Consistent delegation pattern

The implementation correctly follows the established pattern used by all other trait methods in this file, with proper NotInitialized error handling.

sdk-libs/client/src/indexer/indexer_trait.rs (2)

8-8: LGTM! Clean type import

The QueueInfoResult import is correctly placed alphabetically among the other type imports.


204-209: LGTM! Well-documented trait method

The new get_queue_info method is clearly documented and follows the trait's established patterns. The signature is consistent with other indexer methods (async, takes optional config, returns Response wrapper).

sdk-libs/photon-api/src/models/mod.rs (1)

325-332: LGTM! Standard API model exports

The new queue info request/response models follow the established pattern used throughout this file. The module declarations and re-exports are consistent with the existing API surface.

sdk-libs/client/src/indexer/photon_indexer.rs (1)

1740-1798: get_queue_info wiring matches existing indexer patterns

The new method cleanly reuses retry, respects IndexerRpcConfig.slot, and correctly decodes tree/queue from base58 into Pubkey before constructing QueueInfoResult. The control flow and error handling are consistent with the other Photon indexer methods.

sdk-libs/client/src/indexer/types.rs (1)

32-44: QueueInfo and QueueInfoResult shapes look correct

Structs mirror the Photon API queue-info payload (tree/queue as Pubkey, raw queue_type and queue_size, plus a slot wrapper) and fit cleanly into the existing Response<Context> pattern.

sdk-libs/client/src/rpc/indexer.rs (1)

10-12: RPC facade correctly exposes queue elements and queue info

Importing QueueElementsResult/QueueInfoResult and delegating get_queue_elements and get_queue_info through the inner indexer with the existing NotInitialized guard keeps the LightClient surface consistent with other RPC methods.

Also applies to: 204-226, 228-238

sdk-libs/photon-api/src/apis/default_api.rs (1)

298-305: get_queue_info_post API wrapper is consistent with existing photon-api endpoints

The new GetQueueInfoPostError enum and get_queue_info_post function follow the established pattern: same error typing, API-key handling, JSON body serialization, and 2xx vs error-HTTP branching. This should integrate smoothly with callers like PhotonIndexer::get_queue_info.

Also applies to: 1752-1789

sdk-libs/photon-api/src/models/_get_queue_info_post_200_response.rs (1)

11-47: JSON-RPC 200-response envelope is well-shaped

JsonRpcError plus GetQueueInfoPost200Response (with optional result and error) matches the JSON-RPC 2.0 pattern and gives callers access to both success payloads and logical errors without losing the data field.

sdk-libs/photon-api/src/models/_get_queue_info_post_200_response_result.rs (1)

11-46: QueueInfo result model aligns with client and poller usage

GetQueueInfoPost200ResponseResult and inner QueueInfo correctly model the wire format (including queueType/queueSize renames) and provide simple constructors, making the mapping into client QueueInfo straightforward.

sdk-libs/photon-api/src/models/_get_queue_info_post_request.rs (1)

11-64: GetQueueInfoPostRequest JSON-RPC model looks correct and ergonomic

Having Jsonrpc and Method default to "2.0" and "getQueueInfo", plus GetQueueInfoParams as an empty struct, lets callers use Default::default() for simple requests while still supporting explicit construction via new.

forester/src/epoch_manager.rs (4)

17-17: LGTM: Import changes align with the polling refactor.

The imports correctly replace gRPC dependencies with kameo actor types and the new polling module.

Also applies to: 50-50


112-112: LGTM: Struct field updated correctly.

The field type change from coordinator to queue_poller with ActorRef is appropriate for the actor-based approach.


130-130: LGTM: Clone implementation updated.

The clone correctly handles the new queue_poller field.


1107-1107: LGTM: Function signature updated consistently.

The removal of coordinator from instrumentation and addition of queue_update_rx parameter align with the polling-based approach.

Also applies to: 1111-1117

@sergeytimoshin sergeytimoshin force-pushed the sergey/get_queue_info_poll branch from f2549c3 to 2645798 Compare November 22, 2025 02:16
@sergeytimoshin sergeytimoshin force-pushed the sergey/get_queue_info_poll branch from 2645798 to 14c1be7 Compare November 22, 2025 02:23
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (2)
forester/src/epoch_manager.rs (2)

953-1044: V2 polling integration in perform_active_work is correct; minor allocation nit

The new logic that:

  • computes the number of V2 trees,
  • conditionally logs when a poller is available, and
  • registers V2 trees with the poller (failing fast when the poller is unavailable or registration fails)

is a big improvement in terms of correctness and observability.

You can avoid an extra allocation by not materializing v2_trees just to get its length:

-        let (_, v2_trees): (Vec<_>, Vec<_>) = epoch_info
-            .trees
-            .iter()
-            .filter(|tree| !should_skip_tree(&self.config, &tree.tree_accounts.tree_type))
-            .partition(|tree| {
-                matches!(
-                    tree.tree_accounts.tree_type,
-                    TreeType::StateV1 | TreeType::AddressV1
-                )
-            });
-
-        let queue_poller = self.queue_poller.clone();
-
-        if queue_poller.is_some() {
-            info!("Using QueueInfoPoller for {} V2 trees", v2_trees.len());
-        }
+        let v2_tree_count = epoch_info
+            .trees
+            .iter()
+            .filter(|tree| !should_skip_tree(&self.config, &tree.tree_accounts.tree_type))
+            .filter(|tree| matches!(tree.tree_accounts.tree_type, TreeType::StateV2 | TreeType::AddressV2))
+            .count();
+
+        let queue_poller = self.queue_poller.clone();
+
+        if queue_poller.is_some() {
+            info!("Using QueueInfoPoller for {} V2 trees", v2_tree_count);
+        }

Purely a micro-optimization/readability tweak.


1331-1469: V2 slot processing loop now has bounded wait and channel‑closure handling

The new process_light_slot_v2 logic addresses the earlier blocking/closure concerns:

  • wraps queue_update_rx.recv() in tokio::time::timeout(QUEUE_UPDATE_TIMEOUT),
  • tracks consecutive timeouts with MAX_TIMEOUTS and converts prolonged lack of updates into a clear error, and
  • handles Ok(None) by logging and breaking out of the inner loop instead of spinning forever.

This prevents a hung poller or closed channel from stalling the tree for the whole slot window without feedback. The specific constants (QUEUE_UPDATE_TIMEOUT = 150ms, MAX_TIMEOUTS = 100) might need tuning in production, but the structure is solid.

♻️ Duplicate comments (3)
sdk-libs/program-test/src/indexer/test_indexer.rs (1)

868-873: Avoid unimplemented!() in TestIndexer::get_queue_info

This method will currently panic at runtime if hit, which is brittle for a shared program-test harness.

Prefer returning a structured error so callers get a predictable signal, e.g.:

-    async fn get_queue_info(
-        &self,
-        _config: Option<IndexerRpcConfig>,
-    ) -> Result<Response<light_client::indexer::QueueInfoResult>, IndexerError> {
-        unimplemented!("get_queue_info")
-    }
+    async fn get_queue_info(
+        &self,
+        _config: Option<IndexerRpcConfig>,
+    ) -> Result<Response<light_client::indexer::QueueInfoResult>, IndexerError> {
+        Err(IndexerError::NotImplemented(
+            "get_queue_info is not implemented for TestIndexer".into(),
+        ))
+    }

This keeps the trait contract while making failures clearer and non-panicking.

sdk-libs/client/src/indexer/photon_indexer.rs (1)

1740-1798: Queue-info RPC integration looks correct; error mapping nuance remains

The get_queue_info implementation lines up with the rest of PhotonIndexer (retry semantics, slot freshness check, base58→Pubkey decoding, and returning QueueInfoResult wrapped in Response). Functionally this should behave as intended.

One remaining nuance is that the error mapping into GetBatchAddressUpdateInfoPost200ResponseError still discards any extra fields (like data) from the original error; that’s acceptable if you don’t rely on them, but worth revisiting if richer diagnostics are needed.

forester/src/polling/queue_poller.rs (1)

105-137: Tighten distribute_updates to avoid clones and prune closed channels

Right now distribute_updates does an unnecessary message.clone() and keeps tree_notifiers entries around after receivers have closed, which can lead to minor overhead and repeated “channel closed” logs.

You can:

  • move the message into try_send and log using fields from info, and
  • remove closed senders from tree_notifiers on TrySendError::Closed(_).

For example:

-    fn distribute_updates(&self, queue_infos: Vec<QueueInfo>) {
+    fn distribute_updates(&mut self, queue_infos: Vec<QueueInfo>) {
         for info in queue_infos {
-            if let Some(tx) = self.tree_notifiers.get(&info.tree) {
-                let message = QueueUpdateMessage {
-                    tree: info.tree,
-                    queue: info.queue,
-                    queue_type: info.queue_type,
-                    queue_size: info.queue_size,
-                    slot: info.slot,
-                };
-
-                match tx.try_send(message.clone()) {
+            if let Some(tx) = self.tree_notifiers.get(&info.tree) {
+                let message = QueueUpdateMessage {
+                    tree: info.tree,
+                    queue: info.queue,
+                    queue_type: info.queue_type,
+                    queue_size: info.queue_size,
+                    slot: info.slot,
+                };
+
+                let queue_size = message.queue_size;
+                let queue_type = message.queue_type;
+
+                match tx.try_send(message) {
                     Ok(()) => {
                         trace!(
                             "Routed update to tree {}: {} items (type: {:?})",
                             info.tree,
-                            message.queue_size,
-                            info.queue_type
+                            queue_size,
+                            queue_type
                         );
                     }
                     Err(mpsc::error::TrySendError::Full(_)) => {
                         warn!(
                             "Tree {} channel full, dropping update (tree processing slower than updates)",
                             info.tree
                         );
                     }
                     Err(mpsc::error::TrySendError::Closed(_)) => {
-                        trace!("Tree {} channel closed (task likely finished)", info.tree);
+                        trace!(
+                            "Tree {} channel closed (task likely finished); removing notifier",
+                            info.tree
+                        );
+                        self.tree_notifiers.remove(&info.tree);
                     }
                 }
             }
         }
     }

This keeps the behavior the same while reducing allocations and log spam.

📜 Review details

Configuration used: Path: .coderabbit.yaml

Review profile: ASSERTIVE

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between f2549c3 and 14c1be7.

⛔ Files ignored due to path filters (8)
  • Cargo.lock is excluded by !**/*.lock and included by none
  • cli/src/commands/test-validator/index.ts is excluded by none and included by none
  • cli/src/utils/initTestEnv.ts is excluded by none and included by none
  • cli/src/utils/processPhotonIndexer.ts is excluded by none and included by none
  • program-tests/compressed-token-test/tests/v1.rs is excluded by none and included by none
  • program-tests/system-cpi-v2-test/tests/event.rs is excluded by none and included by none
  • scripts/devenv/versions.sh is excluded by none and included by none
  • sdk-tests/client-test/tests/light_client.rs is excluded by none and included by none
📒 Files selected for processing (34)
  • forester/Cargo.toml (1 hunks)
  • forester/build.rs (0 hunks)
  • forester/proto/photon.proto (0 hunks)
  • forester/src/epoch_manager.rs (13 hunks)
  • forester/src/grpc/mod.rs (0 hunks)
  • forester/src/grpc/router.rs (0 hunks)
  • forester/src/lib.rs (1 hunks)
  • forester/src/polling/mod.rs (1 hunks)
  • forester/src/polling/queue_poller.rs (1 hunks)
  • forester/src/work_coordinator.rs (0 hunks)
  • forester/tests/e2e_test.rs (0 hunks)
  • forester/tests/legacy/batched_address_test.rs (0 hunks)
  • forester/tests/legacy/batched_state_async_indexer_test.rs (0 hunks)
  • forester/tests/legacy/batched_state_indexer_test.rs (0 hunks)
  • forester/tests/legacy/batched_state_test.rs (0 hunks)
  • forester/tests/legacy/e2e_test.rs (0 hunks)
  • forester/tests/legacy/e2e_v1_test.rs (0 hunks)
  • forester/tests/test_batch_append_spent.rs (0 hunks)
  • forester/tests/test_compressible_ctoken.rs (0 hunks)
  • sdk-libs/client/Cargo.toml (0 hunks)
  • sdk-libs/client/src/indexer/indexer_trait.rs (2 hunks)
  • sdk-libs/client/src/indexer/mod.rs (1 hunks)
  • sdk-libs/client/src/indexer/photon_indexer.rs (1 hunks)
  • sdk-libs/client/src/indexer/types.rs (1 hunks)
  • sdk-libs/client/src/lib.rs (0 hunks)
  • sdk-libs/client/src/local_test_validator.rs (0 hunks)
  • sdk-libs/client/src/rpc/indexer.rs (3 hunks)
  • sdk-libs/photon-api/src/apis/default_api.rs (2 hunks)
  • sdk-libs/photon-api/src/models/_get_queue_info_post_200_response.rs (1 hunks)
  • sdk-libs/photon-api/src/models/_get_queue_info_post_200_response_result.rs (1 hunks)
  • sdk-libs/photon-api/src/models/_get_queue_info_post_request.rs (1 hunks)
  • sdk-libs/photon-api/src/models/mod.rs (1 hunks)
  • sdk-libs/program-test/src/indexer/test_indexer.rs (1 hunks)
  • sdk-libs/program-test/src/program_test/indexer.rs (1 hunks)
💤 Files with no reviewable changes (17)
  • sdk-libs/client/src/local_test_validator.rs
  • forester/tests/legacy/e2e_v1_test.rs
  • forester/tests/test_batch_append_spent.rs
  • forester/src/grpc/mod.rs
  • forester/build.rs
  • forester/tests/test_compressible_ctoken.rs
  • sdk-libs/client/src/lib.rs
  • forester/tests/e2e_test.rs
  • forester/tests/legacy/batched_address_test.rs
  • forester/tests/legacy/batched_state_async_indexer_test.rs
  • forester/tests/legacy/e2e_test.rs
  • sdk-libs/client/Cargo.toml
  • forester/tests/legacy/batched_state_test.rs
  • forester/tests/legacy/batched_state_indexer_test.rs
  • forester/src/grpc/router.rs
  • forester/proto/photon.proto
  • forester/src/work_coordinator.rs
🧰 Additional context used
🧬 Code graph analysis (10)
sdk-libs/photon-api/src/models/_get_queue_info_post_request.rs (5)
forester/src/epoch_manager.rs (1)
  • new (138-184)
forester/src/polling/queue_poller.rs (1)
  • new (68-76)
sdk-libs/client/src/indexer/photon_indexer.rs (1)
  • new (115-126)
sdk-libs/photon-api/src/models/_get_queue_info_post_200_response.rs (1)
  • new (35-46)
sdk-libs/photon-api/src/models/_get_queue_info_post_200_response_result.rs (2)
  • new (32-34)
  • new (38-45)
sdk-libs/program-test/src/indexer/test_indexer.rs (7)
sdk-libs/client/src/indexer/indexer_trait.rs (1)
  • get_queue_info (206-209)
sdk-libs/client/src/indexer/photon_indexer.rs (1)
  • get_queue_info (1740-1798)
sdk-libs/client/src/rpc/indexer.rs (1)
  • get_queue_info (228-238)
sdk-libs/program-test/src/program_test/indexer.rs (1)
  • get_queue_info (224-234)
sdk-libs/program-test/src/program_test/light_program_test.rs (1)
  • indexer (382-384)
sdk-libs/client/src/rpc/rpc_trait.rs (1)
  • indexer (199-199)
sdk-libs/program-test/src/program_test/rpc.rs (1)
  • indexer (238-240)
sdk-libs/client/src/rpc/indexer.rs (4)
sdk-libs/client/src/indexer/indexer_trait.rs (1)
  • get_queue_info (206-209)
sdk-libs/client/src/indexer/photon_indexer.rs (1)
  • get_queue_info (1740-1798)
sdk-libs/program-test/src/indexer/test_indexer.rs (1)
  • get_queue_info (868-873)
sdk-libs/program-test/src/program_test/indexer.rs (1)
  • get_queue_info (224-234)
sdk-libs/client/src/indexer/photon_indexer.rs (6)
sdk-libs/client/src/indexer/indexer_trait.rs (1)
  • get_queue_info (206-209)
sdk-libs/client/src/rpc/indexer.rs (1)
  • get_queue_info (228-238)
sdk-libs/program-test/src/indexer/test_indexer.rs (2)
  • get_queue_info (868-873)
  • new (1355-1416)
sdk-libs/photon-api/src/apis/default_api.rs (1)
  • get_queue_info_post (1752-1789)
sdk-libs/client/src/indexer/base58.rs (1)
  • decode_base58_to_fixed_array (37-48)
program-libs/compressed-account/src/pubkey.rs (1)
  • new_from_array (79-81)
sdk-libs/photon-api/src/models/_get_queue_info_post_200_response_result.rs (4)
forester/src/epoch_manager.rs (1)
  • new (138-184)
forester/src/polling/queue_poller.rs (1)
  • new (68-76)
sdk-libs/client/src/indexer/photon_indexer.rs (1)
  • new (115-126)
sdk-libs/photon-api/src/models/_get_queue_info_post_200_response.rs (1)
  • new (35-46)
sdk-libs/program-test/src/program_test/indexer.rs (7)
sdk-libs/client/src/indexer/indexer_trait.rs (1)
  • get_queue_info (206-209)
sdk-libs/client/src/indexer/photon_indexer.rs (1)
  • get_queue_info (1740-1798)
sdk-libs/client/src/rpc/indexer.rs (1)
  • get_queue_info (228-238)
sdk-libs/program-test/src/indexer/test_indexer.rs (1)
  • get_queue_info (868-873)
sdk-libs/program-test/src/program_test/light_program_test.rs (1)
  • indexer (382-384)
sdk-libs/program-test/src/program_test/rpc.rs (1)
  • indexer (238-240)
sdk-libs/client/src/rpc/client.rs (1)
  • indexer (684-686)
forester/src/polling/queue_poller.rs (4)
forester/src/epoch_manager.rs (1)
  • new (138-184)
sdk-libs/client/src/indexer/photon_indexer.rs (1)
  • new (115-126)
sdk-libs/photon-api/src/models/_get_queue_info_post_200_response.rs (1)
  • new (35-46)
sdk-libs/photon-api/src/models/_get_queue_info_post_200_response_result.rs (2)
  • new (32-34)
  • new (38-45)
forester/src/epoch_manager.rs (1)
sdk-libs/client/src/indexer/photon_indexer.rs (1)
  • new (115-126)
sdk-libs/photon-api/src/models/_get_queue_info_post_200_response.rs (5)
forester/src/epoch_manager.rs (1)
  • new (138-184)
forester/src/polling/queue_poller.rs (1)
  • new (68-76)
sdk-libs/client/src/indexer/photon_indexer.rs (1)
  • new (115-126)
sdk-libs/photon-api/src/models/_get_queue_info_post_200_response_result.rs (2)
  • new (32-34)
  • new (38-45)
sdk-libs/photon-api/src/models/_get_queue_info_post_request.rs (1)
  • new (27-34)
sdk-libs/client/src/indexer/indexer_trait.rs (4)
sdk-libs/client/src/indexer/photon_indexer.rs (1)
  • get_queue_info (1740-1798)
sdk-libs/client/src/rpc/indexer.rs (1)
  • get_queue_info (228-238)
sdk-libs/program-test/src/indexer/test_indexer.rs (1)
  • get_queue_info (868-873)
sdk-libs/program-test/src/program_test/indexer.rs (1)
  • get_queue_info (224-234)
⏰ Context from checks skipped due to timeout of 900000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (5)
  • GitHub Check: programs (compressed-token-batched-tree, ["cargo-test-sbf -p compressed-token-test -- test_transf...
  • GitHub Check: programs (compressed-token-and-e2e, ["cargo-test-sbf -p compressed-token-test --test v1", "cargo-...
  • GitHub Check: Test program-libs-slow
  • GitHub Check: Test batched-merkle-tree-simulate
  • GitHub Check: Test program-libs-fast
🔇 Additional comments (21)
sdk-libs/client/src/indexer/mod.rs (1)

16-23: Expose QueueInfo types from indexer module

Re-exporting QueueInfo and QueueInfoResult here is correct and keeps the public indexer surface coherent with the new APIs.

sdk-libs/program-test/src/program_test/indexer.rs (1)

224-234: LightProgramTest delegation for get_queue_info is consistent

The new get_queue_info mirrors the existing delegation pattern (NotInitialized guard + passthrough), so the program-test layer stays in sync with the trait.

sdk-libs/photon-api/src/models/mod.rs (1)

325-332: Queue-info models are wired into photon-api exports correctly

The new GetQueueInfoPost* and QueueInfo re-exports follow the same pattern as the other REST models; no issues spotted.

sdk-libs/client/src/indexer/types.rs (1)

32-44: QueueInfo / QueueInfoResult definitions are straightforward and consistent

The new structs have the right fields and visibility for the queue-info API and align with how PhotonIndexer::get_queue_info populates them.

sdk-libs/client/src/indexer/indexer_trait.rs (1)

4-14: New Indexer::get_queue_info API is well-shaped but breaking for external implementers

The get_queue_info signature and docs are consistent with the new types and PhotonIndexer implementation, and it belongs on the core Indexer trait.

Because this extends a public trait, any downstream Indexer implementations outside this repo will fail to compile until they add this method. If you haven’t already, it’s worth auditing consumers and calling this out in release notes or a changelog.

Also applies to: 204-210

sdk-libs/client/src/rpc/indexer.rs (1)

10-12: RPC layer forwards queue elements and queue info cleanly

Using the shared QueueElementsResult / QueueInfoResult types and delegating both methods through to the inner indexer with the usual NotInitialized guard keeps the RPC surface consistent with the core Indexer trait.

Also applies to: 204-213, 228-238

sdk-libs/photon-api/src/models/_get_queue_info_post_200_response.rs (2)

13-20: LGTM! Standard JSON-RPC 2.0 error structure.

The JsonRpcError struct correctly implements the JSON-RPC 2.0 error object specification with required code and message fields plus optional data.


34-47: Constructor only supports success path; error responses require manual construction.

The new() constructor initializes error to None, appropriate for success responses. If you need to construct error responses programmatically, you'll need to use struct initialization or add a separate constructor. This pattern is consistent with other 200 response models in the codebase.

sdk-libs/photon-api/src/apis/default_api.rs (2)

298-305: LGTM! Error enum follows established pattern.

The GetQueueInfoPostError enum correctly mirrors the structure of other endpoint error types in this file, reusing the common GetBatchAddressUpdateInfoPost429Response model for HTTP 429 and 500 errors.


1752-1789: LGTM! Implementation consistent with existing endpoints.

The get_queue_info_post function follows the exact same pattern as other API methods in this file: builds the request with API key and User-Agent, POSTs JSON, and handles success/error responses appropriately.

sdk-libs/photon-api/src/models/_get_queue_info_post_200_response_result.rs (2)

11-35: LGTM! Clean response result structure.

The GetQueueInfoPost200ResponseResult struct appropriately models the queue info response payload with a vector of queue details and the associated slot number.


19-46: LGTM! Well-structured queue metadata model.

The QueueInfo struct appropriately captures queue metadata. The u8 type for queue_type supports up to 256 queue types, which should be sufficient for the foreseeable future.

sdk-libs/photon-api/src/models/_get_queue_info_post_request.rs (3)

11-35: LGTM! Standard JSON-RPC 2.0 request structure.

The GetQueueInfoPostRequest correctly models a JSON-RPC request with all required fields. The constructor appropriately initializes params to default since this endpoint takes no parameters.


37-38: Empty params struct is intentional for JSON-RPC consistency.

The GetQueueInfoParams empty struct ensures the JSON-RPC request includes a params field (serialized as {}), maintaining protocol consistency even when no parameters are required.


40-64: LGTM! Type-safe enums enforce valid values.

The Jsonrpc and Method enums with single variants provide compile-time guarantees that only valid protocol versions and method names can be used. The Default implementations enhance ergonomics.

forester/src/lib.rs (1)

13-13: Expose polling module in crate root

Exporting polling alongside the other subsystems makes the new QueueInfoPoller surface available cleanly to the rest of the crate; no issues here.

forester/src/polling/mod.rs (1)

1-5: Queue poller façade looks clean

Using a dedicated polling module that re-exports QueueInfoPoller and its message types keeps consumer imports (e.g., in epoch_manager) tidy and hides the submodule layout; no further changes needed.

forester/src/polling/queue_poller.rs (1)

68-75: Double‑check indexer_url semantics when appending /v1

QueueInfoPoller::new builds the base path as format!("{}/v1", indexer_url). That’s fine as long as configs treat indexer_url as a bare host/base (e.g. https://…), but if any existing deployments already include /v1 you’ll end up with /v1/v1 and broken calls.

I’d either:

  • document clearly that indexer_url must not include a version path, or
  • stop appending /v1 here and instead expect the fully versioned base in config.
forester/src/epoch_manager.rs (3)

17-18: QueueInfoPoller wiring into EpochManager looks sound

Storing an Option<ActorRef<QueueInfoPoller>> on EpochManager and instantiating it only when config.external_services.indexer_url is set is a good separation of concerns, and cloning the ActorRef in impl Clone ensures all manager clones share the same actor.

The updated logging (“spawn initiated” vs. “started”) also better reflects the asynchronous nature of actor startup. No changes needed here.

Also applies to: 99-114, 136-184


968-1010: Good: V2 trees now fail fast when queue poller is missing or registration fails

In the V2 branch of perform_active_work, you now:

  • require a queue_poller to be present, and
  • treat RegisterTree failures as hard errors instead of silently proceeding with None channels.

This removes the previous “silent success but no work” failure mode for V2 trees. The early Err(anyhow!(...)) paths here are appropriate given that V2 processing fundamentally depends on queue updates.


1117-1222: process_queue_v2 V2 path correctly enforces presence of a queue channel

The refactored process_queue_v2 now:

  • uses the same scheduling logic for all tree types, and
  • for V2 trees, refuses to process slots if queue_update_rx is None, returning an error instead of pretending success.

That matches the new contract that V2 processing requires live queue updates; it’s much less surprising operationally than silently skipping work.

prost-types = "0.14.1"
tonic-prost = "0.14.2"
tokio-stream = { version = "0.1", features = ["sync"] }
kameo = "0.19"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

Validate kameo = "0.19" against your Tokio version and MSRV

kameo 0.19.x pulls in tokio with a ^1.47 requirement and serde 1.0, so this dependency line is reasonable, but you should confirm that:

  • your workspace’s Tokio version is compatible with ^1.47, and
  • your minimum supported Rust version meets kameo’s requirements. (docs.rs)

Once you’re happy with it in CI, consider pinning to a specific patch (e.g. 0.19.2) for reproducibility.


I need to verify the assumptions in the review comment. Let me examine the workspace configuration and check kameo's actual requirements.


🏁 Script executed:

#!/bin/bash
# Check Tokio version in workspace and MSRV settings
echo "=== Checking Tokio dependency in Cargo.toml ==="
rg -n "tokio\s*=" forester/Cargo.toml
echo ""
echo "=== Checking workspace root Cargo.toml ==="
rg -n "tokio\s*=" Cargo.toml
echo ""
echo "=== Looking for MSRV in Cargo.toml files ==="
rg -n "rust-version|MSRV" forester/Cargo.toml
echo ""
echo "=== Looking for rust-toolchain.toml ==="
fd rust-toolchain.toml
echo ""
echo "=== Displaying full forester/Cargo.toml for context ==="
head -70 forester/Cargo.toml

Length of output: 2835


🏁 Script executed:

#!/bin/bash
# Check for workspace root configuration
echo "=== Checking workspace root Cargo.toml for MSRV ==="
head -30 Cargo.toml

Length of output: 121


🏁 Script executed:

#!/bin/bash
# Search for any existing tokio usage to infer version compatibility
echo "=== Searching for tokio usage patterns ==="
rg -n "tokio::" --type rust | head -20

Length of output: 1695


🌐 Web query:

kameo 0.19 Rust minimum supported version MSRV requirements tokio

💡 Result:

I couldn't find an explicit, published MSRV for kameo v0.19. The project does declare a rust-version (MSRV) in its Cargo.toml files (per the changelog) and is built on Tokio (kameo is an async actor library using Tokio). If you want the exact MSRV and tokio-min-version for v0.19 I can fetch the Cargo.toml from the v0.19 tag (or the crate on crates.io) and report the exact rust-version and tokio dependency version — tell me which (GitHub tag or crates.io) you'd prefer. Sources: [kameo project listing/docs]. [1][2]

Sources
[1] Kameo crate listing/docs (lib.rs / docs.rs).
[2] Kameo changelog summary (notes that Cargo.toml contains rust-version / MSRV and that Kameo is built on Tokio).


I need to fetch kameo's actual Cargo.toml metadata to validate the review's claims about tokio requirements and MSRV.


🏁 Script executed:

#!/bin/bash
# Fetch kameo 0.19 metadata from crates.io API
echo "=== Fetching kameo 0.19 from crates.io API ==="
curl -s "https://crates.io/api/v1/crates/kameo/0.19.0" | jq '.crate | {name, version, repository, documentation}'

echo ""
echo "=== Checking for kameo in registry index ==="
# Try to get the Cargo.toml directly from GitHub if available
curl -s "https://raw.githubusercontent.com/tqwewe/kameo/refs/tags/v0.19.0/Cargo.toml" 2>/dev/null | head -40 || echo "Not found via GitHub"

Length of output: 992


Declare MSRV or verify Rust 1.88.0 compatibility for kameo 0.19

kameo 0.19.0 requires rust-version = "1.88.0", but forester doesn't declare a minimum supported Rust version. You should either:

  1. Add rust-version = "1.88.0" to forester/Cargo.toml to match kameo's MSRV, or
  2. Verify your codebase actually supports 1.88.0 and adjust if you target an earlier version.

Additionally, tighten the Tokio constraint from version = "1" to "1.47" at line 41 for clarity (kameo transitively requires it). Consider pinning kameo to a specific patch (e.g., "0.19.2") once you confirm CI stability.

🤖 Prompt for AI Agents
In forester/Cargo.toml around lines 41 and 62: declare the crate MSRV and
tighten Tokio/TOML versions to match kameo's requirements; add rust-version =
"1.88.0" to the Cargo.toml top-level package stanza (or confirm the codebase and
CI actually support an older Rust and document that decision), change the Tokio
dependency from version = "1" to version = "1.47" for clarity, and pin kameo to
a specific patch (e.g., "0.19.2") after verifying CI stability so transitive
MSRV/compatibility is explicit.

@sergeytimoshin sergeytimoshin merged commit e5f84a0 into main Nov 22, 2025
31 of 32 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants