Skip to content

Conversation

@fbac
Copy link
Collaborator

@fbac fbac commented May 15, 2025

Refactor blockchain event streaming to use modular AppChain component and options pattern in RpcLogStreamer

  • Introduces new AppChain component in app_chain.go that encapsulates chain-specific event streaming logic for group messages and identity updates
  • Refactors RpcLogStreamer in rpcLogStreamer.go to use options pattern, replacing builder pattern, with configurable lag and contract mapping
  • Simplifies Indexer in indexer.go by delegating streaming responsibilities to AppChain
  • Extracts log handling logic into dedicated logHandler.go
  • Reduces log polling interval from 1 second to 100 milliseconds in RpcLogStreamer

📍Where to Start

Start with the NewAppChain constructor in app_chain.go which shows how the new modular chain component is initialized and configured.


Macroscope summarized a600b32.

Summary by CodeRabbit

  • New Features

    • Introduced AppChain abstraction to manage blockchain event streaming and indexing for group messages and identity updates.
    • Added modular interfaces for log storage, block tracking, and reorg handling to enhance flexibility.
    • Implemented robust log indexing with reorg detection, retryable error handling, and metrics integration.
    • Added start/stop lifecycle management for blockchain event streamers with improved concurrency control.
  • Refactor

    • Simplified indexer by delegating blockchain interactions to AppChain.
    • Reorganized contract event watchers to be configurable by ID with functional options.
    • Migrated and renamed packages and interfaces for clearer separation and maintainability.
    • Unified error handling using a new retryable error interface and related types.
    • Updated validation logic for blockchain and prune options with improved modularity.
  • Tests

    • Refactored tests to use consolidated mocks and new error abstractions.
    • Removed obsolete builder pattern tests and updated construction of streamers in tests.
  • Chores

    • Updated mock generation configs; removed outdated mocks and added new mocks for interfaces.
    • Adjusted CI workflows and Docker configurations for consistency and new image versions.

@graphite-app
Copy link

graphite-app bot commented May 15, 2025

How to use the Graphite Merge Queue

Add either label to this PR to merge it via the merge queue:

  • Queue - adds this PR to the back of the merge queue
  • Hotfix - for urgent hot fixes, skip the queue and merge this PR next

You must have a Graphite account in order to use the merge queue. Sign up using this link.

An organization admin has enabled the Graphite Merge Queue in this repository.

Please do not merge from GitHub as this will restart CI on PRs being processed by the merge queue.

@coderabbitai
Copy link
Contributor

coderabbitai bot commented May 15, 2025

Caution

Review failed

The pull request is closed.

Walkthrough

This change refactors blockchain event streaming and indexing by removing the builder pattern in favor of functional options, introducing new error handling abstractions, and modularizing contract event logic. It adds new interfaces for log storage, block tracking, and reorg handling, moves contract-specific logic into dedicated types, and updates tests and mocks to align with the new architecture.

Changes

File(s) Change Summary
pkg/blockchain/rpcLogStreamer.go, pkg/blockchain/rpcLogStreamer_test.go Removed builder pattern from RpcLogStreamer, replaced configuration with functional options, added watcher management by ID, lifecycle management (Stop), and updated tests accordingly.
pkg/indexer/indexer.go, pkg/server/server.go, pkg/indexer/e2e_test.go Refactored indexer to delegate blockchain event streaming and indexing to a new AppChain abstraction, simplified initialization and startup, updated usage in server and tests.
pkg/indexer/app_chain/app_chain.go, pkg/indexer/app_chain/contracts/group_message.go, pkg/indexer/app_chain/contracts/identity_update.go Introduced AppChain for managing contract event streaming, added dedicated broadcaster types for group messages and identity updates, encapsulating contract logic and event handling.
pkg/indexer/app_chain/contracts/group_message_storer.go, pkg/indexer/app_chain/contracts/identity_update_storer.go Changed package from storer to contracts, updated error handling to use new retryable error types, and interface compliance assertions.
pkg/indexer/app_chain/contracts/group_message_storer_test.go, pkg/indexer/app_chain/contracts/identity_update_storer_test.go Changed package declaration from storer to contracts in tests.
pkg/errors/errors.go Added new RetryableError interface and implementations for recoverable and non-recoverable errors.
pkg/indexer/common/interface.go, pkg/indexer/common/log_handler.go, pkg/indexer/common/reorg_handler.go Introduced new interfaces for log storage, block tracking, and reorg handling; added robust log indexing function with reorg and retry logic; moved and renamed reorg handler, updated error handling.
pkg/indexer/common/block_tracker.go, pkg/indexer/common/block_tracker_test.go, pkg/indexer/common/reorg_handler_test.go, pkg/indexer/common/log_handler_test.go Changed package from indexer to common, updated usage and tests to new interfaces and error types, added interface compliance assertions.
pkg/indexer/interface.go Deleted obsolete IBlockTracker interface (now in common).
.mockery.yaml Updated mock generation configuration to reflect new package structure and interfaces.
pkg/mocks/common/mock_IBlockTracker.go, pkg/mocks/common/mock_ILogStorer.go, pkg/mocks/common/mock_IReorgHandler.go, pkg/mocks/common/mock_IContract.go Added/updated mocks for new interfaces in common.
pkg/mocks/retryable_errors/mock_RetryableError.go Added mock for new RetryableError interface.
pkg/mocks/indexer/mock_ChainReorgHandler.go, pkg/mocks/storer/mock_LogStorer.go Deleted obsolete mocks for removed interfaces.

Sequence Diagram(s)

sequenceDiagram
    participant Server
    participant Indexer
    participant AppChain
    participant RpcLogStreamer
    participant ContractBroadcaster
    participant DB

    Server->>Indexer: NewIndexer(ctx, log, db, cfg, validationService)
    Indexer->>AppChain: NewAppChain(ctx, log, cfg.AppChain, db, validationService)
    AppChain->>RpcLogStreamer: NewRpcLogStreamer(...options)
    AppChain->>ContractBroadcaster: NewGroupMessageBroadcaster/NewIdentityUpdateBroadcaster(...)
    Server->>Indexer: StartIndexer()
    Indexer->>AppChain: Start()
    AppChain->>RpcLogStreamer: Start()
    RpcLogStreamer-->>AppChain: Emit contract event logs (channels)
    AppChain->>ContractBroadcaster: IndexLogs(eventChannel, reorgChannel)
    ContractBroadcaster->>DB: StoreLog(event)
    ContractBroadcaster->>ContractBroadcaster: Handle reorgs, update block tracker
    Server->>Indexer: Close()
    Indexer->>AppChain: Stop()
    AppChain->>RpcLogStreamer: Stop()
Loading

Possibly related PRs

  • xmtp/xmtpd#686: Updates ABI packages for group messages and identity updates, which are directly used by the new contract broadcaster types introduced here.
  • xmtp/xmtpd#411: Modifies rpcLogStreamer.go for reorg detection and handling, which is refactored and superseded by this PR's new approach.
  • xmtp/xmtpd#252: Adds context management and a Stop method to RpcLogStreamer, which aligns with the lifecycle management changes in this PR.

Suggested reviewers

  • mkysel

Warning

There were issues while running some tools. Please review the errors and either fix the tool's configuration or disable the tool if it's a critical failure.

🔧 golangci-lint (1.64.8)

Error: you are using a configuration file for golangci-lint v2 with golangci-lint v1: please use golangci-lint v2
Failed executing command with error: you are using a configuration file for golangci-lint v2 with golangci-lint v1: please use golangci-lint v2


📜 Recent review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between dc2ab1b and 76de20c.

📒 Files selected for processing (14)
  • .github/workflows/release-from-tag.yml (2 hunks)
  • .github/workflows/test.yml (1 hunks)
  • dev/docker/docker-compose-register.yml (4 hunks)
  • dev/docker/docker-compose.yml (1 hunks)
  • dev/local.env (1 hunks)
  • pkg/blockchain/blockchainPublisher.go (1 hunks)
  • pkg/config/options.go (1 hunks)
  • pkg/config/validation.go (3 hunks)
  • pkg/errors/errors.go (1 hunks)
  • pkg/indexer/app_chain/app_chain.go (1 hunks)
  • pkg/indexer/app_chain/contracts/group_message.go (1 hunks)
  • pkg/indexer/app_chain/contracts/group_message_storer.go (6 hunks)
  • pkg/indexer/app_chain/contracts/identity_update_storer.go (13 hunks)
  • pkg/indexer/common/log_handler_test.go (5 hunks)
✨ Finishing Touches
  • 📝 Generate Docstrings

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Explain this complex logic.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai explain this code block.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and explain its main purpose.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Support

Need help? Create a ticket on our support page for assistance with any issues or questions.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR.
  • @coderabbitai generate sequence diagram to generate a sequence diagram of the changes in this PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

@fbac fbac force-pushed the 05-14-indexer branch from a600b32 to 4e793db Compare May 15, 2025 13:11
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Caution

Inline review comments failed to post. This is likely due to GitHub's limits when posting large numbers of comments. If you are seeing this consistently it is likely a permissions issue. Please check "Moderation" -> "Code review limits" under your organization settings.

Actionable comments posted: 7

🧹 Nitpick comments (5)
pkg/indexer/logHandler.go (1)

52-55: Consider reacting to ctx.Done() to avoid goroutine leaks

indexLogs relies on the producer to close eventChannel when the parent context is cancelled.
If—for any reason—the producer fails to close the channel (e.g. a panic before the defer close() runs), this goroutine will block forever on <-eventChannel.

Adding a select { case <-ctx.Done(): return ... ; case event := <-eventChannel: … } guard provides a cheap safety-net.

pkg/indexer/indexer.go (1)

13-19: wg is now unused – remove to avoid confusion

The wait-group was previously used to track spawned goroutines, but the new AppChain abstraction manages its own lifecycle.
Leaving an unused sync.WaitGroup (and the i.wg.Wait() call in Close()) increases cognitive load.

-type Indexer struct {
-	ctx      context.Context
-	log      *zap.Logger
-	cancel   context.CancelFunc
-	wg       sync.WaitGroup
-	appChain *AppChain
-}
+type Indexer struct {
+	ctx      context.Context
+	log      *zap.Logger
+	cancel   context.CancelFunc
+	appChain *AppChain
+}

Remember to drop the i.wg.Wait() in Close().

pkg/blockchain/rpcLogStreamer.go (1)

51-62: Channel buffer sizes may be insufficient

BackfillChannel is buffered with 100, but heavy traffic (e.g. burst reorg back-fills) can easily exceed that, blocking the watcher.
Similarly, reorgChannel has capacity 1, which may dead-lock the producer (see comment in indexLogs).

Consider:

-backfillChannel := make(chan types.Log, 100)
-reorgChannel := make(chan uint64, 1)
+backfillChannel := make(chan types.Log, 1_000) // or configurable
+reorgChannel := make(chan uint64, 4)           // small, but >1
pkg/indexer/app_chain.go (2)

255-261: Duplicate helper (groupMessageBroadcasterAddress) appears unused
groupMessageBroadcasterAddress() and groupMessageBroadcasterName() return the exact same string, but only the Name variant is referenced in this file. Keeping dead code increases maintenance overhead.

Either delete the groupMessageBroadcasterAddress() helper or differentiate its purpose (e.g. return checksum address rather than ID).


28-40: Minor nit – consider embedding sync.WaitGroup directly
You already expose a wg field; embedding removes one level of indirection (AppChain.wg.Add vs AppChain.Add) and follows Go’s “anonymous field” idiom:

-   wg  sync.WaitGroup
+   sync.WaitGroup

Not essential, but improves ergonomics.

🛑 Comments failed to post (7)
pkg/blockchain/rpcLogStreamer_test.go (1)

37-37: 💡 Verification agent

🧩 Analysis chain

Fix the test to use the new RpcLogStreamer constructor API

The test updated field names from EventChannel to BackfillChannel correctly, but there's a pipeline failure because NewRpcLogStreamer has been refactored to use functional options pattern instead of direct ContractConfig slices.

Update the test to use the new constructor pattern, likely something like:

-return blockchain.NewRpcLogStreamer(
-    context.Background(),
-    client,
-    log,
-    []blockchain.ContractConfig{cfg},
-), channel
+return blockchain.NewRpcLogStreamer(
+    context.Background(),
+    client,
+    log,
+    blockchain.WithContractConfigs([]blockchain.ContractConfig{cfg}),
+), channel

Also applies to: 98-98


🏁 Script executed:

#!/bin/bash
# Check how NewRpcLogStreamer is used elsewhere in the codebase
rg -A 3 "NewRpcLogStreamer" --type go

Length of output: 663


🏁 Script executed:

#!/bin/bash
# Locate the NewRpcLogStreamer function signature and print its definition including option parameters
FILE="pkg/blockchain/rpcLogStreamer.go"
START_LINE=$(rg -n "func NewRpcLogStreamer" "$FILE" | cut -d: -f1)
# Print 50 lines from the signature to capture options pattern
sed -n "${START_LINE},$((START_LINE+50))p" "$FILE"

Length of output: 1381


🏁 Script executed:

#!/bin/bash
FILE="pkg/blockchain/rpcLogStreamer.go"
echo "Searching for RpcLogStreamerOption definition:"
rg -n "type RpcLogStreamerOption" "$FILE"
echo
echo "Printing context around RpcLogStreamerOption definition:"
rg -A 3 -B 3 "RpcLogStreamerOption" "$FILE"
echo
echo "Listing functions starting with With in rpcLogStreamer.go:"
rg -n "func With" "$FILE"

Length of output: 1457


🏁 Script executed:

#!/bin/bash
FILE="pkg/blockchain/rpcLogStreamer.go"
echo "Context for WithContractConfig:"
rg -A 5 -B 3 "func WithContractConfig" "$FILE"

Length of output: 338


🏁 Script executed:

#!/bin/bash
FILE="pkg/blockchain/rpcLogStreamer_test.go"
echo "Lines 1-60 (first NewRpcLogStreamer usage at line ~37):"
sed -n '1,60p' "$FILE"
echo
echo "Lines 80-140 (second NewRpcLogStreamer usage at line ~98):"
sed -n '80,140p' "$FILE"

Length of output: 2709


🏁 Script executed:

#!/bin/bash
echo "Searching for ContractConfig definition across pkg/blockchain:"
rg -n "type ContractConfig" -A 5 -B 3 pkg/blockchain

Length of output: 773


Update rpcLogStreamer tests to match the new functional-options constructor

The NewRpcLogStreamer signature no longer accepts a []ContractConfig. In your test helper, drop the slice argument (or switch to the WithContractConfig option). For example, in pkg/blockchain/rpcLogStreamer_test.go:

• Location: buildStreamer (around line 37)

 func buildStreamer(
     t *testing.T,
     client blockchain.ChainClient,
     fromBlock uint64,
     address common.Address,
     topic common.Hash,
 ) (*blockchain.RpcLogStreamer, chan types.Log) {
     log, err := zap.NewDevelopment()
     require.NoError(t, err)
     channel := make(chan types.Log)
     cfg := blockchain.ContractConfig{
         FromBlock:       fromBlock,
         ContractAddress: address,
         Topics:          []common.Hash{topic},
         BackfillChannel: channel,
     }
-    return blockchain.NewRpcLogStreamer(
-        context.Background(),
-        client,
-        log,
-        []blockchain.ContractConfig{cfg},
-    ), channel
+    return blockchain.NewRpcLogStreamer(
+        context.Background(),
+        client,
+        log,
+    ), channel
 }

If you really need the watcher wired up in other tests, you can instead pass:

blockchain.WithContractConfig(
    cfg.ID,
    cfg.FromBlock,
    cfg.ContractAddress,
    cfg.Topics,
    /* maxDisconnectTime */ time.Second,
)

but for your GetNextPage tests the zero-option call is sufficient.

🤖 Prompt for AI Agents
In pkg/blockchain/rpcLogStreamer_test.go around lines 37 and 98, the test calls
to NewRpcLogStreamer still use the old constructor pattern with a slice of
ContractConfig, which no longer matches the updated functional options pattern.
Update these calls to use the new constructor by removing the slice argument and
instead passing options like WithContractConfig with the appropriate parameters,
or use a zero-option call if the watcher is not needed. This will align the test
with the new NewRpcLogStreamer API and fix the pipeline failure.
pkg/indexer/logHandler.go (2)

139-140: ⚠️ Potential issue

reorgChannel <- reorgBlockNumber can dead-lock

reorgChannel is injected as chan<- uint64; the caller decides buffering.
WithContractConfig currently allocates it with capacity 1. Should two reorgs be detected before the watcher consumes the first value, this send will block indefinitely, stalling the entire log handler.

Either:

  1. Make the send non-blocking:
select {
case reorgChannel <- reorgBlockNumber:
default:
    logger.Warn("reorg channel full, dropping signal")
}
  1. Or document & enforce a larger buffer (e.g. make(chan uint64, 4)).
🤖 Prompt for AI Agents
In pkg/indexer/logHandler.go around lines 139 to 140, the send operation to
reorgChannel can deadlock because the channel has a buffer size of 1 and may
block if multiple reorgs occur before consumption. To fix this, either change
the send to be non-blocking using a select statement with a default case that
logs a warning when the channel is full, or increase the buffer size of
reorgChannel to at least 4 where it is created to prevent blocking.

145-151: 🛠️ Refactor suggestion

Non-retriable storage errors are silently swallowed

retry() returns a non-nil error only for non-retriable failures, yet the caller simply continues, neither surfacing the failure nor emitting metrics.
Consider at least logging the failure at Error level and/or aborting the loop so that an operator is alerted:

- if err != nil {
-     continue
- }
+ if err != nil {
+     logger.Error("non-retryable storage error, dropping event", zap.Error(err))
+     // optionally: return / break to fail fast
+     continue
+ }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

		err := retry(logger, 100*time.Millisecond, contractAddress, func() storer.LogStorageError {
			return logStorer.StoreLog(ctx, event)
		})
		if err != nil {
			logger.Error("non-retryable storage error, dropping event", zap.Error(err))
			// optionally: return / break to fail fast
			continue
		}
🤖 Prompt for AI Agents
In pkg/indexer/logHandler.go around lines 145 to 151, the code currently ignores
non-retriable errors returned by retry() by just continuing the loop without
logging or handling them. Modify the code to log these errors at Error level
using the logger and consider breaking or aborting the loop to alert operators
of the failure instead of silently swallowing the error.
pkg/blockchain/rpcLogStreamer.go (2)

119-123: ⚠️ Potential issue

Stop() should also close per-watcher channels

Stop() cancels the context and waits, but any goroutine still holding a reference to BackfillChannel / reorgChannel will block forever on send/receive.

Before r.wg.Wait() consider:

for _, w := range r.Watchers {
    close(w.BackfillChannel)
    close(w.reorgChannel)
}
🤖 Prompt for AI Agents
In pkg/blockchain/rpcLogStreamer.go around lines 119 to 123, the Stop() method
cancels the context and waits for goroutines but does not close the per-watcher
channels BackfillChannel and reorgChannel, causing potential goroutine blocking.
To fix this, before calling r.wg.Wait(), iterate over r.Watchers and close each
watcher's BackfillChannel and reorgChannel to ensure no goroutines remain
blocked on these channels.

201-203: ⚠️ Potential issue

Potential uint64 underflow when lag > highestBlock

If lagFromHighestBlock is greater than highestBlock, the subtraction underflows, yielding a huge highestBlockCanProcess.

var highestBlockCanProcess uint64
if r.lagFromHighestBlock > highestBlock {
    highestBlockCanProcess = 0
} else {
    highestBlockCanProcess = highestBlock - r.lagFromHighestBlock
}
🤖 Prompt for AI Agents
In pkg/blockchain/rpcLogStreamer.go around lines 201 to 203, the subtraction
highestBlock - r.lagFromHighestBlock can underflow if lagFromHighestBlock is
greater than highestBlock, causing an incorrect large value. Fix this by adding
a conditional check: if lagFromHighestBlock is greater than highestBlock, set
highestBlockCanProcess to 0; otherwise, perform the subtraction as usual.
pkg/indexer/app_chain.go (2)

142-145: 🛠️ Refactor suggestion

Graceful shutdown missing – wait for goroutines to exit
Stop() cancels the context but immediately returns; any in-flight goroutines spawned via tracing.GoPanicWrap may still be indexing. Add a wg.Wait() to ensure everything is flushed before the process terminates:

 func (s *AppChain) Stop() {
     s.streamer.Stop()
     s.cancel()
+    s.wg.Wait()
 }

This also prevents panics from accessing closed DB connections during teardown.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

func (s *AppChain) Stop() {
    s.streamer.Stop()
    s.cancel()
    s.wg.Wait()
}
🤖 Prompt for AI Agents
In pkg/indexer/app_chain.go at lines 142 to 145, the Stop() method cancels the
context and stops the streamer but returns immediately without waiting for any
spawned goroutines to finish. To fix this, add a wait group (wg) to track these
goroutines and call wg.Wait() in Stop() after canceling the context and stopping
the streamer. This ensures all goroutines complete gracefully before Stop()
returns, preventing panics from accessing closed resources.

136-140: ⚠️ Potential issue

Start() silently drops fatal errors – bubble them up or at least log them
Both indexGroupMessageBroadcasterLogs and indexIdentityUpdateBroadcasterLogs return an error, yet the caller discards the result.
If either contract binding fails (e.g. wrong ABI, RPC hiccup) the indexer continues running with half-configured state and no visibility of the failure.

Suggested fix – make the method return an error (preferred) or, at minimum, log and abort:

-func (s *AppChain) Start(db *sql.DB, validationService mlsvalidate.MLSValidationService) {
-    s.streamer.Start()
-    s.indexGroupMessageBroadcasterLogs(s.client, s.messagesTracker, db)
-    s.indexIdentityUpdateBroadcasterLogs(s.client, s.identityUpdatesTracker, validationService, db)
-}
+func (s *AppChain) Start(db *sql.DB, validationService mlsvalidate.MLSValidationService) error {
+    s.streamer.Start()
+
+    if err := s.indexGroupMessageBroadcasterLogs(s.client, s.messagesTracker, db); err != nil {
+        return fmt.Errorf("group-message indexer init failed: %w", err)
+    }
+    if err := s.indexIdentityUpdateBroadcasterLogs(s.client, s.identityUpdatesTracker, validationService, db); err != nil {
+        return fmt.Errorf("identity-update indexer init failed: %w", err)
+    }
+    return nil
+}

Call-sites (e.g. server.go) will need to handle the returned error.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

func (s *AppChain) Start(db *sql.DB, validationService mlsvalidate.MLSValidationService) error {
    s.streamer.Start()

    if err := s.indexGroupMessageBroadcasterLogs(s.client, s.messagesTracker, db); err != nil {
        return fmt.Errorf("group-message indexer init failed: %w", err)
    }
    if err := s.indexIdentityUpdateBroadcasterLogs(s.client, s.identityUpdatesTracker, validationService, db); err != nil {
        return fmt.Errorf("identity-update indexer init failed: %w", err)
    }
    return nil
}
🤖 Prompt for AI Agents
In pkg/indexer/app_chain.go around lines 136 to 140, the Start() method calls
indexGroupMessageBroadcasterLogs and indexIdentityUpdateBroadcasterLogs but
ignores their returned errors, which can cause silent failures. Modify Start()
to return an error and capture the errors from these calls. If an error occurs,
return it immediately to propagate the failure. Update all call-sites of Start()
to handle the returned error accordingly.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (5)
pkg/indexer/logHandler.go (1)

83-88: Consider optimizing block tracker updates.

The code retrieves the latest block from the tracker on every block change, which could be optimized for high-frequency chains.

Consider caching the latest block values and updating them only when the block tracker is updated, rather than retrieving them on every block change.

-		if lastBlockSeen > 0 && lastBlockSeen != event.BlockNumber {
-			storedBlockNumber, storedBlockHash = blockTracker.GetLatestBlock()
-		}
+		if lastBlockSeen > 0 && lastBlockSeen != event.BlockNumber {
+			// Only update from tracker if the stored values have been modified
+			if event.BlockNumber > storedBlockNumber + 1 {
+				storedBlockNumber, storedBlockHash = blockTracker.GetLatestBlock()
+			}
+		}
pkg/blockchain/rpcLogStreamer.go (1)

243-249: Function name inconsistency with field name.

The function GetEventChannel returns the BackfillChannel field, which could lead to confusion.

Consider renaming the function to match the field name for consistency:

-func (r *RpcLogStreamer) GetEventChannel(id string) chan types.Log {
+func (r *RpcLogStreamer) GetBackfillChannel(id string) chan types.Log {

And update all callers accordingly.

pkg/indexer/app_chain.go (3)

24-26: Consider making lagFromHighestBlock configurable.

The constant lagFromHighestBlock is hardcoded to 0, but it could be beneficial to make it configurable.

Consider accepting this value as a parameter in the NewAppChain constructor or as part of the configuration struct:

-const (
-	lagFromHighestBlock = 0
-)

func NewAppChain(
	ctxwc context.Context,
	log *zap.Logger,
	cfg config.ContractsOptions,
	db *sql.DB,
+	lagFromHighestBlock uint64,
) (*AppChain, error) {

98-119: Consider reusing common code in contract configuration.

The contract configuration code for both contracts is very similar and could be refactored to reduce duplication.

Extract common patterns into helper functions to reduce redundancy:

func createContractConfig(
    name string,
    fromBlock uint64,
    address common.Address,
    topics []common.Hash,
    maxDisconnectTime time.Duration,
) blockchain.RpcLogStreamerOption {
    return blockchain.WithContractConfig(
        name,
        fromBlock,
        address,
        topics,
        maxDisconnectTime,
    )
}

255-261: Redundant function implementations.

The functions groupMessageBroadcasterAddress and groupMessageBroadcasterName have identical implementations.

Consider removing one of the redundant functions:

-func groupMessageBroadcasterAddress(chainID int) string {
-	return fmt.Sprintf("groupMessageBroadcaster-%v", chainID)
-}

And use groupMessageBroadcasterName consistently throughout the codebase.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between a600b32 and 4e793db.

📒 Files selected for processing (8)
  • pkg/blockchain/rpcLogStreamer.go (6 hunks)
  • pkg/blockchain/rpcLogStreamer_test.go (2 hunks)
  • pkg/indexer/app_chain.go (1 hunks)
  • pkg/indexer/indexer.go (1 hunks)
  • pkg/indexer/logHandler.go (1 hunks)
  • pkg/indexer/storer/groupMessage.go (1 hunks)
  • pkg/indexer/storer/identityUpdate.go (1 hunks)
  • pkg/server/server.go (1 hunks)
✅ Files skipped from review due to trivial changes (2)
  • pkg/indexer/storer/groupMessage.go
  • pkg/indexer/storer/identityUpdate.go
🚧 Files skipped from review as they are similar to previous changes (2)
  • pkg/server/server.go
  • pkg/blockchain/rpcLogStreamer_test.go
🧰 Additional context used
🧬 Code Graph Analysis (2)
pkg/indexer/logHandler.go (6)
pkg/blockchain/interface.go (1)
  • ChainClient (28-33)
pkg/indexer/storer/interface.go (1)
  • LogStorer (10-12)
pkg/indexer/interface.go (1)
  • IBlockTracker (7-10)
pkg/indexer/reorgHandler.go (2)
  • ChainReorgHandler (15-17)
  • ErrNoBlocksFound (26-26)
pkg/indexer/storer/error.go (1)
  • LogStorageError (3-6)
pkg/metrics/indexer.go (2)
  • EmitIndexerLogProcessingTime (115-117)
  • EmitIndexerRetryableStorageError (99-102)
pkg/blockchain/rpcLogStreamer.go (1)
pkg/blockchain/interface.go (1)
  • ChainClient (28-33)
🔇 Additional comments (19)
pkg/indexer/indexer.go (5)

18-18: Good implementation of composition.

Using composition by embedding the AppChain in the Indexer is a clean approach to delegate blockchain functionality.


25-28: Function signature improvement.

The removal of the validationService parameter from NewIndexer and its addition to StartIndexer is a good design decision, providing better separation between initialization and operation.


31-40: Well-structured error handling.

Good error handling with proper cleanup by calling cancel() on error paths before returning. This prevents context leaks.


50-61: Clean shutdown implementation.

The Close method properly handles shutdown in the correct order: stopping the AppChain, canceling the context, and waiting for goroutines to complete.


63-70: Simplified interface with proper delegation.

The StartIndexer method has been simplified to delegate to the AppChain, removing duplicate logic and improving maintainability.

pkg/indexer/logHandler.go (6)

18-24: Well-documented function purpose.

The documentation clearly explains the function's purpose and error handling behavior, aiding in maintainability.


36-39: Helpful comment with justification for reorg check interval.

Excellent comment explaining the technical reasoning behind the 60-block reorg check interval, including calculations of potential impact.


52-73: Robust reorg detection and handling.

The code properly handles the detection of reorgs and manages the state transitions. The use of continuous variables for tracking the reorg state is well-designed.


94-143: Well-structured reorg verification logic.

The reorg verification logic is comprehensive and handles error cases gracefully. The block hash comparison and finding the reorg point are well-implemented.


145-157: Good use of retry mechanism and metrics.

The code properly uses a retry mechanism for retryable errors and emits metrics for performance monitoring. The block tracker is updated after successful log storage.


161-179: Efficient retry implementation.

The retry function is well-implemented with a clear distinction between retryable and non-retryable errors. The use of metrics for retryable errors is a good practice for monitoring.

pkg/blockchain/rpcLogStreamer.go (5)

19-23: Performance improvement with reduced sleep time.

Reducing the NO_LOGS_SLEEP_TIME from 1 second to 100 milliseconds will improve responsiveness for new events.


25-34: Good addition of ID field for contract identification.

Adding an ID field to ContractConfig enables better organization and access to watchers by name rather than address.


36-64: Well-implemented functional options pattern.

The switch from builder pattern to functional options is a good improvement. The implementation is clean and provides good flexibility for configuration.


79-80: Improved data structure for watchers.

Storing watchers in a map keyed by ID rather than a slice enables direct access and improves efficiency.


119-122: Proper resource cleanup with Stop method.

The addition of a Stop method ensures proper cleanup of resources by canceling the context and waiting for goroutines to complete.

pkg/indexer/app_chain.go (3)

28-40: Well-structured AppChain struct with clear responsibilities.

The AppChain struct has a clear set of responsibilities and maintains all necessary state for blockchain interaction.


136-140: Simple and clear Start method.

The Start method clearly organizes the initialization sequence: starting the streamer and then setting up indexing for each contract.


234-309: Well-organized contract-specific utility functions.

The organization of contract-specific utility functions is clean and promotes code reuse. The separation between GroupMessageBroadcaster and IdentityUpdateBroadcaster functions is clear.

@fbac fbac changed the title Modular indexer Modular multi-chain indexer May 15, 2025
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 7

🧹 Nitpick comments (7)
pkg/indexer/reorg_handler/reorgHandler.go (2)

30-34: Constant style is un-idiomatic Go

BLOCK_RANGE_SIZE is package-private yet screams like it’s exported.
Consider:

-const BLOCK_RANGE_SIZE uint64 = 600
+const blockRangeSize uint64 = 600

and update the single reference in BlockRange.


171-179: Exported function lacks doc comment

BlockRange is now exported but has no accompanying comment, which triggers golint/staticcheck warnings.

// BlockRange returns the inclusive [start, end] range of blocks whose oldest
// block could contain the re-org point when a re-org was detected at `from`.

Adding the comment helps downstream users and tooling.

pkg/indexer/indexer.go (1)

14-20: Unused wait-group – dead code / latent bug

Indexer embeds a sync.WaitGroup but never increments it.
Either:

  1. Drop the field, or
  2. Pass it to appChain.Start / increment where goroutines are spawned so Close() actually waits for work to finish.

Current behaviour is equivalent to time.Sleep(0), which is misleading.

pkg/indexer/app_chain/logHandler.go (1)

169-181: Minor: unexpected busy-loop risk

Even after adding context awareness, consider exponential back-off (or retry count) to avoid hammering the DB/service under extended outages.

pkg/indexer/app_chain/group_message_broadcaster.go (1)

62-64: Avoid receiver/package-name shadowing
Using gm as the method receiver shadows the imported alias gm from the ABI package, making that package inaccessible inside receiver methods and harming readability. Consider a clearer receiver name (gmb, b, etc.).

pkg/indexer/app_chain/app_chain.go (1)

41-49: Shadowed variable hides the original context
Reassigning ctxwc returned by context.WithCancel shadows the incoming parameter.
Although legal, it obscures intent and is easy to misuse later. Prefer a new name such as ctx or ctxWithCancel.

pkg/indexer/app_chain/identity_update_broadcaster.go (1)

62-64: Receiver/package shadowing
The receiver iu masks the imported ABI alias iu, mirroring the issue noted in the group-message broadcaster.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 4e793db and 4e5a0bf.

📒 Files selected for processing (12)
  • pkg/indexer/app_chain/app_chain.go (1 hunks)
  • pkg/indexer/app_chain/group_message_broadcaster.go (1 hunks)
  • pkg/indexer/app_chain/identity_update_broadcaster.go (1 hunks)
  • pkg/indexer/app_chain/logHandler.go (1 hunks)
  • pkg/indexer/app_chain/storer/groupMessage.go (1 hunks)
  • pkg/indexer/app_chain/storer/identityUpdate.go (1 hunks)
  • pkg/indexer/block_tracker/blockTracker.go (1 hunks)
  • pkg/indexer/block_tracker/blockTracker_test.go (8 hunks)
  • pkg/indexer/indexer.go (2 hunks)
  • pkg/indexer/indexer_test.go (1 hunks)
  • pkg/indexer/reorg_handler/reorgHandler.go (5 hunks)
  • pkg/indexer/reorg_handler/reorgHandler_test.go (2 hunks)
✅ Files skipped from review due to trivial changes (5)
  • pkg/indexer/block_tracker/blockTracker.go
  • pkg/indexer/app_chain/storer/groupMessage.go
  • pkg/indexer/app_chain/storer/identityUpdate.go
  • pkg/indexer/indexer_test.go
  • pkg/indexer/reorg_handler/reorgHandler_test.go
🧰 Additional context used
🧬 Code Graph Analysis (2)
pkg/indexer/app_chain/logHandler.go (6)
pkg/blockchain/interface.go (1)
  • ChainClient (28-33)
pkg/indexer/app_chain/storer/interface.go (1)
  • LogStorer (10-12)
pkg/indexer/block_tracker/blockTracker.go (1)
  • BlockTracker (21-26)
pkg/indexer/reorg_handler/reorgHandler.go (2)
  • ChainReorgHandler (15-17)
  • ErrNoBlocksFound (26-26)
pkg/indexer/app_chain/storer/error.go (1)
  • LogStorageError (3-6)
pkg/metrics/indexer.go (2)
  • EmitIndexerLogProcessingTime (115-117)
  • EmitIndexerRetryableStorageError (99-102)
pkg/indexer/block_tracker/blockTracker_test.go (1)
pkg/indexer/block_tracker/blockTracker.go (1)
  • NewBlockTracker (36-53)
🪛 GitHub Check: Test (Node)
pkg/indexer/block_tracker/blockTracker_test.go

[failure] 8-8:
no required module provides package github.com/xmtp/xmtpd/pkg/indexer/blocktracker; to add it:

⏰ Context from checks skipped due to timeout of 90000ms (1)
  • GitHub Check: Upgrade Tests
🔇 Additional comments (3)
pkg/indexer/block_tracker/blockTracker_test.go (1)

24-25: Follow-up: adjust constructor calls once the import path is fixed

After correcting the import, the existing calls to blocktracker.NewBlockTracker are fine – no further changes required.

Also applies to: 39-40, 67-68, 80-82, 115-117, 131-134

pkg/indexer/indexer.go (1)

51-60: Cancellation order – stop child before closing context

appChain.Stop() is called before cancel().
If Stop() relies on the parent context’s cancellation to exit goroutines cleanly, swap the order:

-	if i.appChain != nil {
-		i.appChain.Stop()
-	}
-	i.cancel()
+	i.cancel()
+	if i.appChain != nil {
+		i.appChain.Stop()
+	}

Double-check AppChain.Stop() expectations.

pkg/indexer/app_chain/logHandler.go (1)

54-76: Re-org discard logic may drop legitimate logs

event.BlockNumber >= reorgDetectedAt discards the detection block itself, which could contain the canonical replacement log after the streamer restarts.
Should the comparison be against reorgBeginsAt instead?

Please double-check the intended semantics and add a comment / unit test.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 5

🔭 Outside diff range comments (1)
pkg/indexer/common/logHandler_test.go (1)

140-149: ⚠️ Potential issue

Function signature mismatch

The signature of IndexLogs at line 140 doesn't match the one in the first test case or the actual implementation. The call includes logStorer, blockTracker, reorgHandler, and testContract, but these should now be part of the contract interface.

go c.IndexLogs(
    ctx,
    mockClient,
    channel,
    reorgChannel,
    contract,
-   logStorer,
-   blockTracker,
-   reorgHandler,
-   "testContract",
)
♻️ Duplicate comments (2)
pkg/indexer/app_chain/app_chain.go (2)

70-70: ⚠️ Potential issue

Handle GetLatestBlock errors – zero silently backfills from genesis.

Both GetLatestBlock calls drop the error value. When the tracker table is empty or the DB is unavailable, the method returns (0, err). Passing 0 to the log-streamer will trigger a backfill from block 0, which can choke the node and hit Infura/Alchemy rate limits.

-	groupMessageLatestBlockNumber, _ := groupMessageBroadcaster.GetLatestBlock()
+	groupMessageLatestBlockNumber, err := groupMessageBroadcaster.GetLatestBlock()
+	if err != nil {
+		cancel()
+		return nil, fmt.Errorf("failed to fetch latest block for group message broadcaster: %w", err)
+	}

86-86: ⚠️ Potential issue

Apply the same GetLatestBlock error handling here.

This line has the same issue as the previous GetLatestBlock call - the error is being ignored.

-	identityUpdateLatestBlockNumber, _ := identityUpdateBroadcaster.GetLatestBlock()
+	identityUpdateLatestBlockNumber, err = identityUpdateBroadcaster.GetLatestBlock()
+	if err != nil {
+		cancel()
+		return nil, fmt.Errorf("failed to fetch latest block for identity update broadcaster: %w", err)
+	}
🧹 Nitpick comments (3)
pkg/errors/errors.go (1)

20-22: Function name doesn't match new struct name

The constructor function name NewUnrecoverableLogStorageError still uses the old naming convention while returning the new NonRetryableError type. Consider updating the function name for consistency.

-func NewUnrecoverableLogStorageError(err error) NonRetryableError {
+func NewNonRetryableError(err error) NonRetryableError {
    return NonRetryableError{err: err}
}
pkg/indexer/common/logHandler.go (1)

164-182: Consider adding maximum retry count

The retry function will continue retrying indefinitely as long as ShouldRetry() returns true. Consider adding a maximum retry count or timeout to prevent excessive retries in case of persistent issues.

func retry(
    logger *zap.Logger,
    sleep time.Duration,
    address string,
    fn func() re.RetryableError,
+   maxRetries int,
) error {
+   retryCount := 0
    for {
+       if maxRetries > 0 && retryCount >= maxRetries {
+           logger.Error("max retries exceeded", zap.Int("maxRetries", maxRetries))
+           return errors.New("max retries exceeded")
+       }
        if err := fn(); err != nil {
            logger.Error("error storing log", zap.Error(err))
            if err.ShouldRetry() {
                metrics.EmitIndexerRetryableStorageError(address)
                time.Sleep(sleep)
+               retryCount++
                continue
            }
            return err
        }
        return nil
    }
}

Then update the call site:

err := retry(
    contract.Logger(),
    100*time.Millisecond,
    contract.Address().Hex(),
    func() re.RetryableError {
        return contract.StoreLog(ctx, event)
    },
+   5, // Add a reasonable maximum retry count
)
pkg/blockchain/rpcLogStreamer.go (1)

241-263: Robust accessor methods with null-safety

The new getter methods provide a clean API for accessing watcher-specific data by ID, with proper null-safety checks. These methods support the modular architecture by allowing components to access only the watchers they need.

Small suggestion: Consider adding type-specific error returns instead of zero values for better error handling:

-func (r *RpcLogStreamer) GetContractAddress(id string) common.Address {
+func (r *RpcLogStreamer) GetContractAddress(id string) (common.Address, bool) {
  if _, ok := r.watchers[id]; !ok {
-    return common.Address{}
+    return common.Address{}, false
  }

-  return r.watchers[id].ContractAddress
+  return r.watchers[id].ContractAddress, true
}
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 4e5a0bf and 440ba3c.

📒 Files selected for processing (30)
  • .mockery.yaml (1 hunks)
  • pkg/blockchain/blockchainPublisher.go (1 hunks)
  • pkg/blockchain/rpcLogStreamer.go (6 hunks)
  • pkg/blockchain/rpcLogStreamer_test.go (3 hunks)
  • pkg/errors/errors.go (1 hunks)
  • pkg/indexer/app_chain/app_chain.go (1 hunks)
  • pkg/indexer/app_chain/contracts/group_message.go (1 hunks)
  • pkg/indexer/app_chain/contracts/group_message_storer.go (8 hunks)
  • pkg/indexer/app_chain/contracts/group_message_storer_test.go (1 hunks)
  • pkg/indexer/app_chain/contracts/identity_update.go (1 hunks)
  • pkg/indexer/app_chain/contracts/identity_update_storer.go (13 hunks)
  • pkg/indexer/app_chain/contracts/identity_update_storer_test.go (1 hunks)
  • pkg/indexer/common/blockTracker.go (1 hunks)
  • pkg/indexer/common/blockTracker_test.go (8 hunks)
  • pkg/indexer/common/interface.go (1 hunks)
  • pkg/indexer/common/logHandler.go (1 hunks)
  • pkg/indexer/common/logHandler_test.go (5 hunks)
  • pkg/indexer/common/reorgHandler.go (5 hunks)
  • pkg/indexer/common/reorgHandler_test.go (2 hunks)
  • pkg/indexer/e2e_test.go (1 hunks)
  • pkg/indexer/indexer.go (2 hunks)
  • pkg/indexer/interface.go (0 hunks)
  • pkg/mocks/common/mock_IBlockTracker.go (1 hunks)
  • pkg/mocks/common/mock_IContract.go (1 hunks)
  • pkg/mocks/common/mock_ILogStorer.go (1 hunks)
  • pkg/mocks/common/mock_IReorgHandler.go (1 hunks)
  • pkg/mocks/indexer/mock_ChainReorgHandler.go (0 hunks)
  • pkg/mocks/retryable_errors/mock_RetryableError.go (1 hunks)
  • pkg/mocks/storer/mock_LogStorer.go (0 hunks)
  • pkg/server/server.go (2 hunks)
💤 Files with no reviewable changes (3)
  • pkg/indexer/interface.go
  • pkg/mocks/indexer/mock_ChainReorgHandler.go
  • pkg/mocks/storer/mock_LogStorer.go
✅ Files skipped from review due to trivial changes (12)
  • pkg/indexer/common/blockTracker.go
  • pkg/indexer/app_chain/contracts/group_message_storer_test.go
  • pkg/mocks/common/mock_IBlockTracker.go
  • pkg/indexer/app_chain/contracts/identity_update_storer_test.go
  • pkg/blockchain/blockchainPublisher.go
  • pkg/indexer/common/blockTracker_test.go
  • pkg/indexer/common/reorgHandler_test.go
  • pkg/mocks/retryable_errors/mock_RetryableError.go
  • .mockery.yaml
  • pkg/mocks/common/mock_ILogStorer.go
  • pkg/mocks/common/mock_IReorgHandler.go
  • pkg/mocks/common/mock_IContract.go
🚧 Files skipped from review as they are similar to previous changes (2)
  • pkg/server/server.go
  • pkg/blockchain/rpcLogStreamer_test.go
🧰 Additional context used
🧬 Code Graph Analysis (4)
pkg/indexer/common/logHandler_test.go (2)
pkg/indexer/common/logHandler.go (1)
  • IndexLogs (25-162)
pkg/mocks/retryable_errors/mock_RetryableError.go (2)
  • MockRetryableError (8-10)
  • NewMockRetryableError (112-122)
pkg/indexer/common/interface.go (1)
pkg/errors/errors.go (1)
  • RetryableError (3-6)
pkg/indexer/e2e_test.go (1)
pkg/indexer/indexer.go (1)
  • NewIndexer (22-51)
pkg/indexer/common/logHandler.go (5)
pkg/blockchain/interface.go (1)
  • ChainClient (28-33)
pkg/indexer/common/interface.go (1)
  • IContract (28-35)
pkg/indexer/common/reorgHandler.go (1)
  • ErrNoBlocksFound (22-22)
pkg/errors/errors.go (1)
  • RetryableError (3-6)
pkg/metrics/indexer.go (2)
  • EmitIndexerLogProcessingTime (115-117)
  • EmitIndexerRetryableStorageError (99-102)
🔇 Additional comments (54)
pkg/indexer/common/reorgHandler.go (2)

1-1: Package name change aligns with new modular architecture.

The package has been appropriately renamed from indexer to common to support modular multi-chain indexing.


47-47: Function exported for cross-package access.

The blockRange function has been renamed to BlockRange and exported, with all internal references updated. This supports the modular architecture by making this utility accessible to other packages.

Also applies to: 66-66, 102-102, 167-167

pkg/indexer/app_chain/contracts/group_message_storer.go (3)

1-1: Package name change and error handling centralized.

The package has been renamed from storer to contracts and now imports the centralized error package with alias re. This is a good enhancement that improves modularity and error handling consistency.

Also applies to: 13-13


37-37: Logger name simplified.

The logger name has been simplified from "GroupMessageStorer" to "storer", which is cleaner but still provides context within the contracts package.


46-46: Error handling standardized.

Return type and error constructors have been updated to use the centralized error types from the re package. This maintains the same error handling semantics while improving consistency across the codebase.

Also applies to: 49-49, 57-57, 68-68, 76-76, 85-85, 91-91, 104-104, 115-115

pkg/indexer/e2e_test.go (1)

45-47: Indexer initialization pattern updated.

The test has been updated to use the new indexer API, which now:

  1. Returns both an indexer and an error from NewIndexer
  2. Uses a parameterless StartIndexer() method

This aligns with the new architecture where the indexer delegates blockchain interactions to the AppChain abstraction.

pkg/indexer/app_chain/contracts/identity_update_storer.go (4)

1-1: Package renamed and error handling centralized.

The package has been renamed from storer to contracts and now imports the centralized error package. This improves consistency with the new modular architecture.

Also applies to: 18-18


48-48: Logger name simplified.

The logger name has been simplified from "IdentityUpdateStorer" to "storer", consistent with other storer implementations.


58-58: Error handling standardized throughout StoreLog method.

All error constructors have been updated to use the centralized error types from the re package, maintaining the same error handling logic while standardizing error types.

Also applies to: 61-61, 70-70, 92-92, 103-103, 120-120, 148-148, 163-163, 172-172, 178-178, 189-189, 200-200


207-213: Error type assertion updated.

The error type assertion has been updated to use the new re.RetryableError type, ensuring proper error handling when transactions fail.

pkg/indexer/common/logHandler_test.go (1)

1-2: ⚠️ Potential issue

Package declaration needs to be fixed

The file starts with package common but then immediately declares package indexer. The package should be consistently set to common since this is in the common directory.

-package common
+// package common - commented out

package indexer

Likely an incorrect or invalid review comment.

pkg/errors/errors.go (3)

1-6: Good package and interface renaming

The package has been appropriately renamed from storer to retryable_errors, and the interface has been renamed from LogStorageError to RetryableError. This makes the purpose of the interface more clear and aligns with the modular design.


8-22: Consistent renaming of error types

The error struct has been renamed from UnrecoverableLogStorageError to NonRetryableError, which is clearer and more concise while maintaining backward compatibility with existing code that checks the ShouldRetry() method.


24-38: Retain RetryableLogStorageError type

Maintaining the RetryableLogStorageError struct alongside the new interface provides backward compatibility. This is a good approach during refactoring to minimize disruption.

pkg/indexer/common/interface.go (5)

1-11: Good package structure and imports

Creating a dedicated common package with clear imports is a good way to organize the modular architecture. The imports are clean and the package alias for errors (re) is helpful.


12-15: Well-documented interface with clear purpose

The ILogStorer interface is appropriately documented and has a clear, concise method signature using the new RetryableError interface from the errors package.


17-21: Clean block tracking interface

The IBlockTracker interface has clear methods for getting and updating the latest block, which is essential for blockchain synchronization and reorg handling.


23-25: Reorg handler interface

The IReorgHandler interface provides a clean method for finding reorganization points in the blockchain, which is essential for proper event processing during chain reorganizations.


27-35: Comprehensive contract interface with embedded interfaces

The IContract interface efficiently combines the other interfaces through embedding and adds methods for address, topics, and logging. This promotes code reuse and ensures all contracts implement consistent functionality.

pkg/indexer/common/logHandler.go (7)

18-24: Well-documented function with clear purpose

The comment for IndexLogs clearly explains the function's purpose, operation, and error handling strategy. This is excellent documentation that will help maintainers understand the function's behavior.


32-35: Detailed explanation of reorg check interval

The comment explaining the reorg check interval provides valuable context about the chain's characteristics and the potential impact of reorgs, which is excellent documentation.


37-46: Well-organized state variables

The state variables are well-organized and clearly named, making it easy to understand the reorg detection and handling logic. The use of descriptive variable names like reorgInProgress and reorgDetectedAt enhances readability.


48-69: Clear reorg detection and handling

The reorg detection and handling logic is well-structured with clear comments explaining each step of the process. The detailed logging at each stage will be helpful for debugging.


85-140: Comprehensive reorg checking logic with recovery

The reorg checking logic is thorough and includes proper error handling, periodic checks, and recovery mechanisms. The code is well-commented and logs appropriate information at each step.


142-152: Effective retry mechanism with dependency injection

The retry mechanism is well-implemented, taking advantage of the RetryableError interface to determine whether to retry operations. The use of dependency injection for the logger and address is a good design decision.


122-122:

Details

❌ Incorrect review comment

Missing ErrNoBlocksFound import or definition

The code references ErrNoBlocksFound but this error is not defined in this file. Make sure it's imported or defined appropriately.


🏁 Script executed:

#!/bin/bash
# Search for ErrNoBlocksFound definition in the codebase
rg "ErrNoBlocksFound.*=.*errors\.New" --type go

Length of output: 134


No missing import—ErrNoBlocksFound is defined in the same package
The error is declared in pkg/indexer/common/reorgHandler.go (same common package), so it’s available in logHandler.go without any additional import. No changes needed here.

Likely an incorrect or invalid review comment.

pkg/indexer/indexer.go (6)

9-9: Good modularization using the new app_chain package.

The import of the new app_chain package creates a clear separation of concerns, where blockchain-specific logic is now encapsulated in a dedicated module.


19-19: Clean architectural change with the appChain property.

The Indexer now delegates blockchain interaction to the AppChain abstraction, which follows the single responsibility principle and makes the code more maintainable.


33-43: Proper error handling in the AppChain initialization.

The code correctly propagates errors from AppChain initialization and ensures the context is canceled if an error occurs.


45-50: Simplified Indexer struct initialization.

The Indexer now only maintains minimal properties needed for its operation, delegating complex blockchain interaction to the AppChain component.


53-64: Clean resource management in Close method.

The Close method properly stops the AppChain before canceling the context and waiting for goroutines to complete, ensuring orderly shutdown.


66-68: Simplified StartIndexer method.

The StartIndexer method is now much cleaner, delegating the actual startup logic to the AppChain component.

pkg/indexer/app_chain/contracts/identity_update.go (5)

23-30: Well-structured contract broadcaster with composition.

The IdentityUpdateBroadcaster follows a good design pattern by embedding interfaces (IBlockTracker, IReorgHandler, ILogStorer) to provide the required functionality while keeping the code modular and maintainable.


32-78: Good constructor with proper initialization and error handling.

The NewIdentityUpdateBroadcaster constructor follows a clean pattern:

  1. Initializes all required components (contract binding, block tracker, topics, storer, reorg handler)
  2. Returns early with proper error propagation at each step
  3. Creates a well-structured logger with contextual information

This makes the code robust and easier to debug.


80-90: Clear accessor methods for contract information.

The methods to access Address(), Topics(), and Logger() provide a clean interface for the AppChain to interact with this broadcaster.


102-104: Good practice using unique identifier for contract per chain.

The IdentityUpdateBroadcasterName function creates a unique identifier for the contract by combining the base name with the chain ID, which is important for multi-chain support.


106-112: Clean utility function for extracting event topics.

The function efficiently extracts the event topic hash from the contract ABI, properly handling errors.

pkg/indexer/app_chain/app_chain.go (5)

24-35: Well-structured AppChain type with clear responsibilities.

The AppChain type effectively encapsulates all components needed for blockchain event streaming and indexing, with clear field names and organization.


88-108: Good use of functional options pattern for streamer configuration.

The code uses the new functional options pattern for the RpcLogStreamer, which makes the configuration more flexible and readable compared to the previous builder pattern.


122-155: Clean concurrent indexing implementation.

The Start method correctly:

  1. Starts the streamer first
  2. Launches separate goroutines for each broadcaster using tracing.GoPanicWrap
  3. Properly tracks goroutines with a WaitGroup for clean shutdown
  4. Uses the common.IndexLogs utility function for consistent log handling

This approach enables efficient concurrent processing of events from multiple contracts.


157-168: Proper goroutine cleanup in Stop method.

The Stop method now correctly:

  1. Stops the streamer
  2. Cancels the context
  3. Waits for all spawned goroutines to finish via WaitGroup

This ensures clean shutdown and prevents goroutine leaks.


170-184: Clean accessor methods for event channels.

The methods to access event and reorg channels provide a clean interface to interact with the underlying streamer, with proper naming that reflects the contract and channel type.

pkg/indexer/app_chain/contracts/group_message.go (4)

21-28: Consistent structure with IdentityUpdateBroadcaster.

The GroupMessageBroadcaster follows the same pattern as IdentityUpdateBroadcaster, embedding the same interfaces (IBlockTracker, IReorgHandler, ILogStorer) which creates a consistent design across different contract types.


30-72: Well-structured constructor with proper initialization and error handling.

The constructor follows a clean pattern of initializing components and properly handling errors at each step. The structured logger with contextual information will aid in debugging.


74-84: Consistent accessor methods across broadcaster types.

The accessor methods for Address(), Topics(), and Logger() follow the same pattern as in IdentityUpdateBroadcaster, which creates a consistent interface for the AppChain to work with.


96-98: Good practice with unique identifier per chain.

Similar to IdentityUpdateBroadcaster, this creates a unique identifier by combining the base name with chain ID, enabling multi-chain support.

pkg/blockchain/rpcLogStreamer.go (8)

25-34: LGTM: Strong improvement to ContractConfig structure

Adding the ID field to uniquely identify each contract watcher and renaming EventChannel to the more descriptive backfillChannel enhances the code's readability and supports the modular multi-chain architecture.


36-64: Great implementation of functional options pattern

The functional options pattern is a significant improvement over the previous builder pattern. It makes the API more intuitive, improves readability, and allows for future extensibility without breaking the API.


73-81: LGTM: Improved lifecycle management and configuration

Converting watchers from a slice to a map keyed by ID enables direct access by ID, supporting the modular architecture. The addition of context, cancel, and waitgroup fields provides proper goroutine lifecycle management.


83-110: Well-structured constructor with chainID awareness

The constructor now properly sets up the logger with chainID, improving observability in a multi-chain environment. The functional options application is clean and follows Go idioms.


124-127: Essential addition of Stop method

The new Stop method properly cancels the context and waits for goroutines to complete, preventing resource leaks. This is a critical improvement for proper lifecycle management.


189-191: LGTM: Updated channel usage

The code now correctly sends logs to the renamed backfillChannel instead of the previous EventChannel.


207-207: LGTM: Configurable lag implementation

The hard-coded constant has been replaced with the configurable lagFromHighestBlock field, making the system more flexible.


19-22:

Details

❓ Verification inconclusive

Improved responsiveness with reduced polling interval

The polling interval reduction from 1 second to 100 milliseconds (NO_LOGS_SLEEP_TIME) will make the log streamer more responsive, aligning with the PR objectives. However, this 10x increase in polling frequency could potentially increase load on the RPC endpoint.


🏁 Script executed:

#!/bin/bash
# Check the RPC endpoint documentation or configuration for rate limiting
rg -A 3 "rate.?limit|throttle" --type go

Length of output: 311


🏁 Script executed:

#!/bin/bash
# Show constants and surrounding implementation in rpcLogStreamer.go
rg -n "const \(" pkg/blockchain/rpcLogStreamer.go -B 5 -A 20

# Check for any rate-limiting or semaphore usage within rpcLogStreamer.go
rg -n "limiter" pkg/blockchain/rpcLogStreamer.go

Length of output: 924


Assess RPC endpoint capacity for increased polling

Reducing NO_LOGS_SLEEP_TIME from 1 s to 100 ms (lines 20–22) will indeed make the log streamer more responsive but also increases RPC calls by 10×. There’s no built-in rate limiter in rpcLogStreamer.go, so please:

  • Confirm your RPC provider can handle ~10× more requests without throttling or additional costs.
  • Consider reusing the nonceManager’s semaphore or introducing a simple rate-limit/jitter/backoff in RpcLogStreamer to prevent overwhelming the endpoint.

pkg/blockchain/rpcLogStreamer.go
Lines 19–23:

const (
    BACKFILL_BLOCKS    = uint64(1000)
    ERROR_SLEEP_TIME   = 100 * time.Millisecond
    NO_LOGS_SLEEP_TIME = 100 * time.Millisecond
)

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 440ba3c and 99f89b2.

📒 Files selected for processing (4)
  • pkg/blockchain/rpcLogStreamer.go (8 hunks)
  • pkg/blockchain/rpcLogStreamer_test.go (2 hunks)
  • pkg/indexer/app_chain/app_chain.go (1 hunks)
  • pkg/indexer/common/logHandler_test.go (5 hunks)
🚧 Files skipped from review as they are similar to previous changes (3)
  • pkg/indexer/common/logHandler_test.go
  • pkg/blockchain/rpcLogStreamer_test.go
  • pkg/indexer/app_chain/app_chain.go
🔇 Additional comments (7)
pkg/blockchain/rpcLogStreamer.go (7)

19-23: Performance improvement with reduced sleep times

The sleep times for error handling and no-logs cases have been reduced from 1 second to 100 milliseconds. This change will make the log streamer more responsive to new events, allowing faster event processing.


26-34: Good enhancement to ContractConfig with ID field

Adding an ID field to ContractConfig is a key improvement that supports the modular architecture by allowing contracts to be referenced by unique identifiers rather than by position in a slice. The renamed channel fields also provide better clarity on their purpose.


36-60: Clean implementation of functional options pattern

The switch from a builder pattern to functional options is a good architectural change. The pattern provides:

  1. Better flexibility for configuring the streamer
  2. Cleaner API for adding new configuration options
  3. Good encapsulation of the initialization of buffered channels

The use of buffered channels (size 100 for backfill, size 1 for reorg) should help prevent blocking in high-throughput scenarios.


69-77: Improved RpcLogStreamer design with better lifecycle management

The RpcLogStreamer struct has been well-refactored with:

  1. Change from slice to map for watchers, enabling direct access by ID
  2. Proper context handling for lifecycle management
  3. Configurability through lagFromHighestBlock parameter

The constructor now properly initializes all fields, accepts a chain ID for logging context, and applies functional options in a clean way.

Also applies to: 79-106


120-123: Good addition of explicit Stop method

Adding an explicit Stop method that cancels the context and waits for goroutines to finish is a significant improvement for resource management. This ensures proper cleanup when the streamer is no longer needed.


128-132: Consistent timer usage in watchContract

The timer usage has been improved to be more consistent. Proper deferred closure of channels and timer stopping have been added. The timer reset calls have been standardized to use the MaxDisconnectTime from the watcher config.

Also applies to: 154-154, 168-168


237-259: Well-designed getter methods for contract access by ID

The new getter methods provide a clean interface for accessing contract addresses and channels by ID. The null checks and returning of zero values or nil when an ID is not found is a good defensive programming practice that prevents potential nil pointer dereferences.

@fbac fbac force-pushed the 05-14-indexer branch 2 times, most recently from 2d899c0 to 65a7623 Compare May 16, 2025 11:56
@fbac fbac force-pushed the 05-14-indexer branch from 65a7623 to 3a917df Compare May 16, 2025 11:58
err error
}

func (e NonRecoverableError) Error() string {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the Error method of NonRecoverableError, directly calling e.err.Error() without confirming that e.err is not nil might trigger a runtime panic if e.err is nil. Consider adding a nil-check to handle the nil case by returning a default error message such as "non recoverable error occurred", unless the nil is intended and documented.

+	if e.err == nil {
+		return "non recoverable error occurred"
+	}

🚀 Reply to ask Macroscope to explain or update this suggestion.

👍 Helpful? React to give us feedback.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

♻️ Duplicate comments (1)
pkg/errors/errors.go (1)

12-14: ⚠️ Potential issue

Nil-safety missing in Error()

e.err can legitimately be nil; calling e.err.Error() will panic.
This was raised previously and is still unaddressed.
Consider a guard clause:

 func (e NonRecoverableError) Error() string {
-	return e.err.Error()
+	if e.err == nil {
+		return "non recoverable error occurred"
+	}
+	return e.err.Error()
 }
🧹 Nitpick comments (10)
pkg/indexer/common/reorg_handler.go (3)

27-30: Error var names shadow stdlib errors

Local sentinel errors are fine, but keep names package-specific to avoid confusion when inspecting stack traces.
Example:

-var (
-	ErrNoBlocksFound = errors.New("no blocks found")
-	ErrGetBlock      = errors.New("failed to get block")
-)
+var (
+	ErrReorgNoBlocks   = errors.New("reorg handler: no blocks found")
+	ErrReorgGetBlock   = errors.New("reorg handler: failed to get block")
+)

49-71: Potential tight loop & DB hammering when DB is empty

If the database has no blocks at all, the loop:

startBlock, endBlock := BlockRange(detectedAt)
for {
    storedBlocks, err := r.queries.GetBlocksInRange(...)

will immediately drop into the len(storedBlocks)==0 branch, call BlockRange(startBlock) (which returns the same tuple when startBlock == 0), and then exit with ErrNoBlocksFound.

That’s OK, but if startBlock > 0 yet there are still no rows in that range (e.g., cold-start on a partially-synced DB) the loop will step back by exactly BLOCK_RANGE_SIZE each iteration and hammer the DB 600-block window at a time until it reaches 0.

Consider adding a rate-limit / back-off or collapsing to bigger windows when consecutive misses are detected to avoid thousands of pointless queries on a fresh deployment.


169-176: BlockRange off-by-one semantics are ambiguous

Returning (start,end) where end == from and start = end - BLOCK_RANGE_SIZE implies a range inclusive on both ends.
Document this explicitly to prevent callers from mistakenly using endBlock-1 and missing the tail block.

Minor: BLOCK_RANGE_SIZE is constant but BlockRange depends on it; prefer injecting it or renaming to DefaultBlockRange() for clarity and testability.

pkg/indexer/common/log_handler_test.go (4)

14-18: Package name vs directory name mismatch

The test imports the code via:

errorMocks "github.com/xmtp/xmtpd/pkg/errors"

but the actual package declared in that directory is retryable_errors.
Go allows differing import-path/package-name, but this indirection is confusing; future readers will search for package errors and not find it.

Rename the directory (pkg/retryable_errors) or change the package declaration to errors to align with the path.


22-40: setup() returns many loosely-related values

Returning seven positional values makes the call sites error-prone.
Prefer:

type testCtx struct {
   channel       chan types.Log
   reorgChannel  chan uint64
   ctx           context.Context
   cancel        context.CancelFunc
   log           types.Log
   number        uint64
   hash          common.Hash
}

and return a struct.


111-112: Comment vs logic mismatch

wg.Add(2) // Expecting two calls: StoreLog and UpdateLatestBlock

In this test you only call wg.Done() inside StoreLog, twice (one per retry).
The comment misleads future maintainers; update the comment or the WaitGroup count.


129-138: Return type in mock function should be retryable_errors.RetryableError

The closure signature currently returns errorMocks.RetryableError, which is an interface.
It is better to return *RecoverableError / *NonRecoverableError to avoid losing methods when type-asserting later.

RunAndReturn(func(ctx context.Context, log types.Log) retryable_errors.RetryableError {
    ...
})

Minor, but keeps mocks closer to real usage.

pkg/indexer/common/log_handler.go (1)

142-150: retry should respect context cancellation to avoid infinite loops

retry loops forever until a non-retryable error occurs; when the parent
context is cancelled the handler will continue retrying and block shutdown.

Consider changing the signature to accept the context and abort when
ctx.Done() fires.

pkg/indexer/app_chain/contracts/group_message.go (1)

32-40: Prefer common.Address over raw string to catch invalid input at compile-time

Passing the contract address around as string invites subtle bugs
(missing 0x, checksum errors, etc.). Accept a common.Address and move
the hex conversion to call-sites once.

-func NewGroupMessageBroadcaster(
-
-    address string,
+func NewGroupMessageBroadcaster(
+
+    address common.Address,-) (*GroupMessageBroadcaster, error) {
-    contract, err := groupMessageBroadcasterContract(address, client)
+ ) (*GroupMessageBroadcaster, error) {
+    contract, err := groupMessageBroadcasterContract(address.Hex(), client)

This change eliminates many runtime panics seen in production when an
accidentally empty string was supplied.

pkg/blockchain/rpcLogStreamer.go (1)

186-187: Back-pressure risk: unblock writer when consumer stalls

watcher.backfillChannel is buffered with length 100, but under heavy
traffic 100 logs can be filled in < 100 ms.
Wrap the send in a select with timeout or enlarge / make size
configurable to avoid blocking the entire watcher goroutine.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 65a7623 and dc2ab1b.

📒 Files selected for processing (14)
  • pkg/blockchain/rpcLogStreamer.go (8 hunks)
  • pkg/blockchain/rpcLogStreamer_test.go (2 hunks)
  • pkg/errors/errors.go (1 hunks)
  • pkg/indexer/app_chain/app_chain.go (1 hunks)
  • pkg/indexer/app_chain/contracts/group_message.go (1 hunks)
  • pkg/indexer/app_chain/contracts/group_message_storer.go (9 hunks)
  • pkg/indexer/app_chain/contracts/identity_update.go (1 hunks)
  • pkg/indexer/app_chain/contracts/identity_update_storer.go (14 hunks)
  • pkg/indexer/common/block_tracker.go (2 hunks)
  • pkg/indexer/common/block_tracker_test.go (8 hunks)
  • pkg/indexer/common/log_handler.go (1 hunks)
  • pkg/indexer/common/log_handler_test.go (5 hunks)
  • pkg/indexer/common/reorg_handler.go (6 hunks)
  • pkg/indexer/common/reorg_handler_test.go (2 hunks)
✅ Files skipped from review due to trivial changes (3)
  • pkg/indexer/common/block_tracker_test.go
  • pkg/indexer/common/block_tracker.go
  • pkg/indexer/common/reorg_handler_test.go
🚧 Files skipped from review as they are similar to previous changes (5)
  • pkg/blockchain/rpcLogStreamer_test.go
  • pkg/indexer/app_chain/contracts/identity_update.go
  • pkg/indexer/app_chain/contracts/identity_update_storer.go
  • pkg/indexer/app_chain/contracts/group_message_storer.go
  • pkg/indexer/app_chain/app_chain.go
🧰 Additional context used
🧬 Code Graph Analysis (3)
pkg/indexer/common/reorg_handler.go (2)
pkg/db/queries/db.go (1)
  • New (19-21)
pkg/indexer/common/interface.go (1)
  • IReorgHandler (23-25)
pkg/indexer/common/log_handler.go (5)
pkg/blockchain/interface.go (1)
  • ChainClient (28-33)
pkg/indexer/common/interface.go (1)
  • IContract (28-35)
pkg/indexer/common/reorg_handler.go (1)
  • ErrNoBlocksFound (28-28)
pkg/errors/errors.go (1)
  • RetryableError (3-6)
pkg/metrics/indexer.go (2)
  • EmitIndexerLogProcessingTime (115-117)
  • EmitIndexerRetryableStorageError (99-102)
pkg/blockchain/rpcLogStreamer.go (1)
pkg/blockchain/interface.go (1)
  • ChainClient (28-33)
🔇 Additional comments (4)
pkg/errors/errors.go (2)

8-22: Use pointer receiver & expose Unwrap for compatibility

  1. Both NonRecoverableError and the factory NewNonRecoverableError return values rather than pointers.
    Because the struct holds only one field, value copies are cheap, but using pointer receivers avoids accidental copies that break errors.Is/As identity checks.
  2. Error wrappers should implement Unwrap() error so that upstream callers can leverage errors.Is/As.
-type NonRecoverableError struct {
-	err error
-}
+type NonRecoverableError struct {
+	err error
+}
+
+func (e *NonRecoverableError) Unwrap() error { return e.err }

-func NewNonRecoverableError(err error) NonRecoverableError {
-	return NonRecoverableError{err: err}
+func NewNonRecoverableError(err error) *NonRecoverableError {
+	return &NonRecoverableError{err: err}
 }

Apply the same changes to RecoverableError.
[ suggest_essential_refactor ]


24-38: Symmetric issues in RecoverableError

Mirror the pointer-receiver, Unwrap, and constructor changes suggested above to keep the two error types consistent and to avoid copy semantics pitfalls.
[ suggest_essential_refactor ]

pkg/indexer/common/reorg_handler.go (1)

98-107: Edge case: risk of infinite loop on ancient reorg

When the oldest stored block mismatches the chain, you step back one window:

startBlock, endBlock = BlockRange(startBlock)

If the DB contains only a single non-matching block (e.g., corruption) startBlock will keep shrinking by BLOCK_RANGE_SIZE, eventually hit 0, and then return ErrNoBlocksFound. That’s acceptable, but you break all reorg recovery in that case.

Consider a guard: if the loop has iterated N times without finding any canonical block, surface a distinct error so the caller can decide to wipe the table or trigger a full resync.

pkg/indexer/common/log_handler_test.go (1)

56-65: Mock expectations: missing assertion for UpdateLatestBlock

Comment says we expect two calls (StoreLog + UpdateLatestBlock) and wg.Add(2).
If the production code changes and never invokes UpdateLatestBlock, the test will still pass because both wg.Done() calls are triggered from StoreLog expectations.

Either:

  1. Add a wg.Done() in the UpdateLatestBlock mock, or
  2. Change wg.Add(1) and the explanatory comment.

Keeping them in sync avoids false positives.

Comment on lines +131 to +138
if trackerErr := contract.UpdateLatestBlock(ctx, reorgBlockNumber, reorgBlockHash); trackerErr != nil {
contract.Logger().
Error("error updating block tracker", zap.Error(trackerErr))
}

reorgChannel <- reorgBlockNumber
continue
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Non-blocking send avoids dead-locks when reorg events pile up

reorgChannel has buffer = 1.
If a downstream consumer is slow (or crashed) the send at L136 will block forever, halting the whole indexer.

-	reorgChannel <- reorgBlockNumber
+	select {
+	case reorgChannel <- reorgBlockNumber:
+	default:
+		contract.Logger().Warn("reorgChannel full – dropping signal",
+			zap.Uint64("reorgBlockNumber", reorgBlockNumber))
+	}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if trackerErr := contract.UpdateLatestBlock(ctx, reorgBlockNumber, reorgBlockHash); trackerErr != nil {
contract.Logger().
Error("error updating block tracker", zap.Error(trackerErr))
}
reorgChannel <- reorgBlockNumber
continue
}
if trackerErr := contract.UpdateLatestBlock(ctx, reorgBlockNumber, reorgBlockHash); trackerErr != nil {
contract.Logger().
Error("error updating block tracker", zap.Error(trackerErr))
}
select {
case reorgChannel <- reorgBlockNumber:
default:
contract.Logger().Warn("reorgChannel full – dropping signal",
zap.Uint64("reorgBlockNumber", reorgBlockNumber))
}
continue
}
🤖 Prompt for AI Agents
In pkg/indexer/common/log_handler.go around lines 131 to 138, the send operation
to reorgChannel can block indefinitely because the channel buffer size is 1 and
the consumer might be slow or crashed. To fix this, change the send to a
non-blocking send by using a select statement with a default case, so that if
the channel is full, the send is skipped instead of blocking, preventing
deadlocks when reorg events pile up.

Comment on lines +95 to +104
onchainBlock, err := client.BlockByNumber(ctx, big.NewInt(int64(storedBlockNumber)))
if err != nil {
contract.Logger().
Warn("error querying block from the blockchain, proceeding with event processing",
zap.Uint64("blockNumber", storedBlockNumber),
zap.Error(err),
)
skipReorgHandling = true
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

🧩 Analysis chain

Add a bounded-timeout when querying the chain to prevent goroutine leaks

client.BlockByNumber can hang indefinitely if the remote node becomes un-responsive.
Wrap the call in a short-lived context (e.g. 2-3 s) so the handler keeps making progress and surfaces the error early.

-	onchainBlock, err := client.BlockByNumber(ctx, big.NewInt(int64(storedBlockNumber)))
+	ctxBlock, cancel := context.WithTimeout(ctx, 3*time.Second)
+	defer cancel()
+	onchainBlock, err := client.BlockByNumber(ctxBlock, big.NewInt(int64(storedBlockNumber)))

#!/bin/bash
# Locate the Go file and inspect its imports and relevant lines
LOG_HANDLER_FILE=$(fd -e go log_handler.go)
echo "File path: $LOG_HANDLER_FILE"
echo "---- Imports ----"
sed -n '1,50p' "$LOG_HANDLER_FILE"
echo "---- Lines 80 to 120 ----"
sed -n '80,120p' "$LOG_HANDLER_FILE"

🏁 Script executed:

#!/bin/bash
# Find log_handler.go and display imports and relevant lines
LOG_HANDLER_FILE=$(fd -e go log_handler.go)
echo "File path: $LOG_HANDLER_FILE"
echo "---- Imports (first 50 lines) ----"
sed -n '1,50p' "$LOG_HANDLER_FILE"
echo "---- Code around line 95 ----"
sed -n '85,115p' "$LOG_HANDLER_FILE"

Length of output: 3116


Enforce timeout on BlockByNumber to avoid hanging RPC calls
Wrap the chain query in a short-lived context (e.g. 3s) so that an unresponsive node doesn’t block the handler.

  • File: pkg/indexer/common/log_handler.go (around line 95)
-	onchainBlock, err := client.BlockByNumber(ctx, big.NewInt(int64(storedBlockNumber)))
+	ctxBlock, cancel := context.WithTimeout(ctx, 3*time.Second)
+	onchainBlock, err := client.BlockByNumber(ctxBlock, big.NewInt(int64(storedBlockNumber)))
+	cancel()
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
onchainBlock, err := client.BlockByNumber(ctx, big.NewInt(int64(storedBlockNumber)))
if err != nil {
contract.Logger().
Warn("error querying block from the blockchain, proceeding with event processing",
zap.Uint64("blockNumber", storedBlockNumber),
zap.Error(err),
)
skipReorgHandling = true
}
ctxBlock, cancel := context.WithTimeout(ctx, 3*time.Second)
onchainBlock, err := client.BlockByNumber(ctxBlock, big.NewInt(int64(storedBlockNumber)))
cancel()
if err != nil {
contract.Logger().
Warn("error querying block from the blockchain, proceeding with event processing",
zap.Uint64("blockNumber", storedBlockNumber),
zap.Error(err),
)
skipReorgHandling = true
}
🤖 Prompt for AI Agents
In pkg/indexer/common/log_handler.go around lines 95 to 104, the call to
client.BlockByNumber uses the passed context directly, which can hang
indefinitely if the remote node is unresponsive. To fix this, create a new
context with a timeout of about 3 seconds using context.WithTimeout before
calling BlockByNumber, and ensure to cancel the context after the call to avoid
resource leaks. Replace the original context with this timeout context in the
BlockByNumber call to enforce a bounded wait time.

Comment on lines +203 to 205
highestBlockCanProcess := highestBlock - r.lagFromHighestBlock
if fromBlock > highestBlockCanProcess {
metrics.EmitIndexerCurrentBlockLag(contractAddress, 0)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Handle highestBlock < lag to avoid uint64 under-flow

When the chain is very young or the operator configures an excessive lag,
highestBlock - r.lagFromHighestBlock will wrap to a huge value,
causing the streamer to pull an enormous range of blocks.

-	highestBlockCanProcess := highestBlock - r.lagFromHighestBlock
+	var highestBlockCanProcess uint64
+	if highestBlock <= r.lagFromHighestBlock {
+		highestBlockCanProcess = 0
+	} else {
+		highestBlockCanProcess = highestBlock - r.lagFromHighestBlock
+	}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
highestBlockCanProcess := highestBlock - r.lagFromHighestBlock
if fromBlock > highestBlockCanProcess {
metrics.EmitIndexerCurrentBlockLag(contractAddress, 0)
var highestBlockCanProcess uint64
if highestBlock <= r.lagFromHighestBlock {
highestBlockCanProcess = 0
} else {
highestBlockCanProcess = highestBlock - r.lagFromHighestBlock
}
if fromBlock > highestBlockCanProcess {
metrics.EmitIndexerCurrentBlockLag(contractAddress, 0)
🤖 Prompt for AI Agents
In pkg/blockchain/rpcLogStreamer.go around lines 203 to 205, the calculation of
highestBlockCanProcess subtracts lagFromHighestBlock from highestBlock without
checking if highestBlock is smaller than lagFromHighestBlock, which can cause
uint64 underflow. Fix this by adding a condition to check if highestBlock is
less than lagFromHighestBlock; if so, set highestBlockCanProcess to zero or
handle it appropriately to prevent underflow and avoid pulling an excessive
range of blocks.

mkysel and others added 6 commits May 16, 2025 15:57
### Add `xmtpd-prune` Docker image build support to GitHub Actions
release workflow
The GitHub Actions workflow in
[release-from-tag.yml](https://github.com/xmtp/xmtpd/pull/791/files#diff-cee4eadd46d2585fd337f530116877504920be9ec452bdbfb915b95857438208)
is updated to build and push a new `xmtpd-prune` Docker image using
`Dockerfile-prune`. The workflow matrix now includes three images:
`xmtpd`, `xmtpd-cli`, and `xmtpd-prune`.
### Update Datadog JUnit upload GitHub action from v1 to v2 to fix
deprecation warning
Updates the GitHub Actions workflow configuration in
[test.yml](https://github.com/xmtp/xmtpd/pull/790/files#diff-faff1af3d8ff408964a57b2e475f69a6b7c7b71c9978cccc8f471798caac2c88)
with:
* Upgrades `datadog/junit-upload-github-action` from `@v1` to `@v2`
* Updates parameter name from `api-key` to `api_key` in the Datadog
JUnit upload action configuration
### Update xmtpd-cli Docker image to multichain version sha-d4e697c and
refactor Docker Compose configurations using YAML anchors
* Updates the `xmtpd-cli` Docker image from `sha-a96bf04` to
`sha-d4e697c` in
[docker-compose-register.yml](https://github.com/xmtp/xmtpd/pull/793/files#diff-a1e7ac7430bed8c9bdd26624200229b35a1eeb5363eb27128ae4eb71bad7f83c)
* Introduces YAML anchors for image definitions in both
[docker-compose-register.yml](https://github.com/xmtp/xmtpd/pull/793/files#diff-a1e7ac7430bed8c9bdd26624200229b35a1eeb5363eb27128ae4eb71bad7f83c)
and
[docker-compose.yml](https://github.com/xmtp/xmtpd/pull/793/files#diff-6a25daaa729d7279e0cda8eb92399f1a82d4a91f961e1955bb5745e9c99f3c6e)
* Removes legacy environment variables from
[local.env](https://github.com/xmtp/xmtpd/pull/793/files#diff-8f5c74fc1c72e37751a8a758aad091605c7771b2bb5d5a772d51c97836d85161)
that were previously required for backward compatibility

#### 📍Where to Start
Start with the image version update and YAML anchor definition in
[docker-compose-register.yml](https://github.com/xmtp/xmtpd/pull/793/files#diff-a1e7ac7430bed8c9bdd26624200229b35a1eeb5363eb27128ae4eb71bad7f83c),
as this contains the primary functional change of updating the
`xmtpd-cli` image version.

----

_[Macroscope](https://app.macroscope.com) summarized 37bb42b._

<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

- **Chores**
- Centralized Docker image version references in Docker Compose files
for easier maintenance.
- Updated service definitions to use shared image anchors for
consistency.
- Reorganized and annotated environment variables in local
configuration, adding a new contract address variable.
- Improved configuration validation with modular checks and added
validation for pruning options.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->
configuration for replication and indexer features
* Adds validation checks in
[validation.go](https://github.com/xmtp/xmtpd/pull/798/files#diff-3e0b3707cc5d712d06d1eeb6a67c10b45a7ba0b27e972924d71df3c8e539f23b)
to ensure `options.MlsValidation.GrpcAddress` is configured when
replication or indexer features are enabled
* Improves error handling in
[error.go](https://github.com/xmtp/xmtpd/pull/798/files#diff-3fef35bc4a242f3ad3624f2db23219de359d977803b44d982b0b80050c05fe79)
by adding message fields to `UnrecoverableLogStorageError` and
`RetryableLogStorageError` types
* Standardizes error messaging in
[groupMessage.go](https://github.com/xmtp/xmtpd/pull/798/files#diff-efeef048b7789cf1f5a86eff464134023ebb19f01c92c49b76771ce65bdbc1f9)
and
[identityUpdate.go](https://github.com/xmtp/xmtpd/pull/798/files#diff-cff55c051234a8ef9c37deba5fd2ef50bedea6820902226931fb01086a036c3f)
using package-level constants
* Updates test cases in
[indexer_test.go](https://github.com/xmtp/xmtpd/pull/798/files#diff-1f631679d1a8473f15a81e169b9a43c253c389daa02ee1242973f6e6660a3fb6)
to accommodate new error constructor signatures

Start with the validation logic changes in
[validation.go](https://github.com/xmtp/xmtpd/pull/798/files#diff-3e0b3707cc5d712d06d1eeb6a67c10b45a7ba0b27e972924d71df3c8e539f23b)
which implements the core configuration validation requirements for the
MLS validation service.

----

_[Macroscope](https://app.macroscope.com) summarized 48a4614._

<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->

- **Bug Fixes**
- Improved validation to ensure required configuration fields are
present when specific features are enabled.
- **Refactor**
- Standardized error messages across log processing and identity update
features for clearer and more consistent error reporting.
- Enhanced error types to include descriptive messages, improving error
context in logs and error handling.
- **Tests**
- Updated tests to align with new error message handling and constructor
changes.

<!-- end of auto-generated comment: release notes by coderabbit.ai -->
@fbac fbac closed this May 16, 2025
}

func (e RecoverableError) Error() string {
return fmt.Sprintf("%s: %s", e.msg, e.err.Error())
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the Error() method, calling e.err.Error() without checking if e.err is nil may result in a nil pointer dereference panic. Consider adding a nil check before invoking e.err.Error(), so that if e.err is nil, you return just e.msg.

-	return fmt.Sprintf("%s: %s", e.msg, e.err.Error())
+	if e.err != nil {
+		return fmt.Sprintf("%s: %s", e.msg, e.err.Error())
+	}
+	return e.msg

🚀 Reply to ask Macroscope to explain or update this suggestion.

👍 Helpful? React to give us feedback.

@fbac fbac deleted the 05-14-indexer branch May 16, 2025 16:05
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants