Skip to content

Conversation

@mkysel
Copy link
Collaborator

@mkysel mkysel commented Mar 17, 2025

Adds all metrics from #568

On top of that:

  • refactors self-termination watchContract
  • mutes loud debug logs
  • uses waitgroups in some tests to speed them up
  • some other driveby refactors

Fixes #568

Summary by CodeRabbit

  • New Features

    • Introduced enhanced monitoring that displays new metrics for key blockchain processing statistics, including maximum block processed, block lag, and storage error counts.
    • Added a configurable disconnect timeout to help ensure a more graceful recovery during prolonged network interruptions.
  • Refactor

    • Streamlined error handling and retry processes to improve overall system stability and reliability during blockchain operations.

@mkysel mkysel requested a review from a team as a code owner March 17, 2025 18:40
@coderabbitai
Copy link
Contributor

coderabbitai bot commented Mar 17, 2025

Walkthrough

This pull request introduces a new timer mechanism in the blockchain log streamer's watchContract method to manage maximum disconnect duration and improve graceful shutdown. It also updates error handling in the indexer by incorporating a retry function and modifying error types with new, explicit unrecoverable and retryable errors. The changes include enhanced Prometheus metrics for block progress and storage error counts, as well as a type update for block numbers. Additionally, test synchronization has been improved using wait groups, and a new configuration field for disconnect time has been added.

Changes

File(s) Change Summary
pkg/blockchain/rpcLogStreamer.go Introduced a time.Timer in watchContract for disconnect management, updated logging level for reorganization events, and adjusted block number types in GetNextPage and buildFilterQuery.
pkg/indexer/indexer.go, pkg/indexer/indexer_test.go Enhanced error handling in indexLogs with a new retry function; added contractAddress parameter; replaced fixed sleep durations with sync.WaitGroup for test synchronization.
pkg/indexer/storer/error.go, pkg/indexer/storer/groupMessage.go, pkg/indexer/storer/identityUpdate.go Replaced the generic log storage error function with distinct UnrecoverableLogStorageError and RetryableLogStorageError types, refining error handling flows.
pkg/metrics/indexer.go, pkg/metrics/metrics.go Added new Prometheus metrics (indexerMaxBlock, indexerCurrentBlockLag, and indexerCountRetryableStorageErrors), adjusted block number type for metric emission, and provided new emission functions.
pkg/testutils/config.go Added a new field MaxChainDisconnectTime to the ContractsOptions struct.

Sequence Diagram(s)

sequenceDiagram
  participant RS as RpcLogStreamer
  participant T as Timer
  participant BN as Blockchain Node
  participant L as Logger

  RS->>T: Start timer for maximum disconnect time
  RS->>BN: Request logs
  alt Logs received or re-org detected
      RS->>T: Reset timer
      RS->>L: Log successful operation or warning (if re-org)
  else No logs / timeout condition
      T->>RS: Trigger timeout event
      RS->>L: Emit fatal shutdown log
  end
Loading
sequenceDiagram
  participant IL as IndexLogs
  participant R as Retry Function
  participant F as Operation Function
  participant L as Logger

  IL->>R: Invoke retry with operation, sleep duration, & contract address
  loop Retry attempts
      R->>F: Execute storage operation
      alt Operation succeeds
          R->>IL: Return success result
      else Operation fails with retryable error
          R->>L: Emit metric for retryable error
          Note over R: Wait for sleep duration before retrying
      end
  end
  R->>IL: Return error after maximum retries
Loading

Assessment against linked issues

Objective Addressed Explanation
Indexer Metrics - Block Tracking (gauges for current block, current chain block, and lag) [#568]
Indexer Metrics - Retryable Errors (count of retryable storage errors) [#568]
Indexer Metrics - Processed Message Rates and Validation/Storage Success Rates [#568] Metrics for processed messages and validation/storage success rates were not implemented.

Possibly related PRs

  • More sync metrics #634: Updates to the GetNextPage method and metrics emission, indicating a direct relationship in handling block number types and log streaming metrics.
  • Update blockTracker as messages are indexed #284: Modifications involving the conversion of block number types from int to uint64 in the blockchain log streamer, closely aligning with the changes in this PR.
  • feat: detect chain reorgs #411: Changes to the watchContract method for handling blockchain reorganization events and error logging, similar to the modifications introduced in this PR.

Suggested reviewers

  • fbac
  • neekolas

Warning

There were issues while running some tools. Please review the errors and either fix the tool’s configuration or disable the tool if it’s a critical failure.

🔧 golangci-lint (1.62.2)

Error: can't load config: the Go language version (go1.23) used to build golangci-lint is lower than the targeted Go version (1.24)
Failed executing command with error: can't load config: the Go language version (go1.23) used to build golangci-lint is lower than the targeted Go version (1.24)

Tip

⚡🧪 Multi-step agentic review comment chat (experimental)
  • We're introducing multi-step agentic chat in review comments. This experimental feature enhances review discussions with the CodeRabbit agentic chat by enabling advanced interactions, including the ability to create pull requests directly from comments.
    - To enable this feature, set early_access to true under in the settings.
✨ Finishing Touches
  • 📝 Generate Docstrings

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (1)
pkg/indexer/storer/identityUpdate.go (1)

107-109: Logging new members is clear, but confirm verbosity.
While logging new members is helpful, consider whether Info is the appropriate level if these updates occur frequently.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between cc88300 and 7ce63ae.

📒 Files selected for processing (9)
  • pkg/blockchain/rpcLogStreamer.go (5 hunks)
  • pkg/indexer/indexer.go (5 hunks)
  • pkg/indexer/indexer_test.go (5 hunks)
  • pkg/indexer/storer/error.go (1 hunks)
  • pkg/indexer/storer/groupMessage.go (5 hunks)
  • pkg/indexer/storer/identityUpdate.go (11 hunks)
  • pkg/metrics/indexer.go (2 hunks)
  • pkg/metrics/metrics.go (1 hunks)
  • pkg/testutils/config.go (1 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (2)
  • GitHub Check: Test (Node)
  • GitHub Check: Build pre-baked anvil-xmtpd
🔇 Additional comments (54)
pkg/metrics/metrics.go (1)

72-74: New metrics added for better observability.

The addition of these three metrics enhances the observability of the indexer component:

  • indexerMaxBlock likely tracks the highest block processed
  • indexerCurrentBlockLag provides visibility into how far behind the indexer is from the chain head
  • indexerCountRetryableStorageErrors helps monitor storage failures that can be retried

These metrics align well with the PR's objective of adding metrics related to issue #568.

pkg/testutils/config.go (1)

22-22: MaxChainDisconnectTime configuration added for watchContract.

This new configuration parameter sets a reasonable default timeout (10 seconds) for blockchain disconnect scenarios in test environments. This directly supports the PR objective of refactoring the self-termination functionality of watchContract.

pkg/indexer/storer/groupMessage.go (1)

46-46: Error handling improved with explicit error types.

The error handling has been refactored to use more specific error types:

  • NewUnrecoverableLogStorageError for parsing, validation, and marshaling errors
  • NewRetryableLogStorageError for database operations that might succeed on retry

This change improves code readability and error classification, supporting the metrics for tracking retryable errors.

Also applies to: 54-54, 65-67, 73-73, 82-82, 88-88, 100-100, 111-111

pkg/indexer/indexer.go (4)

9-9: Added metrics import for error tracking.

The addition of the metrics import supports the emission of metrics for retryable storage errors, aligning with the PR objective to add metrics.


107-107: Contract address parameter added for metrics tracking.

Adding the contract address as a parameter to indexLogs allows for more granular tracking of errors by contract, enhancing observability.

Also applies to: 137-137


343-354: Retry logic refactored for better maintainability.

The old retry loop structure has been replaced with a dedicated retry function, which:

  1. Improves code readability by centralizing retry logic
  2. Provides a clearer control flow
  3. Enables metric emission for retryable errors

This is a good refactoring that maintains the same functionality while making the code more maintainable.


359-377: Well-designed retry function with metrics integration.

The new retry function:

  1. Takes the necessary context (logger, sleep duration, address)
  2. Uses a clean loop structure for retries
  3. Handles error classification via the ShouldRetry() method
  4. Integrates metrics emission for retryable errors

This implementation is clean, maintainable, and supports the metrics objectives of the PR.

pkg/indexer/storer/identityUpdate.go (12)

6-6: Import is appropriate for new error handling logic.
This import is required for the recently introduced errors.As usage.


59-59: Switching to UnrecoverableLogStorageError is correct.
Creating a distinct error type clarifies that parse failures cannot be retried.


69-69: Proper use of UnrecoverableLogStorageError.
Indicating an irrecoverable condition for sequence ID retrieval aligns with the new categorization.


119-119: Correctly marks insertion errors as retryable.
Database insertion failures often merit a retry if they are transient in nature.


147-147: Consistent retryable error usage for revocation failures.
Revocation logic can also fail for temporary DB reasons. This is aligned with the new approach.


161-161: UnrecoverableLogStorageError for originator envelope errors.
Building envelopes is typically logic-based rather than transient, thus not retryable.


171-171: Same pattern for signing envelope failures.
Signing errors also likely indicate a permanent issue.


177-177: Unrecoverable on marshalling originator envelope.
Marshalling failures often signal malformed data rather than a transient condition.


187-188: Switch to retryable error for gateway envelope insertion.
This is correct if an insertion failure might be due to a temporary DB condition.


198-198: Retryable insertion error for blockchain messages.
Storing blockchain messages could fail due to transient issues, fitting the retryable pattern well.


206-207: Leveraging errors.As for custom error interface.
This pattern neatly distinguishes internal errors from system/database exceptions.


211-212: Default to retryable for unknown errors.
A safe fallback approach if the error type is not recognized.

pkg/indexer/indexer_test.go (9)

6-6: Introduced sync package.
WaitGroup usage replaces arbitrary sleep for more robust test synchronization.


45-47: Set up WaitGroup for two asynchronous calls.
Appropriately increments the counter to track StoreLog and UpdateLatestBlock completions.


51-53: Signaling wg.Done() inside the mock run function.
Ensures that the test knows exactly when the mock call completes.


78-89: Replaces sleep with WaitGroup and channel signaling.
This approach is less error-prone and more deterministic than using time.Sleep.


118-120: WaitGroup usage in TestIndexLogsRetryableError.
Consistent approach for tracking asynchronous test operations.


127-127: Properly invoking wg.Done() after StoreLog.
Ensures that the test waits for the storer mock call to finish.


129-133: Simulating a retryable error and then an unrecoverable error.
This is a realistic test scenario that aligns perfectly with the new error infrastructure.


147-148: Passing testContract parameter.
Verifies that the contract reference is correctly assigned in the test.


150-161: WaitGroup-based synchronization with channel closure.
Again, a robust replacement for fixed sleeps in tests.

pkg/indexer/storer/error.go (8)

8-9: New UnrecoverableLogStorageError struct.
Helps differentiate permanent failures from transient ones.


12-14: Implements the Error() method properly.
Exposes the wrapped error’s message as expected.


16-18: ShouldRetry() returning false for unrecoverable errors.
Clearly encodes that these errors cannot be retried.


20-21: Constructor for UnrecoverableLogStorageError.
Simple and clear factory function.


24-26: RetryableLogStorageError struct for transient errors.
Separating concerns for retry logic is a smart design move.


28-28: Error() implementation for retryable errors.
Directly wraps the underlying error message.


32-34: ShouldRetry() set to true for retryable scenario.
Ensures that callers can differentiate between permanent vs. temporary conditions.


36-37: Constructor for RetryableLogStorageError.
Maintains parallelism with the unrecoverable error constructor.

pkg/metrics/indexer.go (7)

26-32: Good addition of max block metric for tracking indexer progress.

This gauge metric will help monitor the maximum block on the chain to be processed, providing better visibility into the indexer's progress.


34-40: Great addition of block lag metric.

The block lag metric provides valuable information about how far behind the indexer is, which is critical for monitoring and alerting on indexer health.


42-48: Good implementation of storage error tracking.

This counter will help monitor and analyze retryable storage errors, which aligns with improving the indexer's error handling and reliability.


72-75: Proper type safety improvement from int to uint64.

Changing the block parameter from int to uint64 is a good improvement as block numbers are always positive and this aligns with Ethereum's native block number representation.


77-80: Good implementation of max block emission function.

This function complements the new indexerMaxBlock metric and follows the established pattern for the metrics emitter functions.


87-90: Well-designed block lag emission function.

This implementation correctly follows the pattern of other metric emitters and will be useful for tracking indexing progress.


92-95: Good implementation of error counter increment.

This function provides a clean way to track retryable storage errors by contract address, which will be valuable for debugging and monitoring.

pkg/blockchain/rpcLogStreamer.go (11)

132-134: Good implementation of self-termination timer.

The timer implementation is a cleaner approach than using manual timestamp tracking. The defer timer.Stop() ensures proper cleanup.


140-144: Improved shutdown handling for prolonged disconnections.

Using a timer-based approach with a fatal log provides a more graceful and deterministic way to handle excessive disconnect times rather than panicking.


151-156: Better log level for blockchain reorganization.

Changing from Info to Warn is appropriate as a reorg is an exceptional event that operators should be aware of. The timer reset ensures proper tracking of disconnect time after a reorg.


169-170: Good timer reset after successful operation.

Properly resetting the timer after successfully getting logs ensures accurate tracking of disconnect time.


171-174: Simplified no-logs handling logic.

The simplified conditional is cleaner and more maintainable than the previous implementation, while preserving the same behavior.


201-202: Good integration of max block metric.

This properly ties the blockchain client's block number to the metrics system, providing visibility into chain progress.


205-210: Well-implemented block lag metrics.

The code correctly handles both the case when the indexer is caught up (lag = 0) and when it's lagging behind, providing accurate metrics.


226-227: Proper updating of current block metric.

Using the toBlock value for the current block metric is correct as it represents the highest block that has been fully processed in the current iteration.


229-230: Clear next block calculation.

The code clearly defines the next block to process as one past the current range's end.


240-242: Good type consistency for block numbers.

Changing from int64 to uint64 for block numbers provides better type consistency throughout the codebase and aligns with Ethereum's native block number representation.

Also applies to: 250-252


215-220:

Details

❓ Verification inconclusive

Verify error handling for FilterLogs call.

The error handling for the FilterLogs call looks good, but you might want to consider specific handling for different types of RPC errors that might occur, such as rate limiting or temporary network issues.


🏁 Script executed:

#!/bin/bash
# Check how RPC errors are handled across the codebase
rg -A 10 -B 5 "FilterLogs.*err\s*!=" --glob "*.go"

Length of output: 52


ACTION: Review and potentially refine error handling for the FilterLogs RPC call

The current implementation wraps the call in metrics.MeasureGetLogs, and while this provides basic error propagation, it's unclear if different RPC error cases—such as rate limiting or transient network errors—are handled distinctly. Please verify manually whether:

  • Specialized error handling for these scenarios is needed at this call site or within MeasureGetLogs
  • The broader codebase already addresses such distinctions elsewhere

If such differentiation would improve robustness, consider adding explicit checks for these error types.

@mkysel mkysel merged commit 5378eb5 into main Mar 17, 2025
8 checks passed
@mkysel mkysel deleted the mkysel/indexer-metrics branch March 17, 2025 21:05
mkysel added a commit that referenced this pull request Mar 20, 2025
Adds all metrics from #568

On top of that:
- refactors self-termination watchContract
- mutes loud debug logs
- uses waitgroups in some tests to speed them up
- some other driveby refactors

Fixes #568

<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->

- **New Features**
- Introduced enhanced monitoring that displays new metrics for key
blockchain processing statistics, including maximum block processed,
block lag, and storage error counts.
- Added a configurable disconnect timeout to help ensure a more graceful
recovery during prolonged network interruptions.

- **Refactor**
- Streamlined error handling and retry processes to improve overall
system stability and reliability during blockchain operations.

<!-- end of auto-generated comment: release notes by coderabbit.ai -->
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Indexer Metrics

3 participants