Skip to content

Conversation

@fbac
Copy link
Collaborator

@fbac fbac commented Jul 8, 2025

Backfill page on subscription error by restructuring RPCLogStreamer error handling and reducing MaxChainDisconnectTime from 300s to 60s

  • Restructures the RPCLogStreamer.watchContract method in rpc_log_streamer.go to handle subscription errors within the main loop and restart backfill process when errors occur
  • Reduces the default MaxChainDisconnectTime in AppChainOptions struct from 300s to 60s in options.go
  • Extracts subscription building logic into buildSubscriptionWithBackoff method and changes channel buffer size from hardcoded 100 to dynamic sizing based on expected logs per block

📍Where to Start

Start with the watchContract method in rpc_log_streamer.go to understand the restructured main loop and error handling logic.

Changes since #947 opened

  • Modified backfill error handling in RPCLogStreamer.watchContract method [25da832]

Macroscope summarized 25da832.

@fbac fbac requested a review from a team as a code owner July 8, 2025 14:57
@graphite-app
Copy link

graphite-app bot commented Jul 8, 2025

How to use the Graphite Merge Queue

Add either label to this PR to merge it via the merge queue:

  • Queue - adds this PR to the back of the merge queue
  • Hotfix - for urgent hot fixes, skip the queue and merge this PR next

You must have a Graphite account in order to use the merge queue. Sign up using this link.

An organization admin has enabled the Graphite Merge Queue in this repository.

Please do not merge from GitHub as this will restart CI on PRs being processed by the merge queue.

@fbac fbac force-pushed the 07-08-backfill_on_sub_error branch from 6368621 to 4cd89cc Compare July 8, 2025 15:29
logs, err := r.backfillPage(r.ctx, cfg, backfillFromBlockNumber)
if err != nil {
logger.Error("failed to backfill page, closing", zap.Error(err))
return
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if you terminate the goroutine here, what will restart the watcher? Same for all returns in this error block

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, this should be a Fatal.

If we can't guarantee the page has been backfilled, or the sub hasn't been recreated, we're at a data integrity risk which could lead to gaps and undefined behavior when a client invokes the API.

Let me know what you think @mkysel

Comment on lines 189 to 193
case err, open := <-sub.Err():
if !open {
logger.Error("subscription channel closed, closing watcher")
return
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when doing an initial backfill, the sub cancellation might happen quite a bit while we are still backfilling. We should totally do the right thing, but tearing down everything seems like overkill.

Copy link
Collaborator Author

@fbac fbac Jul 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've been thinking on this one for a couple days. I've come to believe this is an overkill, but also unnecessary. I'll come today with a different approach!

case err, open := <-sub.Err():
if !open {
logger.Error("subscription channel closed, closing watcher")
return
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this mean we will stop indexing until the entire process gets restarted?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've removed checking the sub error channel is open. Instead we'll rebuild the sub when there's an actual error. And the sub error channel is recreated by go-ethereum (the owner)

@fbac fbac requested a review from mkysel July 14, 2025 12:30
@fbac fbac force-pushed the 07-08-backfill_on_sub_error branch from 5e8727f to 26b1ce7 Compare July 14, 2025 12:32
@fbac fbac force-pushed the 07-08-backfill_on_sub_error branch from b87fc6d to 476bbed Compare July 15, 2025 13:33
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Issue on line in pkg/indexer/rpc_streamer/rpc_log_streamer.go:1:

The field backfillBlockPageSize on RPCLogStreamer is read by GetNextPage and written in the backfill error handling (r.backfillBlockPageSize = blockPageSize) across different watcher goroutines without synchronization. This can lead to data races and unpredictable behavior. Consider using atomic operations or a mutex to synchronize access to r.backfillBlockPageSize.

+import "sync/atomic"
@@
-                            r.backfillBlockPageSize = blockPageSize
+                            atomic.StoreUint64(&r.backfillBlockPageSize, blockPageSize)
@@
-    toBlock := min(fromBlockNumber+r.backfillBlockPageSize-1, highestBlock)
+    toBlock := min(fromBlockNumber+atomic.LoadUint64(&r.backfillBlockPageSize)-1, highestBlock)

🚀 Reply to ask Macroscope to explain or update this suggestion.

👍 Helpful? React to give us feedback.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a watcher per blockchain process, and the indexer has N blockchain processes. The block page size is unique per blockchain.

It shouldn't matter which goroutine adjusts the block page size, as they'll be always relevant to the others watchers in the same blockchain process.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the clarification on the architecture. While I understand that functionally all watchers in the same blockchain process should use the same block page size, this is still a data race from Go's memory model perspective - multiple goroutines accessing the same memory location where at least one is writing without synchronization.

Even if the race is benign in practice, it could:

  • Be flagged by go run -race
  • Lead to undefined behavior per the Go memory model
  • Cause issues with compiler optimizations

The atomic operations are lightweight and would eliminate the race condition entirely without affecting performance. Would you be open to that approach for correctness?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What proposal do you have?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I propose using atomic operations to fix the data race. Two changes would be needed:

  1. Change the field type in RPCLogStreamer from uint64 to atomic.Uint64 (or use sync/atomic functions)
  2. Update the two access locations:

Writing (in watchContract):

import "sync/atomic"

// Instead of:
r.backfillBlockPageSize = blockPageSize

// Use:
atomic.StoreUint64(&r.backfillBlockPageSize, blockPageSize)

Reading (in GetNextPage):

// Instead of:
toBlock := min(fromBlockNumber+r.backfillBlockPageSize-1, highestBlock)

// Use:
toBlock := min(fromBlockNumber+atomic.LoadUint64(&r.backfillBlockPageSize)-1, highestBlock)

This is a minimal change that eliminates the race condition while maintaining the same functionality.

@fbac fbac merged commit 68be52a into main Jul 15, 2025
9 checks passed
@fbac fbac deleted the 07-08-backfill_on_sub_error branch July 15, 2025 19:11
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants