Skip to content

Conversation

@fbac
Copy link
Collaborator

@fbac fbac commented May 20, 2025

Add custom start block configuration for contract event indexing to enable indexing from specified block heights

Introduces configuration options and implementation for custom start block heights in contract event indexing. Key changes include:

  • Adds GroupMessageBroadcasterStartBlock and IdentityUpdateBroadcasterStartBlock configuration options in options.go
  • Modifies BlockTracker initialization in block_tracker.go to support starting from specified block numbers
  • Updates GroupMessageBroadcaster and IdentityUpdateBroadcaster in group_message.go and identity_update.go to accept start block parameters
  • Improves error handling and logging across the indexing system with block number context

📍Where to Start

Start with the configuration changes in options.go to understand the new start block parameters, then review the BlockTracker modifications in block_tracker.go which implements the core functionality.


Macroscope summarized f96b584.

Summary by CodeRabbit

  • New Features

    • Added support for specifying start block numbers for group message, identity update, payer registry, and payer report manager broadcasters via configuration options.
    • Introduced a two-phase log streaming approach with initial backfill and subsequent live subscription using WebSocket.
  • Improvements

    • Enhanced subscription management with automatic exponential backoff and retry logic for websocket connections.
    • Improved initialization of block trackers to start from a specified block on-chain if no database record exists.
    • Added detailed error wrapping and consistent error context during app chain initialization.
    • Enhanced log indexing with context-aware cancellation, detailed reorg event logging, and block number inclusion in retry error logs.
    • Updated configuration validation to actively verify WebSocket URLs.
    • Switched default RPC URLs and test setups from HTTP to WebSocket protocol for improved connectivity.
    • Refactored test setups for cleaner context and channel management.
  • Bug Fixes

    • Fixed handling of client and start block parameters during broadcaster and block tracker initialization.
  • Tests

    • Updated tests to reflect new API signatures, use mock blockchain clients, and verify block tracker initialization and log handling improvements.

@fbac fbac requested a review from a team as a code owner May 20, 2025 13:56
@coderabbitai
Copy link
Contributor

coderabbitai bot commented May 20, 2025

"""

Walkthrough

This change refactors RpcLogStreamer to implement a two-phase log streaming approach with initial backfill followed by websocket subscription with exponential backoff retries. It renames fields in ContractConfig, updates method signatures, and improves subscription management. Configuration structs are extended with WebSocket URLs and start block parameters. Broadcaster constructors and block tracker are updated to accept start blocks and Ethereum client references, enabling initialization from on-chain data if no DB record exists. Logging, error handling, and test setups are improved accordingly.

Changes

Files/Paths Change Summary
pkg/blockchain/rpcLogStreamer.go Refactored watchContract for two-phase log streaming (backfill then subscription); renamed ContractAddress to Address and backfillChannel to eventChannel; updated GetNextPage signature; added buildSubscriptionChannel and buildSubscription methods with exponential backoff retry logic; split filter query construction; updated Start and GetEventChannel methods.
pkg/config/options.go Deprecated RpcURL fields; added WssURL fields and start block fields (GroupMessageBroadcasterStartBlock, IdentityUpdateBroadcasterStartBlock, PayerRegistryStartBlock, PayerReportManagerStartBlock) to AppChainOptions and SettlementChainOptions.
pkg/indexer/app_chain/app_chain.go Added sentinel error ErrInitializingAppChain; wrapped initialization errors with it; passed start block parameters to broadcaster constructors; renamed ContractAddress to Address in ContractConfig.
pkg/indexer/app_chain/contracts/group_message.go
pkg/indexer/app_chain/contracts/identity_update.go
Updated constructors to accept startBlock parameter; passed it and client to block tracker initialization.
pkg/indexer/common/block_tracker.go Renamed fields for clarity; updated NewBlockTracker and loadLatestBlock to accept blockchain client and start block; fallback to on-chain block if no DB record; added Block.save method; improved error handling and naming consistency.
pkg/indexer/common/block_tracker_test.go Updated tests to use mock blockchain client and pass start block to NewBlockTracker; adjusted assertions for new initialization logic using mock block data.
pkg/indexer/common/log_handler.go Refactored IndexLogs to use select with context cancellation and channel close handling; added block number to retry function and error logs; added debug log for reorged events; improved logging after storing logs and updating latest block.
pkg/indexer/common/log_handler_test.go Refactored test setup to return a struct encapsulating channels, context, event, block number, and hash; updated tests to use this struct for cleaner code.
pkg/indexer/settlement_chain/contracts/payer_registry.go
pkg/indexer/settlement_chain/contracts/payer_report_manager.go
Updated constructors to accept startBlock parameter and client; passed them to block tracker initialization.
pkg/indexer/settlement_chain/settlement_chain.go Passed start block parameters to PayerRegistry and PayerReportManager constructors; renamed ContractAddress to Address in ContractConfig.
pkg/server/server_test.go Changed test server RPC URL scheme from HTTP to WebSocket for AppChain.
pkg/testutils/anvil/anvil.go Changed Anvil URL scheme from HTTP to WebSocket.
pkg/config/validation.go Replaced simple RPC URL validation with active WebSocket connection check using Gorilla WebSocket; added validateWebsocketURL function.
cmd/cli/cli_test.go Changed test input for --http-address flag from HTTP to WebSocket URL.
dev/local.env Changed default RPC_URL from HTTP to WebSocket URL.
go.mod Moved github.com/gorilla/websocket from indirect to direct dependency.
pkg/blockchain/rpcLogStreamer_test.go Updated test code to reflect renamed ContractConfig field and changed GetNextPage signature with added context parameter and return values.

Sequence Diagram(s)

sequenceDiagram
    participant RpcLogStreamer
    participant ContractConfig
    participant BlockchainNode
    participant SubscriptionChannel
    participant Logger

    RpcLogStreamer->>ContractConfig: Initialize watcher with Address and eventChannel
    RpcLogStreamer->>BlockchainNode: Backfill logs in pages until highest block considering unsafe reorg distance
    RpcLogStreamer-->>Logger: Log backfill progress and fromBlock
    RpcLogStreamer->>SubscriptionChannel: Build subscription channel with exponential backoff retries
    SubscriptionChannel->>BlockchainNode: Subscribe to live logs via websocket
    RpcLogStreamer->>SubscriptionChannel: Receive live logs and send to eventChannel
Loading

Possibly related PRs

Suggested labels

indexer

Suggested reviewers

  • mkysel
  • neekolas
    """

Warning

There were issues while running some tools. Please review the errors and either fix the tool's configuration or disable the tool if it's a critical failure.

🔧 golangci-lint (1.64.8)

Error: you are using a configuration file for golangci-lint v2 with golangci-lint v1: please use golangci-lint v2
Failed executing command with error: you are using a configuration file for golangci-lint v2 with golangci-lint v1: please use golangci-lint v2


📜 Recent review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 327e181 and 66a6b70.

📒 Files selected for processing (1)
  • pkg/blockchain/rpcLogStreamer.go (7 hunks)
🧰 Additional context used
🧠 Learnings (2)
📓 Common learnings
Learnt from: fbac
PR: xmtp/xmtpd#828
File: pkg/blockchain/rpcLogStreamer.go:0-0
Timestamp: 2025-05-22T11:22:58.453Z
Learning: The log buffer for RpcLogStreamer will be implemented in a subsequent PR with a minimum size equal to lagFromHighestBlock to prevent potential log loss during transition from backfill to subscription mode.
pkg/blockchain/rpcLogStreamer.go (1)
Learnt from: fbac
PR: xmtp/xmtpd#800
File: pkg/blockchain/rpcLogStreamer.go:0-0
Timestamp: 2025-05-19T09:45:00.127Z
Learning: In the blockchain RpcLogStreamer implementation, backfillChannel and reorgChannel are meant to be internal implementation details that are managed by the streamer itself, not provided by external callers. This is why they are maintained as unexported fields with getter methods for access.
⏰ Context from checks skipped due to timeout of 90000ms (5)
  • GitHub Check: Push Docker Images to GitHub Packages (xmtpd-cli)
  • GitHub Check: Test (Node)
  • GitHub Check: Upgrade Tests
  • GitHub Check: Push Docker Images to GitHub Packages (xmtpd)
  • GitHub Check: Code Review
🔇 Additional comments (8)
pkg/blockchain/rpcLogStreamer.go (8)

10-10: LGTM: Exponential backoff import added for subscription resilience.

The addition of the backoff package supports the new subscription retry logic with exponential backoff, which improves the robustness of websocket connections.


21-25: LGTM: Well-defined constants for streaming configuration.

The constants provide sensible defaults for backfill block size, subscription retries, and sleep intervals that align with the two-phase streaming approach.


31-31: LGTM: Field renamed for consistency.

Renaming ContractAddress to Address improves consistency across the codebase and aligns with Ethereum naming conventions.


34-34: LGTM: Channel renamed for clarity.

Renaming backfillChannel to eventChannel better reflects its purpose as the unified channel for both backfill and subscription events.


147-257: Excellent refactor implementing robust two-phase streaming.

The watchContract method has been completely rewritten to implement a sophisticated two-phase approach:

  1. Backfill phase: Efficiently pages through historical logs with proper error handling and context cancellation
  2. Subscription phase: Uses websocket subscription with exponential backoff retry logic

Key improvements:

  • Proper channel management with defer close statements
  • Context-aware cancellation throughout both phases
  • Unsafe reorg distance consideration for buffer management
  • Seamless transition between phases when backfill reaches current height

The implementation properly handles edge cases and provides excellent logging for observability.


259-299: LGTM: Enhanced method signature with proper context handling.

The updated GetNextPage method signature properly includes:

  • Context parameter for cancellation support
  • Returns highestBlock for improved downstream logic
  • Updated field references for the renamed struct fields

The implementation correctly handles the backfill logic with appropriate metrics emission.


301-384: Excellent subscription management with resilient retry logic.

The new subscription methods implement robust websocket subscription management:

buildSubscriptionChannel:

  • Creates properly buffered channel (100 capacity)
  • Manages subscription lifecycle in separate goroutine
  • Handles graceful shutdown with defer cleanup

buildSubscription:

  • Implements exponential backoff retry on subscription errors
  • Respects maximum disconnect time and retry limits
  • Provides excellent error logging and recovery

The use of the backoff library with configurable parameters ensures resilient connection management that can handle temporary network issues without losing events.


406-419: LGTM: Clean filter query builders updated for new struct fields.

Both buildFilterQuery and buildBaseFilterQuery are properly updated to use the renamed Address field. The separation of concerns between base query building and range-specific query building is well maintained.

✨ Finishing Touches
  • 📝 Generate Docstrings

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Explain this complex logic.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai explain this code block.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and explain its main purpose.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Support

Need help? Create a ticket on our support page for assistance with any issues or questions.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR.
  • @coderabbitai generate sequence diagram to generate a sequence diagram of the changes in this PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

@graphite-app
Copy link

graphite-app bot commented May 20, 2025

How to use the Graphite Merge Queue

Add either label to this PR to merge it via the merge queue:

  • Queue - adds this PR to the back of the merge queue
  • Hotfix - for urgent hot fixes, skip the queue and merge this PR next

You must have a Graphite account in order to use the merge queue. Sign up using this link.

An organization admin has enabled the Graphite Merge Queue in this repository.

Please do not merge from GitHub as this will restart CI on PRs being processed by the merge queue.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (1)
pkg/indexer/common/block_tracker.go (1)

140-148: Persist the “bootstrap” block to avoid repeated RPC hits

When no DB row exists you fetch the startBlock from the chain but do not insert it into the database.
Every new BlockTracker for the same address will therefore query the RPC node again until UpdateLatestBlock is called elsewhere.

Consider persisting immediately:

	if err != nil && errors.Is(err, sql.ErrNoRows) {
		onchainBlock, err := client.BlockByNumber(ctx, big.NewInt(int64(startBlock)))
		if err != nil {
			return nil, err
		}
+		if err := querier.SetLatestBlock(ctx, queries.SetLatestBlockParams{
+			ContractAddress: address.Hex(),
+			BlockNumber:     int64(onchainBlock.NumberU64()),
+			BlockHash:       onchainBlock.Hash().Bytes(),
+		}); err != nil {
+			return nil, err
+		}
		block.save(onchainBlock.NumberU64(), onchainBlock.Hash().Bytes())
		return block, nil
	}

This prevents unnecessary RPC traffic and keeps all instances in sync.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 4a223be and f96b584.

📒 Files selected for processing (8)
  • pkg/blockchain/rpcLogStreamer.go (1 hunks)
  • pkg/config/options.go (1 hunks)
  • pkg/indexer/app_chain/app_chain.go (6 hunks)
  • pkg/indexer/app_chain/contracts/group_message.go (2 hunks)
  • pkg/indexer/app_chain/contracts/identity_update.go (2 hunks)
  • pkg/indexer/common/block_tracker.go (4 hunks)
  • pkg/indexer/common/block_tracker_test.go (6 hunks)
  • pkg/indexer/common/log_handler.go (4 hunks)
🧰 Additional context used
🧠 Learnings (1)
pkg/indexer/common/block_tracker.go (1)
Learnt from: fbac
PR: xmtp/xmtpd#800
File: pkg/indexer/app_chain/app_chain.go:70-71
Timestamp: 2025-05-19T09:19:00.735Z
Learning: The `GetLatestBlock()` method in `BlockTracker` returns `(uint64, []byte)` representing the latest block number and hash, with no error return. It defaults to returning 0 for the block number when no block has been stored yet, as it maintains values in an in-memory cache.
🪛 GitHub Check: Lint-Go
pkg/indexer/common/block_tracker_test.go

[failure] 10-10:
"github.com/stretchr/testify/assert" imported and not used (typecheck)

🪛 GitHub Actions: Lint
pkg/indexer/common/block_tracker_test.go

[error] 10-10: "github.com/stretchr/testify/assert" imported and not used (typecheck)

⏰ Context from checks skipped due to timeout of 90000ms (5)
  • GitHub Check: Push Docker Images to GitHub Packages (xmtpd-cli)
  • GitHub Check: Push Docker Images to GitHub Packages (xmtpd)
  • GitHub Check: Upgrade Tests
  • GitHub Check: Test (Node)
  • GitHub Check: Code Review
🔇 Additional comments (9)
pkg/blockchain/rpcLogStreamer.go (2)

138-142: Code refactoring improves readability.

Organizing related variable declarations into a single var block improves code organization and readability.


148-148: Good addition of startup logging.

Adding this log statement provides better visibility into the watcher's initialization, making it easier to track when watchers start and their initial block numbers.

pkg/indexer/common/log_handler.go (2)

152-152: Added blockNumber to enhance error tracking.

Adding the block number parameter to the retry function signature is an important enhancement that will help with debugging and tracing errors.


185-188: Improved error logging with block context.

Including the block number in error logs provides essential context for troubleshooting issues, especially during retries in blockchain processing.

pkg/indexer/app_chain/contracts/group_message.go (2)

39-39: Added support for custom start block.

Adding the startBlock parameter enables specifying a custom starting block for the group message broadcaster, supporting the PR's main objective.


47-52: Updated NewBlockTracker call with client and startBlock.

The modified call passes the required parameters to initialize the block tracker with a specific start block, appropriately utilizing the new parameter.

pkg/indexer/app_chain/contracts/identity_update.go (2)

42-42: Added support for custom start block.

Adding the startBlock parameter enables specifying a custom starting block for the identity update broadcaster, supporting the PR's main objective.


51-57: Updated NewBlockTracker call with client and startBlock.

The modified call passes the required parameters to initialize the block tracker with a specific start block, appropriately utilizing the new parameter.

pkg/config/options.go (1)

34-41: Validate zero-value default for new start-block flags

Adding GroupMessageBroadcasterStartBlock and IdentityUpdateBroadcasterStartBlock with a default of 0 makes “genesis” the implicit start point. That is fine, but if the intent was “use the latest on-chain head when unset”, callers must remember to translate a 0 into “fetch latest”. Please double-check downstream assumptions (e.g. block_tracker.go always fetches startBlock even when zero).

@macroscopeapp
Copy link

macroscopeapp bot commented May 20, 2025

Replace HTTP RPC connections with WebSocket subscriptions and add configurable start block heights for blockchain event indexing

This pull request replaces HTTP-based blockchain connections with WebSocket subscriptions for real-time event monitoring and adds configurable start block heights for contract event indexing. The changes include:

WebSocket Migration: Updates RpcLogStreamer in pkg/blockchain/rpcLogStreamer.go to use WebSocket subscriptions with backfill logic, exponential backoff reconnection, and real-time event streaming
Configurable Start Blocks: Adds start block parameters to BlockTracker in pkg/blockchain/common/block_tracker.go and contract initializers for GroupMessageBroadcaster, IdentityUpdateBroadcaster, PayerRegistry, and PayerReportManager
Configuration Updates: Adds WssURL fields and contract-specific start block options to pkg/config/options.go with WebSocket URL validation in pkg/config/validation.go
Protocol Changes: Updates all RPC URLs from http:// to ws:// across test files and environment configurations
Dependency Management: Promotes gorilla/websocket from indirect to direct dependency in go.mod

📍Where to Start

Start with the RpcLogStreamer struct and watchContract method in pkg/blockchain/rpcLogStreamer.go to understand the core WebSocket subscription implementation and backfill logic.


Macroscope summarized 66a6b70.

@mkysel
Copy link
Collaborator

mkysel commented May 22, 2025

what's the motivation behind this? Why would we want to NOT index from 0?

GroupMessageBroadcasterAddress string `long:"group-message-broadcaster-address" env:"XMTPD_APP_CHAIN_GROUP_MESSAGE_BROADCAST_ADDRESS" description:"Group message broadcaster contract address"`
GroupMessageBroadcasterStartBlock uint64 `long:"group-message-broadcaster-start-block" env:"XMTPD_APP_CHAIN_GROUP_MESSAGE_BROADCAST_START_BLOCK" description:"Start block for the group message broadcaster" default:"0"`
IdentityUpdateBroadcasterAddress string `long:"identity-update-broadcaster-address" env:"XMTPD_APP_CHAIN_IDENTITY_UPDATE_BROADCAST_ADDRESS" description:"Identity update broadcaster contract address"`
IdentityUpdateBroadcasterStartBlock uint64 `long:"identity-update-broadcaster-start-block" env:"XMTPD_APP_CHAIN_IDENTITY_UPDATE_BROADCAST_START_BLOCK" description:"Start block for the identity update broadcaster" default:"0"`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where do we expect these block numbers to come from? It seems like the only sensible input would be the block numbers our contract was deployed at. I'd expect that would come from some deployment artifact JSON rather than an environment variable or command line arg.

Or is the idea that the deployment artifact would make its way into the environment variables somehow?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO ideally it's the deployment artifact setting those. I left this as an open implementation intentionally to modify it later, when the deployment artifact implementation is finished.

@fbac
Copy link
Collaborator Author

fbac commented May 26, 2025

what's the motivation behind this? Why would we want to NOT index from 0?

We never want to index from 0. We only want to index from the block where the contract was deployed.

@mkysel
Copy link
Collaborator

mkysel commented May 27, 2025

Ah that makes sense! I guess its an optimization and if it is not set, it is should be safe.

@mkysel
Copy link
Collaborator

mkysel commented May 27, 2025

{
  "appChainDeploymentBlock": 0,
  "appChainId": 31337,
  "distributionManager": "0x73A4846B953EFcD9242A3e94666DEE3312EE8a5F",
  "groupMessageBroadcaster": "0x8c5908AFbd1a5C25590D78eC7Bb0422262BDE6a1",
  "identityUpdateBroadcaster": "0x2c7A0c3856ca0CC9bf339E19fE25ca4c1f57A567",
  "nodeRegistry": "0xB47363AfbeAf04a8Fbaf6bE085050A979d2a9794",
  "payerRegistry": "0x95E856F1542EB9Eb1BFB6019f0D438584b1652ed",
  "payerReportManager": "0x87cbA0310D1f1a2bF15408b54d987Bf3ec45B2a5",
  "rateRegistry": "0xe9Fb03945475587B03eA28b8118E2bc5B753E3E9",
  "settlementChainDeploymentBlock": 0,
  "settlementChainId": 31337,
  "settlementChainParameterRegistry": "0x866e7279B86a71093F2a601883b9c66EdB320ddD"
}

seems to be per chain. If you want it per contract, you should synch with @deluca-mike and the contracts repo

@deluca-mike
Copy link
Contributor

deluca-mike commented May 27, 2025

{
  "appChainDeploymentBlock": 0,
  "appChainId": 31337,
  "distributionManager": "0x73A4846B953EFcD9242A3e94666DEE3312EE8a5F",
  "groupMessageBroadcaster": "0x8c5908AFbd1a5C25590D78eC7Bb0422262BDE6a1",
  "identityUpdateBroadcaster": "0x2c7A0c3856ca0CC9bf339E19fE25ca4c1f57A567",
  "nodeRegistry": "0xB47363AfbeAf04a8Fbaf6bE085050A979d2a9794",
  "payerRegistry": "0x95E856F1542EB9Eb1BFB6019f0D438584b1652ed",
  "payerReportManager": "0x87cbA0310D1f1a2bF15408b54d987Bf3ec45B2a5",
  "rateRegistry": "0xe9Fb03945475587B03eA28b8118E2bc5B753E3E9",
  "settlementChainDeploymentBlock": 0,
  "settlementChainId": 31337,
  "settlementChainParameterRegistry": "0x866e7279B86a71093F2a601883b9c66EdB320ddD"
}

seems to be per chain. If you want it per contract, you should synch with @deluca-mike and the contracts repo

This is per environment. A file like this can be obtained for anvil/local, testing, staging, and prod, and should be made available for each release in the smart-contracts repo, once they are actually deployed there.

### Implement WebSocket subscription support in RpcLogStreamer to enable
real-time event indexing
* High Impact: Adds WebSocket subscription capabilities to
`RpcLogStreamer` in
[rpcLogStreamer.go](https://github.com/xmtp/xmtpd/pull/828/files#diff-7c09882fdc2ca1e5719483f613a8646adbf3750ccba69267c09948639d406ebd)
with exponential backoff reconnection logic and implements dual-channel
event handling in
[log_handler.go](https://github.com/xmtp/xmtpd/pull/828/files#diff-a31c080b060abb8bd9dfa4d6092255d7f851497acd426d985f03ee9c70389d3c)
* Medium Impact: Introduces WebSocket URL configuration and validation
in
[options.go](https://github.com/xmtp/xmtpd/pull/828/files#diff-6731fb6f709392ce3e37d3b0c42074cddbce566dad2bab86af24ba7585eeb57c)
and
[validation.go](https://github.com/xmtp/xmtpd/pull/828/files#diff-3e0b3707cc5d712d06d1eeb6a67c10b45a7ba0b27e972924d71df3c8e539f23b)
* Low Impact: Updates test configurations and environment variables
across multiple files to use WebSocket protocol instead of HTTP

#### 📍Where to Start
Start with the `RpcLogStreamer` implementation in
[rpcLogStreamer.go](https://github.com/xmtp/xmtpd/pull/828/files#diff-7c09882fdc2ca1e5719483f613a8646adbf3750ccba69267c09948639d406ebd)
which contains the core WebSocket subscription logic and handles both
backfill and subscription channels.

----

_[Macroscope](https://app.macroscope.com) summarized a3b5dc4._

<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

- **New Features**
- Added support for real-time subscription-based log streaming alongside
existing backfill mechanisms, enabling near real-time event processing.
- Introduced dedicated subscription channels for contract event handling
and log indexing.
- Added WebSocket URL fields to configuration with deprecation notices
for HTTP URLs.

- **Refactor**
- Unified event streaming into a single event channel per contract with
robust subscription management including retry and backoff.
- Updated event handling to use separate channels for event and reorg
notifications with improved context cancellation and channel management.
- Standardized naming conventions and improved logging for contract
configuration fields.
- Reorganized test setups to accommodate dual log channels and
structured test state.
- Enhanced log indexing to handle reorg events with clearer control flow
and metrics emission.

- **Bug Fixes**
- Improved validation for blockchain connection URLs by actively testing
WebSocket connectivity.

- **Chores**
- Updated environment and test configurations to use WebSocket URLs
instead of HTTP for blockchain connections.
  - Declared the WebSocket package as a direct dependency.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🔭 Outside diff range comments (1)
pkg/indexer/settlement_chain/settlement_chain.go (1)

56-81: 💡 Verification agent

🧩 Analysis chain

Verify start block validation for contract initialization.

The start block parameters are passed directly to the constructors without visible validation. Ensure that invalid start blocks (e.g., blocks that haven't been mined yet) are properly handled either here or in the respective constructors.


🏁 Script executed:

#!/bin/bash
# Description: Check if start block validation exists in the PayerRegistry and PayerReportManager constructors

# Search for validation logic in the constructors
ast-grep --pattern 'func NewPayerRegistry($$$) {
  $$$
}'

ast-grep --pattern 'func NewPayerReportManager($$$) {
  $$$
}'

# Check for any validation of start blocks
rg -A 10 "startBlock|StartBlock" pkg/indexer/settlement_chain/contracts/

Length of output: 3576


Validate startBlock in both contract constructors

Neither NewPayerRegistry nor NewPayerReportManager checks that the provided startBlock is ≤ the chain’s current head. Passing a future block will cause the block tracker to error at runtime. Add explicit validation of startBlock before initializing the tracker and return a clear error if it’s out of range.

Locations to update:

  • pkg/indexer/settlement_chain/contracts/payer_registry.go → func NewPayerRegistry
  • pkg/indexer/settlement_chain/contracts/payer_report_manager.go → func NewPayerReportManager

Suggested diff inside each constructor, before calling c.NewBlockTracker(..., startBlock):

+   // ensure the start block isn’t ahead of the chain
+   latest, err := client.BlockNumber(ctx)
+   if err != nil {
+       return nil, fmt.Errorf("failed to fetch latest block: %w", err)
+   }
+   if startBlock > latest {
+       return nil, fmt.Errorf("startBlock %d is greater than latest block %d", startBlock, latest)
+   }

Also add or update unit tests to cover an out-of-range startBlock.

♻️ Duplicate comments (4)
pkg/config/validation.go (1)

259-259: ⚠️ Potential issue

Fix potential nil pointer dereference in connection cleanup.

The unguarded call to conn.Close() can result in a nil pointer dereference if the dial fails and conn is nil.

Apply this fix to prevent potential panics:

-    _ = conn.Close()
+    if conn != nil {
+        _ = conn.Close()
+    }
pkg/config/options.go (1)

43-45: Document the source and purpose of start block configurations.

As discussed in previous reviews, these start blocks should ideally come from deployment artifacts. Until that implementation is complete, consider:

  1. Adding comments explaining that these represent contract deployment blocks
  2. Documenting that 0 means "start from the beginning" vs actual deployment block
  3. Adding validation to ensure start blocks don't exceed the current chain height

Also applies to: 62-64

pkg/blockchain/rpcLogStreamer.go (2)

166-167: ⚠️ Potential issue

Critical: Implement log buffering to prevent potential log loss.

As noted in the retrieved learnings, the log buffer implementation is planned for a subsequent PR. However, without buffering, there's a risk of losing logs during the transition from backfill to subscription mode. The TODOs indicate awareness of this issue.

Consider implementing at least a basic buffer to prevent log loss, even if the full implementation comes later.

Also applies to: 233-234


271-275: ⚠️ Potential issue

Return actual highestBlock value instead of 0.

When fromBlock > highestBlock, the function returns 0 for highestBlock which could cause issues with downstream logic like reorg distance calculations.

-		return []types.Log{}, nil, 0, nil
+		return []types.Log{}, nil, highestBlock, nil
🧹 Nitpick comments (6)
cmd/cli/cli_test.go (1)

13-13: Consider updating variable name to reflect WebSocket protocol.

The variable httpAddress now contains a WebSocket URL (ws://localhost:8545), which creates a naming inconsistency. Consider renaming to wsAddress or rpcAddress to better reflect the actual protocol being used.

-	httpAddress := "ws://localhost:8545"
+	wsAddress := "ws://localhost:8545"

Also consider whether the CLI flag name --http-address should be updated to reflect that it now expects WebSocket URLs, or if the flag should accept both HTTP and WebSocket URLs for backward compatibility.

pkg/server/server_test.go (1)

45-45: LGTM! Consider field naming consistency.

The change to WebSocket URL is correct and consistent with the broader migration to WebSocket-based RPC connections.

Note that the field name RpcURL now contains a WebSocket URL, which might be confusing. If this field name is part of the configuration struct in pkg/config/options.go, consider whether it should be renamed to WssURL or similar to better reflect the protocol change.

dev/local.env (1)

12-12: LGTM! Consider backward compatibility and naming.

The change to WebSocket URL is correct and aligns with the WebSocket migration across the codebase. This change affects multiple downstream variables (lines 15-16, 31) which is appropriate.

Consider the following improvements:

  1. Naming consistency: The variable RPC_URL now specifically contains a WebSocket URL. Consider renaming to WSS_URL or documenting that RPC_URL specifically refers to WebSocket URLs.

  2. Backward compatibility: If external systems or documentation reference this as an HTTP URL, ensure they are updated accordingly.

-# Local Anvil container RPC URL. Default to 7545, as it's the port exposed by docker-compose.
+# Local Anvil container WebSocket RPC URL. Default to 7545, as it's the port exposed by docker-compose.
pkg/config/options.go (1)

35-38: Consider adding migration timeline and validation for WssURL.

While the deprecation approach allows gradual migration, having both RpcURL and WssURL could cause confusion. Consider:

  1. Adding a clear timeline for when RpcURL will be removed
  2. Implementing validation for WssURL alongside RpcURL validation
  3. Logging warnings when RpcURL is used

Also applies to: 49-52

pkg/blockchain/rpcLogStreamer.go (2)

21-25: Consider making timing constants configurable per chain.

Different chains have different block times and characteristics. Consider making these values configurable through RpcLogStreamerOption to allow tuning per chain:

  • backfillBlocks: Different chains may handle different batch sizes
  • sleepTimeOnError/sleepTimeNoLogs: Faster chains might benefit from shorter sleep times

406-436: Consider consolidating filter query builders to reduce duplication.

The two filter query builder functions have significant overlap. Consider refactoring to reduce duplication:

-func buildFilterQuery(
-	contractConfig ContractConfig,
-	fromBlock uint64,
-	toBlock uint64,
-) ethereum.FilterQuery {
-	addresses := []common.Address{contractConfig.Address}
-	topics := [][]common.Hash{}
-	for _, topic := range contractConfig.Topics {
-		topics = append(topics, []common.Hash{topic})
-	}
-
-	return ethereum.FilterQuery{
-		FromBlock: new(big.Int).SetUint64(fromBlock),
-		ToBlock:   new(big.Int).SetUint64(toBlock),
-		Addresses: addresses,
-		Topics:    topics,
-	}
-}
-
-func buildSubscriptionFilterQuery(cfg ContractConfig) ethereum.FilterQuery {
-	addresses := []common.Address{cfg.Address}
-	topics := [][]common.Hash{}
-	for _, topic := range cfg.Topics {
-		topics = append(topics, []common.Hash{topic})
-	}
-
-	return ethereum.FilterQuery{
-		Addresses: addresses,
-		Topics:    topics,
-	}
-}
+func buildBaseFilterQuery(cfg ContractConfig) ethereum.FilterQuery {
+	addresses := []common.Address{cfg.Address}
+	topics := [][]common.Hash{}
+	for _, topic := range cfg.Topics {
+		topics = append(topics, []common.Hash{topic})
+	}
+
+	return ethereum.FilterQuery{
+		Addresses: addresses,
+		Topics:    topics,
+	}
+}
+
+func buildFilterQuery(
+	contractConfig ContractConfig,
+	fromBlock uint64,
+	toBlock uint64,
+) ethereum.FilterQuery {
+	query := buildBaseFilterQuery(contractConfig)
+	query.FromBlock = new(big.Int).SetUint64(fromBlock)
+	query.ToBlock = new(big.Int).SetUint64(toBlock)
+	return query
+}
+
+func buildSubscriptionFilterQuery(cfg ContractConfig) ethereum.FilterQuery {
+	return buildBaseFilterQuery(cfg)
+}
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 17dc67e and ac51e0d.

📒 Files selected for processing (15)
  • cmd/cli/cli_test.go (1 hunks)
  • dev/local.env (1 hunks)
  • go.mod (1 hunks)
  • pkg/blockchain/rpcLogStreamer.go (8 hunks)
  • pkg/blockchain/rpcLogStreamer_test.go (2 hunks)
  • pkg/config/options.go (1 hunks)
  • pkg/config/validation.go (4 hunks)
  • pkg/indexer/app_chain/app_chain.go (8 hunks)
  • pkg/indexer/common/log_handler.go (2 hunks)
  • pkg/indexer/common/log_handler_test.go (5 hunks)
  • pkg/indexer/settlement_chain/contracts/payer_registry.go (2 hunks)
  • pkg/indexer/settlement_chain/contracts/payer_report_manager.go (2 hunks)
  • pkg/indexer/settlement_chain/settlement_chain.go (4 hunks)
  • pkg/server/server_test.go (1 hunks)
  • pkg/testutils/anvil/anvil.go (1 hunks)
✅ Files skipped from review due to trivial changes (1)
  • go.mod
🚧 Files skipped from review as they are similar to previous changes (2)
  • pkg/indexer/app_chain/app_chain.go
  • pkg/indexer/common/log_handler.go
🧰 Additional context used
🧠 Learnings (2)
📓 Common learnings
Learnt from: fbac
PR: xmtp/xmtpd#828
File: pkg/blockchain/rpcLogStreamer.go:0-0
Timestamp: 2025-05-22T11:22:58.453Z
Learning: The log buffer for RpcLogStreamer will be implemented in a subsequent PR with a minimum size equal to lagFromHighestBlock to prevent potential log loss during transition from backfill to subscription mode.
pkg/blockchain/rpcLogStreamer.go (1)
Learnt from: fbac
PR: xmtp/xmtpd#800
File: pkg/blockchain/rpcLogStreamer.go:0-0
Timestamp: 2025-05-19T09:45:00.127Z
Learning: In the blockchain RpcLogStreamer implementation, backfillChannel and reorgChannel are meant to be internal implementation details that are managed by the streamer itself, not provided by external callers. This is why they are maintained as unexported fields with getter methods for access.
⏰ Context from checks skipped due to timeout of 90000ms (3)
  • GitHub Check: Test (Node)
  • GitHub Check: Upgrade Tests
  • GitHub Check: Code Review
🔇 Additional comments (14)
pkg/testutils/anvil/anvil.go (1)

103-103:

Details

❓ Verification inconclusive

LGTM! Verify blockchain client WebSocket support.

The change to WebSocket URL format is correct and aligns with the broader migration to WebSocket-based RPC connections.

Please verify that blockchain.NewClient() properly supports WebSocket URLs by running this verification:


🏁 Script executed:

#!/bin/bash
# Description: Verify blockchain.NewClient supports WebSocket URLs
# Expected: Function should handle both ws:// and http:// schemes

ast-grep --pattern 'func NewClient($_, $url $_) {
  $$$
}'

# Also check if there are any URL scheme validations
rg -A 5 -B 5 "ws://" pkg/blockchain/

Length of output: 97


I’ll locate and inspect the NewClient implementation and any related dialing logic to confirm WebSocket support.


🏁 Script executed:

#!/bin/bash
# Locate and inspect NewClient in pkg/blockchain
grep -R "func NewClient" -n pkg/blockchain | while IFS=: read -r file line _; do
  echo "=== $file:$line ==="
  sed -n "$((line-5)),$((line+50))p" "$file"
done

# Search for underlying dial calls that handle URL schemes
grep -R "Dial(" -n pkg/blockchain

Length of output: 1433


Confirm WebSocket support in ethclient.DialContext

I’ve located your NewClient implementation— it simply delegates to:

func NewClient(ctx context.Context, rpcUrl string) (*ethclient.Client, error) {
    return ethclient.DialContext(ctx, rpcUrl)
}

Since DialContext is part of go-ethereum’s RPC package, it should inherently handle ws:///wss:// schemes. Please verify against the version of go-ethereum you’re importing that ethclient.DialContext indeed accepts and negotiates WebSocket URLs.

pkg/blockchain/rpcLogStreamer_test.go (2)

46-46: LGTM: Struct field rename aligned with API changes.

The rename from ContractAddress to Address in ContractConfig is consistent with the broader API refactoring.


59-59: LGTM: Test updated for new method signature.

The test correctly adapts to the updated GetNextPage method that now accepts a context parameter and returns an additional highest block number value.

pkg/indexer/settlement_chain/contracts/payer_registry.go (2)

43-43: LGTM: Start block parameter added for configurable indexing.

The addition of startBlock uint64 parameter enables starting contract event indexing from a specified block height, which aligns with the PR objectives for optimization.


50-56: LGTM: Block tracker initialization updated with client and start block.

The explicit passing of client and startBlock to NewBlockTracker correctly implements the enhanced block tracking functionality that can initialize from on-chain data when no DB record exists.

pkg/config/validation.go (2)

10-10: LGTM: Gorilla WebSocket dependency added for validation.

The addition of the Gorilla WebSocket package supports the new WebSocket URL validation functionality.


117-122: LGTM: Enhanced validation with WebSocket connectivity checks.

The replacement of simple field presence validation with active WebSocket connection validation ensures URLs are reachable before use, which improves configuration reliability.

Also applies to: 150-155

pkg/indexer/settlement_chain/contracts/payer_report_manager.go (2)

41-41: LGTM: Start block parameter added for configurable indexing.

The addition of startBlock uint64 parameter enables starting contract event indexing from a specified block height, consistent with the system-wide enhancement for optimized indexing.


48-54: LGTM: Block tracker initialization updated with client and start block.

The explicit passing of client and startBlock to NewBlockTracker correctly implements the enhanced block tracking functionality, consistent with similar changes in PayerRegistry.

pkg/indexer/settlement_chain/settlement_chain.go (1)

95-112: LGTM! Consistent field naming across the codebase.

The renaming from ContractAddress to Address is consistent with the refactoring in other files and improves code clarity.

pkg/config/options.go (1)

34-65: Well-structured configuration with clear separation of concerns.

The separation of AppChainOptions and SettlementChainOptions provides good modularity and makes the configuration intent clear.

pkg/indexer/common/log_handler_test.go (1)

22-55: Excellent test refactoring for improved maintainability.

The logHandlerTest struct provides better encapsulation and ensures consistent setup across tests. The proper cleanup of channels and context in defer blocks prevents resource leaks.

pkg/blockchain/rpcLogStreamer.go (2)

29-36: LGTM! Consistent naming improvements.

The field renaming improves clarity and consistency across the codebase. The channel buffer sizes are appropriate for event streaming.

Also applies to: 70-75


301-384: Excellent implementation of subscription with automatic retry.

The subscription handling with exponential backoff provides robust recovery from temporary connection issues. The use of MaxDisconnectTime as a circuit breaker is a good pattern.

@fbac fbac force-pushed the 05-20-block_tracker_startblock branch from 327e181 to 66a6b70 Compare May 28, 2025 12:35
@fbac fbac merged commit bf2da26 into main May 28, 2025
10 checks passed
@fbac fbac deleted the 05-20-block_tracker_startblock branch May 28, 2025 12:43
This was referenced May 28, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants