Skip to content

refactor: batch ops#1838

Merged
SwenSchaeferjohann merged 20 commits intomainfrom
sergey/batch-process-optimisation
Jul 9, 2025
Merged

refactor: batch ops#1838
SwenSchaeferjohann merged 20 commits intomainfrom
sergey/batch-process-optimisation

Conversation

@sergeytimoshin
Copy link
Contributor

@sergeytimoshin sergeytimoshin commented Jun 21, 2025

Summary by CodeRabbit

  • New Features

    • Added comprehensive end-to-end integration tests for both legacy and V2 pipelines, improving test coverage and validation of Merkle tree rollovers, queue processing, and registration logic.
    • Introduced Python scripts for analyzing and comparing performance metrics and TPS statistics from log files.
    • Added public functions for state and address Merkle tree rollovers.
  • Enhancements

    • Refactored batch processing to use asynchronous streaming, enabling more efficient, incremental, and concurrent transaction handling.
    • Improved error handling and logging for queue metrics and processing rates.
    • Increased default batch instructions per transaction and RPC pool size for improved throughput.
    • Added support for API keys in client and connection configurations for enhanced security and flexibility.
    • Updated client configurations to include indexer URLs and API keys for improved integration.
    • Enhanced Forester status fetching to use indexer URL and API key.
  • Bug Fixes

    • Fixed queue metric logging to provide clearer insights into queue states.
  • Documentation

    • Expanded README with detailed environment variable documentation for running e2e V2 tests.
  • Chores

    • Updated dependencies and script hashes for Photon indexer installation.
    • Simplified and consolidated GitHub Actions test workflows.
    • Removed deprecated traits and error types across the codebase.
  • Tests

    • Added new utility functions and removed ignored attributes to enable more robust automated test execution.
    • Improved test utilities for slot and protocol config management.
    • Added legacy priority fee tests to validate fee capping and RPC fee estimation.
  • Refactor

    • Removed deprecated traits and error types, simplifying codebase and generic constraints.
    • Streamlined batch and transaction processing logic for maintainability and performance.
    • Consolidated streaming abstractions for batch instruction generation and transaction submission.
    • Simplified trait bounds and removed IndexerType trait usage throughout the codebase.

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Jun 21, 2025

Walkthrough

This update refactors Forester's batch processing system for state and address Merkle trees from synchronous, chunked vector-based logic to an asynchronous streaming model. It introduces new stream-based APIs, enhances error propagation, simplifies trait bounds by removing the IndexerType abstraction, and adds new integration tests and performance analysis scripts. The changes affect both the core pipeline and testing infrastructure, improving modularity and concurrency.

Changes

File(s) / Path(s) Change Summary
forester-utils/src/instructions/address_batch_update.rs, state_batch_append.rs, state_batch_nullify.rs Refactored batch instruction data generation from synchronous vector-based functions to asynchronous streams, introducing stream-based APIs, chunked processing, concurrent proof generation, and improved error propagation.
forester-utils/src/error.rs Added new error variants to ForesterUtilsError for additional error types, supporting automatic conversion from related errors.
forester-utils/src/utils.rs Changed wait_for_indexer to accept an immutable RPC reference instead of mutable.
forester-utils/src/lib.rs Made the error module public. Added new structs ParsedMerkleTreeData and ParsedQueueData to encapsulate parsed Merkle tree and queue state data.
forester-utils/Cargo.toml, forester/Cargo.toml Added async-stream dependency. Fixed formatting in dev-dependencies.
forester-utils/src/rpc_pool.rs Added optional api_key to SolanaConnectionManager and SolanaRpcPoolBuilder, propagating through constructors and connection creation.
cli/src/utils/constants.ts, scripts/install.sh Updated Photon git commit hash constant and install script to new revision.
forester/src/cli.rs, forester/src/config.rs Updated default values for batch_ixs_per_tx (from 1 to 4 in CLI, from 4 to 3 in config) and rpc_pool_size (from 50 to 100).
forester/src/epoch_manager.rs, forester/src/lib.rs, forester/src/processor/v2/mod.rs, common.rs, address.rs, state.rs Removed IndexerType trait bound and related abstractions throughout the pipeline and batch processor logic. Updated function signatures and imports accordingly.
forester/src/rollover/mod.rs Added new async public functions for state and address Merkle tree rollovers.
forester/src/errors.rs Removed BatchProcessing variant from ForesterError.
forester/src/forester_status.rs Updated LightClientConfig initialization to use indexer URL and Photon API key.
sdk-libs/client/src/rpc/rpc_trait.rs Added optional api_key field to LightClientConfig and updated constructors/factories.
sdk-libs/client/src/indexer/photon_indexer.rs Removed debug logs and added error extraction after API response in get_validity_proof.
forester/src/indexer_type.rs, forester/src/processor/v2/error.rs Deleted IndexerType trait and batch process error type, removing related implementations and conversions.
forester/package.json, .github/workflows/forester-tests.yml Updated npm scripts and CI workflow to focus on new e2e v1/v2 tests, removing legacy and granular test scripts.
forester/README.md Added documentation for e2e_v2 test environment variables and test configuration.
forester/scripts/compare_performance.py, forester/scripts/v2_stats.py Added new Python scripts for performance and TPS analysis of Forester logs.
forester/tests/address_v2_test.rs, forester/tests/e2e_v1_test.rs, forester/tests/e2e_v2_test.rs, forester/tests/test_utils.rs, forester/tests/legacy/priority_fee_test.rs, forester/tests/legacy/test_utils.rs Added and extended comprehensive integration and utility tests for v1/v2 pipelines, priority fee logic, and indexer consistency. Improved test setup, queue assertions, and pipeline coordination.
forester/tests/legacy/batched_state_async_indexer_test.rs Unignored and unserialized the async batched state indexer test, updated to use immutable RPC reference.
program-tests/utils/src/setup_accounts.rs, xtask/src/create_batch_address_tree.rs, xtask/src/create_batch_state_tree.rs, xtask/src/create_state_tree.rs, xtask/src/create_update_protocol_config_ix.rs, xtask/src/new_deployment.rs Added explicit api_key: None to LightClientConfig initialization in test and xtask utilities.
forester/src/processor/v1/helpers.rs Updated to use immutable RPC reference for wait_for_indexer.
forester/src/processor/v1/send_transaction.rs Added info-level logs for queue state metrics in batch prerequisites.

Sequence Diagram(s)

sequenceDiagram
    participant Forester as Forester Pipeline
    participant RPC as SolanaRpcPool
    participant Indexer as Indexer (Mutex)
    participant Prover as ProofClient
    participant Stream as Async Stream

    Forester->>RPC: Acquire connection
    Forester->>Indexer: Lock and fetch Merkle tree/queue metadata
    alt No pending work
        Forester-->>Stream: Return empty stream
    else Pending work
        Forester->>Indexer: Wait for indexer catch-up
        Forester->>Stream: Start async stream
        loop For each chunk in queue
            Stream->>Indexer: Fetch chunk data
            Stream->>Prover: Concurrently generate ZKPs for inputs
            Prover-->>Stream: Return proofs/results
            Stream-->>Forester: Yield instruction data or errors
        end
    end
Loading

Possibly related PRs

  • Lightprotocol/light-protocol#1831: Refactors batch instruction data generation to use configurable prover URL and timing parameters, which is directly extended by this PR's streaming/concurrency refactor.
  • Lightprotocol/light-protocol#1814: Updates the SolanaRpcPool and related connection manager to include an optional api_key field, related to this PR's addition of api_key support.
  • Lightprotocol/light-protocol#1799: Changes the RPC client configuration type from RpcConfig to LightClientConfig and updates usages accordingly, related to this PR's modifications to RPC client configuration and connection setup.

Suggested reviewers

  • SwenSchaeferjohann
  • ananas-block

Poem

Hopping through streams, not chunks but a flow,
Forester bunnies now process on the go!
With async and streams, and errors that sing,
Batch proofs are delivered as fast as a spring.
Old traits are now gone, the code’s feeling light—
This bunny’s delighted: the pipeline’s just right!
🐇✨

Warning

There were issues while running some tools. Please review the errors and either fix the tool's configuration or disable the tool if it's a critical failure.

🔧 Clippy (1.86.0)
Updating crates.io index

warning: failed to write cache, path: /usr/local/registry/index/index.crates.io-1949cf8c6b5b557f/.cache/pi/no/pinocchio, error: Permission denied (os error 13)
Downloading crates ...
Downloaded adler2 v2.0.1
error: failed to create directory /usr/local/registry/cache/index.crates.io-1949cf8c6b5b557f

Caused by:
Permission denied (os error 13)

✨ Finishing Touches
  • 📝 Generate Docstrings

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Explain this complex logic.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai explain this code block.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and explain its main purpose.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Support

Need help? Create a ticket on our support page for assistance with any issues or questions.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR.
  • @coderabbitai generate sequence diagram to generate a sequence diagram of the changes in this PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

@sergeytimoshin sergeytimoshin marked this pull request as ready for review June 22, 2025 03:27
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

♻️ Duplicate comments (1)
forester-utils/src/instructions/state_batch_nullify.rs (1)

137-138: Apply same bounded concurrency suggestion as append module.

Similar to the append module, consider implementing bounded concurrency for proof generation to avoid overwhelming the prover service.

Also applies to: 171-183

🧹 Nitpick comments (5)
forester/tests/test_utils.rs (1)

310-319: Consider making the function more generic.

The function could accept a generic R: Rpc parameter instead of specifically LightClient for better reusability. Also, consider making the sleep interval configurable.

-pub async fn wait_for_slot(rpc: &mut LightClient, target_slot: u64) {
+pub async fn wait_for_slot<R: Rpc>(rpc: &mut R, target_slot: u64) {
forester-utils/src/instructions/address_batch_update.rs (1)

124-224: Excellent streaming implementation with concurrent proof generation!

The chunked processing and concurrent proof generation significantly improve performance. The indexer root validation on the first chunk is a good safety measure.

Consider adding a comment explaining why indexer root validation is only done on the first chunk, to help future maintainers understand this design decision.

forester-utils/src/instructions/state_batch_append.rs (2)

71-93: Consider consistent error handling in the stream.

The stream correctly handles errors by yielding them, but there's an inconsistency:

  • Line 89-91: Yields error and returns early
  • This pattern is good but could be extracted into a macro for consistency

Consider creating a macro for consistent error handling:

+macro_rules! yield_err_return {
+    ($expr:expr, $msg:expr) => {
+        match $expr {
+            Ok(val) => val,
+            Err(e) => {
+                yield Err(ForesterUtilsError::Indexer(format!("{}: {}", $msg, e)));
+                return;
+            }
+        }
+    };
+}

111-149: Consider adding bounded concurrency for proof generation.

The current implementation creates all proof futures at once (line 139), which could overwhelm the prover service with many concurrent requests.

Consider using futures::stream::FuturesUnordered with a concurrency limit:

-        let mut proof_futures = Vec::new();
+        use futures::stream::{FuturesUnordered, StreamExt};
+        let mut proof_futures = FuturesUnordered::new();
+        const MAX_CONCURRENT_PROOFS: usize = 10;

         for (batch_idx, leaves_hash_chain) in leaves_hash_chains.iter().enumerate() {
             // ... existing batch processing ...
-            proof_futures.push(generate_zkp_proof(circuit_inputs, client));
+            proof_futures.push(generate_zkp_proof(circuit_inputs, client));
+            
+            // Process when reaching concurrency limit
+            if proof_futures.len() >= MAX_CONCURRENT_PROOFS {
+                if let Some(result) = proof_futures.next().await {
+                    match result {
+                        Ok(data) => yield Ok(data),
+                        Err(e) => yield Err(e),
+                    }
+                }
+            }
         }

-        let proof_results = future::join_all(proof_futures).await;
-        for proof_result in proof_results {
-            match proof_result {
-                Ok(data) => yield Ok(data),
-                Err(e) => yield Err(e),
-            }
-        }
+        // Process remaining futures
+        while let Some(result) = proof_futures.next().await {
+            match result {
+                Ok(data) => yield Ok(data),
+                Err(e) => yield Err(e),
+            }
+        }
forester-utils/src/instructions/state_batch_nullify.rs (1)

82-85: Consider using consistent logging level.

The append module uses trace! for empty hash chains, while this uses debug!. Consider using the same logging level for consistency.

-        debug!("No hash chains to process for nullification, returning empty stream.");
+        trace!("No hash chains to process for nullification, returning empty stream.");
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between ec32e2c and 27b8d01.

⛔ Files ignored due to path filters (1)
  • Cargo.lock is excluded by !**/*.lock
📒 Files selected for processing (24)
  • cli/src/utils/constants.ts (1 hunks)
  • forester-utils/Cargo.toml (1 hunks)
  • forester-utils/src/error.rs (2 hunks)
  • forester-utils/src/instructions/address_batch_update.rs (5 hunks)
  • forester-utils/src/instructions/state_batch_append.rs (3 hunks)
  • forester-utils/src/instructions/state_batch_nullify.rs (1 hunks)
  • forester-utils/src/lib.rs (1 hunks)
  • forester-utils/src/utils.rs (1 hunks)
  • forester/Cargo.toml (2 hunks)
  • forester/src/cli.rs (2 hunks)
  • forester/src/epoch_manager.rs (2 hunks)
  • forester/src/errors.rs (0 hunks)
  • forester/src/indexer_type.rs (4 hunks)
  • forester/src/processor/v1/helpers.rs (1 hunks)
  • forester/src/processor/v2/address.rs (1 hunks)
  • forester/src/processor/v2/common.rs (7 hunks)
  • forester/src/processor/v2/error.rs (0 hunks)
  • forester/src/processor/v2/mod.rs (2 hunks)
  • forester/src/processor/v2/state.rs (1 hunks)
  • forester/tests/address_v2_test.rs (6 hunks)
  • forester/tests/batched_state_async_indexer_test.rs (2 hunks)
  • forester/tests/test_utils.rs (2 hunks)
  • scripts/install.sh (4 hunks)
  • sdk-libs/client/src/indexer/photon_indexer.rs (1 hunks)
💤 Files with no reviewable changes (2)
  • forester/src/errors.rs
  • forester/src/processor/v2/error.rs
🧰 Additional context used
🪛 GitHub Actions: forester-tests
forester/src/epoch_manager.rs

[error] 704-704: Thread panicked due to attempt to subtract with overflow at epoch_manager.rs:704:29. This caused the test to fail.

🔇 Additional comments (43)
forester-utils/Cargo.toml (1)

37-37: LGTM: Dependency addition supports streaming refactor.

The async-stream dependency addition aligns with the refactoring to asynchronous streaming batch processing mentioned in the PR objectives.

forester/Cargo.toml (2)

35-35: LGTM: Consistent dependency addition.

The async-stream dependency addition is consistent with the same dependency added to forester-utils and supports the streaming refactor.


56-56: LGTM: Formatting improvement.

The spacing normalization around the features array improves code consistency.

forester-utils/src/lib.rs (1)

3-3: LGTM: Exposing error types for external use.

Making the error module public enables other crates to access ForesterUtilsError types, which supports the expanded error handling in the streaming refactor.

forester-utils/src/utils.rs (1)

32-32: LGTM: Improved function signature for better concurrency.

Changing from &mut R to &R is a good improvement as the function only needs read access to the RPC client. This change enables better concurrency and simplifies borrowing semantics without affecting functionality.

forester/src/processor/v1/helpers.rs (1)

93-95: LGTM! Mutability reduction improves code safety.

The change from mutable to immutable RPC reference aligns with the broader refactor to reduce unnecessary mutable borrowing. This is a positive change that improves code safety without affecting functionality.

cli/src/utils/constants.ts (2)

22-22: Version revert looks intentional.

The revert from "0.50.1" to "0.50.0" appears to be intentional based on the PR context and coordinated git commit hash update.


27-27: ```shell
#!/bin/bash

Verify that the specified commit exists in the remote Photon repository

echo "Checking if commit e53cbf27faa595301e8a6a7d79fd4e17f8c811bc exists remotely..."
git ls-remote --exit-code https://github.com/lightprotocol/photon.git e53cbf27faa595301e8a6a7d79fd4e17f8c811bc && echo "✅ Commit exists." || echo "❌ Commit not found."


</details>
<details>
<summary>forester/src/cli.rs (2)</summary>

`59-59`: **Increased batch size aligns with streaming improvements.**

The increase from 1 to 4 instructions per transaction makes sense with the new asynchronous streaming batch processing model, which can handle larger batches more efficiently.

---

`91-91`: **RPC pool size increase supports higher concurrency.**

Doubling the RPC pool size from 50 to 100 appropriately supports the increased concurrency demands of the new streaming RPC calls and proof generation.

</details>
<details>
<summary>forester/src/epoch_manager.rs (1)</summary>

`997-1002`: **Conditional logging reduces noise effectively.**

The conditional logging that only outputs when items are processed (`if items_processed_this_iteration > 0`) is a good improvement that reduces log noise while maintaining important debugging information.

</details>
<details>
<summary>scripts/install.sh (1)</summary>

`213-213`: **Git commit hash update is consistent with constants.**

The updated commit hash `e53cbf27faa595301e8a6a7d79fd4e17f8c811bc` matches the `PHOTON_GIT_COMMIT` constant in `cli/src/utils/constants.ts`, ensuring consistency between installation and configuration.

</details>
<details>
<summary>forester/src/processor/v2/mod.rs (1)</summary>

`9-10`: **LGTM! Clean error handling consolidation.**

The removal of the local error module and the addition of the `'static` lifetime bound align well with the async streaming refactoring. The lifetime constraint is necessary for the new streaming architecture.



Also applies to: 20-20

</details>
<details>
<summary>forester-utils/src/error.rs (1)</summary>

`1-33`: **Well-structured error handling enhancements.**

The new error variants properly support the async streaming batch processing with automatic conversions via `#[from]`. The error messages are clear and follow a consistent pattern.

</details>
<details>
<summary>forester/tests/address_v2_test.rs (3)</summary>

`60-60`: **Good test coverage improvements.**

The reduced validator wait time and increased batch count (10x) enhance test efficiency and coverage.




Also applies to: 100-100

> Likely an incorrect or invalid review comment.

---

`108-127`: **Excellent test synchronization logic.**

The addition of protocol phase synchronization ensures deterministic test execution by waiting for the active phase before creating addresses.

---

`294-298`: **Helpful debugging additions.**

The additional logging of root, root_index, and root history improves test observability without affecting functionality.



Also applies to: 450-450

</details>
<details>
<summary>forester/src/processor/v2/address.rs (3)</summary>

`20-43`: **Clean stream creation abstraction.**

The `create_stream_future` function properly encapsulates the streaming setup with appropriate error handling and configuration extraction.

---

`45-54`: **Well-structured finalization logic.**

The `create_finalizer_future` properly handles RPC connection pooling and delegates to the appropriate finalization function.

---

`57-81`: **Excellent streaming pipeline refactor.**

The transformation to async streaming with delegated batch processing via `process_stream` significantly improves modularity and follows the new architectural pattern consistently. The instruction builder closure pattern is clean and reusable.

</details>
<details>
<summary>forester/tests/test_utils.rs (1)</summary>

`299-307`: **Clean protocol phase calculation.**

The `get_active_phase_start_slot` function provides a reusable way to determine the active phase timing for test synchronization.

</details>
<details>
<summary>forester/tests/batched_state_async_indexer_test.rs (1)</summary>

`3-5`: **Good refactoring to use shared test utilities!**

The consolidation of test utilities into a shared module improves code reuse and maintainability across the test suite.



Also applies to: 47-47

</details>
<details>
<summary>forester/src/indexer_type.rs (1)</summary>

`51-51`: **Appropriate change to immutable RPC references.**

The switch from `&mut R` to `&R` for the RPC parameter is correct since these methods only perform read operations. This change aligns well with the concurrent streaming architecture and allows better shared access patterns.



Also applies to: 100-100, 247-247, 340-340

</details>
<details>
<summary>forester/src/processor/v2/state.rs (5)</summary>

`27-50`: **Well-structured stream creation function.**

The function properly encapsulates stream creation with appropriate error handling.

---

`52-66`: **Clean finalizer implementation.**

Good separation of concerns with the dedicated finalizer function for post-processing.

---

`68-94`: **Excellent refactoring to streaming architecture!**

The refactored function elegantly uses the generic `process_stream` infrastructure, eliminating code duplication and improving maintainability.

---

`96-169`: **Consistent implementation of nullify operations.**

The nullify functions follow the same streaming pattern as append operations, maintaining consistency across the codebase.

---

`171-188`: **Appropriate use of immutable RPC reference.**

The function correctly uses an immutable RPC reference since it only performs read operations.

</details>
<details>
<summary>forester-utils/src/instructions/address_batch_update.rs (2)</summary>

`29-121`: **Well-designed streaming API with proper configuration.**

The `AddressUpdateConfig` struct and `get_address_update_stream` function provide a clean API for stream creation. Good handling of edge cases like empty hash chains.

---

`226-332`: **Well-structured helper functions with proper validation.**

The circuit input preparation is thorough with appropriate bounds checking and hash chain validation.

</details>
<details>
<summary>forester/src/processor/v2/common.rs (3)</summary>

`40-104`: **Excellent generic stream processing abstraction!**

The `process_stream` function provides a reusable, well-tested abstraction for batch processing. Great handling of buffering, edge cases, and comprehensive logging.

---

`137-137`: **Good improvements to BatchProcessor.**

The addition of the `'static` bound enables proper async streaming, and the unified error handling improves consistency across the codebase.



Also applies to: 201-201, 360-360, 399-399

---

`106-122`: **Clean transaction helper function.**

Good encapsulation of transaction sending logic with proper connection management.

</details>
<details>
<summary>forester-utils/src/instructions/state_batch_append.rs (6)</summary>

`1-1`: **LGTM! Appropriate imports for streaming architecture.**

The new imports support the asynchronous streaming architecture well:
- `Pin` for pinning streams in memory
- `async_stream::stream` for creating async streams
- `Mutex` for thread-safe access to the indexer
- `trace` for improved logging




Also applies to: 4-5, 20-21, 23-23

---

`25-53`: **Well-designed function signature for streaming architecture.**

The function properly uses:
- Arc wrappers for thread-safe shared access to RPC pool and indexer
- Lifetime bounds to ensure stream safety
- Appropriate trait bounds (Send + Sync) for concurrent execution

The trace logging at line 51 is helpful for debugging.

---

`53-70`: **Good resource management with explicit drops.**

Excellent practice to:
1. Get the RPC connection and indexer lock
2. Fetch necessary metadata
3. Wait for indexer synchronization
4. Explicitly drop both resources before starting the stream

This prevents holding locks unnecessarily during stream execution.

---

`95-109`: **Thorough validation of queue elements.**

Good checks for:
1. Correct number of elements (lines 95-102)
2. Root consistency between indexer and on-chain state (lines 104-109)

These validations help catch data inconsistencies early.

---

`155-171`: **Clean refactoring of proof generation function.**

Good improvements:
- Simplified return type from tuple to struct
- Clear error propagation
- Proper struct construction

---

`179-180`: **Excellent simplification of error handling.**

The use of the `?` operator instead of explicit error mapping makes the code cleaner and more idiomatic.




Also applies to: 185-185, 200-201, 204-204

</details>
<details>
<summary>forester-utils/src/instructions/state_batch_nullify.rs (4)</summary>

`1-1`: **Consistent imports with append module.**

The imports mirror those in `state_batch_append.rs`, maintaining consistency across the refactored modules.




Also applies to: 4-5, 18-19, 21-21

---

`48-80`: **Well-structured metadata fetching with proper error handling.**

Good improvements:
- Proper error handling for empty root history (lines 70-72)
- Clear destructuring of metadata
- Appropriate use of RPC connection without unnecessary mutability

---

`150-156`: **Good validation of required tx_hash field.**

The nullify operation correctly validates that tx_hash is present for each leaf (line 155), which is essential for nullification tracking.

---

`188-204`: **Consistent refactoring of proof generation function.**

The changes mirror those in the append module, maintaining consistency across the codebase.

</details>

</blockquote></details>

</details>

<!-- This is an auto-generated comment by CodeRabbit for review status -->

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

♻️ Duplicate comments (1)
forester/tests/e2e_v2_test.rs (1)

643-644: Improve empty account handling.

Similar to the previous issue, returning a default signature masks the actual test condition.

🧹 Nitpick comments (5)
forester/src/processor/v2/state.rs (1)

95-98: Remove commented code if no longer needed.

There's substantial commented code related to batch index handling. If this functionality is no longer required after the streaming refactor, consider removing it entirely to maintain code cleanliness.

-    // let batch_index = {
-    //     let rpc = context.rpc_pool.get_connection().await?;
-    //     get_batch_index(context, &*rpc).await?
-    // };
-
...
-// async fn get_batch_index<R: Rpc>(
-//     context: &BatchContext<R, impl Indexer>,
-//     rpc: &R,
-// ) -> Result<usize> {
-//     let mut account = rpc.get_account(context.merkle_tree).await?.ok_or_else(|| {
-//         Error::msg(format!(
-//             "State tree account not found: {}",
-//             context.merkle_tree
-//         ))
-//     })?;
-
-//     let merkle_tree = BatchedMerkleTreeAccount::state_from_bytes(
-//         account.data.as_mut_slice(),
-//         &context.merkle_tree.into(),
-//     )?;
-
-//     Ok(merkle_tree.queue_batches.pending_batch_index as usize)
-// }

Also applies to: 116-133

forester/src/processor/v2/common.rs (1)

37-97: LGTM: Well-designed streaming abstraction with minor concern.

The process_stream function provides excellent reusable streaming batch processing logic. The implementation correctly handles stream buffering, batch sending, and cleanup.

However, consider the memory implications: the instruction_buffer accumulates items up to ixs_per_tx before processing. For high-throughput scenarios, you might want to add explicit backpressure handling if transaction sending becomes a bottleneck.

forester/tests/e2e_v2_test.rs (3)

89-92: Consider security implications of insecure_clone() usage.

While this is test code, using insecure_clone() on keypairs can mask security issues and establish bad patterns. Consider using a test-safe cloning method or documenting why the insecure variant is necessary here.


132-133: Replace debug prints with structured logging.

Debug println statements should be replaced with proper logging using the log crate for better control over output levels and formatting in tests.

-    println!("batch payer pubkey: {:?}", batch_payer.pubkey());
-    println!("legacy payer pubkey: {:?}", legacy_payer.pubkey());
+    log::debug!("batch payer pubkey: {:?}", batch_payer.pubkey());
+    log::debug!("legacy payer pubkey: {:?}", legacy_payer.pubkey());

826-830: Remove commented code.

Commented code should be removed to maintain code cleanliness. If this code is needed, it should be uncommented or documented why it's kept.

-        // let queue_account = rpc
-        //     .get_anchor_account::<QueueAccount>(&account.unwrap().metadata.associated_queue.into())
-        //     .await
-        //     .unwrap();
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 27b8d01 and 0c05113.

📒 Files selected for processing (16)
  • forester/package.json (1 hunks)
  • forester/src/epoch_manager.rs (6 hunks)
  • forester/src/indexer_type.rs (0 hunks)
  • forester/src/lib.rs (1 hunks)
  • forester/src/processor/v2/address.rs (1 hunks)
  • forester/src/processor/v2/common.rs (7 hunks)
  • forester/src/processor/v2/mod.rs (2 hunks)
  • forester/src/processor/v2/state.rs (1 hunks)
  • forester/src/rollover/mod.rs (1 hunks)
  • forester/tests/address_v2_test.rs (0 hunks)
  • forester/tests/batched_address_test.rs (0 hunks)
  • forester/tests/batched_state_async_indexer_test.rs (0 hunks)
  • forester/tests/batched_state_indexer_test.rs (0 hunks)
  • forester/tests/batched_state_test.rs (0 hunks)
  • forester/tests/e2e_v1_test.rs (1 hunks)
  • forester/tests/e2e_v2_test.rs (1 hunks)
💤 Files with no reviewable changes (6)
  • forester/tests/batched_state_test.rs
  • forester/tests/batched_state_indexer_test.rs
  • forester/tests/batched_address_test.rs
  • forester/tests/address_v2_test.rs
  • forester/tests/batched_state_async_indexer_test.rs
  • forester/src/indexer_type.rs
✅ Files skipped from review due to trivial changes (1)
  • forester/tests/e2e_v1_test.rs
🚧 Files skipped from review as they are similar to previous changes (3)
  • forester/src/processor/v2/mod.rs
  • forester/src/processor/v2/address.rs
  • forester/src/epoch_manager.rs
⏰ Context from checks skipped due to timeout of 90000ms (20)
  • GitHub Check: Test sdk-libs
  • GitHub Check: Test program-libs-slow
  • GitHub Check: Test program-libs-fast
  • GitHub Check: Test concurrent-merkle-tree
  • GitHub Check: Test batched-merkle-tree-simulate
  • GitHub Check: stateless-js-v1
  • GitHub Check: test-double-registration
  • GitHub Check: test-state-batched
  • GitHub Check: test-state-photon-batched
  • GitHub Check: test-2-foresters
  • GitHub Check: system-programs (sdk-test-program, ["cargo-test-sbf -p sdk-test"])
  • GitHub Check: test-address-batched
  • GitHub Check: system-programs (token-escrow-test, [ "cargo test-sbf -p token-escrow" ])
  • GitHub Check: test-e2e
  • GitHub Check: system-programs (sdk-anchor-test-program, ["cargo-test-sbf -p sdk-anchor-test", "cargo-test-sbf -...
  • GitHub Check: stateless-js-v2
  • GitHub Check: system-programs (counter-test, ["cargo test-sbf -p counter"])
  • GitHub Check: cli-v1
  • GitHub Check: cli-v2
  • GitHub Check: lint
🔇 Additional comments (13)
forester/src/lib.rs (1)

112-112: LGTM: Clean trait bound simplification.

The removal of the IndexerType<R> trait bound aligns with the broader architectural simplification where the IndexerType trait and its implementations were removed entirely. The remaining Indexer + 'static constraint should provide all necessary functionality.

forester/package.json (1)

8-10: LGTM: Well-organized test script consolidation.

The consolidation from legacy batched test scripts to comprehensive e2e tests (test_e2e_v2 and test_e2e_v1) provides better coverage of the new streaming batch processing pipeline. The naming is clear and logging configuration is consistent.

forester/src/rollover/mod.rs (2)

17-43: LGTM: Well-structured rollover function.

The rollover_state_merkle_tree function is properly implemented with:

  • Correct async signature and error handling
  • Fresh keypair generation for new tree components
  • Appropriate logging of transaction signatures
  • Proper error propagation using ForesterError

45-69: LGTM: Consistent rollover implementation.

The rollover_address_merkle_tree function follows the same well-structured pattern as the state rollover function, with appropriate keypair generation, function calls, and logging.

forester/src/processor/v2/state.rs (4)

22-45: LGTM: Well-implemented streaming helper.

The create_append_stream_future function properly creates an asynchronous stream from the instruction generation function, with appropriate error handling and return type.


47-65: LGTM: Clean streaming refactor.

The perform_append function is much cleaner with the streaming approach, using the shared process_stream abstraction and eliminating manual chunking logic. The instruction builder closure is well-structured.


67-89: LGTM: Consistent streaming implementation.

The create_nullify_stream_future function follows the same well-structured pattern as the append version, maintaining consistency across the codebase.


91-114: LGTM: Simplified nullify processing.

The perform_nullify function benefits from the same streaming simplification, making the code more maintainable and consistent.

forester/src/processor/v2/common.rs (5)

90-90: Verify the total items calculation logic.

The calculation total_instructions_processed * zkp_batch_size as usize assumes each instruction processes zkp_batch_size items. Please verify this is the correct relationship between instructions and the actual items being processed.


99-115: LGTM: Clean transaction batch helper.

The send_transaction_batch function is well-implemented with appropriate logging and error handling.


130-130: LGTM: Proper trait bound simplification.

The removal of the IndexerType<R> trait bound and addition of 'static lifetime bound aligns with the architectural simplification while maintaining necessary constraints.


194-194: LGTM: Improved error handling.

The update to use ForesterError::InvalidTreeType provides more specific error information compared to the previous generic error type.


353-353: LGTM: Simplified function calls.

The removal of the mutable RPC argument from state::perform_append and state::perform_nullify calls aligns with the new streaming approach where RPC connections are managed internally.

Also applies to: 392-392

@sergeytimoshin sergeytimoshin force-pushed the sergey/batch-process-optimisation branch from 0c05113 to 693ca82 Compare June 22, 2025 16:31
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (4)
forester/tests/e2e_v2_test.rs (4)

361-375: Refactor function to reduce parameter count.

This function has 10 parameters which violates clean code principles. Consider grouping related parameters into structs for better maintainability.


516-518: Improve empty account handling.

Returning a default signature when no accounts are found can mask test failures. Consider returning a Result or logging this condition.


770-772: Replace panic with assertion or error propagation.

In test code, using assert! or expect with a descriptive message is more idiomatic than explicit panics.


235-237: Replace panic with proper error handling.

Instead of panicking on unsupported tree types, return a proper error that can be handled by the caller.

🧹 Nitpick comments (4)
forester/src/processor/v2/state.rs (1)

95-133: Remove commented-out code.

The commented-out get_batch_index function should be removed entirely rather than left as commented code to keep the codebase clean.

-    // let batch_index = {
-    //     let rpc = context.rpc_pool.get_connection().await?;
-    //     get_batch_index(context, &*rpc).await?
-    // };
-
+

And remove the entire commented get_batch_index function:

-// async fn get_batch_index<R: Rpc>(
-//     context: &BatchContext<R, impl Indexer>,
-//     rpc: &R,
-// ) -> Result<usize> {
-//     let mut account = rpc.get_account(context.merkle_tree).await?.ok_or_else(|| {
-//         Error::msg(format!(
-//             "State tree account not found: {}",
-//             context.merkle_tree
-//         ))
-//     })?;
-
-//     let merkle_tree = BatchedMerkleTreeAccount::state_from_bytes(
-//         account.data.as_mut_slice(),
-//         &context.merkle_tree.into(),
-//     )?;
-
-//     Ok(merkle_tree.queue_batches.pending_batch_index as usize)
-// }
+
forester/tests/e2e_v2_test.rs (3)

66-66: Consider making timeout configurable.

The 15-minute timeout is hardcoded. Consider making this configurable via environment variable to accommodate different CI environments or debugging scenarios.

-const DEFAULT_TIMEOUT_SECONDS: u64 = 60 * 15;
+const DEFAULT_TIMEOUT_SECONDS: u64 = std::env::var("E2E_TIMEOUT_SECONDS")
+    .ok()
+    .and_then(|s| s.parse().ok())
+    .unwrap_or(60 * 15);

158-160: Make RNG seed configurable for test reproducibility.

While logging the seed is helpful, consider making it configurable via environment variable to reproduce specific test failures.

-    let rng_seed = rand::thread_rng().gen::<u64>();
+    let rng_seed = std::env::var("TEST_RNG_SEED")
+        .ok()
+        .and_then(|s| s.parse().ok())
+        .unwrap_or_else(|| rand::thread_rng().gen::<u64>());
     println!("seed {}", rng_seed);

383-384: Extract magic numbers to named constants.

The compress amount values (1_000_000 for first iteration, 10_000 for others) should be extracted to named constants for clarity.

+const INITIAL_COMPRESS_AMOUNT: u64 = 1_000_000;
+const REGULAR_COMPRESS_AMOUNT: u64 = 10_000;
+
         let batch_compress_sig = compress(
             rpc,
             &env.v2_state_trees[0].output_queue,
             batch_payer,
-            if i == 0 { 1_000_000 } else { 10_000 },
+            if i == 0 { INITIAL_COMPRESS_AMOUNT } else { REGULAR_COMPRESS_AMOUNT },
             sender_batched_accs_counter,
         )
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 0c05113 and 693ca82.

⛔ Files ignored due to path filters (1)
  • Cargo.lock is excluded by !**/*.lock
📒 Files selected for processing (33)
  • .github/workflows/forester-tests.yml (1 hunks)
  • cli/src/utils/constants.ts (1 hunks)
  • forester-utils/Cargo.toml (1 hunks)
  • forester-utils/src/error.rs (2 hunks)
  • forester-utils/src/instructions/address_batch_update.rs (5 hunks)
  • forester-utils/src/instructions/state_batch_append.rs (3 hunks)
  • forester-utils/src/instructions/state_batch_nullify.rs (1 hunks)
  • forester-utils/src/lib.rs (1 hunks)
  • forester-utils/src/utils.rs (1 hunks)
  • forester/Cargo.toml (2 hunks)
  • forester/package.json (1 hunks)
  • forester/src/cli.rs (2 hunks)
  • forester/src/epoch_manager.rs (6 hunks)
  • forester/src/errors.rs (0 hunks)
  • forester/src/indexer_type.rs (0 hunks)
  • forester/src/lib.rs (1 hunks)
  • forester/src/processor/v1/helpers.rs (1 hunks)
  • forester/src/processor/v2/address.rs (1 hunks)
  • forester/src/processor/v2/common.rs (7 hunks)
  • forester/src/processor/v2/error.rs (0 hunks)
  • forester/src/processor/v2/mod.rs (2 hunks)
  • forester/src/processor/v2/state.rs (1 hunks)
  • forester/src/rollover/mod.rs (1 hunks)
  • forester/tests/address_v2_test.rs (0 hunks)
  • forester/tests/batched_address_test.rs (0 hunks)
  • forester/tests/batched_state_async_indexer_test.rs (0 hunks)
  • forester/tests/batched_state_indexer_test.rs (0 hunks)
  • forester/tests/batched_state_test.rs (0 hunks)
  • forester/tests/e2e_v1_test.rs (1 hunks)
  • forester/tests/e2e_v2_test.rs (1 hunks)
  • forester/tests/test_utils.rs (2 hunks)
  • scripts/install.sh (4 hunks)
  • sdk-libs/client/src/indexer/photon_indexer.rs (1 hunks)
💤 Files with no reviewable changes (8)
  • forester/src/errors.rs
  • forester/tests/batched_state_indexer_test.rs
  • forester/src/processor/v2/error.rs
  • forester/tests/batched_state_test.rs
  • forester/tests/batched_address_test.rs
  • forester/tests/address_v2_test.rs
  • forester/src/indexer_type.rs
  • forester/tests/batched_state_async_indexer_test.rs
✅ Files skipped from review due to trivial changes (2)
  • cli/src/utils/constants.ts
  • forester/tests/test_utils.rs
🚧 Files skipped from review as they are similar to previous changes (16)
  • forester-utils/src/lib.rs
  • forester-utils/Cargo.toml
  • forester/tests/e2e_v1_test.rs
  • forester/Cargo.toml
  • forester/src/processor/v1/helpers.rs
  • forester-utils/src/utils.rs
  • forester/src/lib.rs
  • forester/package.json
  • sdk-libs/client/src/indexer/photon_indexer.rs
  • scripts/install.sh
  • forester/src/cli.rs
  • forester/src/processor/v2/mod.rs
  • forester-utils/src/error.rs
  • forester/src/rollover/mod.rs
  • forester/src/processor/v2/address.rs
  • forester/src/epoch_manager.rs
⏰ Context from checks skipped due to timeout of 90000ms (16)
  • GitHub Check: system-programs (sdk-anchor-test-program, ["cargo-test-sbf -p sdk-anchor-test", "cargo-test-sbf -...
  • GitHub Check: system-programs (token-escrow-test, [ "cargo test-sbf -p token-escrow" ])
  • GitHub Check: system-programs (sdk-test-program, ["cargo-test-sbf -p sdk-test"])
  • GitHub Check: system-programs (counter-test, ["cargo test-sbf -p counter"])
  • GitHub Check: Test program-libs-slow
  • GitHub Check: Test batched-merkle-tree-simulate
  • GitHub Check: Test sdk-libs
  • GitHub Check: Test program-libs-fast
  • GitHub Check: Test concurrent-merkle-tree
  • GitHub Check: stateless-js-v1
  • GitHub Check: stateless-js-v2
  • GitHub Check: test-e2e (legacy)
  • GitHub Check: test-e2e
  • GitHub Check: cli-v1
  • GitHub Check: lint
  • GitHub Check: cli-v2
🔇 Additional comments (18)
.github/workflows/forester-tests.yml (1)

38-52: LGTM! Workflow simplification aligns with the refactoring goals.

The consolidation from multiple granular test configurations to two comprehensive end-to-end tests makes sense given the broader architectural changes moving to streaming batch processing. This should provide better overall test coverage while reducing CI complexity.

forester/src/processor/v2/state.rs (2)

22-45: Well-designed streaming helper function.

The create_append_stream_future function properly encapsulates the stream creation logic with appropriate error mapping from the underlying instruction stream functions to anyhow::Error. The size hint return along with the stream enables proper batch processing downstream.


47-65: Clean refactor to use streaming abstraction.

The perform_append function is now much cleaner by delegating to the shared process_stream function. The instruction builder closure is a good pattern for converting instruction data to Solana instructions.

forester-utils/src/instructions/address_batch_update.rs (4)

29-40: Well-designed configuration struct.

The AddressUpdateConfig struct properly encapsulates all the necessary parameters for address updates with appropriate trait bounds. Using Arc<Mutex<I>> for the indexer enables safe concurrent access across the streaming pipeline.


42-121: Excellent streaming implementation.

The get_address_update_stream function properly:

  • Fetches initial on-chain state to set up the stream
  • Validates the merkle tree account and extracts necessary data
  • Returns an empty stream when no work is needed
  • Delegates to the streaming implementation with proper error handling

The early return with an empty stream when leaves_hash_chains.is_empty() is a good optimization.


138-224: Robust concurrent streaming implementation.

The stream_instruction_data function demonstrates excellent async streaming patterns:

  • Proper chunking based on photon element limits
  • Concurrent proof generation using future::join_all
  • Comprehensive error handling with early returns on failures
  • Proper indexer synchronization and root validation
  • Good logging for debugging

The concurrent proof generation approach should significantly improve performance compared to sequential processing.


226-228: Good chunk size calculation.

The calculate_max_zkp_batches_per_call function properly ensures at least 1 batch while respecting the photon element limit. The std::cmp::max(1, ...) prevents division by zero issues.

forester/src/processor/v2/common.rs (4)

37-97: Excellent generic streaming abstraction.

The process_stream function provides a well-designed abstraction that:

  • Properly handles asynchronous streams with buffering
  • Maintains transaction size limits via ixs_per_tx
  • Provides good error propagation and logging
  • Returns meaningful metrics (total items processed)
  • Uses generic types effectively for reusability

The buffering logic correctly handles both full buffers and remaining items after stream completion.


99-115: Clean transaction sending abstraction.

The send_transaction_batch helper function properly encapsulates transaction creation and sending with good logging. The use of the RPC pool for connection management is appropriate.


130-130: Good simplification of trait bounds.

The removal of the IndexerType<R> bound and addition of 'static lifetime bound simplifies the generic constraints while maintaining the necessary functionality for the streaming approach.


353-353: Simplified function calls align with refactor.

The removal of the mutable RPC argument from state::perform_append and state::perform_nullify calls aligns with the new streaming approach where RPC connections are managed by the shared context.

forester-utils/src/instructions/state_batch_nullify.rs (3)

24-85: Well-implemented streaming nullify function.

The get_nullify_instruction_stream function properly:

  • Extracts on-chain state and validates consistency
  • Returns appropriate stream types with size hints
  • Handles empty hash chains gracefully with early return
  • Synchronizes with indexer before starting stream processing
  • Uses proper error types throughout

The early return with empty stream when no hash chains need processing is a good optimization.


92-183: Excellent concurrent streaming implementation.

The streaming logic demonstrates strong async patterns:

  • Proper validation of queue elements against expected counts
  • Root consistency checks between indexer and on-chain state
  • Concurrent proof generation using future::join_all
  • Clean error propagation through the stream
  • Good separation of batch processing logic

The concurrent proof generation approach should provide significant performance improvements.


188-204: Clean proof generation helper.

The generate_nullify_zkp_proof function properly encapsulates proof generation with good error mapping from the prover client to ForesterUtilsError. The return type change to a single struct aligns well with the streaming interface.

forester-utils/src/instructions/state_batch_append.rs (4)

53-54: Good use of connection pooling.

The implementation correctly uses the RPC connection pool and properly releases the connection after use, improving resource utilization.


54-54: Efficient mutex usage pattern.

The implementation correctly acquires and releases the indexer mutex - first for initial validation, then dropping it before the stream starts, and re-acquiring it within the stream. This pattern maximizes concurrency by not holding the lock for the entire stream lifetime.

Also applies to: 69-70, 76-76


88-93: Robust error handling in streaming context.

The error handling pattern correctly yields errors through the stream and terminates processing on critical failures. Error messages are descriptive and will aid in debugging.

Also applies to: 95-108, 129-133


111-150: Excellent concurrent proof generation pattern.

The implementation efficiently generates all proofs concurrently using future::join_all, maximizing throughput while properly handling both successful results and errors through the stream.

@sergeytimoshin sergeytimoshin force-pushed the sergey/batch-process-optimisation branch from 693ca82 to bdf1922 Compare July 1, 2025 13:49
@sergeytimoshin sergeytimoshin force-pushed the sergey/batch-process-optimisation branch from bdf1922 to 667d9b0 Compare July 1, 2025 13:54
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 12

♻️ Duplicate comments (3)
forester/tests/e2e_v2_test.rs (3)

1192-1194: Replace panic with proper error propagation in tests.

-        Err(e) => {
-            panic!("compress error: {:?}", e);
-        }
+        Err(e) => panic!("Failed to compress account: {:?}", e),

724-737: Refactor function to reduce parameter count.

This function has too many parameters which makes it difficult to maintain and test.

Group related parameters into structs:

+struct TransactionCounters {
+    sender_batched_accs: u64,
+    sender_legacy_accs: u64,
+    sender_batched_token: u64,
+    address_v1: u64,
+    address_v2: u64,
+}
+
+struct MintPubkeys {
+    v1_mint: Option<Pubkey>,
+    v2_mint: Option<Pubkey>,
+}
+
 async fn execute_test_transactions<R: Rpc + Indexer + MerkleTreeExt, I: Indexer>(
     rpc: &mut R,
     indexer: &mut I,
     rng: &mut StdRng,
     env: &TestAccounts,
     payer: &Keypair,
-    v1_mint_pubkey: Option<&Pubkey>,
-    v2_mint_pubkey: Option<&Pubkey>,
-    sender_batched_accs_counter: &mut u64,
-    sender_legacy_accs_counter: &mut u64,
-    sender_batched_token_counter: &mut u64,
-    address_v1_counter: &mut u64,
-    address_v2_counter: &mut u64,
+    mint_pubkeys: &MintPubkeys,
+    counters: &mut TransactionCounters,
 ) {

907-909: Improve empty account handling to avoid masking test failures.

Returning a default signature when no accounts are found can hide test failures.

-    if input_compressed_accounts.is_empty() {
-        return Signature::default();
-    }
+    assert!(!input_compressed_accounts.is_empty(), 
+            "No compressed token accounts found for transfer");
🧹 Nitpick comments (11)
forester/scripts/compare_performance.py (2)

125-128: Simplify nested if statements

Combine the conditions into a single if statement for better readability.

-            # Response time: empty -> has_elements (filter out immediate responses)
-            elif prev_state == 'queue_empty' and curr_state == 'queue_has_elements':
-                if duration > 0.01:  # Filter immediate responses
-                    results['queue_response_times'].append(duration)
+            # Response time: empty -> has_elements (filter out immediate responses)
+            elif prev_state == 'queue_empty' and curr_state == 'queue_has_elements' and duration > 0.01:
+                results['queue_response_times'].append(duration)

190-303: Refactor large method into smaller, focused methods

This method is too complex with 79 statements, 19 local variables, and 14 branches. Consider breaking it down for better maintainability.

Consider extracting the following into separate methods:

  • Queue performance analysis (lines 212-240)
  • Transaction performance analysis (lines 241-252)
  • Processing rate analysis (lines 253-264)
  • Key insights generation (lines 265-301)

Example refactor:

def analyze_and_compare(self, old_file: str, new_file: str):
    """Main analysis and comparison function."""
    self._print_header()
    old_results, new_results = self._analyze_files(old_file, new_file)
    self._print_summary(old_results, new_results)
    
    queue_comparison = self._analyze_queue_performance(old_results, new_results)
    tx_comparison = self._analyze_transaction_performance(old_results, new_results)
    rate_comparison = self._analyze_processing_rate(old_results, new_results)
    
    self._generate_insights(queue_comparison, tx_comparison, rate_comparison)
forester/README.md (1)

113-113: Remove trailing colons from headings

The static analysis correctly identifies trailing punctuation in these headings. Remove the colons to follow Markdown best practices.

-#### Required for Devnet mode:
+#### Required for Devnet mode
-#### Required for both modes:
+#### Required for both modes
-#### Example configurations:
+#### Example configurations

Also applies to: 121-121, 125-125

forester/tests/legacy/priority_fee_test.rs (1)

13-14: Move module declarations to the top of the file

Module declarations should be at the top of the file for better organization and Rust conventions.

Move lines 13-14 to after line 12:

 use solana_sdk::signature::Signer;
 
+mod test_utils;
 use crate::test_utils::init;
-mod test_utils;
forester/tests/legacy/test_utils.rs (1)

19-24: Consider making init function more explicit about side effects

The init function spawns a test validator as a side effect. Consider renaming to clarify this behavior.

-pub async fn init(config: Option<LightValidatorConfig>) {
+pub async fn init_with_validator(config: Option<LightValidatorConfig>) {
     setup_telemetry();
     register_metrics();
     spawn_test_validator(config).await;
 }
forester/src/processor/v2/common.rs (2)

121-130: Potential division by zero in metrics calculation.

While checking total_duration.as_secs_f64() > 0.0 prevents division by zero, the current approach sets TPS/IPS to 0.0 when duration is very small. This could mask performance in fast operations.

Consider using a minimum duration threshold:

-    let tps = if total_duration.as_secs_f64() > 0.0 {
-        transactions_sent as f64 / total_duration.as_secs_f64()
-    } else {
-        0.0
-    };
+    let duration_secs = total_duration.as_secs_f64().max(0.001); // Minimum 1ms
+    let tps = transactions_sent as f64 / duration_secs;

291-300: Valuable queue metrics addition.

The queue state logging provides excellent visibility for monitoring. Consider standardizing the metric format for easier parsing.

Consider using consistent key-value formatting:

-            info!("QUEUE_METRIC: queue_has_elements tree_type={} tree={} input_ready={} output_ready={}",
-                self.tree_type, self.context.merkle_tree, input_ready, output_ready);
+            info!(
+                "QUEUE_METRIC: queue_has_elements tree_type={} tree={} input_ready={} output_ready={}",
+                self.tree_type, self.context.merkle_tree, input_ready, output_ready
+            );
forester/tests/e2e_v1_test.rs (2)

462-483: Extract magic number to constant.

+const REGISTRATION_TEST_ITERATIONS: usize = 10;
+
-    for _ in 0..10 {
+    for _ in 0..REGISTRATION_TEST_ITERATIONS {

509-556: Consider returning the total work instead of mutating parameter.

The function correctly validates queue lengths, but mutating the total_expected_work parameter is an unusual pattern.

Consider returning the accumulated work:

-pub async fn assert_queue_len(
-    pool: &SolanaRpcPool<LightClient>,
-    state_trees: &[StateMerkleTreeAccounts],
-    address_trees: &[AddressMerkleTreeAccounts],
-    total_expected_work: &mut u64,
-    expected_len: usize,
-    not_empty: bool,
-) {
+pub async fn assert_queue_len(
+    pool: &SolanaRpcPool<LightClient>,
+    state_trees: &[StateMerkleTreeAccounts],
+    address_trees: &[AddressMerkleTreeAccounts],
+    expected_len: usize,
+    not_empty: bool,
+) -> u64 {
+    let mut total_work = 0u64;
     // ... existing logic ...
-        *total_expected_work += queue_length as u64;
+        total_work += queue_length as u64;
     // ...
+    total_work
+}
forester-utils/src/instructions/address_batch_update.rs (1)

159-164: Consider improving error handling in the stream.

Using return in an async stream might not properly clean up resources. Consider yielding errors and continuing when possible, or ensure proper cleanup.

Consider yielding all errors instead of early return:

-                    Err(e) => {
-                        yield Err(ForesterUtilsError::Indexer(format!("Failed to get address queue with proofs: {}", e)));
-                        return;
-                    }
+                    Err(e) => {
+                        yield Err(ForesterUtilsError::Indexer(format!("Failed to get address queue with proofs: {}", e)));
+                        continue; // Skip this chunk but continue processing
+                    }
forester/tests/e2e_v2_test.rs (1)

86-124: Consider consolidating configuration functions.

The multiple get_* functions follow a similar pattern. Consider creating a configuration struct to reduce repetition.

Create a configuration struct:

+struct TestConfig {
+    rpc_url: String,
+    ws_rpc_url: String,
+    indexer_url: String,
+    prover_url: String,
+    api_key: Option<String>,
+}
+
+impl TestConfig {
+    fn from_env() -> Self {
+        match TestMode::from_env() {
+            TestMode::Local => Self {
+                rpc_url: "http://localhost:8899".to_string(),
+                ws_rpc_url: "ws://localhost:8900".to_string(),
+                indexer_url: "http://localhost:8784".to_string(),
+                prover_url: "http://localhost:3001".to_string(),
+                api_key: None,
+            },
+            TestMode::Devnet => Self {
+                rpc_url: get_env_var("PHOTON_RPC_URL"),
+                ws_rpc_url: get_env_var("PHOTON_WSS_RPC_URL"),
+                indexer_url: get_env_var("PHOTON_INDEXER_URL"),
+                prover_url: get_env_var("PHOTON_PROVER_URL"),
+                api_key: Some(get_env_var("PHOTON_API_KEY")),
+            },
+        }
+    }
+}
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 693ca82 and 979e569.

⛔ Files ignored due to path filters (1)
  • Cargo.lock is excluded by !**/*.lock
📒 Files selected for processing (46)
  • .github/workflows/forester-tests.yml (1 hunks)
  • cli/src/utils/constants.ts (1 hunks)
  • forester-utils/Cargo.toml (1 hunks)
  • forester-utils/src/error.rs (2 hunks)
  • forester-utils/src/instructions/address_batch_update.rs (5 hunks)
  • forester-utils/src/instructions/state_batch_append.rs (3 hunks)
  • forester-utils/src/instructions/state_batch_nullify.rs (1 hunks)
  • forester-utils/src/lib.rs (1 hunks)
  • forester-utils/src/rpc_pool.rs (6 hunks)
  • forester-utils/src/utils.rs (1 hunks)
  • forester/Cargo.toml (2 hunks)
  • forester/README.md (2 hunks)
  • forester/package.json (1 hunks)
  • forester/scripts/compare_performance.py (1 hunks)
  • forester/scripts/v2_stats.py (1 hunks)
  • forester/src/cli.rs (2 hunks)
  • forester/src/config.rs (1 hunks)
  • forester/src/epoch_manager.rs (10 hunks)
  • forester/src/errors.rs (0 hunks)
  • forester/src/forester_status.rs (1 hunks)
  • forester/src/indexer_type.rs (0 hunks)
  • forester/src/lib.rs (2 hunks)
  • forester/src/processor/v1/helpers.rs (1 hunks)
  • forester/src/processor/v1/send_transaction.rs (2 hunks)
  • forester/src/processor/v2/address.rs (1 hunks)
  • forester/src/processor/v2/common.rs (8 hunks)
  • forester/src/processor/v2/error.rs (0 hunks)
  • forester/src/processor/v2/mod.rs (2 hunks)
  • forester/src/processor/v2/state.rs (1 hunks)
  • forester/src/rollover/mod.rs (1 hunks)
  • forester/tests/address_v2_test.rs (1 hunks)
  • forester/tests/batched_state_async_indexer_test.rs (2 hunks)
  • forester/tests/e2e_v1_test.rs (1 hunks)
  • forester/tests/e2e_v2_test.rs (1 hunks)
  • forester/tests/legacy/priority_fee_test.rs (1 hunks)
  • forester/tests/legacy/test_utils.rs (1 hunks)
  • forester/tests/test_utils.rs (2 hunks)
  • program-tests/utils/src/setup_accounts.rs (1 hunks)
  • scripts/install.sh (4 hunks)
  • sdk-libs/client/src/indexer/photon_indexer.rs (1 hunks)
  • sdk-libs/client/src/rpc/rpc_trait.rs (3 hunks)
  • xtask/src/create_batch_address_tree.rs (1 hunks)
  • xtask/src/create_batch_state_tree.rs (1 hunks)
  • xtask/src/create_state_tree.rs (1 hunks)
  • xtask/src/create_update_protocol_config_ix.rs (1 hunks)
  • xtask/src/new_deployment.rs (1 hunks)
💤 Files with no reviewable changes (3)
  • forester/src/processor/v2/error.rs
  • forester/src/errors.rs
  • forester/src/indexer_type.rs
✅ Files skipped from review due to trivial changes (6)
  • program-tests/utils/src/setup_accounts.rs
  • xtask/src/create_batch_state_tree.rs
  • forester/src/processor/v1/send_transaction.rs
  • forester/src/config.rs
  • forester/Cargo.toml
  • forester/tests/test_utils.rs
🚧 Files skipped from review as they are similar to previous changes (18)
  • forester-utils/src/lib.rs
  • forester-utils/Cargo.toml
  • forester/src/processor/v1/helpers.rs
  • scripts/install.sh
  • forester/tests/address_v2_test.rs
  • forester/package.json
  • forester/src/processor/v2/mod.rs
  • .github/workflows/forester-tests.yml
  • sdk-libs/client/src/indexer/photon_indexer.rs
  • forester-utils/src/utils.rs
  • cli/src/utils/constants.ts
  • forester/src/cli.rs
  • forester/src/lib.rs
  • forester/src/processor/v2/state.rs
  • forester-utils/src/error.rs
  • forester/src/rollover/mod.rs
  • forester/src/processor/v2/address.rs
  • forester/src/epoch_manager.rs
🧰 Additional context used
🧬 Code Graph Analysis (1)
forester/scripts/compare_performance.py (1)
forester/scripts/v2_stats.py (3)
  • clean_line (37-39)
  • parse_timestamp (41-46)
  • main (180-211)
🪛 Ruff (0.11.9)
forester/scripts/compare_performance.py

11-11: collections.defaultdict imported but unused

Remove unused import: collections.defaultdict

(F401)


12-12: typing.Tuple imported but unused

Remove unused import: typing.Tuple

(F401)


126-127: Use a single if statement instead of nested if statements

(SIM102)

forester/scripts/v2_stats.py

9-9: datetime.timedelta imported but unused

Remove unused import: datetime.timedelta

(F401)


11-11: typing.Tuple imported but unused

Remove unused import: typing.Tuple

(F401)


14-14: json imported but unused

Remove unused import: json

(F401)


113-113: f-string without any placeholders

Remove extraneous f prefix

(F541)


190-193: Use ternary operator log_file = sys.stdin if args.logfile == '-' else open(args.logfile, 'r') instead of if-else-block

Replace if-else-block with log_file = sys.stdin if args.logfile == '-' else open(args.logfile, 'r')

(SIM108)


193-193: Use a context manager for opening files

(SIM115)

🪛 Flake8 (7.2.0)
forester/scripts/compare_performance.py

[error] 11-11: 'collections.defaultdict' imported but unused

(F401)


[error] 12-12: 'typing.Tuple' imported but unused

(F401)


[error] 15-15: expected 2 blank lines, found 1

(E302)


[error] 304-304: expected 2 blank lines, found 1

(E302)


[error] 321-321: expected 2 blank lines after class or function definition, found 1

(E305)

forester/scripts/v2_stats.py

[error] 9-9: 'datetime.timedelta' imported but unused

(F401)


[error] 11-11: 'typing.Tuple' imported but unused

(F401)


[error] 14-14: 'json' imported but unused

(F401)


[error] 16-16: expected 2 blank lines, found 1

(E302)


[error] 113-113: f-string is missing placeholders

(F541)


[error] 180-180: expected 2 blank lines, found 1

(E302)


[error] 213-213: expected 2 blank lines after class or function definition, found 1

(E305)

🪛 Pylint (3.3.7)
forester/scripts/compare_performance.py

[refactor] 36-36: Too many local variables (17/15)

(R0914)


[error] 83-83: Value 'current_operation' is unsubscriptable

(E1136)


[error] 87-87: Value 'current_operation' is unsubscriptable

(E1136)


[refactor] 190-190: Too many local variables (19/15)

(R0914)


[refactor] 190-190: Too many branches (14/12)

(R0912)


[refactor] 190-190: Too many statements (79/50)

(R0915)

forester/scripts/v2_stats.py

[refactor] 193-193: Consider using 'with' for resource-allocating operations

(R1732)

🪛 markdownlint-cli2 (0.17.2)
forester/README.md

113-113: Trailing punctuation in heading
Punctuation: ':'

(MD026, no-trailing-punctuation)


121-121: Trailing punctuation in heading
Punctuation: ':'

(MD026, no-trailing-punctuation)


125-125: Trailing punctuation in heading
Punctuation: ':'

(MD026, no-trailing-punctuation)

⏰ Context from checks skipped due to timeout of 90000ms (25)
  • GitHub Check: system-programs (system-cpi-test-v2-other, ["cargo-test-sbf -p system-cpi-v2-test -- --skip funct...
  • GitHub Check: system-programs (system-cpi-test-v2-event, ["cargo-test-sbf -p system-cpi-v2-test -- event::parse"])
  • GitHub Check: system-programs (system-cpi-test, ["cargo-test-sbf -p system-cpi-test"])
  • GitHub Check: system-programs (light-compressed-token, ["cargo-test-sbf -p compressed-token-test"])
  • GitHub Check: system-programs (light-registry, ["cargo-test-sbf -p registry-test"])
  • GitHub Check: system-programs (account-compression, ["cargo-test-sbf -p account-compression-test"])
  • GitHub Check: system-programs (random-e2e-test, ["cargo-test-sbf -p e2e-test"])
  • GitHub Check: system-programs (light-system-program, ["cargo-test-sbf -p system-test"])
  • GitHub Check: system-programs (system-cpi-test-v2-functional, ["cargo-test-sbf -p system-cpi-v2-test -- functio...
  • GitHub Check: cli-v1
  • GitHub Check: lint
  • GitHub Check: system-programs (token-escrow-test, [ "cargo test-sbf -p token-escrow" ])
  • GitHub Check: system-programs (sdk-anchor-test-program, ["cargo-test-sbf -p sdk-anchor-test", "cargo-test-sbf -...
  • GitHub Check: system-programs (counter-test, ["cargo test-sbf -p counter"])
  • GitHub Check: system-programs (sdk-test-program, ["cargo-test-sbf -p sdk-test"])
  • GitHub Check: cli-v2
  • GitHub Check: test-e2e
  • GitHub Check: test-e2e (legacy)
  • GitHub Check: Test sdk-libs
  • GitHub Check: Test program-libs-fast
  • GitHub Check: Test concurrent-merkle-tree
  • GitHub Check: Test batched-merkle-tree-simulate
  • GitHub Check: Test program-libs-slow
  • GitHub Check: stateless-js-v1
  • GitHub Check: stateless-js-v2
🔇 Additional comments (35)
forester/scripts/v2_stats.py (1)

16-36: Well-structured initialization with comprehensive regex patterns

The regex patterns effectively capture all necessary V2 TPS metric fields and the ANSI escape pattern is standard.

forester/scripts/compare_performance.py (1)

49-49: Good use of context manager for file handling

The file is properly opened using a context manager, ensuring it's closed even if an exception occurs.

xtask/src/create_update_protocol_config_ix.rs (1)

35-41: Consistent API key field addition

The addition of api_key: None is consistent with the pattern across other files in this PR.

xtask/src/create_batch_address_tree.rs (1)

42-48: Consistent API key field addition

The addition of api_key: None follows the same pattern as other files in this PR.

xtask/src/new_deployment.rs (1)

56-62: Consistent API key field addition

The addition of api_key: None maintains consistency with the API key support added throughout the codebase.

xtask/src/create_state_tree.rs (1)

52-52: LGTM! API key field added for consistency.

The addition of api_key: None aligns with the broader API key support implementation across the codebase. Setting it to None is appropriate for this CLI tool context.

forester/src/forester_status.rs (1)

176-177: LGTM! Proper configuration of indexer URL and API key.

This change correctly configures the LightClient with the indexer URL and API key from the external services configuration, enabling authenticated access to the Photon indexer service. This replaces the previous hardcoded None values and aligns with the broader API key support implementation.

sdk-libs/client/src/rpc/rpc_trait.rs (4)

31-31: LGTM! API key field added to support authenticated access.

The addition of the optional api_key field to LightClientConfig enables authenticated access to external services. Using Option<String> provides flexibility for both authenticated and non-authenticated use cases.


36-44: LGTM! Constructor properly handles API key parameter.

The new method correctly accepts and initializes the api_key parameter, maintaining consistency with other configuration fields.


45-63: LGTM! Factory methods maintain backward compatibility.

The local_no_indexer() and local() factory methods correctly default api_key to None, preserving backward compatibility for local development scenarios.


65-73: LGTM! Devnet method properly supports API key configuration.

The devnet method correctly accepts an api_key parameter and initializes it in the configuration, enabling authenticated access to devnet indexer services.

forester-utils/src/rpc_pool.rs (4)

30-30: LGTM! API key field added to connection manager.

The addition of the api_key field to SolanaConnectionManager enables passing authentication credentials through the connection pool infrastructure.


39-56: LGTM! Constructor properly handles API key parameter.

The constructor correctly accepts and stores the api_key parameter, following the same pattern as other configuration fields.


64-74: LGTM! API key correctly passed to LightClientConfig.

The connect method properly includes the api_key when creating the LightClientConfig, ensuring authenticated connections are established when an API key is provided.


94-224: LGTM! Builder pattern properly supports API key configuration.

The SolanaRpcPoolBuilder correctly implements the builder pattern for the api_key field, defaulting to None and passing it through to the connection manager during construction.

forester/README.md (1)

96-169: Well-structured test documentation

The test environment variables section is comprehensive and provides clear examples for different testing scenarios. The documentation effectively explains both local and devnet modes with all necessary configuration options.

forester/tests/legacy/priority_fee_test.rs (2)

16-18: Clarify why this test is ignored

The test has an #[ignore] attribute. Consider adding a comment explaining why this test is ignored and under what conditions it should run.


97-226: Comprehensive test coverage for priority fee capping

The unit test thoroughly covers various scenarios including edge cases, boundary conditions, and error cases. Good use of catch_unwind for testing panic behavior.

forester/tests/legacy/test_utils.rs (2)

114-121: Good implementation of truncated pubkey generation

The function correctly generates a pubkey with less than 254 bits by zeroing the first byte. The comment clearly explains the purpose.


124-201: Well-structured assertion helpers for indexer comparison

The assertion functions provide thorough validation between test indexer and Photon indexer states. Good error handling with descriptive panic messages.

Also applies to: 203-237, 240-287

forester-utils/src/instructions/state_batch_append.rs (2)

26-53: Well-designed async streaming interface

The function signature properly uses Arc wrappers for shared concurrent access and returns a pinned boxed stream with appropriate lifetime bounds. Good separation of concerns.


71-153: Excellent concurrent proof generation implementation

The streaming implementation efficiently processes batches concurrently using future::join_all. Good error propagation through the stream with detailed error messages.

forester-utils/src/instructions/state_batch_nullify.rs (3)

24-46: Consistent streaming interface design

The function signature matches the pattern established in state_batch_append.rs, which is good for API consistency.


155-155: Improve error handling for missing tx_hash

Good validation of required fields. The error message includes the leaf index which aids debugging.


92-186: Well-structured streaming implementation with proper validation

The implementation includes:

  • Root consistency validation between indexer and on-chain state
  • Comprehensive error messages
  • Efficient concurrent proof generation
  • Clear tracing for debugging
forester/src/processor/v2/common.rs (4)

148-165: LGTM!

Clean implementation of transaction batch sending with appropriate logging.


180-180: LGTM! Trait bound simplification aligns with IndexerType removal.

The simplified trait bound with just 'static lifetime is appropriate for the async processing context.


244-244: Appropriate error type migration.

The change from BatchProcessError::UnsupportedTreeType to ForesterError::InvalidTreeType correctly aligns with the error type refactoring across the codebase.


414-414: Clean API simplification.

Removing the mutable RPC parameter and relying on the context improves encapsulation. The functions can now manage RPC connections internally through the context's RPC pool.

Also applies to: 453-453

forester/tests/e2e_v1_test.rs (1)

312-343: Well-structured rollover verification.

Good use of descriptive assertions with debug output for troubleshooting failures.

forester-utils/src/instructions/address_batch_update.rs (3)

29-40: Well-designed configuration struct.

Good use of Arc for shared ownership and Mutex for thread-safe indexer access. The structure properly encapsulates all necessary configuration for address updates.


89-91: Good error handling for empty root history.

The use of ok_or_else with a descriptive error message properly handles the edge case of empty root history.


269-284: Thorough bounds checking for batch processing.

Excellent validation of array bounds before processing. The error messages are descriptive and will help with debugging.

forester/tests/e2e_v2_test.rs (2)

1122-1196: Well-implemented balance management for compression.

Excellent handling of rent exemption requirements and balance checks. The fallback airdrop mechanism is appropriate for test environments.


1294-1387: Clean implementation of batch address creation.

Good use of iterators and proper error propagation. The HashMap for remaining accounts is handled correctly.


import re
import sys
from datetime import datetime, timedelta
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Remove unused imports

These imports are not used in the code.

-from datetime import datetime, timedelta
+from datetime import datetime
 from collections import defaultdict
-from typing import Dict, List, Tuple, Optional
+from typing import Dict, List, Optional
 import argparse
 import statistics
-import json

Also applies to: 11-11, 14-14

🧰 Tools
🪛 Ruff (0.11.9)

9-9: datetime.timedelta imported but unused

Remove unused import: datetime.timedelta

(F401)

🪛 Flake8 (7.2.0)

[error] 9-9: 'datetime.timedelta' imported but unused

(F401)

🤖 Prompt for AI Agents
In forester/scripts/v2_stats.py at lines 9, 11, and 14, the imports datetime,
timedelta, and any other unused imports should be removed because they are not
used anywhere in the code. Review the imports on these lines and delete those
that are not referenced in the script to clean up the code.

Comment on lines +48 to +102
def parse_log_line(self, line: str) -> None:
"""Parse a single log line for V2 TPS metrics."""
clean_line = self.clean_line(line)
timestamp = self.parse_timestamp(clean_line)

if not timestamp:
return

# Parse operation start
start_match = self.operation_start_pattern.search(clean_line)
if start_match:
self.operations.append({
'type': 'start',
'timestamp': timestamp,
'tree_type': start_match.group(1),
'operation': start_match.group(2) or 'batch',
'tree': start_match.group(3),
'epoch': int(start_match.group(4))
})
return

# Parse operation complete
complete_match = self.operation_complete_pattern.search(clean_line)
if complete_match:
self.operation_summaries.append({
'timestamp': timestamp,
'tree_type': complete_match.group(1),
'operation': complete_match.group(2) or 'batch',
'tree': complete_match.group(3),
'epoch': int(complete_match.group(4)),
'zkp_batches': int(complete_match.group(5)),
'transactions': int(complete_match.group(6)),
'instructions': int(complete_match.group(7)),
'duration_ms': int(complete_match.group(8)),
'tps': float(complete_match.group(9)),
'ips': float(complete_match.group(10)),
'items_processed': int(complete_match.group(11)) if complete_match.group(11) else 0
})
return

# Parse transaction sent
tx_match = self.transaction_sent_pattern.search(clean_line)
if tx_match:
self.transactions.append({
'timestamp': timestamp,
'tree_type': tx_match.group(1),
'operation': tx_match.group(2) or 'batch',
'tree': tx_match.group(3),
'tx_num': int(tx_match.group(4)),
'total_txs': int(tx_match.group(5)),
'signature': tx_match.group(6),
'instructions': int(tx_match.group(7)),
'tx_duration_ms': int(tx_match.group(8))
})

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add error handling for type conversions

The method converts strings to int/float without error handling, which could raise ValueError if the regex captures unexpected data.

For example, wrap the conversions in try-except blocks:

-                'epoch': int(start_match.group(4))
+                'epoch': int(start_match.group(4))

Should be:

try:
    epoch = int(start_match.group(4))
except ValueError:
    return  # or log the error

Apply similar error handling to all int() and float() conversions throughout this method.

🤖 Prompt for AI Agents
In forester/scripts/v2_stats.py between lines 48 and 102, the parse_log_line
method performs multiple int() and float() conversions on regex capture groups
without error handling, which can raise ValueError if the data is unexpected. To
fix this, wrap each int() and float() conversion in try-except blocks that catch
ValueError. In the except block, either return early from the method or log the
error to avoid crashing. Apply this pattern consistently for all conversions in
the method to ensure robust parsing.

print("No V2 TPS metrics found in logs")
return

print(f"\nSUMMARY:")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Remove unnecessary f-string prefix

The string has no placeholders.

-        print(f"\nSUMMARY:")
+        print("\nSUMMARY:")
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
print(f"\nSUMMARY:")
print("\nSUMMARY:")
🧰 Tools
🪛 Ruff (0.11.9)

113-113: f-string without any placeholders

Remove extraneous f prefix

(F541)

🪛 Flake8 (7.2.0)

[error] 113-113: f-string is missing placeholders

(F541)

🤖 Prompt for AI Agents
In forester/scripts/v2_stats.py at line 113, remove the unnecessary f-string
prefix from the print statement since the string contains no placeholders.
Change the line to a regular string without the 'f' prefix.

Comment on lines +189 to +204
# Read and parse log file
if args.logfile == '-':
log_file = sys.stdin
else:
log_file = open(args.logfile, 'r')

try:
for line in log_file:
if 'V2_TPS_METRIC' not in line:
continue

analyzer.parse_log_line(line)
finally:
if args.logfile != '-':
log_file.close()

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Improve file handling with context manager

The current implementation doesn't use a context manager for file operations and could be simplified.

-    # Read and parse log file
-    if args.logfile == '-':
-        log_file = sys.stdin
-    else:
-        log_file = open(args.logfile, 'r')
-    
-    try:
-        for line in log_file:
-            if 'V2_TPS_METRIC' not in line:
-                continue
-                
-            analyzer.parse_log_line(line)
-    finally:
-        if args.logfile != '-':
-            log_file.close()
+    # Read and parse log file
+    if args.logfile == '-':
+        log_file = sys.stdin
+        for line in log_file:
+            if 'V2_TPS_METRIC' in line:
+                analyzer.parse_log_line(line)
+    else:
+        with open(args.logfile, 'r') as log_file:
+            for line in log_file:
+                if 'V2_TPS_METRIC' in line:
+                    analyzer.parse_log_line(line)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
# Read and parse log file
if args.logfile == '-':
log_file = sys.stdin
else:
log_file = open(args.logfile, 'r')
try:
for line in log_file:
if 'V2_TPS_METRIC' not in line:
continue
analyzer.parse_log_line(line)
finally:
if args.logfile != '-':
log_file.close()
# Read and parse log file
if args.logfile == '-':
log_file = sys.stdin
for line in log_file:
if 'V2_TPS_METRIC' in line:
analyzer.parse_log_line(line)
else:
with open(args.logfile, 'r') as log_file:
for line in log_file:
if 'V2_TPS_METRIC' in line:
analyzer.parse_log_line(line)
🧰 Tools
🪛 Ruff (0.11.9)

190-193: Use ternary operator log_file = sys.stdin if args.logfile == '-' else open(args.logfile, 'r') instead of if-else-block

Replace if-else-block with log_file = sys.stdin if args.logfile == '-' else open(args.logfile, 'r')

(SIM108)


193-193: Use a context manager for opening files

(SIM115)

🪛 Pylint (3.3.7)

[refactor] 193-193: Consider using 'with' for resource-allocating operations

(R1732)

🤖 Prompt for AI Agents
In forester/scripts/v2_stats.py around lines 189 to 204, the file opening and
closing logic should be refactored to use a context manager (with statement) for
safer and cleaner file handling. Replace the manual open and close calls with a
with block that opens the file if args.logfile is not '-', otherwise use
sys.stdin directly inside the with block. This ensures the file is properly
closed automatically and simplifies the code.

import sys
import argparse
from datetime import datetime
from collections import defaultdict
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Remove unused imports

These imports are not used in the code.

-from collections import defaultdict
-from typing import Dict, List, Tuple, Optional
+from typing import Dict, List, Optional

Also applies to: 12-12

🧰 Tools
🪛 Ruff (0.11.9)

11-11: collections.defaultdict imported but unused

Remove unused import: collections.defaultdict

(F401)

🪛 Flake8 (7.2.0)

[error] 11-11: 'collections.defaultdict' imported but unused

(F401)

🤖 Prompt for AI Agents
In forester/scripts/compare_performance.py at lines 11 and 12, the imports from
collections.defaultdict and any other unused imports should be removed as they
are not used anywhere in the code. Review the imports in these lines and delete
those that are not referenced in the script to clean up the code.

Comment on lines +83 to +85
let mut rpc = LightClient::new(LightClientConfig::local()).await.unwrap();
rpc.payer = config.payer_keypair.insecure_clone();

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Verify RPC configuration consistency

The test sets up environment-based configuration but then uses LightClientConfig::local() which might not respect the environment variables for indexer URL and API key.

Consider using the config from the environment:

-    let mut rpc = LightClient::new(LightClientConfig::local()).await.unwrap();
+    let rpc_config = LightClientConfig {
+        rpc_url: config.external_services.rpc_url.clone(),
+        indexer_url: Some(config.external_services.indexer_url.clone().unwrap()),
+        photon_api_key: config.external_services.photon_api_key.clone(),
+        ..Default::default()
+    };
+    let mut rpc = LightClient::new(rpc_config).await.unwrap();
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
let mut rpc = LightClient::new(LightClientConfig::local()).await.unwrap();
rpc.payer = config.payer_keypair.insecure_clone();
let rpc_config = LightClientConfig {
rpc_url: config.external_services.rpc_url.clone(),
indexer_url: Some(config.external_services.indexer_url.clone().unwrap()),
photon_api_key: config.external_services.photon_api_key.clone(),
..Default::default()
};
let mut rpc = LightClient::new(rpc_config).await.unwrap();
rpc.payer = config.payer_keypair.insecure_clone();
🤖 Prompt for AI Agents
In forester/tests/legacy/priority_fee_test.rs around lines 83 to 85, the test
initializes the LightClient with LightClientConfig::local(), which may ignore
environment variables for indexer URL and API key. To fix this, replace
LightClientConfig::local() with the configuration loaded from the environment
(e.g., using config.load_from_env() or equivalent) to ensure the RPC client
respects the environment-based settings.

Comment on lines +53 to +54
let mut rpc = rpc_pool.get_connection().await?;
let indexer_guard = indexer.lock().await;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

🧩 Analysis chain

Consider connection lifecycle management

The RPC connection is obtained and dropped early. Ensure this doesn't cause issues if the connection pool has limited connections.


🏁 Script executed:

#!/bin/bash
# Check how RPC connections are managed in the pool
ast-grep --pattern 'impl$$$SolanaRpcPool$$${ $$$ get_connection($$$) { $$$ } $$$ }'

Length of output: 85


🏁 Script executed:

#!/bin/bash
# Locate the definition(s) of `get_connection` to inspect how connections are returned/dropped
rg -n "get_connection" -R .

Length of output: 432


🏁 Script executed:

#!/bin/bash
# Find where SolanaRpcPool is defined
rg -n "SolanaRpcPool" .
# Search for get_connection definitions with context
rg -n "get_connection" -C3 .

Length of output: 32340


🏁 Script executed:

#!/bin/bash
# Display the implementation of `get_connection` method in SolanaRpcPool
sed -n '200,260p' forester-utils/src/rpc_pool.rs

Length of output: 2266


🏁 Script executed:

#!/bin/bash
# Display the remainder of `get_connection` implementation in SolanaRpcPool
sed -n '260,340p' forester-utils/src/rpc_pool.rs

Length of output: 765


Minimize RPC connection hold span

Currently, the code checks out an RPC connection before acquiring the indexer lock, which means the connection is held while awaiting indexer.lock(). In a pool with limited connections, this can lead to unnecessary contention.

Consider swapping the order so you only hold the RPC connection when you’re ready to use it:

- let mut rpc = rpc_pool.get_connection().await?;
- let indexer_guard = indexer.lock().await;
+ let indexer_guard = indexer.lock().await;
+ let mut rpc = rpc_pool.get_connection().await?;

This change ensures the pool connection is only checked out for the brief window in which it’s actually needed.

🤖 Prompt for AI Agents
In forester-utils/src/instructions/state_batch_append.rs around lines 53 to 54,
the RPC connection is acquired before awaiting the indexer lock, causing the
connection to be held longer than necessary. To fix this, reorder the code to
first acquire the indexer lock and only then get the RPC connection from the
pool. This minimizes the duration the RPC connection is held, reducing
contention in the connection pool.

Comment on lines +359 to +372
// If one forester is first for both queues there will be no work left
// - this assert is flaky
// assert!(
// forester_epoch_pda.work_counter > 0,
// "forester {} did not perform any work",
// i
// );
performed_work += forester_epoch_pda.work_counter;
} else {
return Err(RpcError::CustomError(format!(
"Forester {} not registered",
i,
)));
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Address or remove commented flaky assertion.

The commented assertion indicates a known flakiness issue. Either fix the underlying race condition or remove the commented code to avoid confusion.

Either fix the race condition or remove the comment:

-            // If one forester is first for both queues there will be no work left
-            // - this assert is flaky
-            // assert!(
-            //     forester_epoch_pda.work_counter > 0,
-            //     "forester {} did not perform any work",
-            //     i
-            // );
+            // Note: work_counter may be 0 if another forester processed all work
🤖 Prompt for AI Agents
In forester/tests/e2e_v1_test.rs around lines 359 to 372, there is a
commented-out assertion marked as flaky due to a race condition. To resolve
this, either implement synchronization or logic changes to fix the race
condition causing the flakiness and then re-enable the assertion, or remove the
commented assertion entirely to avoid confusion and keep the test clean.

Comment on lines +35 to +311
async fn test_e2e_v1() {
init(Some(LightValidatorConfig {
enable_indexer: false,
wait_time: 90,
prover_config: Some(ProverConfig::default()),
sbf_programs: vec![],
limit_ledger_size: None,
}))
.await;
let forester_keypair1 = Keypair::new();
let forester_keypair2 = Keypair::new();

let mut test_accounts = TestAccounts::get_local_test_validator_accounts();
test_accounts.protocol.forester = forester_keypair1.insecure_clone();

let mut config1 = forester_config();
config1.payer_keypair = forester_keypair1.insecure_clone();

let mut config2 = forester_config();
config2.payer_keypair = forester_keypair2.insecure_clone();

let pool = SolanaRpcPoolBuilder::<LightClient>::default()
.url(config1.external_services.rpc_url.to_string())
.commitment(CommitmentConfig::confirmed())
.build()
.await
.unwrap();

let mut rpc = LightClient::new(LightClientConfig::local_no_indexer())
.await
.unwrap();
rpc.payer = forester_keypair1.insecure_clone();

// Airdrop to both foresters and governance authority
for keypair in [
&forester_keypair1,
&forester_keypair2,
&test_accounts.protocol.governance_authority,
] {
rpc.airdrop_lamports(&keypair.pubkey(), LAMPORTS_PER_SOL * 100_000)
.await
.unwrap();
}

// Register both foresters
for forester_keypair in [&forester_keypair1, &forester_keypair2] {
register_test_forester(
&mut rpc,
&test_accounts.protocol.governance_authority,
&forester_keypair.pubkey(),
light_registry::ForesterConfig::default(),
)
.await
.unwrap();
}

let new_forester_keypair1 = Keypair::new();
let new_forester_keypair2 = Keypair::new();

for forester_keypair in [&new_forester_keypair1, &new_forester_keypair2] {
rpc.airdrop_lamports(&forester_keypair.pubkey(), LAMPORTS_PER_SOL * 100_000)
.await
.unwrap();
}

update_test_forester(
&mut rpc,
&forester_keypair1,
&forester_keypair1.pubkey(),
Some(&new_forester_keypair1),
light_registry::ForesterConfig::default(),
)
.await
.unwrap();

update_test_forester(
&mut rpc,
&forester_keypair2,
&forester_keypair2.pubkey(),
Some(&new_forester_keypair2),
light_registry::ForesterConfig::default(),
)
.await
.unwrap();

config1.derivation_pubkey = forester_keypair1.pubkey();
config1.payer_keypair = new_forester_keypair1.insecure_clone();

config2.derivation_pubkey = forester_keypair2.pubkey();
config2.payer_keypair = new_forester_keypair2.insecure_clone();

let config1 = Arc::new(config1);
let config2 = Arc::new(config2);

let indexer: TestIndexer =
TestIndexer::init_from_acounts(&config1.payer_keypair, &test_accounts, 0).await;

let mut env = E2ETestEnv::<LightClient, TestIndexer>::new(
rpc,
indexer,
&test_accounts,
keypair_action_config(),
general_action_config(),
0,
Some(0),
)
.await;
// removing batched Merkle tree
env.indexer.state_merkle_trees.remove(1);
// removing batched address tree
env.indexer.address_merkle_trees.remove(1);
let user_index = 0;
let balance = env
.rpc
.get_balance(&env.users[user_index].keypair.pubkey())
.await
.unwrap();
env.compress_sol(user_index, balance).await;
// Create state and address trees which can be rolled over
env.create_address_tree(Some(0)).await;
env.create_state_tree(Some(0)).await;
let state_tree_with_rollover_threshold_0 =
env.indexer.state_merkle_trees[1].accounts.merkle_tree;
let address_tree_with_rollover_threshold_0 =
env.indexer.address_merkle_trees[1].accounts.merkle_tree;

println!(
"State tree with rollover threshold 0: {:?}",
state_tree_with_rollover_threshold_0
);
println!(
"Address tree with rollover threshold 0: {:?}",
address_tree_with_rollover_threshold_0
);

let state_trees: Vec<StateMerkleTreeAccounts> = env
.indexer
.state_merkle_trees
.iter()
.map(|x| x.accounts)
.collect();
let address_trees: Vec<AddressMerkleTreeAccounts> = env
.indexer
.address_merkle_trees
.iter()
.map(|x| x.accounts)
.collect();

println!("Address trees: {:?}", address_trees);

// Two rollovers plus other work
let mut total_expected_work = 2;
{
let iterations = 5;
for i in 0..iterations {
println!("Round {} of {}", i, iterations);
let user_keypair = env.users[0].keypair.insecure_clone();
env.transfer_sol_deterministic(&user_keypair, &user_keypair.pubkey(), Some(1))
.await
.unwrap();
env.transfer_sol_deterministic(&user_keypair, &user_keypair.pubkey().clone(), Some(0))
.await
.unwrap();
sleep(Duration::from_millis(100)).await;
env.create_address(None, Some(1)).await;
env.create_address(None, Some(0)).await;
}
assert_queue_len(
&pool,
&state_trees,
&address_trees,
&mut total_expected_work,
0,
true,
)
.await;
}

let (shutdown_sender1, shutdown_receiver1) = oneshot::channel();
let (shutdown_sender2, shutdown_receiver2) = oneshot::channel();
let (work_report_sender1, mut work_report_receiver1) = mpsc::channel(100);
let (work_report_sender2, mut work_report_receiver2) = mpsc::channel(100);

let indexer = Arc::new(Mutex::new(env.indexer));

let service_handle1 = tokio::spawn(run_pipeline::<LightClient, TestIndexer>(
config1.clone(),
None,
None,
indexer.clone(),
shutdown_receiver1,
work_report_sender1,
));
let service_handle2 = tokio::spawn(run_pipeline::<LightClient, TestIndexer>(
config2.clone(),
None,
None,
indexer,
shutdown_receiver2,
work_report_sender2,
));

const EXPECTED_EPOCHS: u64 = 3; // We expect to process 2 epochs (0 and 1)

let mut processed_epochs = HashSet::new();
let mut total_processed = 0;
while processed_epochs.len() < EXPECTED_EPOCHS as usize {
tokio::select! {
Some(report) = work_report_receiver1.recv() => {
println!("Received work report from forester 1: {:?}", report);
total_processed += report.processed_items;
processed_epochs.insert(report.epoch);
}
Some(report) = work_report_receiver2.recv() => {
println!("Received work report from forester 2: {:?}", report);
total_processed += report.processed_items;
processed_epochs.insert(report.epoch);
}
else => break,
}
}

println!("Processed {} items", total_processed);

// Verify that we've processed the expected number of epochs
assert_eq!(
processed_epochs.len(),
EXPECTED_EPOCHS as usize,
"Processed {} epochs, expected {}",
processed_epochs.len(),
EXPECTED_EPOCHS
);

// Verify that we've processed epochs 0 and 1
// assert!(processed_epochs.contains(&0), "Epoch 0 was not processed");
assert!(processed_epochs.contains(&1), "Epoch 1 was not processed");

assert_trees_are_rolledover(
&pool,
&state_tree_with_rollover_threshold_0,
&address_tree_with_rollover_threshold_0,
)
.await;
// assert queues have been emptied
assert_queue_len(&pool, &state_trees, &address_trees, &mut 0, 0, false).await;
let mut rpc = pool.get_connection().await.unwrap();
let forester_pubkeys = [config1.derivation_pubkey, config2.derivation_pubkey];

// assert that foresters registered for epoch 1 and 2 (no new work is emitted after epoch 0)
// Assert that foresters have registered all processed epochs and the next epoch (+1)
for epoch in 1..=EXPECTED_EPOCHS {
let total_processed_work =
assert_foresters_registered(&forester_pubkeys[..], &mut rpc, epoch)
.await
.unwrap();
if epoch == 1 {
assert_eq!(
total_processed_work, total_expected_work,
"Not all items were processed."
);
} else {
assert_eq!(
total_processed_work, 0,
"Not all items were processed in prior epoch."
);
}
}

shutdown_sender1
.send(())
.expect("Failed to send shutdown signal to forester 1");
shutdown_sender2
.send(())
.expect("Failed to send shutdown signal to forester 2");
service_handle1.await.unwrap().unwrap();
service_handle2.await.unwrap().unwrap();
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Consider breaking down this complex test and reducing flakiness potential.

This test is doing many things at once, making it difficult to maintain and debug failures. Consider:

  1. Extracting setup logic into helper functions
  2. Using constants for magic numbers
  3. Replacing sleep() with proper synchronization

Extract constants and reduce sleep usage:

+const AIRDROP_AMOUNT: u64 = LAMPORTS_PER_SOL * 100_000;
+const TEST_ITERATIONS: usize = 5;
+const WORK_WAIT_INTERVAL: Duration = Duration::from_millis(100);
+
 #[tokio::test(flavor = "multi_thread", worker_threads = 32)]
 async fn test_e2e_v1() {
     // ... existing code ...
-    rpc.airdrop_lamports(&keypair.pubkey(), LAMPORTS_PER_SOL * 100_000)
+    rpc.airdrop_lamports(&keypair.pubkey(), AIRDROP_AMOUNT)
     // ...
-    sleep(Duration::from_millis(100)).await;
+    // Consider using a retry mechanism or waiting for specific conditions
+    // instead of fixed sleep

Committable suggestion skipped: line range outside the PR's diff.

🤖 Prompt for AI Agents
In forester/tests/e2e_v1_test.rs from lines 35 to 311, the test function is
overly complex and uses hardcoded magic numbers and sleep calls that can cause
flakiness. Refactor by extracting setup code into helper functions to improve
readability and maintainability. Define constants for repeated magic numbers
like iteration counts and lamport amounts. Replace the
sleep(Duration::from_millis(100)) calls with proper synchronization mechanisms
such as awaiting specific events or using channels to ensure deterministic
timing.

}
}

async fn verify_root_changed(
Copy link
Contributor

@ananas-block ananas-block Jul 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

assert is too permissive imo.
Suggestion:

  • assert next index equals an expected next index instead of root, this way we assert the exact number of updates

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Additional suggestion:

  • assert that all queue batches that can be have been inserted, to assert that the indexer is processing all trees

Comment on lines +56 to +67
let (merkle_tree_next_index, mut current_root, _) =
get_merkle_tree_metadata(&mut *rpc, merkle_tree_pubkey).await?;

let (merkle_tree_next_index, current_root, root_history) =
get_merkle_tree_metadata(rpc, merkle_tree_pubkey).await?;

trace!(
"merkle_tree_next_index: {:?} current_root: {:?}",
merkle_tree_next_index,
current_root
);

// Get output queue metadata and hash chains
let (zkp_batch_size, leaves_hash_chains) =
get_output_queue_metadata(rpc, output_queue_pubkey).await?;
get_output_queue_metadata(&mut *rpc, output_queue_pubkey).await?;

if leaves_hash_chains.is_empty() {
trace!("No hash chains to process");
return Ok(Vec::new());
trace!("No hash chains to process, returning empty stream.");
return Ok((Box::pin(futures::stream::empty()), zkp_batch_size));
}

wait_for_indexer(rpc, indexer).await?;
wait_for_indexer(&*rpc, &*indexer_guard).await?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can wait for all of these in parallel

Err(e) => {
error!("Failed to generate proof for batch {}: {:?}", i, e);
return Err(e);
let proof_results = future::join_all(proof_futures).await;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure it's necessary for this pr, but could be faster if we don't wait for all proofs but just for the first batch.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or we just request a batch in each function call

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or maybe I got it wrong lmk

instruction_buffer.push(data);
total_instructions_processed += 1;

if instruction_buffer.len() >= context.ixs_per_tx {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we need to execute proofs strictly in sequence.
What do we do if we requested 10 proofs, ixs_per_tx is 5, and the 4th proof generation failed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case we will send first N sucessfully generated proofs and after that will try to regenerated next ones

Copy link
Contributor

@ananas-block ananas-block left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Flow summary:

  1. separate thread for every tree
  2. while in forester slot
    do: dispatch_tree_processing
    0. check whether a batch is available
    (we need to fetch accounts here and refetch in step 1,
    verify_batch_ready(), then again in get_append_instruction_stream())
    1. get queue and tree accounts once
    2. get proofs
    3. stream proofs and send tx asap

Did I get that right?

Prettier flow, could add it as doc comment somewhere:

// 1. Separate async task per tree
  tokio::spawn(async move {
      // 2. While in active phase and forester has slots
      for assigned_light_slot in tree_schedule.slots {
          // Inner loop every 25ms during slot window
          loop {
              // 3. Check eligibility and batch readiness
              let ready_state = verify_batch_ready(); // FETCH ACCOUNTS

              match ready_state {
                  ReadyForAppend => {
                      // 4. Create instruction stream
                      let (stream, _) = get_append_instruction_stream(); // FETCH ACCOUNTS AGAIN

                      // 5. Process stream - proofs generated concurrently
                      while let Some(instruction_data) = stream.next().await {
                          // 6. Buffer instructions and send tx when buffer full
                          if buffer.len() >= ixs_per_tx {
                              send_transaction_batch();
                              buffer.clear();
                          }
                      }
                  }
                  ReadyForNullify => { /* similar flow */ }
                  NotReady => { /* skip */ }
              }

              sleep(25ms); // Rate limiting
          }
      }
  });

General feedback:

  1. code looks nice!

  2. we could try to remove some nesting layers there are multiple functions which wrap other functions in 5 lines and are only used once eg:

    • fn process_batched_operations<
    • async fn process_v2
    • async fn dispatch_tree_processing
  3. we need to fetch accounts here and refetch in step 1, verify_batch_ready(), then again in get_append_instruction_stream()

  4. What do you think about suffixing functions with a number (nesting depth)?
    (Sounds a bit stupid I know but could help to navigate the code.)

  5. it probably makes sense to search for more bottlenecks in the proof inputs generation function in particular refetching accounts.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 3af7eaf and 8df8244.

📒 Files selected for processing (4)
  • forester-utils/src/instructions/address_batch_update.rs (5 hunks)
  • forester-utils/src/instructions/state_batch_append.rs (3 hunks)
  • forester-utils/src/instructions/state_batch_nullify.rs (1 hunks)
  • forester/src/processor/v2/common.rs (8 hunks)
🧰 Additional context used
🧬 Code Graph Analysis (1)
forester-utils/src/instructions/address_batch_update.rs (4)
forester-utils/src/utils.rs (1)
  • wait_for_indexer (31-77)
prover/client/src/proof_client.rs (2)
  • with_config (67-78)
  • generate_batch_address_append_proof (508-516)
sdk-libs/client/src/indexer/indexer_trait.rs (1)
  • get_address_queue_with_proofs (179-185)
program-libs/hasher/src/hash_chain.rs (1)
  • create_hash_chain_from_slice (23-32)
⏰ Context from checks skipped due to timeout of 90000ms (25)
  • GitHub Check: system-programs (system-cpi-test-v2-functional, ["cargo-test-sbf -p system-cpi-v2-test -- functio...
  • GitHub Check: system-programs (light-compressed-token, ["cargo-test-sbf -p compressed-token-test"])
  • GitHub Check: system-programs (random-e2e-test, ["cargo-test-sbf -p e2e-test"])
  • GitHub Check: system-programs (system-cpi-test-v2-event, ["cargo-test-sbf -p system-cpi-v2-test -- event::parse"])
  • GitHub Check: system-programs (system-cpi-test-v2-other, ["cargo-test-sbf -p system-cpi-v2-test -- --skip funct...
  • GitHub Check: system-programs (account-compression, ["cargo-test-sbf -p account-compression-test"])
  • GitHub Check: system-programs (system-cpi-test, ["cargo-test-sbf -p system-cpi-test"])
  • GitHub Check: system-programs (light-system-program, ["cargo-test-sbf -p system-test"])
  • GitHub Check: system-programs (light-registry, ["cargo-test-sbf -p registry-test"])
  • GitHub Check: cli-v1
  • GitHub Check: test-e2e (legacy)
  • GitHub Check: system-programs (sdk-test-program, ["cargo-test-sbf -p sdk-test"])
  • GitHub Check: test-e2e
  • GitHub Check: system-programs (token-escrow-test, [ "cargo test-sbf -p token-escrow" ])
  • GitHub Check: stateless-js-v2
  • GitHub Check: stateless-js-v1
  • GitHub Check: cli-v2
  • GitHub Check: Test sdk-libs
  • GitHub Check: Test program-libs-fast
  • GitHub Check: system-programs (sdk-anchor-test-program, ["cargo-test-sbf -p sdk-anchor-test", "cargo-test-sbf -...
  • GitHub Check: Test concurrent-merkle-tree
  • GitHub Check: system-programs (counter-test, ["cargo test-sbf -p counter"])
  • GitHub Check: Test program-libs-slow
  • GitHub Check: lint
  • GitHub Check: Test batched-merkle-tree-simulate
🔇 Additional comments (10)
forester-utils/src/instructions/state_batch_append.rs (3)

53-54: Concurrent acquisition improves connection utilization

The use of tokio::join! to concurrently acquire the indexer lock and RPC connection is an improvement over the sequential approach mentioned in past reviews. This reduces the time the RPC connection is held while waiting for the indexer lock.


70-92: Re-locking indexer within stream is appropriate

The stream correctly re-acquires the indexer lock when needed (line 75) rather than holding it for the entire stream duration. This allows proper resource cleanup between the initial setup and stream execution phases.


141-169: Concurrent proof generation with proper error handling

The implementation correctly:

  • Generates all proofs concurrently using future::join_all
  • Implements fail-fast behavior by collecting successful proofs only until the first error
  • Yields successful proofs individually before reporting any errors

This approach optimizes for throughput while maintaining error safety.

forester/src/processor/v2/common.rs (3)

37-54: Excellent documentation addresses complexity

The comprehensive documentation clearly explains each generic parameter, the function's purpose, and expected behavior. This significantly improves the maintainability of this complex generic function.


84-110: Proper fail-fast behavior for proof failures

The stream processing correctly implements fail-fast behavior - when a proof generation fails (error yielded from stream), the ? operator on line 85 terminates the entire operation. This prevents sending incomplete batches and maintains transaction consistency.


137-163: Comprehensive metrics and performance tracking

The detailed logging provides valuable operational metrics including TPS, instruction processing rates, and timing information. This will be crucial for monitoring the performance of the new streaming architecture.

forester-utils/src/instructions/state_batch_nullify.rs (2)

88-91: Proper resource cleanup after indexer synchronization

The code correctly drops both the RPC connection and indexer guard after waiting for the indexer to catch up. This ensures resources are released before the stream begins processing, which may take considerable time.


175-202: Consistent error handling pattern with other streaming functions

The error handling logic matches the pattern used in state_batch_append.rs:

  • Concurrent proof generation
  • Collection of successful proofs until first failure
  • Individual yielding of successful results followed by error reporting

This consistency across the codebase improves maintainability.

forester-utils/src/instructions/address_batch_update.rs (2)

29-40: Well-designed configuration struct

The AddressUpdateConfig struct properly encapsulates all required dependencies with appropriate trait bounds. This improves code organization and makes the function signature more manageable.


142-178: Correct chunking and offset calculation

The chunking logic properly handles pagination:

  • chunk_start and chunk_end correctly define chunk boundaries
  • processed_items_offset accurately calculates the global offset for indexer pagination
  • Root validation is performed only on the first chunk, which is appropriate

This ensures efficient processing of large batches while staying within the photon element limits.

Comment on lines +323 to +325
let adjusted_start_index = global_start_index as usize
+ (chunk_start_idx * batch_size as usize)
+ (batch_idx * batch_size as usize);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

🧩 Analysis chain

Verify adjusted start index calculation

The calculation combines multiple offsets for the global start index. Ensure this correctly accounts for:

  • Global tree position (global_start_index)
  • Chunk position within the stream (chunk_start_idx * batch_size)
  • Batch position within the chunk (batch_idx * batch_size)

🏁 Script executed:

#!/bin/bash
# Check if there are any tests that validate the start index calculation
rg -A 10 -B 5 "adjusted_start_index|start_index.*chunk.*batch" --type rust

Length of output: 5461


Add unit tests for adjusted_start_index calculation

I didn’t find any existing tests that cover the combined offsets in address_batch_update.rs. Please add unit tests to verify that:

  • global_start_index
  • chunk_start_idx * batch_size
  • batch_idx * batch_size
    are all applied correctly when computing adjusted_start_index.

• File to cover:

  • forester-utils/src/instructions/address_batch_update.rs (around lines 323–325)

Suggested test scenarios:

  • batch at start of stream (all zeros)
  • non‐zero global_start_index with zero chunk/batch indices
  • advancing across chunk boundaries (chunk_start_idx > 0)
  • multiple batches within a chunk (batch_idx > 0)
🤖 Prompt for AI Agents
In forester-utils/src/instructions/address_batch_update.rs around lines 323 to
325, add unit tests for the adjusted_start_index calculation to verify that
global_start_index, chunk_start_idx * batch_size, and batch_idx * batch_size are
correctly combined. Create tests covering scenarios including batch at start of
stream (all zeros), non-zero global_start_index with zero chunk and batch
indices, advancing across chunk boundaries with chunk_start_idx > 0, and
multiple batches within a chunk with batch_idx > 0. Ensure each test asserts the
expected adjusted_start_index value based on the input parameters.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (4)
forester/tests/e2e_v2_test.rs (4)

558-633: Make assertions more specific as suggested in previous review.

The current assertion only checks that the root changed but doesn't verify the exact expected changes. This aligns with the previous suggestion to assert next index equals expected value.

Consider asserting:

  • The exact number of items processed (next_index increment)
  • That all expected queue batches have been processed
  • Specific metadata changes rather than just root changes
 async fn verify_root_changed(
     rpc: &mut LightClient,
     merkle_tree_pubkey: &Pubkey,
     pre_root: &[u8; 32],
+    expected_items_processed: u64,
     kind: TreeType,
 ) {
+    let (initial_next_index, _, _) = get_initial_merkle_tree_state(rpc, merkle_tree_pubkey, kind).await;
     let current_root = match kind {
         // ... existing match logic
     };
     
+    let (current_next_index, _, _) = get_initial_merkle_tree_state(rpc, merkle_tree_pubkey, kind).await;
+    assert_eq!(
+        current_next_index - initial_next_index,
+        expected_items_processed,
+        "Expected {} items to be processed for {:?}",
+        expected_items_processed,
+        kind
+    );
     
     assert_ne!(
         *pre_root, current_root,
         "Root should have changed for {:?}",
         kind
     );
 }

901-903: Replace default signature return with proper error handling.

This issue was already identified in previous reviews. Returning a default signature when no accounts are found can mask test failures.

The function should return a Result type to properly handle the case when no compressed token accounts are available for transfer, allowing the test to fail explicitly rather than silently continuing with a default signature.


1183-1184: Replace explicit panic with more descriptive error handling.

This matches the previous review suggestion to use more idiomatic error handling in test code.

         Err(e) => {
-            panic!("compress error: {:?}", e);
+            panic!("Failed to compress account: {:?}", e);
         }

717-731: Refactor function to reduce parameter count.

This function has 10+ parameters which violates clean code principles, as identified in previous reviews.

Consider grouping related parameters into structs:

  • Counters into a single struct
  • Mint pubkeys into an options struct
  • Test configuration into a struct

This would improve maintainability and make the function signature more manageable.

🧹 Nitpick comments (2)
forester/tests/e2e_v2_test.rs (2)

86-88: Improve error handling with more descriptive messages.

Consider providing more context about which environment variables are required and when they're needed, rather than panicking immediately.

-fn get_env_var(key: &str) -> String {
-    env::var(key).unwrap_or_else(|_| panic!("{} environment variable is not set", key))
-}
+fn get_env_var(key: &str) -> String {
+    env::var(key).unwrap_or_else(|_| {
+        panic!(
+            "{} environment variable is required for devnet mode. Please set it or use TEST_MODE=local",
+            key
+        )
+    })
+}

673-684: Extract magic numbers into named constants.

Several magic numbers are used that would benefit from being named constants for better readability and maintainability.

+const DEFAULT_ITERATIONS: usize = 10;
+const V1_COMPRESS_AMOUNT: u64 = 2_000_000;
+const V2_INITIAL_COMPRESS_AMOUNT: u64 = 5_000_000;
+const V2_SUBSEQUENT_COMPRESS_AMOUNT: u64 = 2_000_000;

 async fn wait_for_work_report(
     work_report_receiver: &mut mpsc::Receiver<WorkReport>,
     tree_params: &InitStateTreeAccountsInstructionData,
 ) {
     let batch_size = tree_params.output_queue_zkp_batch_size as usize;
     let minimum_processed_items: usize = tree_params.output_queue_batch_size as usize;

 async fn execute_test_transactions<R: Rpc + Indexer + MerkleTreeExt, I: Indexer>(
     // ... parameters
 ) {
-    let mut iterations = 10;
+    let mut iterations = DEFAULT_ITERATIONS;
     if is_v2_state_test_enabled() {
         iterations =
             get_state_v2_batch_size(rpc, &env.v2_state_trees[0].merkle_tree).await as usize;
     }

Also applies to: 734-736

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 5f0db05 and 6fd6307.

📒 Files selected for processing (2)
  • .github/workflows/forester-tests.yml (3 hunks)
  • forester/tests/e2e_v2_test.rs (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • .github/workflows/forester-tests.yml
⏰ Context from checks skipped due to timeout of 90000ms (25)
  • GitHub Check: system-programs (light-compressed-token, ["cargo-test-sbf -p compressed-token-test"])
  • GitHub Check: system-programs (random-e2e-test, ["cargo-test-sbf -p e2e-test"])
  • GitHub Check: system-programs (system-cpi-test-v2-other, ["cargo-test-sbf -p system-cpi-v2-test -- --skip funct...
  • GitHub Check: system-programs (light-system-program, ["cargo-test-sbf -p system-test"])
  • GitHub Check: system-programs (system-cpi-test-v2-functional, ["cargo-test-sbf -p system-cpi-v2-test -- functio...
  • GitHub Check: system-programs (light-registry, ["cargo-test-sbf -p registry-test"])
  • GitHub Check: system-programs (system-cpi-test-v2-event, ["cargo-test-sbf -p system-cpi-v2-test -- event::parse"])
  • GitHub Check: system-programs (account-compression, ["cargo-test-sbf -p account-compression-test"])
  • GitHub Check: system-programs (system-cpi-test, ["cargo-test-sbf -p system-cpi-test"])
  • GitHub Check: test-e2e (legacy)
  • GitHub Check: test-e2e
  • GitHub Check: system-programs (token-escrow-test, [ "cargo test-sbf -p token-escrow" ])
  • GitHub Check: system-programs (sdk-test-program, ["cargo-test-sbf -p sdk-test"])
  • GitHub Check: system-programs (sdk-anchor-test-program, ["cargo-test-sbf -p sdk-anchor-test", "cargo-test-sbf -...
  • GitHub Check: system-programs (counter-test, ["cargo test-sbf -p counter"])
  • GitHub Check: lint
  • GitHub Check: stateless-js-v1
  • GitHub Check: stateless-js-v2
  • GitHub Check: Test sdk-libs
  • GitHub Check: Test program-libs-slow
  • GitHub Check: Test concurrent-merkle-tree
  • GitHub Check: Test program-libs-fast
  • GitHub Check: cli-v1
  • GitHub Check: cli-v2
  • GitHub Check: Test batched-merkle-tree-simulate
🔇 Additional comments (1)
forester/tests/e2e_v2_test.rs (1)

1119-1145: Good defensive programming for test reliability.

The balance checking and airdrop retry logic is well-implemented and helps ensure test reliability in different environments by handling insufficient balance scenarios gracefully.

Comment on lines +182 to +448
#[tokio::test(flavor = "multi_thread", worker_threads = 16)]
#[serial]
async fn test_e2e_v2() {
let state_tree_params = InitStateTreeAccountsInstructionData::test_default();
let env = TestAccounts::get_local_test_validator_accounts();
let config = ForesterConfig {
external_services: ExternalServicesConfig {
rpc_url: get_rpc_url(),
ws_rpc_url: Some(get_ws_rpc_url()),
indexer_url: Some(get_indexer_url()),
prover_url: Some(get_prover_url()),
photon_api_key: get_api_key(),
pushgateway_url: None,
pagerduty_routing_key: None,
rpc_rate_limit: None,
photon_rate_limit: None,
send_tx_rate_limit: None,
},
retry_config: Default::default(),
queue_config: Default::default(),
indexer_config: Default::default(),
transaction_config: Default::default(),
general_config: GeneralConfig {
slot_update_interval_seconds: 10,
tree_discovery_interval_seconds: 5,
enable_metrics: false,
skip_v1_state_trees: false,
skip_v2_state_trees: false,
skip_v1_address_trees: false,
skip_v2_address_trees: false,
},
rpc_pool_config: RpcPoolConfig {
max_size: 50,
connection_timeout_secs: 15,
idle_timeout_secs: 300,
max_retries: 10,
initial_retry_delay_ms: 1000,
max_retry_delay_ms: 16000,
},
registry_pubkey: light_registry::ID,
payer_keypair: env.protocol.forester.insecure_clone(),
derivation_pubkey: env.protocol.forester.pubkey(),
address_tree_data: vec![],
state_tree_data: vec![],
};
let test_mode = TestMode::from_env();

if test_mode == TestMode::Local {
init(Some(LightValidatorConfig {
enable_indexer: true,
wait_time: 60,
prover_config: None,
sbf_programs: vec![(
"FNt7byTHev1k5x2cXZLBr8TdWiC3zoP5vcnZR4P682Uy".to_string(),
"../target/deploy/create_address_test_program.so".to_string(),
)],
limit_ledger_size: None,
}))
.await;
spawn_prover(ProverConfig::default()).await;
}

let mut rpc = setup_rpc_connection(&env.protocol.forester).await;
if test_mode == TestMode::Local {
ensure_sufficient_balance(
&mut rpc,
&env.protocol.forester.pubkey(),
LAMPORTS_PER_SOL * 100,
)
.await;
ensure_sufficient_balance(
&mut rpc,
&env.protocol.governance_authority.pubkey(),
LAMPORTS_PER_SOL * 100,
)
.await;
}

let mut photon_indexer = create_photon_indexer();
let protocol_config = get_protocol_config(&mut rpc).await;

let active_phase_slot = get_active_phase_start_slot(&mut rpc, &protocol_config).await;
wait_for_slot(&mut rpc, active_phase_slot).await;

// Get initial state for V1 state tree if enabled
let pre_state_v1_root = if is_v1_state_test_enabled() {
let (_, _, root) = get_initial_merkle_tree_state(
&mut rpc,
&env.v1_state_trees[0].merkle_tree,
TreeType::StateV1,
)
.await;
Some(root)
} else {
None
};

// Get initial state for V1 address tree if enabled
let pre_address_v1_root = if is_v1_address_test_enabled() {
let (_, _, root) = get_initial_merkle_tree_state(
&mut rpc,
&env.v1_address_trees[0].merkle_tree,
TreeType::AddressV1,
)
.await;
Some(root)
} else {
None
};

// Get initial state for V2 state tree if enabled
let pre_state_v2_root = if is_v2_state_test_enabled() {
let (_, _, root) = get_initial_merkle_tree_state(
&mut rpc,
&env.v2_state_trees[0].merkle_tree,
TreeType::StateV2,
)
.await;
Some(root)
} else {
None
};

// Get initial state for V2 address tree if enabled
let pre_address_v2_root = if is_v2_address_test_enabled() {
let (_, _, root) =
get_initial_merkle_tree_state(&mut rpc, &env.v2_address_trees[0], TreeType::AddressV2)
.await;
Some(root)
} else {
None
};

let payer = get_forester_keypair();
println!("payer pubkey: {:?}", payer.pubkey());

if test_mode == TestMode::Local {
ensure_sufficient_balance(&mut rpc, &payer.pubkey(), LAMPORTS_PER_SOL * 100).await;
} else {
ensure_sufficient_balance(&mut rpc, &payer.pubkey(), LAMPORTS_PER_SOL).await;
}

// V1 mint if V1 test enabled
let legacy_mint_pubkey = if is_v1_state_test_enabled() {
let legacy_mint_keypair = Keypair::new();
let pubkey = create_mint_helper_with_keypair(&mut rpc, &payer, &legacy_mint_keypair).await;

let sig = mint_to(
&mut rpc,
&env.v1_state_trees[0].merkle_tree,
&payer,
&pubkey,
)
.await;
println!("v1 mint_to: {:?}", sig);
Some(pubkey)
} else {
println!("Skipping V1 mint - V1 state test disabled");
None
};

// V2 mint if V2 test enabled
let batch_mint_pubkey = if is_v2_state_test_enabled() {
let batch_mint_keypair = Keypair::new();
let pubkey = create_mint_helper_with_keypair(&mut rpc, &payer, &batch_mint_keypair).await;

let sig = mint_to(
&mut rpc,
&env.v2_state_trees[0].output_queue,
&payer,
&pubkey,
)
.await;
println!("v2 mint_to: {:?}", sig);
Some(pubkey)
} else {
println!("Skipping V2 mint - V2 state test disabled");
None
};

let mut sender_batched_accs_counter = 0;
let mut sender_legacy_accs_counter = 0;
let mut sender_batched_token_counter: u64 = MINT_TO_NUM * 2;
let mut address_v1_counter = 0;
let mut address_v2_counter = 0;

wait_for_indexer(&rpc, &photon_indexer).await.unwrap();

let rng_seed = rand::thread_rng().gen::<u64>();
println!("seed {}", rng_seed);
let rng = &mut StdRng::seed_from_u64(rng_seed);

let (service_handle, shutdown_sender, mut work_report_receiver) =
setup_forester_pipeline(&config).await;

execute_test_transactions(
&mut rpc,
&mut photon_indexer,
rng,
&env,
&payer,
legacy_mint_pubkey.as_ref(),
batch_mint_pubkey.as_ref(),
&mut sender_batched_accs_counter,
&mut sender_legacy_accs_counter,
&mut sender_batched_token_counter,
&mut address_v1_counter,
&mut address_v2_counter,
)
.await;

wait_for_work_report(&mut work_report_receiver, &state_tree_params).await;

// Verify root changes based on enabled tests
if is_v1_state_test_enabled() {
if let Some(pre_root) = pre_state_v1_root {
verify_root_changed(
&mut rpc,
&env.v1_state_trees[0].merkle_tree,
&pre_root,
TreeType::StateV1,
)
.await;
}
}

if is_v2_state_test_enabled() {
if let Some(pre_root) = pre_state_v2_root {
verify_root_changed(
&mut rpc,
&env.v2_state_trees[0].merkle_tree,
&pre_root,
TreeType::StateV2,
)
.await;
}
}

if is_v1_address_test_enabled() {
if let Some(pre_root) = pre_address_v1_root {
verify_root_changed(
&mut rpc,
&env.v1_address_trees[0].merkle_tree,
&pre_root,
TreeType::AddressV1,
)
.await;
}
}

if is_v2_address_test_enabled() {
if let Some(pre_root) = pre_address_v2_root {
verify_root_changed(
&mut rpc,
&env.v2_address_trees[0],
&pre_root,
TreeType::AddressV2,
)
.await;
}
}

shutdown_sender
.send(())
.expect("Failed to send shutdown signal");
service_handle.await.unwrap().unwrap();
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Break down the monolithic test function for better maintainability.

The test function is very long and handles multiple responsibilities (setup, execution, verification). Consider extracting logical sections into separate functions.

+async fn setup_test_environment() -> (TestAccounts, ForesterConfig, LightClient, PhotonIndexer) {
+    // Move setup logic here
+}
+
+async fn execute_test_scenarios(/* parameters */) {
+    // Move test execution logic here  
+}
+
+async fn verify_test_results(/* parameters */) {
+    // Move verification logic here
+}
+
 #[tokio::test(flavor = "multi_thread", worker_threads = 16)]
 #[serial]
 async fn test_e2e_v2() {
+    let (env, config, mut rpc, mut photon_indexer) = setup_test_environment().await;
+    execute_test_scenarios(/* parameters */).await;
+    verify_test_results(/* parameters */).await;
-    // Move existing logic to extracted functions
 }

Committable suggestion skipped: line range outside the PR's diff.

🤖 Prompt for AI Agents
In forester/tests/e2e_v2_test.rs from lines 182 to 448, the test_e2e_v2 function
is too long and mixes setup, execution, and verification logic. Refactor by
extracting these logical sections into separate helper functions such as
setup_environment, perform_minting, execute_transactions, and verify_results.
Then call these functions sequentially in the main test function to improve
readability and maintainability.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

♻️ Duplicate comments (3)
forester-utils/src/instructions/state_batch_append.rs (1)

156-156: Consider streaming proof generation for improved responsiveness

As suggested by @ananas-block, waiting for all proofs before yielding any results reduces responsiveness. Consider yielding proofs as they complete rather than waiting for all to finish.

Instead of collecting all futures and awaiting them together, consider using FuturesUnordered to yield results as they complete:

-let proof_results = future::join_all(proof_futures).await;
+use futures::stream::FuturesUnordered;
+let mut proof_stream = proof_futures.into_iter().collect::<FuturesUnordered<_>>();
+
+while let Some(proof_result) = proof_stream.next().await {
+    match proof_result {
+        Ok(data) => yield Ok(data),
+        Err(e) => {
+            yield Err(ForesterUtilsError::Prover(format!("Proof generation failed: {}", e)));
+            return;
+        }
+    }
+}
forester-utils/src/instructions/address_batch_update.rs (1)

242-244: Add unit tests for critical index calculation

The adjusted_start_index calculation combines multiple offsets and is critical for correctness. Please add comprehensive unit tests as suggested in the previous review.

forester/src/processor/v2/common.rs (1)

89-109: Clarify transaction atomicity when proof generation fails mid-stream

@ananas-block raised a valid concern: if 10 proofs are requested with ixs_per_tx=5 and the 4th proof fails, the first transaction (with 5 proofs) would already be sent and committed, while the second transaction would never be sent. This creates a partial state update.

Consider documenting this behavior or implementing a two-phase approach where all proofs are validated before sending any transactions.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 6fd6307 and 6fa912f.

📒 Files selected for processing (7)
  • forester-utils/src/instructions/address_batch_update.rs (5 hunks)
  • forester-utils/src/instructions/state_batch_append.rs (2 hunks)
  • forester-utils/src/instructions/state_batch_nullify.rs (1 hunks)
  • forester-utils/src/lib.rs (1 hunks)
  • forester/src/processor/v2/address.rs (1 hunks)
  • forester/src/processor/v2/common.rs (7 hunks)
  • forester/src/processor/v2/state.rs (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
  • forester/src/processor/v2/address.rs
  • forester/src/processor/v2/state.rs
⏰ Context from checks skipped due to timeout of 90000ms (25)
  • GitHub Check: system-programs (sdk-anchor-test-program, ["cargo-test-sbf -p sdk-anchor-test", "cargo-test-sbf -...
  • GitHub Check: system-programs (sdk-test-program, ["cargo-test-sbf -p sdk-test"])
  • GitHub Check: system-programs (counter-test, ["cargo test-sbf -p counter"])
  • GitHub Check: system-programs (token-escrow-test, [ "cargo test-sbf -p token-escrow" ])
  • GitHub Check: Test program-libs-fast
  • GitHub Check: Test program-libs-slow
  • GitHub Check: Test sdk-libs
  • GitHub Check: Test batched-merkle-tree-simulate
  • GitHub Check: Test concurrent-merkle-tree
  • GitHub Check: stateless-js-v2
  • GitHub Check: test-e2e (legacy)
  • GitHub Check: test-e2e
  • GitHub Check: system-programs (light-system-program, ["cargo-test-sbf -p system-test"])
  • GitHub Check: lint
  • GitHub Check: system-programs (light-compressed-token, ["cargo-test-sbf -p compressed-token-test"])
  • GitHub Check: system-programs (system-cpi-test-v2-event, ["cargo-test-sbf -p system-cpi-v2-test -- event::parse"])
  • GitHub Check: system-programs (account-compression, ["cargo-test-sbf -p account-compression-test"])
  • GitHub Check: stateless-js-v1
  • GitHub Check: system-programs (system-cpi-test, ["cargo-test-sbf -p system-cpi-test"])
  • GitHub Check: system-programs (random-e2e-test, ["cargo-test-sbf -p e2e-test"])
  • GitHub Check: system-programs (system-cpi-test-v2-functional, ["cargo-test-sbf -p system-cpi-v2-test -- functio...
  • GitHub Check: system-programs (light-registry, ["cargo-test-sbf -p registry-test"])
  • GitHub Check: system-programs (system-cpi-test-v2-other, ["cargo-test-sbf -p system-cpi-v2-test -- --skip funct...
  • GitHub Check: cli-v2
  • GitHub Check: cli-v1
🔇 Additional comments (1)
forester-utils/src/instructions/state_batch_nullify.rs (1)

170-194: LGTM! Good fail-fast implementation with partial recovery

The implementation correctly collects all successful proofs before yielding the first error, ensuring that completed work isn't lost. This is a good pattern for batch processing.

Comment on lines +11 to +22
/// Parsed merkle tree data extracted from account
#[derive(Debug, Clone)]
pub struct ParsedMerkleTreeData {
pub next_index: u64,
pub current_root: [u8; 32],
pub root_history: Vec<[u8; 32]>,
pub zkp_batch_size: u16,
pub pending_batch_index: u32,
pub num_inserted_zkps: u64,
pub current_zkp_batch_index: u64,
pub leaves_hash_chains: Vec<[u8; 32]>,
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add documentation for public struct ParsedMerkleTreeData

This public struct lacks documentation. Please add doc comments explaining its purpose, usage, and the meaning of each field.

+/// Parsed merkle tree data extracted from on-chain account
+/// 
+/// This struct contains the essential data parsed from a BatchedMerkleTreeAccount
+/// to avoid redundant RPC calls and parsing operations.
 #[derive(Debug, Clone)]
 pub struct ParsedMerkleTreeData {
+    /// The next available index in the merkle tree
     pub next_index: u64,
+    /// The current root hash of the merkle tree
     pub current_root: [u8; 32],
+    /// History of root hashes for verification
     pub root_history: Vec<[u8; 32]>,
+    /// Size of each ZKP batch
     pub zkp_batch_size: u16,
+    /// Index of the current pending batch
     pub pending_batch_index: u32,
+    /// Number of ZKPs that have been inserted
     pub num_inserted_zkps: u64,
+    /// Current index within the ZKP batch
     pub current_zkp_batch_index: u64,
+    /// Hash chains for the leaves in the current batch
     pub leaves_hash_chains: Vec<[u8; 32]>,
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
/// Parsed merkle tree data extracted from account
#[derive(Debug, Clone)]
pub struct ParsedMerkleTreeData {
pub next_index: u64,
pub current_root: [u8; 32],
pub root_history: Vec<[u8; 32]>,
pub zkp_batch_size: u16,
pub pending_batch_index: u32,
pub num_inserted_zkps: u64,
pub current_zkp_batch_index: u64,
pub leaves_hash_chains: Vec<[u8; 32]>,
}
/// Parsed merkle tree data extracted from account
/// Parsed merkle tree data extracted from on-chain account
///
/// This struct contains the essential data parsed from a BatchedMerkleTreeAccount
/// to avoid redundant RPC calls and parsing operations.
#[derive(Debug, Clone)]
pub struct ParsedMerkleTreeData {
/// The next available index in the merkle tree
pub next_index: u64,
/// The current root hash of the merkle tree
pub current_root: [u8; 32],
/// History of root hashes for verification
pub root_history: Vec<[u8; 32]>,
/// Size of each ZKP batch
pub zkp_batch_size: u16,
/// Index of the current pending batch
pub pending_batch_index: u32,
/// Number of ZKPs that have been inserted
pub num_inserted_zkps: u64,
/// Current index within the ZKP batch
pub current_zkp_batch_index: u64,
/// Hash chains for the leaves in the current batch
pub leaves_hash_chains: Vec<[u8; 32]>,
}
🤖 Prompt for AI Agents
In forester-utils/src/lib.rs around lines 11 to 22, the public struct
ParsedMerkleTreeData lacks documentation. Add doc comments above the struct
definition describing its overall purpose and usage. Then add individual doc
comments for each field explaining what data it holds and its role in the
struct. Use clear and concise language to improve code readability and
maintainability.

Comment on lines +24 to +32
/// Parsed output queue data extracted from account
#[derive(Debug, Clone)]
pub struct ParsedQueueData {
pub zkp_batch_size: u16,
pub pending_batch_index: u32,
pub num_inserted_zkps: u64,
pub current_zkp_batch_index: u64,
pub leaves_hash_chains: Vec<[u8; 32]>,
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add documentation for public struct ParsedQueueData

This public struct lacks documentation. Please add doc comments explaining its purpose and the meaning of each field.

+/// Parsed output queue data extracted from on-chain account
+/// 
+/// This struct contains the essential data parsed from a BatchedQueueAccount
+/// to support batch processing operations.
 #[derive(Debug, Clone)]
 pub struct ParsedQueueData {
+    /// Size of each ZKP batch in the queue
     pub zkp_batch_size: u16,
+    /// Index of the current pending batch
     pub pending_batch_index: u32,
+    /// Number of ZKPs that have been inserted
     pub num_inserted_zkps: u64,
+    /// Current index within the ZKP batch
     pub current_zkp_batch_index: u64,
+    /// Hash chains for the leaves in the current batch
     pub leaves_hash_chains: Vec<[u8; 32]>,
 }
🤖 Prompt for AI Agents
In forester-utils/src/lib.rs around lines 24 to 32, the public struct
ParsedQueueData lacks documentation. Add doc comments above the struct
definition explaining its overall purpose and add comments for each field
describing what data it holds and its role in the context of the output queue
data extracted from an account.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (2)
forester-utils/src/instructions/address_batch_update.rs (1)

241-243: Verify the complex index calculation logic.

The adjusted_start_index calculation combines three different offsets:

  • global_start_index: The tree's global position
  • chunk_start_idx * batch_size: Position within the overall stream
  • batch_idx * batch_size: Position within the current chunk

This calculation appears correct but is complex enough to warrant additional unit testing to ensure all edge cases are handled properly.

#!/bin/bash
# Check for existing tests that validate the adjusted_start_index calculation
rg -A 10 -B 5 "adjusted_start_index|get_all_circuit_inputs_for_chunk" --type rust
forester/src/processor/v2/common.rs (1)

82-108: Potential issue with sequential proof execution requirement.

The streaming approach processes items as they arrive and sends transactions when the buffer fills. However, a past review comment mentioned that proofs need to be executed strictly in sequence, and asked what happens if proof generation fails partway through a batch. The current implementation doesn't appear to handle this sequential requirement or partial failure recovery.

If strict sequential processing is required, consider:

  1. Adding transaction ordering guarantees
  2. Implementing rollback mechanisms for partial failures
  3. Ensuring proof dependencies are maintained across transaction boundaries

Could you verify if the sequential proof execution requirement still applies and how partial failures should be handled in the new streaming model?

🧹 Nitpick comments (1)
forester/src/processor/v2/common.rs (1)

520-524: Potential memory concern with large batches.

For each parsing operation, a new leaves_hash_chains vector is created and populated. While this is cleaner than the previous approach, it could consume significant memory for large batches with many hash chains.

Consider whether the hash chains could be lazily loaded or if there's a way to share references when possible, especially if this data is short-lived.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between c591e2a and 1aa1da3.

📒 Files selected for processing (6)
  • forester-utils/src/instructions/address_batch_update.rs (5 hunks)
  • forester-utils/src/instructions/state_batch_append.rs (2 hunks)
  • forester-utils/src/instructions/state_batch_nullify.rs (1 hunks)
  • forester/src/processor/v2/address.rs (1 hunks)
  • forester/src/processor/v2/common.rs (7 hunks)
  • forester/src/processor/v2/state.rs (1 hunks)
🧰 Additional context used
🧬 Code Graph Analysis (1)
forester/src/processor/v2/address.rs (3)
forester-utils/src/instructions/address_batch_update.rs (1)
  • get_address_update_stream (268-315)
programs/registry/src/account_compression_cpi/sdk.rs (1)
  • create_batch_update_address_tree_instruction (490-516)
forester/src/processor/v2/common.rs (1)
  • process_stream (53-162)
⏰ Context from checks skipped due to timeout of 90000ms (25)
  • GitHub Check: Test program-libs-slow
  • GitHub Check: Test program-libs-fast
  • GitHub Check: Test batched-merkle-tree-simulate
  • GitHub Check: Test sdk-libs
  • GitHub Check: Test concurrent-merkle-tree
  • GitHub Check: system-programs (random-e2e-test, ["cargo-test-sbf -p e2e-test"])
  • GitHub Check: system-programs (system-cpi-test-v2-functional, ["cargo-test-sbf -p system-cpi-v2-test -- functio...
  • GitHub Check: system-programs (system-cpi-test-v2-event, ["cargo-test-sbf -p system-cpi-v2-test -- event::parse"])
  • GitHub Check: system-programs (light-system-program, ["cargo-test-sbf -p system-test"])
  • GitHub Check: system-programs (light-compressed-token, ["cargo-test-sbf -p compressed-token-test"])
  • GitHub Check: system-programs (system-cpi-test-v2-other, ["cargo-test-sbf -p system-cpi-v2-test -- --skip funct...
  • GitHub Check: system-programs (light-registry, ["cargo-test-sbf -p registry-test"])
  • GitHub Check: system-programs (account-compression, ["cargo-test-sbf -p account-compression-test"])
  • GitHub Check: system-programs (system-cpi-test, ["cargo-test-sbf -p system-cpi-test"])
  • GitHub Check: test-e2e
  • GitHub Check: test-e2e (legacy)
  • GitHub Check: system-programs (sdk-test-program, ["cargo-test-sbf -p sdk-test"])
  • GitHub Check: system-programs (sdk-anchor-test-program, ["cargo-test-sbf -p sdk-anchor-test", "cargo-test-sbf -...
  • GitHub Check: system-programs (counter-test, ["cargo test-sbf -p counter"])
  • GitHub Check: system-programs (token-escrow-test, [ "cargo test-sbf -p token-escrow" ])
  • GitHub Check: stateless-js-v2
  • GitHub Check: cli-v1
  • GitHub Check: cli-v2
  • GitHub Check: lint
  • GitHub Check: stateless-js-v1
🔇 Additional comments (23)
forester/src/processor/v2/state.rs (4)

22-46: Well-structured helper function with proper error handling.

The create_nullify_stream_future function correctly handles the async stream creation and error mapping. The error conversion from specific error types to anyhow::Error is appropriate for the abstraction level.


48-74: Clean abstraction for append stream creation.

The pattern is consistent with the nullify helper function, maintaining good code consistency across operations.


76-107: Excellent refactoring to use the streaming abstraction.

The function now properly delegates the complex streaming logic to the process_stream function while maintaining clear separation of concerns through the instruction builder closure.


109-141: Consistent implementation pattern.

The perform_append function follows the same clean pattern as perform_nullify, ensuring maintainability and consistency across the codebase.

forester/src/processor/v2/address.rs (2)

17-41: Consistent helper function implementation.

The create_stream_future follows the same pattern established in state.rs, which is good for code consistency and maintainability.


44-72: Clean refactoring to streaming model.

The process_batch function properly uses the new streaming abstraction while maintaining the same interface. The instruction builder closure is well-defined and the delegation to process_stream is appropriate.

forester-utils/src/instructions/state_batch_nullify.rs (3)

22-38: Clean helper function refactoring.

The generate_nullify_zkp_proof function is well-structured and properly handles the proof generation with appropriate error mapping.


84-194: Complex but well-structured streaming implementation.

The stream implementation handles several important aspects correctly:

  • Proper error propagation through the stream
  • Root validation to ensure consistency
  • Concurrent proof generation with fail-fast behavior
  • Early termination on errors to prevent unnecessary work

The fail-fast with partial recovery pattern (lines 169-193) is implemented correctly, yielding successful proofs before reporting the first error.


79-83: Consider connection lifecycle optimization.

The indexer guard is acquired, used briefly for waiting, then dropped. This pattern is efficient for releasing the mutex quickly.

forester-utils/src/instructions/state_batch_append.rs (3)

73-74: Good optimization for concurrent resource acquisition.

Using tokio::join! to concurrently acquire the indexer lock and RPC connection is an excellent optimization that reduces the total wait time compared to sequential acquisition.


95-193: Consistent streaming implementation with proper error handling.

The stream implementation follows the same pattern as the nullify function, maintaining consistency across the codebase. The concurrent proof generation and fail-fast behavior are implemented correctly.


91-94: Efficient resource management.

The early dropping of RPC connection and indexer guard after the initial validation is efficient and prevents holding resources longer than necessary.

forester-utils/src/instructions/address_batch_update.rs (4)

28-39: Well-designed configuration struct.

The AddressUpdateConfig struct provides a clean abstraction for the configuration parameters, improving code organization and reusability.


56-158: Complex but well-structured streaming implementation.

The chunked streaming approach correctly handles large batches by:

  • Limiting elements per call to prevent overwhelming the indexer
  • Proper offset calculation for pagination
  • Root validation on the first chunk
  • Concurrent proof generation with fail-fast behavior

The implementation maintains good separation of concerns and error handling.


160-162: Good helper function for batch calculation.

The calculate_max_zkp_batches_per_call function provides a clear abstraction for the chunking logic and ensures at least one batch per call.


268-315: Clean public API with early exit optimization.

The function correctly handles the empty hash chains case by returning an empty stream immediately, avoiding unnecessary processing. The delegation to the streaming helper maintains good separation of concerns.

forester/src/processor/v2/common.rs (7)

35-66: Documentation addresses previous feedback well.

The comprehensive documentation for the process_stream function successfully addresses the past review comment about the complex function signature. The type parameters, arguments, and return value are clearly explained.


164-181: Clean transaction sending implementation.

The send_transaction_batch helper function is well-structured and properly handles RPC connection management and transaction creation.


188-198: Excellent improvement to BatchReadyState design.

The refactored BatchReadyState enum now carries parsed data with each variant, eliminating the need to re-parse account data later. This is a significant improvement in both performance and code organization.


206-206: Simplified trait bounds improve maintainability.

The removal of the IndexerType<R> bound and addition of 'static lifetime bound simplifies the generic constraints, making the code more maintainable while preserving necessary functionality.


345-353: Enhanced logging provides valuable metrics.

The new queue metrics logging distinguishes between empty and non-empty queue states, providing better observability for monitoring and debugging batch processing performance.


507-508: Improved error handling with unified error types.

The change from BatchProcessError::UnsupportedTreeType to ForesterError::InvalidTreeType unifies error handling across the codebase, which is a good architectural improvement.


537-538: Correct batch readiness logic implementation.

The batch readiness check properly validates both the batch state and the availability of unprocessed ZKP batches, ensuring processing only occurs when there's actual work to be done.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (8)
forester/scripts/v2_stats.py (4)

5-5: Remove unused imports.

These imports are not used anywhere in the code and should be removed to clean up the script.

-from datetime import datetime, timedelta
+from datetime import datetime
 from collections import defaultdict
-from typing import Dict, List, Tuple, Optional
+from typing import Dict, List, Optional
 import argparse
 import statistics
-import json

Also applies to: 7-7, 10-10


66-66: Add error handling for type conversions.

The method performs multiple int() and float() conversions on regex capture groups without error handling, which could raise ValueError if the regex captures unexpected data.

Consider wrapping these conversions in try-except blocks:

-                'epoch': int(v1_start_match.group(3))
+                'epoch': int(v1_start_match.group(3)) if v1_start_match.group(3).isdigit() else 0

Apply similar error handling to all int() and float() conversions throughout the parsing methods, or wrap the entire parsing logic in a try-except block that logs parsing errors and continues processing.

Also applies to: 78-78, 99-99, 112-119, 132-135


148-148: Remove unnecessary f-string prefixes.

These strings have no placeholders and don't need the f prefix.

-        print(f"\nSUMMARY:")
+        print("\nSUMMARY:")
-                        print(f"      ^ This is the meaningful throughput metric for V2 (actual tree updates/sec)")
+                        print("      ^ This is the meaningful throughput metric for V2 (actual tree updates/sec)")

Also applies to: 225-225


248-261: Improve file handling with context manager.

The current file handling approach doesn't use context managers and could be simplified for better resource management.

-    # Read and parse log file
-    if args.logfile == '-':
-        log_file = sys.stdin
-    else:
-        log_file = open(args.logfile, 'r')
-    
-    try:
-        for line in log_file:
-            if 'TPS_METRIC' not in line:  # Match both V1 and V2
-                continue
-                
-            analyzer.parse_log_line(line)
-    finally:
-        if args.logfile != '-':
-            log_file.close()
+    # Read and parse log file
+    if args.logfile == '-':
+        log_file = sys.stdin
+        for line in log_file:
+            if 'TPS_METRIC' in line:
+                analyzer.parse_log_line(line)
+    else:
+        with open(args.logfile, 'r') as log_file:
+            for line in log_file:
+                if 'TPS_METRIC' in line:
+                    analyzer.parse_log_line(line)
forester-utils/src/instructions/state_batch_append.rs (1)

173-192: Duplicate buffer management logic.

Similar to the nullify implementation, the proof buffer yielding logic is duplicated. Consider creating a shared helper function for buffer management across both state operations.

forester-utils/src/instructions/address_batch_update.rs (1)

271-273: Verify complex offset calculation logic.

The adjusted_start_index calculation combines multiple offsets. While the logic appears correct, the complexity warrants additional verification through unit tests as mentioned in the previous review.

forester/tests/e2e_v2_test.rs (2)

730-860: Function has too many parameters.

This function still has 10 parameters which makes it difficult to maintain. Consider grouping related parameters into structs.


1193-1195: Replace panic with proper error handling.

Using panic for error handling in tests is not idiomatic. Consider using assertions or propagating the error.

🧹 Nitpick comments (3)
forester/scripts/v2_stats.py (1)

12-12: Fix PEP 8 spacing violations.

The code has several spacing issues that don't conform to PEP 8 standards.

+

 class V2TpsAnalyzer:
+

 def main():
 
+
 if __name__ == '__main__':

Also applies to: 238-238, 271-271

forester/src/processor/v2/state.rs (1)

88-96: Consider graceful error handling for serialization.

The instruction builder uses .unwrap() which could panic if serialization fails. While this is likely acceptable for guaranteed serializable data, consider using a more explicit error handling approach for better debugging.

-        data.try_to_vec().unwrap(),
+        data.try_to_vec().expect("Failed to serialize nullify instruction data"),
forester-utils/src/instructions/state_batch_nullify.rs (1)

179-198: Consider optimizing buffer yielding logic.

The buffer management strategy yields proofs when pending_count >= yield_batch_size. However, this logic is duplicated in two places. Consider extracting this into a helper function to reduce code duplication.

fn yield_proofs_if_ready(
    proof_buffer: &mut Vec<InstructionDataBatchNullifyInputs>,
    yield_batch_size: usize,
) -> Option<Vec<InstructionDataBatchNullifyInputs>> {
    if proof_buffer.len() >= yield_batch_size {
        let result = proof_buffer.clone();
        proof_buffer.clear();
        Some(result)
    } else {
        None
    }
}
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 2e87f8d and fc0ff3b.

📒 Files selected for processing (12)
  • cli/src/utils/constants.ts (1 hunks)
  • forester-utils/src/instructions/address_batch_update.rs (4 hunks)
  • forester-utils/src/instructions/state_batch_append.rs (2 hunks)
  • forester-utils/src/instructions/state_batch_nullify.rs (1 hunks)
  • forester/scripts/v2_stats.py (1 hunks)
  • forester/src/epoch_manager.rs (12 hunks)
  • forester/src/processor/v1/send_transaction.rs (4 hunks)
  • forester/src/processor/v2/address.rs (1 hunks)
  • forester/src/processor/v2/common.rs (6 hunks)
  • forester/src/processor/v2/state.rs (1 hunks)
  • forester/tests/e2e_v2_test.rs (1 hunks)
  • scripts/install.sh (4 hunks)
🚧 Files skipped from review as they are similar to previous changes (4)
  • scripts/install.sh
  • cli/src/utils/constants.ts
  • forester/src/processor/v1/send_transaction.rs
  • forester/src/epoch_manager.rs
🧰 Additional context used
🧬 Code Graph Analysis (1)
forester/src/processor/v2/address.rs (3)
forester-utils/src/instructions/address_batch_update.rs (1)
  • get_address_update_instruction_stream (299-352)
programs/registry/src/account_compression_cpi/sdk.rs (1)
  • create_batch_update_address_tree_instruction (490-516)
forester/src/processor/v2/common.rs (1)
  • process_stream (58-144)
🪛 Ruff (0.11.9)
forester/scripts/v2_stats.py

5-5: datetime.timedelta imported but unused

Remove unused import: datetime.timedelta

(F401)


7-7: typing.Tuple imported but unused

Remove unused import: typing.Tuple

(F401)


10-10: json imported but unused

Remove unused import: json

(F401)


148-148: f-string without any placeholders

Remove extraneous f prefix

(F541)


225-225: f-string without any placeholders

Remove extraneous f prefix

(F541)


248-251: Use ternary operator log_file = sys.stdin if args.logfile == '-' else open(args.logfile, 'r') instead of if-else-block

Replace if-else-block with log_file = sys.stdin if args.logfile == '-' else open(args.logfile, 'r')

(SIM108)


251-251: Use a context manager for opening files

(SIM115)

🪛 Flake8 (7.2.0)
forester/scripts/v2_stats.py

[error] 5-5: 'datetime.timedelta' imported but unused

(F401)


[error] 7-7: 'typing.Tuple' imported but unused

(F401)


[error] 10-10: 'json' imported but unused

(F401)


[error] 12-12: expected 2 blank lines, found 1

(E302)


[error] 148-148: f-string is missing placeholders

(F541)


[error] 225-225: f-string is missing placeholders

(F541)


[error] 238-238: expected 2 blank lines, found 1

(E302)


[error] 271-271: expected 2 blank lines after class or function definition, found 1

(E305)

🪛 Pylint (3.3.7)
forester/scripts/v2_stats.py

[refactor] 12-12: Too many instance attributes (9/7)

(R0902)


[refactor] 251-251: Consider using 'with' for resource-allocating operations

(R1732)

⏰ Context from checks skipped due to timeout of 90000ms (25)
  • GitHub Check: test-e2e
  • GitHub Check: test-e2e (legacy)
  • GitHub Check: lint
  • GitHub Check: cli-v2
  • GitHub Check: Test program-libs-fast
  • GitHub Check: Test program-libs-slow
  • GitHub Check: Test sdk-libs
  • GitHub Check: system-programs (light-compressed-token, ["cargo-test-sbf -p compressed-token-test"])
  • GitHub Check: Test batched-merkle-tree-simulate
  • GitHub Check: system-programs (random-e2e-test, ["cargo-test-sbf -p e2e-test"])
  • GitHub Check: Test concurrent-merkle-tree
  • GitHub Check: system-programs (system-cpi-test-v2-functional, ["cargo-test-sbf -p system-cpi-v2-test -- functio...
  • GitHub Check: system-programs (system-cpi-test, ["cargo-test-sbf -p system-cpi-test"])
  • GitHub Check: system-programs (system-cpi-test-v2-other, ["cargo-test-sbf -p system-cpi-v2-test -- --skip funct...
  • GitHub Check: system-programs (light-registry, ["cargo-test-sbf -p registry-test"])
  • GitHub Check: cli-v1
  • GitHub Check: system-programs (system-cpi-test-v2-event, ["cargo-test-sbf -p system-cpi-v2-test -- event::parse"])
  • GitHub Check: system-programs (light-system-program, ["cargo-test-sbf -p system-test"])
  • GitHub Check: system-programs (account-compression, ["cargo-test-sbf -p account-compression-test"])
  • GitHub Check: system-programs (token-escrow-test, [ "cargo test-sbf -p token-escrow" ])
  • GitHub Check: system-programs (sdk-anchor-test-program, ["cargo-test-sbf -p sdk-anchor-test", "cargo-test-sbf -...
  • GitHub Check: system-programs (sdk-test-program, ["cargo-test-sbf -p sdk-test"])
  • GitHub Check: system-programs (counter-test, ["cargo test-sbf -p counter"])
  • GitHub Check: stateless-js-v2
  • GitHub Check: stateless-js-v1
🔇 Additional comments (24)
forester/scripts/v2_stats.py (2)

167-174: Excellent documentation of V1 vs V2 differences.

The detailed explanation of the architectural differences between V1 and V2 transaction models is very helpful for users interpreting the metrics. This contextual information prevents misinterpretation of the performance data.


221-230: Well-designed V2-specific metrics.

The Items Processed Per Second (IPPS) metric and batching efficiency analysis provide meaningful insights into V2 performance that go beyond traditional TPS measurements. This addresses the architectural differences between V1 and V2 appropriately.

forester/src/processor/v2/state.rs (2)

22-47: Well-designed async stream helper function.

The error mapping from the streaming infrastructure to anyhow::Error provides good error propagation while maintaining type compatibility with the existing error handling system.


100-108: Excellent use of generic stream processing.

The delegation to process_stream with a closure-based instruction builder promotes code reuse and separation of concerns. This design pattern makes the streaming logic reusable across different operation types.

forester/src/processor/v2/address.rs (2)

28-42: Good use of configuration struct pattern.

The AddressUpdateConfig encapsulation reduces parameter passing complexity and makes the API more maintainable. The configuration approach is well-suited for the complex parameter requirements of the streaming infrastructure.


53-62: Consistent error handling pattern.

The instruction builder follows the same serialization pattern as the state operations. The consistency across modules improves maintainability.

forester-utils/src/instructions/state_batch_nullify.rs (2)

88-92: Good practice: Early resource cleanup.

Properly dropping the RPC connection and indexer guard after the initial setup minimizes resource contention. This is especially important in a connection pool environment.


129-134: Critical: Root consistency validation.

The root mismatch validation between indexer and on-chain state is essential for preventing invalid operations. The early termination on mismatch protects against processing invalid data.

forester-utils/src/instructions/state_batch_append.rs (2)

77-78: Excellent improvement: Parallel resource acquisition.

Using tokio::join! to acquire the indexer lock and RPC connection concurrently addresses the previous review concern about connection pool contention. This minimizes the time each resource is held.


132-137: Comprehensive root validation.

The root mismatch detection provides critical integrity checking. The error message is clear and actionable for debugging indexer synchronization issues.

forester-utils/src/instructions/address_batch_update.rs (3)

202-204: Good separation of concerns for batch size calculation.

Extracting the max ZKP batches calculation into a helper function improves readability and makes the logic testable independently.


88-99: Robust root validation with proper error handling.

The root validation only on the first chunk is an optimization that maintains integrity while avoiding redundant checks. The error messaging provides clear context for debugging.


131-164: Efficient concurrent proof generation with backpressure.

The yielding strategy with pending_count >= yield_batch_size provides backpressure control to prevent unbounded memory growth while maintaining concurrency. This is a well-designed streaming pattern.

forester/src/processor/v2/common.rs (8)

57-144: Well-implemented streaming batch processor with comprehensive metrics.

The function effectively handles asynchronous stream processing with proper error propagation, empty batch handling, and detailed performance metrics (TPS/IPS). The documentation adequately explains the complex generic signature.


146-163: LGTM!

Simple and effective helper function for transaction submission.


165-253: Clean refactoring with improved data flow.

The removal of IndexerType trait bound simplifies the interface, and passing pre-parsed data to processing methods reduces redundant parsing operations. The caching mechanism effectively prevents duplicate batch processing.


255-383: Efficient batch readiness verification with improved observability.

The refactoring to fetch and parse accounts once improves efficiency. The queue metrics logging (empty vs. has_elements) provides valuable observability for monitoring queue states.


385-419: LGTM!

Clean refactoring that leverages pre-parsed data, reducing redundant account fetching.


455-503: Well-structured account parsing with proper error handling.

The function correctly extracts all necessary data from merkle tree accounts and determines batch readiness. The error handling for unsupported tree types is appropriate.


505-540: LGTM!

Consistent implementation with the merkle tree parsing function, maintaining good code structure.


542-554: LGTM!

Simple and correct implementation with proper edge case handling.

forester/tests/e2e_v2_test.rs (3)

70-180: Well-structured test configuration with good flexibility.

The environment-based configuration approach allows easy switching between local and devnet testing. The keypair parsing supports both byte array and base58 formats, improving usability.


182-455: Comprehensive end-to-end test with proper lifecycle management.

The test effectively covers all tree types (V1/V2 state/address) with feature flag support, proper setup for both local and devnet environments, and thorough verification of state changes.


862-1384: Well-organized test utilities with comprehensive coverage.

The helper functions provide thorough coverage of minting, compression, transfers, and address creation for both V1 and V2 trees. Good separation of concerns and clear function purposes.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants