-
Notifications
You must be signed in to change notification settings - Fork 39
Index via websocket subscription #828
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
""" WalkthroughThis change transitions blockchain connection protocols from HTTP to WebSocket across configuration files, tests, and code. It introduces WebSocket URL fields, updates validation to test WebSocket connectivity, and adds live subscription-based log streaming to the log streamer. Log indexing is refactored to handle context cancellation and channel closure explicitly, with corresponding test and interface updates. Changes
Sequence Diagram(s)sequenceDiagram
participant Config
participant Validator
participant BlockchainClient
participant RpcLogStreamer
participant Indexer
Config->>Validator: Provide WssURL
Validator->>Validator: validateWebsocketURL(WssURL)
Validator->>BlockchainClient: Attempt WebSocket dial
BlockchainClient-->>Validator: Success/Failure
Indexer->>RpcLogStreamer: Start log streaming (backfill + subscription)
RpcLogStreamer->>BlockchainClient: Subscribe to logs (WebSocket)
BlockchainClient-->>RpcLogStreamer: Log events (live)
RpcLogStreamer->>Indexer: Send logs on eventChannel (merged backfill + subscription)
Indexer->>Indexer: IndexLogs(eventChannel, reorgChannel)
Note over Indexer: Process events from channel, handle reorgs, context cancellation
Possibly related PRs
Suggested reviewers
Warning There were issues while running some tools. Please review the errors and either fix the tool's configuration or disable the tool if it's a critical failure. 🔧 golangci-lint (1.64.8)Error: you are using a configuration file for golangci-lint v2 with golangci-lint v1: please use golangci-lint v2 Note ⚡️ AI Code Reviews for VS Code, Cursor, WindsurfCodeRabbit now has a plugin for VS Code, Cursor and Windsurf. This brings AI code reviews directly in the code editor. Each commit is reviewed immediately, finding bugs before the PR is raised. Seamless context handoff to your AI code agent ensures that you can easily incorporate review feedback. 📜 Recent review detailsConfiguration used: CodeRabbit UI 📒 Files selected for processing (3)
🚧 Files skipped from review as they are similar to previous changes (2)
🧰 Additional context used🧠 Learnings (1)pkg/blockchain/rpcLogStreamer.go (3)⏰ Context from checks skipped due to timeout of 90000ms (5)
🔇 Additional comments (7)
✨ Finishing Touches
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 6
🔭 Outside diff range comments (1)
pkg/blockchain/rpcLogStreamer.go (1)
170-180:⚠️ Potential issue
time.Timeris reset without first draining or stopping – data-race & panic riskCalling
timer.Reset()while the timer is already expired but the value has not been read causes a panic.
The twoReset(cfg.MaxDisconnectTime)invocations should follow the safe pattern:if !timer.Stop() { <-timer.C // drain if needed } timer.Reset(cfg.MaxDisconnectTime)Apply this pattern before each
Reset(lines ~179 and ~210).Also applies to: 208-214
🧹 Nitpick comments (4)
cmd/cli/cli_test.go (1)
13-13: URL scheme updated but flag name remains inconsistentThe URL scheme has been correctly changed from HTTP to WebSocket to align with the transition to WebSocket-based blockchain communication. However, the flag name
--http-addressno longer matches the protocol being used, which could be confusing for users.Consider renaming the flag to
--ws-addressor a more protocol-agnostic name like--rpc-addressto better reflect its actual purpose. You would need to update both the flag name in the code and in the error message on line 44.pkg/indexer/common/log_handler.go (1)
185-204: Consider deduplicating identical storage logic for backfill & subscriptionThe retry-and-store block is duplicated verbatim in both branches. A small
helper (e.g.processLog(event types.Log) error) would trim ~30 duplicated
lines and make future changes less error-prone.pkg/blockchain/rpcLogStreamer.go (2)
405-416: Duplicate filter-query builders – consolidate into one helper
buildFilterQueryandbuildSubscriptionFilterQuerydiffer only in the optional
FromBlock/ToBlockfields. Duplicating code invites drift.Consider a single function that takes optional
from/toarguments:func buildFilterQueryBase(addr common.Address, topics []common.Hash, from, to *big.Int) ethereum.FilterQuery { … }and reuse it from both call-sites.
This improves maintainability and removes ~15 lines of duplication.
63-66: Channel buffer size is arbitrary – expose as a parameter or constantThe hard-coded buffer of
100may be either too small (risking back-pressure) or too large (wasted memory) depending on chain activity.
Expose the size as:const defaultChanBuf = 256or accept it via an option so operators can tune it.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (12)
cmd/cli/cli_test.go(1 hunks)dev/local.env(1 hunks)go.mod(1 hunks)pkg/blockchain/rpcLogStreamer.go(13 hunks)pkg/blockchain/rpcLogStreamer_test.go(1 hunks)pkg/config/options.go(2 hunks)pkg/config/validation.go(4 hunks)pkg/indexer/app_chain/app_chain.go(6 hunks)pkg/indexer/common/log_handler.go(4 hunks)pkg/indexer/common/log_handler_test.go(5 hunks)pkg/server/server_test.go(1 hunks)pkg/testutils/anvil/anvil.go(1 hunks)
🧰 Additional context used
🧠 Learnings (1)
pkg/indexer/app_chain/app_chain.go (2)
Learnt from: fbac
PR: xmtp/xmtpd#800
File: pkg/indexer/app_chain/contracts/group_message.go:0-0
Timestamp: 2025-05-19T09:46:53.916Z
Learning: The GroupMessageBroadcaster implementation in pkg/indexer/app_chain/contracts/group_message.go uses common.Address type for contract addresses which are already validated by pkg/config/validation.go, making additional address validation unnecessary.
Learnt from: fbac
PR: xmtp/xmtpd#800
File: pkg/indexer/app_chain/app_chain.go:174-180
Timestamp: 2025-05-19T09:19:31.648Z
Learning: In the AppChain implementation, the reorg channels returned by methods like GroupMessageBroadcasterReorgChannel() need to remain bidirectional (chan uint64) because they're passed to the IndexLogs function which writes to them when detecting blockchain reorganizations.
🧬 Code Graph Analysis (2)
pkg/indexer/app_chain/app_chain.go (2)
pkg/indexer/app_chain/contracts/group_message.go (1)
GroupMessageBroadcasterName(101-103)pkg/indexer/app_chain/contracts/identity_update.go (1)
IdentityUpdateBroadcasterName(106-108)
pkg/indexer/common/log_handler.go (3)
pkg/indexer/common/reorg_handler.go (1)
ErrNoBlocksFound(28-28)pkg/errors/errors.go (1)
RetryableError(5-8)pkg/metrics/indexer.go (1)
EmitIndexerLogProcessingTime(115-117)
⏰ Context from checks skipped due to timeout of 90000ms (5)
- GitHub Check: Code Review
- GitHub Check: Push Docker Images to GitHub Packages (xmtpd-cli)
- GitHub Check: Push Docker Images to GitHub Packages (xmtpd)
- GitHub Check: Upgrade Tests
- GitHub Check: Test (Node)
🔇 Additional comments (10)
go.mod (1)
12-12: LGTM! WebSocket package now used as direct dependencyMoving the Gorilla WebSocket package from indirect to direct dependencies properly reflects its new usage in the codebase for WebSocket connections to blockchain nodes.
pkg/testutils/anvil/anvil.go (1)
103-103:Details
✅ Verification successful
WebSocket URL scheme adopted for Anvil container
The URL scheme has been correctly updated from HTTP to WebSocket, consistent with the project-wide transition to WebSocket-based blockchain communication.
Let's verify that the
blockchain.NewClientfunction supports WebSocket URLs:
🏁 Script executed:
#!/bin/bash # Check if the blockchain.NewClient function properly handles WebSocket URLs rg -A 10 "func NewClient" pkg/blockchain/Length of output: 655
Approve WebSocket URL scheme in Anvil container
The WebSocket URL in
pkg/testutils/anvil/anvil.go(line 103) is correct. Under the hood,blockchain.NewClientcallsethclient.DialContext, which fully supportsws://(andwss://) endpoints. No further changes needed.dev/local.env (1)
12-12: LGTM: WebSocket protocol update aligns with subscription requirements.The change from HTTP to WebSocket protocol for the RPC_URL environment variable is appropriate for enabling real-time subscription-based log streaming.
pkg/server/server_test.go (1)
45-45: LGTM: Test server configuration updated to use WebSocket URL.This change correctly aligns the test server configuration with the broader transition to WebSocket connections.
pkg/config/options.go (2)
35-38: Appropriate deprecation notices and new field added.The deprecation notice for
RpcURLand introduction ofWssURLprovides a clear migration path while maintaining backward compatibility.
48-51: Consistent approach for SettlementChainOptions.The same deprecation pattern and new field addition is correctly applied to the SettlementChainOptions struct, maintaining consistency across the codebase.
pkg/config/validation.go (2)
117-122: LGTM: WebSocket validation for AppChain RPC URL.Validating the WebSocket connection is a good enhancement over simply checking for non-empty strings.
150-155: LGTM: WebSocket validation for SettlementChain RPC URL.Consistent validation approach applied to the SettlementChain configuration.
pkg/indexer/app_chain/app_chain.go (1)
151-158: EnsureGetSubscriptionChannel()never returnsnilto avoid a hard-block inIndexLogs
IndexLogsperforms a blocking receive on the subscription channel.
IfRpcLogStreamer.GetSubscriptionChannel()returnsnil(e.g. when the WS
subscription could not be established), the goroutine will block forever because
receiving from anilchannel blocks indefinitely.
Please make sureRpcLogStreameralways initialises the channel (even when the
underlying subscription cannot be created) or changeIndexLogsto guard
against anilchannel.Also applies to: 167-174
pkg/indexer/common/log_handler_test.go (1)
123-129: ClosesubChannelin the retryable-error test to avoid goroutine leaksUnlike
TestIndexLogsSuccess, thedeferblock here does not close
cfg.subChannel. As a result, theIndexLogsgoroutine may stay blocked on
that channel after the test completes, leading to leaked goroutines.- close(cfg.reorgChannel) + close(cfg.reorgChannel) + close(cfg.subChannel)[nitpick]
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 5
🧹 Nitpick comments (4)
pkg/blockchain/rpcLogStreamer.go (4)
300-303: Add a small sleep when logs are found to prevent tight loop.When logs are found during backfill, the loop continues immediately without delay, which could hammer the node with requests if there are many consecutive blocks with logs.
if len(logs) == 0 { time.Sleep(sleepTimeNoLogs) continue } + // Add a small sleep to prevent a tight loop when many blocks have logs + time.Sleep(10 * time.Millisecond) logger.Debug( "Got logs", zap.Int("numLogs", len(logs)), zap.Uint64("fromBlock", fromBlock), zap.Time("time", time.Now()), )
394-423: Add short delay before retry after subscription error.The current implementation immediately attempts to rebuild the subscription after an error. Adding a small delay before the retry attempt can help avoid hammering the server if there's a persistent issue.
case err := <-sub.Err(): if err == nil { continue } logger.Error("subscription error, rebuilding", zap.Error(err)) + // Add a small delay before attempting to rebuild + time.Sleep(100 * time.Millisecond) rebuildOperation := func() (ethereum.Subscription, error) { sub, err = r.buildSubscription(query, innerSubCh) return sub, err }
144-147: Add a state variable to track the operational mode.Currently,
isSubEnabledcontrols whether subscription logs are processed. Making this a defined state enum would improve code readability and maintainability.Define a state enum at the top of the file:
type watcherState int const ( stateBackfill watcherState = iota stateSubscription statePolling // For future use if subscription fails )Then update the code to use this state:
var ( logger = r.logger.With(zap.String("contractAddress", cfg.Address.Hex())) subLogger = logger.Named("subscription"). With(zap.String("contractAddress", cfg.Address.Hex())) timer = time.NewTimer(cfg.MaxDisconnectTime) fromBlock = cfg.FromBlock highestBackfillBlock = fromBlock - 1 - isSubEnabled = false + state = stateBackfill )And when checking/updating the state:
- if !isSubEnabled { + if state == stateBackfill { continue }- isSubEnabled = true + state = stateSubscriptionThis makes the code more self-documenting and easier to extend with additional states in the future.
Also applies to: 229-252
386-424: Consider adding metrics for subscription status.You have metrics for backfill progress, but no metrics for subscription status. Adding metrics would help with monitoring the health of the subscription.
Consider adding these metrics:
+ // At the top of the file, add constants for metric names + const ( + // ... existing constants + metricSubscriptionRebuildAttempts = "subscription_rebuild_attempts" + metricSubscriptionErrors = "subscription_errors" + ) logger.Info("Subscribed to contract") + // Initialize counters + var ( + rebuildAttempts int + errorCount int + ) for { select { case <-ctx.Done(): logger.Debug("subscription context cancelled, stopping") return case err := <-sub.Err(): if err == nil { continue } logger.Error("subscription error, rebuilding", zap.Error(err)) + errorCount++ + metrics.EmitCounter(fmt.Sprintf("%s_%s", contractAddress, metricSubscriptionErrors), 1) // ... existing code ... + rebuildAttempts++ + metrics.EmitCounter(fmt.Sprintf("%s_%s", contractAddress, metricSubscriptionRebuildAttempts), 1) sub, err = backoff.Retry[ethereum.Subscription]( // ... existing code ... ) if err != nil { logger.Error( "failed to rebuild subscription, closing", zap.Error(err), ) return } + + // Reset error count on successful rebuild + errorCount = 0 } }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
pkg/blockchain/rpcLogStreamer.go(11 hunks)pkg/config/validation.go(4 hunks)
🧰 Additional context used
🪛 GitHub Check: Lint-Go
pkg/config/validation.go
[failure] 244-244:
Error return value of conn.Close is not checked (errcheck)
🪛 GitHub Actions: Lint
pkg/config/validation.go
[error] 244-244: golangci-lint: Error return value of conn.Close is not checked (errcheck)
⏰ Context from checks skipped due to timeout of 90000ms (5)
- GitHub Check: Push Docker Images to GitHub Packages (xmtpd-cli)
- GitHub Check: Test (Node)
- GitHub Check: Push Docker Images to GitHub Packages (xmtpd)
- GitHub Check: Upgrade Tests
- GitHub Check: Code Review
🔇 Additional comments (1)
pkg/blockchain/rpcLogStreamer.go (1)
221-222:Details
❓ Verification inconclusive
Address TODO: Evaluate timer in subscription context.
There's a TODO about whether the timer makes sense while using a subscription with keepalive. This should be resolved before considering the code complete.
WebSocket connections typically have their own keepalive mechanisms. The timer here is used to detect if we've gone too long without receiving any logs, which could indicate a problem.
Let's verify if the WebSocket implementation you're using (via the go-ethereum client) has built-in keepalive:
🏁 Script executed:
#!/bin/bash # Check if go-ethereum's WebSocket implementation has keepalive rg -A 5 "keepalive|KeepAlive|keep-alive" --type goLength of output: 3923
Clarify subscription keepalive and timer usage
We need to confirm whether go-ethereum’s WebSocket subscription implements any ping/pong or read-deadline keepalive. If it doesn’t, this inactivity timer is required to detect stalled connections; otherwise the timer can be removed or repurposed.
• Location to update:
pkg/blockchain/rpcLogStreamer.go:221–222
• Suggested diff:- // TODO: Does the timer makes sense while using a subscription with keepalive? + // inactivity timer: detect stalled WS subscriptions (go-ethereum RPC does not send ping/pong by default) timer.Reset(cfg.MaxDisconnectTime)Please verify the underlying RPC/WebSocket client’s keepalive behavior and then either update the comment accordingly or remove this timer logic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🔭 Outside diff range comments (1)
pkg/blockchain/rpcLogStreamer.go (1)
322-330:⚠️ Potential issueUnsigned under-flow when
lagFromHighestBlock>highestBlock
highestBlockCanProcess := highestBlock - uint64(r.lagFromHighestBlock)will
wrap around to a huge value if the chain is very young (e.g., in tests or
private networks). This leads to an out-of-range filter query.- highestBlockCanProcess := highestBlock - uint64(r.lagFromHighestBlock) + var highestBlockCanProcess uint64 + if uint64(r.lagFromHighestBlock) > highestBlock { + highestBlockCanProcess = 0 + } else { + highestBlockCanProcess = highestBlock - uint64(r.lagFromHighestBlock) + }This guards against overflow and guarantees
highestBlockCanProcess ≤ highestBlock.
♻️ Duplicate comments (1)
pkg/blockchain/rpcLogStreamer.go (1)
404-410:⚠️ Potential issueCompile-time failure – missing type argument for
backoff.Retry
backoff.Retryin v5 is a generic function.
Without specifying the type parameter the compiler cannot inferT, so this will not build.- sub, err = backoff.Retry( + sub, err = backoff.Retry[ethereum.Subscription]( // specify the concrete type r.ctx, rebuildOperation, backoff.WithBackOff(expBackoff), backoff.WithMaxTries(maxSubRetries), backoff.WithMaxElapsedTime(cfg.MaxDisconnectTime), )
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (5)
pkg/blockchain/rpcLogStreamer.go(10 hunks)pkg/blockchain/rpcLogStreamer_test.go(2 hunks)pkg/indexer/app_chain/app_chain.go(2 hunks)pkg/indexer/common/log_handler.go(3 hunks)pkg/indexer/common/log_handler_test.go(5 hunks)
🚧 Files skipped from review as they are similar to previous changes (4)
- pkg/indexer/app_chain/app_chain.go
- pkg/blockchain/rpcLogStreamer_test.go
- pkg/indexer/common/log_handler.go
- pkg/indexer/common/log_handler_test.go
🧰 Additional context used
🧠 Learnings (1)
pkg/blockchain/rpcLogStreamer.go (1)
Learnt from: fbac
PR: xmtp/xmtpd#800
File: pkg/blockchain/rpcLogStreamer.go:0-0
Timestamp: 2025-05-19T09:45:00.105Z
Learning: In the blockchain RpcLogStreamer implementation, backfillChannel and reorgChannel are meant to be internal implementation details that are managed by the streamer itself, not provided by external callers. This is why they are maintained as unexported fields with getter methods for access.
⏰ Context from checks skipped due to timeout of 90000ms (5)
- GitHub Check: Push Docker Images to GitHub Packages (xmtpd-cli)
- GitHub Check: Upgrade Tests
- GitHub Check: Push Docker Images to GitHub Packages (xmtpd)
- GitHub Check: Test (Node)
- GitHub Check: Code Review
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (3)
pkg/blockchain/rpcLogStreamer.go (3)
405-411:⚠️ Potential issueFix the generic type parameter for backoff.Retry
The
backoff.Retryfunction in v5 is a generic function that requires a type parameter. Without it, compilation will fail.Apply this diff to add the generic type parameter:
- sub, err = backoff.Retry( + sub, err = backoff.Retry[ethereum.Subscription]( r.ctx, rebuildOperation, backoff.WithBackOff(expBackoff), backoff.WithMaxTries(maxSubRetries), backoff.WithMaxElapsedTime(cfg.MaxDisconnectTime), )
184-188: 🛠️ Refactor suggestionConsider updating highestBackfillBlock after a reorg
When handling a reorg, you're updating
backfillStartBlockbut nothighestBackfillBlock. This could lead to inconsistent state during backfill.Apply this diff to update
highestBackfillBlockconsistently:backfillStartBlock = reorgBlock +if backfillStartBlock > 0 { + highestBackfillBlock = backfillStartBlock - 1 +} else { + highestBackfillBlock = 0 +}
483-494: 🛠️ Refactor suggestionConsider setting FromBlock in subscription filter query
The subscription filter query doesn't specify a
FromBlock, which could lead to duplicated logs if the WebSocket client is ahead of the backfill process.Consider one of these approaches:
- Move the subscription initialization after backfill is complete
- Set an explicit
FromBlockin the subscription filter:return ethereum.FilterQuery{ + FromBlock: new(big.Int).SetInt64(-1), // Latest block Addresses: addresses, Topics: topics, }
🧹 Nitpick comments (1)
pkg/blockchain/rpcLogStreamer.go (1)
159-169: Consider handling subscription setup failure gracefullyIf the subscription setup fails, the entire watcher stops. Consider a more graceful approach where backfill continues even if subscription fails.
innerSubCh, err := r.buildSubscriptionChannel(cfg, subLogger) if err != nil { - logger.Error("failed to subscribe to contract", zap.Error(err)) - return + logger.Error("failed to subscribe to contract, continuing with backfill only", zap.Error(err)) + // Create an empty channel that will never yield values + innerSubCh = make(chan types.Log) }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
pkg/blockchain/rpcLogStreamer.go(10 hunks)
🧰 Additional context used
🧠 Learnings (1)
pkg/blockchain/rpcLogStreamer.go (2)
Learnt from: fbac
PR: xmtp/xmtpd#828
File: pkg/blockchain/rpcLogStreamer.go:0-0
Timestamp: 2025-05-22T11:22:58.413Z
Learning: The log buffer for RpcLogStreamer will be implemented in a subsequent PR with a minimum size equal to lagFromHighestBlock to prevent potential log loss during transition from backfill to subscription mode.
Learnt from: fbac
PR: xmtp/xmtpd#800
File: pkg/blockchain/rpcLogStreamer.go:0-0
Timestamp: 2025-05-19T09:45:00.105Z
Learning: In the blockchain RpcLogStreamer implementation, backfillChannel and reorgChannel are meant to be internal implementation details that are managed by the streamer itself, not provided by external callers. This is why they are maintained as unexported fields with getter methods for access.
⏰ Context from checks skipped due to timeout of 90000ms (6)
- GitHub Check: Push Docker Images to GitHub Packages (xmtpd-prune)
- GitHub Check: Push Docker Images to GitHub Packages (xmtpd-cli)
- GitHub Check: Test (Node)
- GitHub Check: Push Docker Images to GitHub Packages (xmtpd)
- GitHub Check: Upgrade Tests
- GitHub Check: Code Review
🔇 Additional comments (5)
pkg/blockchain/rpcLogStreamer.go (5)
203-205: Plan for implementing log bufferNoted your TODO for implementing a log buffer in a subsequent PR. This is important to prevent log loss during the transition from backfill to subscription mode.
Using the provided learning: "The log buffer for RpcLogStreamer will be implemented in a subsequent PR with a minimum size equal to lagFromHighestBlock to prevent potential log loss."
217-219: Good fix for CPU spin with closed channelSetting
innerBackfillCh = nilwhen the channel is closed is an excellent approach to prevent CPU spin in the select statement.
359-426: Robust subscription handling with backoffThe implementation of WebSocket subscription with exponential backoff and reconnection logic is well-designed. The error handling is comprehensive, with appropriate logging and recovery mechanisms.
190-215: Robust subscription log handlingThe way you handle subscription logs is robust, especially updating the
backfillEndBlockbased on incoming logs to ensure proper synchronization between backfill and subscription.
235-247: Well-designed transition from backfill to subscriptionThe transition from backfill to subscription mode is well-designed, ensuring that the backfill is complete before enabling subscription, and cancelling the backfill goroutine once it's no longer needed.
Replace HTTP polling with WebSocket subscriptions for blockchain event monitoring in
|
pkg/blockchain/rpcLogStreamer.go
Outdated
|
|
||
| case log, open := <-innerSubCh: | ||
| if !open { | ||
| subLogger.Fatal("subscription channel closed, closing watcher") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does fatal call panic?
This also has to close all channels. Or the better idea might be to call defer on the cancel functions after creation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch as well, switching to logger.Error + return.
| // TODO: Next PR to introduce a log buffer. | ||
| // The buffer has to be of size lagFromHighestBlock, at minimum. | ||
|
|
||
| continue |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@coderabbitai what is the precedence of select if two or more events are available? It seems that in this case, the select statement might keep selecting the continue branch and never select the innerBackfillCh channel
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mkysel You've raised an excellent question about Go's select statement behavior. In Go, when multiple cases in a select statement are ready simultaneously, one is chosen at random with uniform probability. There is no fixed precedence order.
This means that if both innerSubCh and innerBackfillCh have events available at the same time, the Go runtime will randomly choose which case to execute. Your concern is valid - if the subscription channel consistently receives more events than the backfill channel, there's a possibility that the backfill events could be processed more slowly or even starved.
In the current implementation, when isSubEnabled is false during the backfill phase, subscription events are only used to update backfillEndBlock before continuing the loop without further processing. If random selection repeatedly favors the subscription channel case, it could indeed delay processing of the backfill logs.
A potential improvement would be to implement a prioritization mechanism or a buffering strategy to ensure backfill logs are processed in a timely manner regardless of subscription event frequency.
(_/)
( •_•)
/ >📚 thinking about Go's select statement behavior...
✏️ Learnings added
Learnt from: mkysel
PR: xmtp/xmtpd#828
File: pkg/blockchain/rpcLogStreamer.go:206-206
Timestamp: 2025-05-22T12:37:08.598Z
Learning: In Go's select statement, when multiple cases are ready simultaneously, one is chosen at random with uniform probability. There is no fixed precedence order, which can potentially lead to starvation of some channels if other channels consistently have more events.
Note: Learnings are effective only in the context of similar code segments. To apply general coding guidelines, please configure review instructions. You can manage existing learnings in the app.
pkg/blockchain/rpcLogStreamer.go
Outdated
| if log.BlockNumber > highestBackfillBlock { | ||
| highestBackfillBlock = log.BlockNumber | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can the backfill be out of order? Why would log.Blocknumber ever be lower than the one before?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch, it's always ordered, I coded this defensively automatically. Removing it.
| } | ||
|
|
||
| func (r *RpcLogStreamer) watchContract(watcher ContractConfig) { | ||
| func (r *RpcLogStreamer) watchContract(cfg ContractConfig) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that I can break this algorithm.
client.BlockNumber and channel creation are not atomic.
- Lets say that backfillEndBlock gives us block 100
- 10 blocks get inserted concurrently
- We construct backfill at 90, want to go to 100
- Subscription constructor takes a long time to start. It would start at 110, but it doesnt give us that data in a channel yet
- Innerbackfillch gives us 90-100 as designed
- Innerbackfill reaches highestBackfillBlock(100) >= backfillEndBlock(100)
- We stop backfill, enable sub
- Sub finally opens a channel and starts at 110
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the order of operations has to be:
- open the sub, guarantee via a waitgroup that the channel has been opened
- get the current highest block in case there are no new events coming from the sub
- open backfill
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
func (r *RpcLogStreamer) watchContract(cfg ContractConfig) {
var (
logger = r.logger.With(zap.String("contractAddress", cfg.Address.Hex()))
subLogger = logger.Named("subscription").
With(zap.String("contractAddress", cfg.Address.Hex()))
backfillStartBlock = cfg.FromBlock
)
defer close(cfg.eventChannel)
defer close(cfg.reorgChannel)
innerSubCh, err := r.buildSubscriptionChannel(cfg, subLogger)
if err != nil {
logger.Error("failed to subscribe to contract", zap.Error(err))
return
}
// backfillEndBlock is inclusive from backfill
backfillEndBlock, err := r.client.BlockNumber(r.ctx)
if err != nil {
logger.Error("failed to get highest block", zap.Error(err))
return
}
innerBackfillCh, cancelBackfill, err := r.buildBackfillChannel(cfg, backfillStartBlock)
if err != nil {
logger.Error("failed to retrieve historical logs", zap.Error(err))
return
}
logger.Info("Starting watcher")
// 1) fully consume the backfill until we reach the best known state after the sub was created
watchLoop:
for {
select {
case <-r.ctx.Done():
logger.Debug("Context cancelled, stopping watcher")
return
case _, _ = <-cfg.reorgChannel:
//TODO
return
case log, open := <-innerBackfillCh:
if !open {
logger.Fatal("backfill channel closed, closing watcher")
return
}
cfg.eventChannel <- log
// Check if subscription has to be enabled only after processing all logs in the last batch.
// Duplicated logs are not a problem, lost logs are.
if log.BlockNumber >= backfillEndBlock {
logger.Debug(
"Backfill complete, enabling subscription mode",
zap.Uint64("blockNumber", log.BlockNumber),
)
cancelBackfill()
break watchLoop
}
}
}
// 2) now we can safely use the sub
for {
select {
case <-r.ctx.Done():
logger.Debug("Context cancelled, stopping watcher")
return
case _, _ = <-cfg.reorgChannel:
//TODO
return
case log, open := <-innerSubCh:
if !open {
subLogger.Fatal("subscription channel closed, closing watcher")
return
}
// already processed by backfill
if log.BlockNumber <= backfillEndBlock {
subLogger.Debug(
"Log already processed",
zap.Uint64("fromBlock", log.BlockNumber),
)
continue
}
subLogger.Debug(
"Sending log to subscription channel",
zap.Uint64("fromBlock", log.BlockNumber),
)
cfg.eventChannel <- log
}
}
}
^^^ it does not account for reorgs, but is much more readable to me
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Initially I had a similar approach, but discarded it. I don't think it would work, specially during long backfills.
As I see it:
- We set
backfillEndBlockonce, which is the latest block at the time r.client.BlockNumber() is fired. - It would be never updated afterwards, the process backfills from start to end
- In the meanwhile the blockchain will keep producing blocks, so in practice the real
backfillEndBlockhas increased, and we'll miss those logs. There is no way as well to signal the subscription to keep those logs in a buffer.
By the time we switch to the subscription channel it would be advanced and we'd be receiving logs from the initial backfillEndBlock + N blocks ahead.
N can be huge if the backfill process took long enough (and it will), which is possible for new nodes syncing 1 year historical data (126,230,400 blocks at 250ms per block, worst case scenario)
That's the whole reason around moving the backfill and sub channel to the same select, and having backfillEndBlock as a moving target thanks to the logs coming to the sub channel.
This gave me an idea though, which might be better than the current approach and combines both.
Theoretically we could "weight" the channels, like:
backfillLoop:
for {
// Subscription channel updates backfillEndBlock to the latest log seen.
select {
case log := <-subChan:
// Update backfillEndBlock.
default:
// There was no log in subChan, backfill.
}
// Next backfill batch.
select {
case log := <-backfillChan:
// Process logs.
// if log.BlockNumber >= backfillEndBlock
// cancel backfill and break backfillLoop
}
}
// Switch to a subscription-only.
for {
select {
case log := <-subChan:
// Handle log.
}
}
pkg/blockchain/rpcLogStreamer.go
Outdated
| // Check if subscription has to be enabled only after processing all logs in the last batch. | ||
| // Duplicated logs are not a problem, lost logs are. | ||
| if highestBackfillBlock+uint64(r.lagFromHighestBlock) >= backfillEndBlock { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this looks dangerous. we set the lag to 0. But if it is not 0, it seems that the backfill will never catch up with the sub
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're right, and after some testing I believe the problem is deeper.
We currently backfill with the configurable lag, in GetNextPage:
highestBlockCanProcess := highestBlock - uint64(r.lagFromHighestBlock)
if fromBlock > highestBlockCanProcess {
metrics.EmitIndexerCurrentBlockLag(contractAddress, 0)
return []types.Log{}, nil, nil
}
This isn't compatible with a websocket subscription, which will always get the latest canonical and reorged events (field Removed = true|false). It's also not compatible with the app chain, where we always want 0 lag (it's hardcoded)
So I believe we should backfill always with 0 lag for every chain.
Then, when switch over to a sub, after receiving an event in a "lagged" chain we can:
- maintain it in a timelock and send it over the channel after the safe lag has passed (lag_blocks * time_per_block)
- if before that the indexer receives that log was reorged (the same event, with Removed = true), we cancel it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (1)
pkg/blockchain/rpcLogStreamer.go (1)
336-342:⚠️ Potential issueCritical: Missing type parameter for backoff.Retry.
The
backoff.Retrycall is missing the required type parameter for backoff v5. This will cause compilation failure with "cannot infer T" errors.Apply this fix:
- sub, err = backoff.Retry( + sub, err = backoff.Retry[ethereum.Subscription]( r.ctx, rebuildOperation, backoff.WithBackOff(expBackoff), backoff.WithMaxTries(maxSubRetries), backoff.WithMaxElapsedTime(cfg.MaxDisconnectTime), )
🧹 Nitpick comments (1)
pkg/blockchain/rpcLogStreamer.go (1)
197-209: Incomplete unsafe reorg distance implementation.The logic for handling unsafe reorg distance appears incomplete - it logs reaching the threshold but doesn't actually implement the buffer management mentioned in the TODO comments.
Consider implementing a placeholder or clearer documentation about what will happen in this section when the buffer is implemented.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
pkg/blockchain/rpcLogStreamer.go(8 hunks)pkg/indexer/app_chain/app_chain.go(3 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- pkg/indexer/app_chain/app_chain.go
🧰 Additional context used
🧠 Learnings (1)
pkg/blockchain/rpcLogStreamer.go (3)
Learnt from: fbac
PR: xmtp/xmtpd#828
File: pkg/blockchain/rpcLogStreamer.go:0-0
Timestamp: 2025-05-22T11:22:58.453Z
Learning: The log buffer for RpcLogStreamer will be implemented in a subsequent PR with a minimum size equal to lagFromHighestBlock to prevent potential log loss during transition from backfill to subscription mode.
Learnt from: mkysel
PR: xmtp/xmtpd#828
File: pkg/blockchain/rpcLogStreamer.go:206-206
Timestamp: 2025-05-22T12:37:08.622Z
Learning: In Go's select statement, when multiple cases are ready simultaneously, one is chosen at random with uniform probability. There is no fixed precedence order, which can potentially lead to starvation of some channels if other channels consistently have more events.
Learnt from: fbac
PR: xmtp/xmtpd#800
File: pkg/blockchain/rpcLogStreamer.go:0-0
Timestamp: 2025-05-19T09:45:00.127Z
Learning: In the blockchain RpcLogStreamer implementation, backfillChannel and reorgChannel are meant to be internal implementation details that are managed by the streamer itself, not provided by external callers. This is why they are maintained as unexported fields with getter methods for access.
🧬 Code Graph Analysis (1)
pkg/blockchain/rpcLogStreamer.go (2)
pkg/metrics/indexer.go (5)
EmitIndexerMaxBlock(84-87)EmitIndexerCurrentBlockLag(94-97)MeasureGetLogs(104-113)EmitIndexerCurrentBlock(79-82)EmitIndexerNumLogsFound(74-77)pkg/tracing/tracing.go (1)
GoPanicWrap(113-128)
⏰ Context from checks skipped due to timeout of 90000ms (5)
- GitHub Check: Push Docker Images to GitHub Packages (xmtpd-cli)
- GitHub Check: Push Docker Images to GitHub Packages (xmtpd)
- GitHub Check: Test (Node)
- GitHub Check: Upgrade Tests
- GitHub Check: Code Review
🔇 Additional comments (7)
pkg/blockchain/rpcLogStreamer.go (7)
10-10: LGTM! Well-structured imports and constants for WebSocket functionality.The addition of the backoff library and the defined constants provide appropriate defaults for subscription management and error handling.
Also applies to: 21-25
31-31: LGTM! Improved struct design with unified event channel.The field rename from
ContractAddresstoAddresscreates a cleaner API, and the unifiedeventChannelsimplifies event handling by combining backfill and subscription events.Also applies to: 34-34
59-59: LGTM! Consistent updates to option function.The changes properly use the new field names and create appropriately buffered channels for the unified event streaming approach.
Also applies to: 62-63
149-153: Potential log loss during backfill phase.Starting the subscription early (before backfill completes) could result in missing events that arrive between subscription start and backfill completion, as noted in the TODO comment on line 155.
Based on the retrieved learnings, this will be addressed with a log buffer in a subsequent PR. However, ensure that the buffer size will be sufficient to handle the gap between subscription start and backfill completion.
249-252: LGTM! Improved method signature with better context support.The addition of context parameter enables proper cancellation handling, and returning the highestBlock provides valuable information for the calling code. The simplified error handling makes the method more predictable.
Also applies to: 254-257
359-373: LGTM! Clean subscription creation helper.The method provides a simple, focused interface for creating subscriptions with appropriate error handling.
414-425: LGTM! Appropriate separation of subscription and backfill queries.The function correctly creates subscription-specific filter queries without block ranges, which is appropriate for live event streaming.
f9d849a to
4280ebc
Compare
Implement WebSocket subscription support in RpcLogStreamer to enable real-time event indexing
RpcLogStreamerin rpcLogStreamer.go with exponential backoff reconnection logic and implements dual-channel event handling in log_handler.go📍Where to Start
Start with the
RpcLogStreamerimplementation in rpcLogStreamer.go which contains the core WebSocket subscription logic and handles both backfill and subscription channels.Macroscope summarized a3b5dc4.
Summary by CodeRabbit
New Features
Refactor
Bug Fixes
Chores