-
Notifications
You must be signed in to change notification settings - Fork 39
Revamped reorg detection #858
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
How to use the Graphite Merge QueueAdd either label to this PR to merge it via the merge queue:
You must have a Graphite account in order to use the merge queue. Sign up using this link. An organization admin has enabled the Graphite Merge Queue in this repository. Please do not merge from GitHub as this will restart CI on PRs being processed by the merge queue. |
|
""" WalkthroughThis change introduces explicit blockchain reorganization (reorg) detection and handling by verifying block parent hashes during log streaming. It updates the Changes
Sequence Diagram(s)sequenceDiagram
participant AppChain/SettlementChain
participant RpcLogStreamer
participant BlockchainNode
AppChain/SettlementChain->>RpcLogStreamer: Start streaming (provides FromBlockNumber, FromBlockHash)
loop For each page
RpcLogStreamer->>BlockchainNode: Fetch block at fromBlockNumber + 1
BlockchainNode-->>RpcLogStreamer: Returns block (number, hash, parent hash)
RpcLogStreamer->>RpcLogStreamer: Compare parent hash with FromBlockHash
alt Hashes match
RpcLogStreamer->>BlockchainNode: Fetch logs for block
BlockchainNode-->>RpcLogStreamer: Returns logs
RpcLogStreamer-->>AppChain/SettlementChain: Deliver logs, update FromBlockNumber/Hash
else Reorg detected
RpcLogStreamer-->>AppChain/SettlementChain: Return ErrReorg, previous block number/hash
end
end
Possibly related PRs
Suggested reviewers
Warning There were issues while running some tools. Please review the errors and either fix the tool's configuration or disable the tool if it's a critical failure. 🔧 golangci-lint (1.64.8)Error: you are using a configuration file for golangci-lint v2 with golangci-lint v1: please use golangci-lint v2 ✨ Finishing Touches
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
Replace channel-based reorg detection with hash-based tracking in RpcLogStreamer and implement contract-specific reorg handlersThis PR replaces the existing channel-based blockchain reorganization detection system with a hash-based tracking approach. The changes include: • Modified 📍Where to StartStart with the Changes since #858 opened
Macroscope summarized e1eed50. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
🔭 Outside diff range comments (3)
pkg/indexer/common/log_handler.go (1)
35-41: 💡 Verification agent🧩 Analysis chain
Address incomplete reorg handling and ignored return value.
The delegation to
contract.HandleLog(ctx, event)is correct, but there are two concerns:
- The TODO comment suggests this implementation is incomplete for future work
- The return value from
HandleLogis ignored, potentially missing retryable errorsConsider handling the return value appropriately:
if event.Removed { contract.Logger().Debug("detected reorged event", zap.Any("log", event)) - _ = contract.HandleLog(ctx, event) + if err := contract.HandleLog(ctx, event); err != nil { + contract.Logger().Error("error handling reorged event", zap.Error(err)) + if err.ShouldRetry() { + // Consider retry logic for reorg handling + } + } }
🏁 Script executed:
#!/bin/bash # Description: Verify RpcLogStreamer correctly marks reorged events as Removed # Expected: Events should be marked as Removed when reorgs are detected echo "=== Checking where event.Removed is set ===" rg "\.Removed\s*=" --type go -A 3 -B 3 echo "=== Checking reorg detection in RpcLogStreamer ===" rg "reorg" --type go -A 5 -B 5 pkg/blockchain/Length of output: 3157
Implement complete reorg‐removal flow and surface HandleLog errors
Our verification shows that
pkg/blockchain/rpcLogStreamer.gonever setslog.Removed = true, so theif event.Removed { … }branch inpkg/indexer/common/log_handler.gois never exercised. The lingering TODO is therefore still valid, and we also continue to ignore any errors fromHandleLog.• In pkg/blockchain/rpcLogStreamer.go – upon detecting a reorg (the parent‐hash mismatch), iterate over buffered logs from the dropped blocks, mark each with
Removed = true, and send them oncfg.eventChannel.
• In pkg/indexer/common/log_handler.go – replace_ = contract.HandleLog(...)with error handling and optional retry logic.Suggested diffs:
--- a/pkg/blockchain/rpcLogStreamer.go +++ b/pkg/blockchain/rpcLogStreamer.go @@ -XXX,6 +XXX,20 @@ func (r *rpcLogStreamer) streamLogs(cfg ContractConfig) { if len(fromBlockHash) == 32 && !bytes.Equal(fromBlockHash, block.ParentHash().Bytes()) { r.logger.Error("blockchain reorg detected", …) + // Emit removal events for any logs in the buffer for blocks ≥ fromBlockNumber + for _, removedLog := range r.bufferedLogs[ /* blocks ≥ fromBlockNumber */ ] { + removedLog.Removed = true + cfg.eventChannel <- removedLog + } + // Optionally: clear or rebase the buffer return ErrReorg } }--- a/pkg/indexer/common/log_handler.go +++ b/pkg/indexer/common/log_handler.go @@ -35,7 +35,16 @@ func IndexLogs(…){ if event.Removed { contract.Logger().Debug("detected reorged event", zap.Any("log", event)) - _ = contract.HandleLog(ctx, event) + if err := contract.HandleLog(ctx, event); err != nil { + contract.Logger().Error("failed handling reorged event", zap.Error(err)) + if retryable, ok := err.(interface{ ShouldRetry() bool }); ok && retryable.ShouldRetry() { + // TODO: implement retry/backoff strategy + } + } } } }pkg/indexer/settlement_chain/contracts/payer_report_manager_reorg_handler.go (1)
1-26: 🛠️ Refactor suggestionSignificant code duplication across reorg handlers.
This file is nearly identical to
payer_registry_reorg_handler.go, differing only in the struct name. Consider refactoring to reduce duplication.Option 1: Generic reorg handler
+// GenericReorgHandler handles blockchain reorganization events by logging them +type GenericReorgHandler struct { + logger *zap.Logger + name string +} + +func NewGenericReorgHandler(logger *zap.Logger, name string) *GenericReorgHandler { + return &GenericReorgHandler{ + logger: logger.Named("reorg-handler"), + name: name, + } +} + +func (h *GenericReorgHandler) HandleLog(ctx context.Context, event types.Log) re.RetryableError { + h.logger.Info("handling reorged event", + zap.String("handler", h.name), + zap.Any("log", event)) + return nil +}Option 2: Embedded common handler
+type BaseReorgHandler struct { + logger *zap.Logger +} + +func (h *BaseReorgHandler) HandleLog(ctx context.Context, event types.Log) re.RetryableError { + h.logger.Info("handling reorged event", zap.Any("log", event)) + return nil +} + +type PayerReportManagerReorgHandler struct { + BaseReorgHandler +} + +func NewPayerReportManagerReorgHandler(logger *zap.Logger) *PayerReportManagerReorgHandler { + return &PayerReportManagerReorgHandler{ + BaseReorgHandler: BaseReorgHandler{logger: logger.Named("reorg-handler")}, + } +}pkg/indexer/app_chain/contracts/group_message_reorg_handler.go (1)
1-26: 💡 Verification agent🧩 Analysis chain
Verify adequacy of simplified reorg handling approach.
This completes a consistent pattern across all reorg handlers, but the current implementation only logs events without any actual reorg handling logic. For blockchain applications, reorgs typically require:
- Reverting database changes from reorged blocks
- Reprocessing canonical chain events
- Updating state consistency
Please verify this logging-only approach is sufficient for the system's reorg handling requirements:
If more comprehensive reorg handling is needed, I can help design the logic for state reversion and canonical chain reprocessing.
🏁 Script executed:
#!/bin/bash # Check if there's additional reorg handling logic elsewhere rg -A 10 -B 5 "reorg.*detect|detect.*reorg|HandleLog.*reorg" --type go # Look for database rollback or state reversion logic rg -A 10 "rollback|revert.*block|undo.*transaction" --type goLength of output: 3217
Implement comprehensive reorg handling instead of no-op logging
The current pattern simply logs and returns nil in every
IReorgHandler(e.g.pkg/indexer/common/log_handler.gohas a TODO, andpkg/indexer/app_chain/contracts/group_message_reorg_handler.goonly logs the removed event). We found no database rollback or state‐reversion logic:• pkg/indexer/common/log_handler.go
– TODO: “Handle reorged event in future PR”
– Only callscontract.HandleLogwhenevent.Removed == true• pkg/indexer/app_chain/contracts/group_message_reorg_handler.go
–HandleLogjust logs and returnsnilNo occurrences of
rollback,revert.*block, orundo.*transactionwere found anywhere in the repo.Reorgs typically require:
- Rolling back or deleting database entries created by now-orphaned blocks
- Re-indexing or replaying events along the new canonical chain
- Ensuring state consistency (e.g., updating highest processed block height)
Please expand each handler to perform these steps (or delegate to shared utilities), remove the “TODO,” and add tests to verify reorg workflows.
🧹 Nitpick comments (3)
pkg/indexer/common/log_handler.go (1)
14-17: Update comment to reflect current implementation status.The comments accurately describe the simplified behavior, but consider updating to clarify that reorg detection is now handled upstream by the RpcLogStreamer.
/* -- IndexLogs will run until the eventChannel is closed, passing each event to the logStorer. -- If an event fails to be stored, and the error is retryable, it will sleep for 100ms and try again. -- The only non-retriable errors should be things like malformed events or failed validations. +- IndexLogs processes events from the eventChannel until it's closed. +- Reorg detection is handled upstream by RpcLogStreamer; removed events are delegated to HandleLog. +- Retryable storage errors result in 100ms sleep before retry. +- Non-retryable errors include malformed events or validation failures. */pkg/indexer/settlement_chain/contracts/payer_registry_reorg_handler.go (1)
11-13: Consider adding documentation for the struct.The
PayerRegistryReorgHandlerstruct would benefit from documentation explaining its purpose in the new reorg detection system.+// PayerRegistryReorgHandler handles blockchain reorganization events for payer registry contracts +// by logging the events. This is part of the simplified reorg handling system. type PayerRegistryReorgHandler struct { logger *zap.Logger }pkg/indexer/app_chain/contracts/group_message_reorg_handler.go (1)
23-24: Consider structured logging for better observability.The current logging could be enhanced with more structured information for better debugging and monitoring of reorg events.
- h.logger.Info("handling reorged event", zap.Any("log", event)) + h.logger.Info("handling reorged event", + zap.Uint64("blockNumber", event.BlockNumber), + zap.String("blockHash", event.BlockHash.Hex()), + zap.String("txHash", event.TxHash.Hex()), + zap.Uint("logIndex", event.Index), + zap.String("address", event.Address.Hex()), + )
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (21)
pkg/blockchain/rpcLogStreamer.go(7 hunks)pkg/blockchain/rpcLogStreamer_test.go(2 hunks)pkg/indexer/app_chain/app_chain.go(4 hunks)pkg/indexer/app_chain/contracts/group_message.go(1 hunks)pkg/indexer/app_chain/contracts/group_message_reorg_handler.go(1 hunks)pkg/indexer/app_chain/contracts/group_message_storer.go(0 hunks)pkg/indexer/app_chain/contracts/identity_update.go(1 hunks)pkg/indexer/app_chain/contracts/identity_update_reorg_handler.go(1 hunks)pkg/indexer/app_chain/contracts/identity_update_storer.go(0 hunks)pkg/indexer/common/interface.go(1 hunks)pkg/indexer/common/log_handler.go(2 hunks)pkg/indexer/common/log_handler_test.go(0 hunks)pkg/indexer/common/reorg_handler.go(0 hunks)pkg/indexer/common/reorg_handler_test.go(0 hunks)pkg/indexer/settlement_chain/contracts/payer_registry.go(1 hunks)pkg/indexer/settlement_chain/contracts/payer_registry_reorg_handler.go(1 hunks)pkg/indexer/settlement_chain/contracts/payer_report_manager.go(1 hunks)pkg/indexer/settlement_chain/contracts/payer_report_manager_reorg_handler.go(1 hunks)pkg/indexer/settlement_chain/settlement_chain.go(4 hunks)pkg/mocks/common/mock_IContract.go(1 hunks)pkg/mocks/common/mock_IReorgHandler.go(2 hunks)
💤 Files with no reviewable changes (5)
- pkg/indexer/app_chain/contracts/group_message_storer.go
- pkg/indexer/app_chain/contracts/identity_update_storer.go
- pkg/indexer/common/log_handler_test.go
- pkg/indexer/common/reorg_handler_test.go
- pkg/indexer/common/reorg_handler.go
🧰 Additional context used
🧠 Learnings (2)
pkg/indexer/app_chain/contracts/group_message.go (1)
Learnt from: fbac
PR: xmtp/xmtpd#800
File: pkg/indexer/app_chain/contracts/group_message.go:0-0
Timestamp: 2025-05-19T09:46:53.942Z
Learning: The GroupMessageBroadcaster implementation in pkg/indexer/app_chain/contracts/group_message.go uses common.Address type for contract addresses which are already validated by pkg/config/validation.go, making additional address validation unnecessary.
pkg/indexer/app_chain/app_chain.go (1)
Learnt from: fbac
PR: xmtp/xmtpd#800
File: pkg/indexer/app_chain/app_chain.go:174-180
Timestamp: 2025-05-19T09:19:31.673Z
Learning: In the AppChain implementation, the reorg channels returned by methods like GroupMessageBroadcasterReorgChannel() need to remain bidirectional (chan uint64) because they're passed to the IndexLogs function which writes to them when detecting blockchain reorganizations.
🧬 Code Graph Analysis (12)
pkg/indexer/app_chain/contracts/identity_update.go (1)
pkg/indexer/app_chain/contracts/identity_update_reorg_handler.go (1)
NewIdentityUpdateReorgHandler(15-17)
pkg/indexer/settlement_chain/contracts/payer_registry.go (1)
pkg/indexer/settlement_chain/contracts/payer_registry_reorg_handler.go (1)
NewPayerRegistryReorgHandler(15-17)
pkg/indexer/settlement_chain/contracts/payer_report_manager.go (1)
pkg/indexer/settlement_chain/contracts/payer_report_manager_reorg_handler.go (1)
NewPayerReportManagerReorgHandler(15-17)
pkg/indexer/common/interface.go (1)
pkg/errors/errors.go (1)
RetryableError(5-8)
pkg/indexer/app_chain/contracts/group_message_reorg_handler.go (1)
pkg/errors/errors.go (1)
RetryableError(5-8)
pkg/indexer/app_chain/contracts/identity_update_reorg_handler.go (1)
pkg/errors/errors.go (1)
RetryableError(5-8)
pkg/indexer/app_chain/contracts/group_message.go (1)
pkg/indexer/app_chain/contracts/group_message_reorg_handler.go (1)
NewGroupMessageReorgHandler(15-17)
pkg/indexer/settlement_chain/contracts/payer_report_manager_reorg_handler.go (1)
pkg/errors/errors.go (1)
RetryableError(5-8)
pkg/indexer/settlement_chain/contracts/payer_registry_reorg_handler.go (1)
pkg/errors/errors.go (1)
RetryableError(5-8)
pkg/blockchain/rpcLogStreamer.go (1)
pkg/metrics/indexer.go (5)
EmitIndexerMaxBlock(84-87)EmitIndexerCurrentBlockLag(94-97)MeasureGetLogs(104-113)EmitIndexerCurrentBlock(79-82)EmitIndexerNumLogsFound(74-77)
pkg/mocks/common/mock_IContract.go (1)
pkg/errors/errors.go (1)
RetryableError(5-8)
pkg/mocks/common/mock_IReorgHandler.go (1)
pkg/errors/errors.go (1)
RetryableError(5-8)
⏰ Context from checks skipped due to timeout of 90000ms (5)
- GitHub Check: Code Review
- GitHub Check: Push Docker Images to GitHub Packages (xmtpd-cli)
- GitHub Check: Test (Node)
- GitHub Check: Push Docker Images to GitHub Packages (xmtpd)
- GitHub Check: Upgrade Tests
🔇 Additional comments (15)
pkg/blockchain/rpcLogStreamer_test.go (2)
43-49: Test correctly updated to match new API.The field rename from
FromBlocktoFromBlockNumberproperly reflects the updated struct definition.
59-63: GetNextPage call properly updated for reorg detection.The method call correctly includes the additional block hash parameter (
nilfor initial call) and handles the extra return values, aligning with the new reorg detection mechanism.pkg/indexer/settlement_chain/contracts/payer_report_manager.go (1)
72-72: Reorg handler correctly simplified to contract-specific implementation.The change from generic
NewChainReorgHandlertoNewPayerReportManagerReorgHandlerreduces dependencies and aligns with the new logger-based reorg handling approach.pkg/indexer/settlement_chain/contracts/payer_registry.go (1)
74-74: Consistent refactoring to contract-specific reorg handler.The change follows the same pattern as other contracts, properly implementing the simplified reorg handler approach.
pkg/indexer/app_chain/contracts/group_message.go (1)
67-67: App chain reorg handler properly updated.The change maintains consistency with the settlement chain contracts, correctly implementing the new contract-specific reorg handler pattern.
pkg/indexer/app_chain/contracts/identity_update.go (1)
72-72:Details
✅ Verification successful
Verify consistent reorg handler migration across all contracts.
The simplification from
c.NewChainReorgHandler(ctx, client, querier)toNewIdentityUpdateReorgHandler(logger)aligns with the newHandleLoginterface approach. However, ensure all contract reorg handlers have been migrated consistently.
🏁 Script executed:
#!/bin/bash # Description: Verify all contracts use the new reorg handler pattern consistently # Expected: All NewXXXReorgHandler calls should only take logger parameter echo "=== Searching for old reorg handler pattern ===" rg "NewChainReorgHandler" --type go echo "=== Searching for new reorg handler pattern ===" rg "New.*ReorgHandler\(" --type go -A 2 echo "=== Checking HandleLog implementations ===" ast-grep --pattern 'func ($_, $_) HandleLog($_, $_) $_ { $$$ }'Length of output: 3150
All reorg handlers migrated consistently
The switch to
NewXReorgHandler(logger)has been applied uniformly across all contracts (identity_update, group_message, payer_report_manager, payer_registry), and no references to the oldNewChainReorgHandlerremain.pkg/indexer/settlement_chain/settlement_chain.go (2)
98-99: LGTM! ContractConfig properly separates block number and hash.The update to use separate
FromBlockNumberandFromBlockHashfields instead of a singleFromBlockfield aligns with the enhanced reorg detection approach.Also applies to: 108-109
71-71:Details
❓ Verification inconclusive
Verify GetLatestBlock() returns both block number and hash correctly.
The destructuring assignment correctly captures both values from
GetLatestBlock(). However, ensure this method signature change is implemented consistently across all block trackers.Also applies to: 88-88
🏁 Script executed:
#!/bin/bash # Description: Verify GetLatestBlock implementations return (uint64, []byte) # Expected: All implementations should return both block number and hash echo "=== Checking GetLatestBlock method signatures ===" ast-grep --pattern 'func ($_, $_) GetLatestBlock() ($_, $_) { $$$ }' echo "=== Checking GetLatestBlock interface definition ===" rg "GetLatestBlock\(\)" --type go -A 2 -B 2Length of output: 8307
Verify
NewRpcLogStreamersignature and downstream usage
- ✅ Confirmed:
GetLatestBlock() (uint64, []byte)is implemented consistently across
–pkg/indexer/common/interface.go
–pkg/indexer/common/block_tracker.go
– all mocks and tests- 🔍 Next steps:
• Check that theNewRpcLogStreamerconstructor has been updated to accept bothfromBlockNumber uint64andfromBlockHash []byte, and that its signature matches the calls in:
pkg/indexer/settlement_chain/settlement_chain.gopkg/indexer/app_chain/app_chain.go
• Ensure there are no remaining references to reorg/consensus channels and that no downstream code relies on themSuggested quick check:
rg "func NewRpcLogStreamer" --type go -A5pkg/indexer/settlement_chain/contracts/payer_registry_reorg_handler.go (1)
19-25:Details
❓ Verification inconclusive
Unused context parameter and minimal implementation.
The
HandleLogmethod has an unusedctxparameter and currently only logs events without performing actual reorg handling logic. Consider if this implementation is sufficient for the new reorg detection system.Please verify this simplified approach is intentional and complete:
🏁 Script executed:
#!/bin/bash # Search for other HandleLog implementations to verify this pattern is consistent ast-grep --pattern $'func ($$$) HandleLog( ctx context.Context, event types.Log, ) re.RetryableError { $$$ }'Length of output: 2885
Consistent no-op stub across all ReorgHandlers—confirm intended design
We’ve confirmed that every
HandleLogimplementation in your codebase (payer_report_manager, payer_registry, identity_update, group_message) currently only logs the event and returnsnil, with an unusedctxparameter:• pkg/indexer/settlement_chain/contracts/payer_report_manager_reorg_handler.go
• pkg/indexer/settlement_chain/contracts/payer_registry_reorg_handler.go
• pkg/indexer/app_chain/contracts/identity_update_reorg_handler.go
• pkg/indexer/app_chain/contracts/group_message_reorg_handler.goIf this is meant to be a temporary placeholder for future reorg‐specific logic, please document that intent (e.g., add a TODO) and/or remove the unused
ctxparameter to avoid confusion. Otherwise, implement the actual reorg handling here.pkg/indexer/app_chain/app_chain.go (1)
79-80: LGTM! Clean integration with the new reorg detection approach.The changes correctly retrieve and pass both block number and hash from the contract's latest block to the RpcLogStreamer configuration, enabling hash-based reorg detection.
Also applies to: 97-98, 107-108, 117-118
pkg/blockchain/rpcLogStreamer.go (1)
265-341: Excellent implementation of hash-based reorg detection!The refactored
GetNextPagemethod properly implements blockchain reorg detection by:
- Verifying parent hash continuity between blocks
- Correctly handling the genesis block edge case
- Returning appropriate recovery information on reorg detection
- Maintaining backward compatibility with metrics emission
The implementation is robust and aligns well with the overall reorg detection strategy.
pkg/mocks/common/mock_IContract.go (1)
79-183: Mock updates correctly reflect the interface changes.The auto-generated mock properly implements the new
GetLatestBlock()andHandleLog()methods, aligning with the refactored reorg detection approach.pkg/mocks/common/mock_IReorgHandler.go (3)
5-12: LGTM: Imports updated for new interface signature.The import additions correctly support the new
HandleLogmethod signature, includingcontext,retryable_errors, and Ethereumtypespackages.
47-74: LGTM: Mock helper methods correctly updated.The expecter, call structures, and helper methods (
Run,Return,RunAndReturn) are all properly updated to match the newHandleLogmethod signature. The implementation follows standard mockery patterns.
27-45:Details
✅ Verification successful
Verify the interface change aligns with the reorg detection refactoring.
The method signature change from
FindReorgPoint(detectedAt uint64) (uint64, []byte, error)toHandleLog(ctx context.Context, event types.Log) retryable_errors.RetryableErrorrepresents a fundamental shift in reorg detection strategy. The mock implementation correctly handles the new signature and return type.Please verify that this interface change is consistently implemented across all reorg handlers in the codebase:
🏁 Script executed:
#!/bin/bash # Description: Verify all IReorgHandler implementations use the new HandleLog method # Search for IReorgHandler interface definition echo "=== IReorgHandler interface definition ===" ast-grep --pattern 'type IReorgHandler interface { $$$ }' # Search for HandleLog implementations echo "=== HandleLog method implementations ===" rg -A 5 'func.*HandleLog.*context\.Context.*types\.Log.*retryable_errors\.RetryableError' # Search for any remaining FindReorgPoint references echo "=== Checking for any remaining FindReorgPoint references ===" rg 'FindReorgPoint'Length of output: 2634
Interface change verified and consistent across codebase
- Verified
IReorgHandlerinterface inpkg/indexer/common/interface.godefinesHandleLog(ctx context.Context, event types.Log) retryable_errors.RetryableError.- Confirmed all mock implementations (
pkg/mocks/common/mock_IReorgHandler.go,pkg/mocks/common/mock_IContract.go) use the newHandleLogsignature.- No remaining
FindReorgPointreferences found.All reorg handlers align with the refactoring.
pkg/blockchain/rpcLogStreamer.go
Outdated
| cfg ContractConfig, | ||
| fromBlockNumber uint64, | ||
| fromBlockHash []byte, | ||
| ) (logs []types.Log, nextBlockNumber *uint64, nextBlockHash []byte, highestBlock uint64, err error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is one yucky return tuple!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree, I'm thinking on how to group it in understandable types.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
a return struct should be ok return (BlockReturnType, error). Just with a better name :D
| metrics.EmitIndexerMaxBlock(contractAddress, highestBlock) | ||
|
|
||
| if fromBlock > highestBlock { | ||
| if fromBlockNumber > highestBlock { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this check makes sense, but how can it happen?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Edited: Actually, this can happen due to a misconfiguration.
| metrics.EmitIndexerNumLogsFound(contractAddress, len(logs)) | ||
|
|
||
| nextBlockNumber := toBlock + 1 | ||
| nextBlock, err := r.client.BlockByNumber(ctx, big.NewInt(int64(toBlock+1))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we call the chain here to ask it for block+1?
| contractAddress := cfg.Address.Hex() | ||
|
|
||
| if fromBlockNumber > 0 { | ||
| block, err := r.client.BlockByNumber(ctx, big.NewInt(int64(fromBlockNumber+1))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure how all the code connects. We return "current-block+1" as the next block down below. And here we also want to start at +1. I can't quite convince myself that there isn't some off-by-one situation here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll include some clarifying comments, otherwise this knowledge may be lost in time.
The whole thing is:
-
We only know about the current block and its hash, that's what we have stored in the past.
The first iteration we get it from the block tracker, for the following ones we have to get it from the chain (last step in GetNextPage). -
For each iteration, we have to check the next block's parent hash against our locally stored hash, they have to be the same. Otherwise there has been a reorg at some point and the hash has been recomputed.
-
If everything goes well, we get the next block (num and hash) after the one we last processed in the batch, so we know where to start in the next iteration.
50ea3f9 to
c676f39
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
♻️ Duplicate comments (1)
pkg/blockchain/rpcLogStreamer.go (1)
255-255: Address the complex return tuple.The return tuple has become quite complex with 5 return values. This echoes the previous reviewer's concern about the "yucky return tuple."
Consider creating a structured return type to improve readability:
+type BackfillResult struct { + Logs []types.Log + NextBlockNumber *uint64 + NextBlockHash []byte + HighestBlock uint64 + Err error +} -func (r *RpcLogStreamer) GetNextPage( - ctx context.Context, - cfg ContractConfig, - fromBlockNumber uint64, - fromBlockHash []byte, -) (logs []types.Log, nextBlockNumber *uint64, nextBlockHash []byte, highestBlock uint64, err error) { +func (r *RpcLogStreamer) GetNextPage( + ctx context.Context, + cfg ContractConfig, + fromBlockNumber uint64, + fromBlockHash []byte, +) BackfillResult {
🧹 Nitpick comments (1)
pkg/blockchain/rpcLogStreamer.go (1)
273-281: Consider improving reorg recovery logic.When a reorg is detected, the code fetches
fromBlockNumber-1to get the previous block's information. While this provides a rollback point, consider if this is the optimal reorg recovery strategy.For a more robust reorg recovery, consider implementing a binary search to find the common ancestor block rather than just stepping back one block. This would be more efficient for handling deeper reorgs.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (8)
pkg/blockchain/rpcLogStreamer.go(7 hunks)pkg/blockchain/rpcLogStreamer_test.go(2 hunks)pkg/config/validation.go(1 hunks)pkg/indexer/app_chain/contracts/group_message_reorg_handler.go(1 hunks)pkg/indexer/app_chain/contracts/identity_update_reorg_handler.go(1 hunks)pkg/indexer/common/log_handler.go(2 hunks)pkg/indexer/settlement_chain/contracts/payer_registry_reorg_handler.go(1 hunks)pkg/indexer/settlement_chain/contracts/payer_report_manager_reorg_handler.go(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (6)
- pkg/indexer/settlement_chain/contracts/payer_registry_reorg_handler.go
- pkg/indexer/common/log_handler.go
- pkg/indexer/app_chain/contracts/identity_update_reorg_handler.go
- pkg/indexer/app_chain/contracts/group_message_reorg_handler.go
- pkg/indexer/settlement_chain/contracts/payer_report_manager_reorg_handler.go
- pkg/blockchain/rpcLogStreamer_test.go
🧰 Additional context used
🧠 Learnings (1)
pkg/blockchain/rpcLogStreamer.go (1)
Learnt from: fbac
PR: xmtp/xmtpd#800
File: pkg/blockchain/rpcLogStreamer.go:0-0
Timestamp: 2025-05-19T09:45:00.127Z
Learning: In the blockchain RpcLogStreamer implementation, backfillChannel and reorgChannel are meant to be internal implementation details that are managed by the streamer itself, not provided by external callers. This is why they are maintained as unexported fields with getter methods for access.
⏰ Context from checks skipped due to timeout of 90000ms (3)
- GitHub Check: Code Review
- GitHub Check: Test (Node)
- GitHub Check: Upgrade Tests
🔇 Additional comments (6)
pkg/blockchain/rpcLogStreamer.go (5)
4-7: LGTM: Appropriate imports for new functionality.The new imports support the reorg detection features:
bytesfor hash comparison,encoding/hexfor logging, anderrorsfor the new ErrReorg.
30-30: LGTM: Well-defined error type for reorg detection.The ErrReorg error provides a clear way to signal blockchain reorganizations, enabling proper error handling upstream.
35-36: LGTM: Enhanced ContractConfig for reorg detection.The separation of
FromBlockNumberandFromBlockHashenables proper tracking of blockchain state for reorg detection.
317-325: Verify the next block fetching logic.The logic fetches
toBlock+1to get the next block's information. This appears correct astoBlockrepresents the last processed block, sotoBlock+1is indeed the next block to process.
258-283:Details
✅ Verification successful
Verify the reorg detection logic.
The reorg detection logic appears sound but needs verification of edge cases:
- Block number boundary: The check
fromBlockNumber > 0prevents reorg detection on genesis block, which is correct.- Hash validation: The 32-byte hash length check is appropriate for Ethereum block hashes.
- Parent hash comparison: Correctly compares the stored hash with the parent hash of the next block.
However, there's a potential edge case when
fromBlockNumberis 0 andfromBlockHashis provided.
🏁 Script executed:
#!/bin/bash # Description: Check if there are any callers that pass a hash when fromBlockNumber is 0 # Expected: No occurrences of passing hash with block number 0 echo "Searching for GetNextPage calls with potential zero block number and hash..." rg -A 10 -B 5 "GetNextPage.*0.*\[\]byte" --type go echo "Searching for ContractConfig initialization with zero block number and hash..." ast-grep --pattern 'ContractConfig{ $$$ FromBlockNumber: 0, FromBlockHash: $hash, $$$ }'Length of output: 481
No callers pass a hash when
fromBlockNumberis 0; reorg logic is safeI verified via ripgrep and ast-grep that there are no invocations of
GetNextPageorContractConfiginitializations supplying a non-nilFromBlockHashalongsideFromBlockNumber: 0. Therefore, the edge case you flagged cannot occur with the current callers, and no changes are needed.pkg/config/validation.go (1)
314-316: LGTM: Defensive programming improvement.The nil check before closing the WebSocket connection prevents potential nil pointer dereference errors. This is a good defensive programming practice, especially since
dialer.Dial()can return a nil connection in certain error scenarios.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
♻️ Duplicate comments (2)
pkg/blockchain/rpcLogStreamer.go (2)
307-311: Consider removing the problematic validation check.This validation check
fromBlockNumber > highestBlockmay be problematic in edge cases where the chain tip moves between the initialBlockNumber()call and when this check executes, potentially causing false positives.This relates to a previous reviewer's question about how this condition can occur. Consider either removing this check or making it more robust to handle chain tip movement.
340-352: LGTM: Next block fetching for continued processing.The logic correctly fetches the next block (
toBlock+1) and returns its number and hash for the next iteration, maintaining the chain of trust for reorg detection.This addresses previous concerns about off-by-one errors by clearly fetching the block after the last processed block.
🧹 Nitpick comments (1)
pkg/blockchain/rpcLogStreamer.go (1)
24-34: Consider making constants configurable and verify error handling.The renamed constants improve clarity, but the hardcoded values may not be suitable for all deployment environments.
Consider making
sleepTimeOnErrorandsleepTimeNoLogsconfigurable via options, as different chains may have different optimal retry intervals.The new error variables are well-defined and will improve error handling throughout the codebase.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
pkg/blockchain/rpcLogStreamer.go(8 hunks)pkg/blockchain/rpcLogStreamer_test.go(2 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- pkg/blockchain/rpcLogStreamer_test.go
🧰 Additional context used
🧠 Learnings (1)
pkg/blockchain/rpcLogStreamer.go (1)
Learnt from: fbac
PR: xmtp/xmtpd#800
File: pkg/blockchain/rpcLogStreamer.go:0-0
Timestamp: 2025-05-19T09:45:00.127Z
Learning: In the blockchain RpcLogStreamer implementation, backfillChannel and reorgChannel are meant to be internal implementation details that are managed by the streamer itself, not provided by external callers. This is why they are maintained as unexported fields with getter methods for access.
⏰ Context from checks skipped due to timeout of 90000ms (5)
- GitHub Check: Test (Node)
- GitHub Check: Push Docker Images to GitHub Packages (xmtpd-cli)
- GitHub Check: Push Docker Images to GitHub Packages (xmtpd)
- GitHub Check: Upgrade Tests
- GitHub Check: Code Review
🔇 Additional comments (12)
pkg/blockchain/rpcLogStreamer.go (12)
4-6: LGTM: Import additions support the new functionality.The new imports (
bytes,encoding/hex,errors) are appropriately added to support the reorg detection logic with hash comparisons and error definitions.
78-79: LGTM: ContractConfig initialization updated correctly.The struct initialization properly uses the new
FromBlockNumberandFromBlockHashfields, maintaining consistency with the struct definition changes.
161-162: LGTM: Backfill state variables properly initialized.The initialization of
backfillFromBlockNumberandbackfillFromBlockHashfrom the contract configuration correctly sets up the starting state for the backfill process.
181-181: LGTM: GetNextPage call updated to new signature.The function call correctly passes both block number and hash parameters as required by the new signature.
183-205: Excellent error handling improvement with structured approach.The switch-based error handling is much cleaner than the previous implementation and properly handles the three distinct cases: end of backfill, reorg detection, and general errors.
The specific handling for each error type ensures appropriate logging and flow control. The reorg case correctly logs the rollback information for debugging.
207-210: LGTM: State updates properly maintain block progression.The conditional update of both
backfillFromBlockNumberandbackfillFromBlockHashensures consistent state progression whenNextBlockNumberis available.
254-258: Excellent structured response design.The
GetNextPageResponsestruct provides a clean interface that encapsulates all necessary return values, making the function signature much more maintainable than returning multiple values.This addresses the previous reviewer's concern about "yucky return tuples" in a clean and type-safe manner.
260-265: LGTM: Function signature clearly expresses reorg detection requirements.The updated signature explicitly requires both block number and hash, making the reorg detection capability clear to callers.
268-271: LGTM: Early error handling for chain connectivity.Fetching the highest block early and handling errors immediately prevents unnecessary processing when the chain is unreachable.
273-303: Excellent reorg detection logic with proper edge case handling.The reorg detection implementation is well-designed:
- Correctly skips reorg checking at block 0 (genesis)
- Validates hash length before comparison
- Uses parent hash verification which is the standard approach
- Properly handles the reorg case by rolling back one block
- Provides detailed logging for debugging
The logic fetches
fromBlockNumber+1to verify its parent hash matches the stored hash offromBlockNumber, which is the correct approach for detecting chain reorganizations.
332-338: LGTM: End of backfill detection and handling.The logic correctly identifies when backfill is complete (
toBlock+1 > highestBlock) and returns the appropriate error with empty next block information.
405-405: LGTM: Constant rename improves clarity.The rename from the previous constant to
maxSubReconnectionRetriesmakes the purpose more explicit and readable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (1)
pkg/blockchain/rpcLogStreamer.go (1)
273-306: Solid reorg detection implementation with good edge case handling.The reorg detection logic is well-implemented:
- Correctly skips genesis block (no parent to verify)
- Proper hash comparison using
bytes.Equal- Smart rollback strategy by moving to
fromBlockNumber-1- Good logging for debugging reorg events
However, consider adding the clarifying comment mentioned in past reviews about the off-by-one logic:
+ // Fetch the next block (fromBlockNumber+1) to verify its parent hash + // matches our stored hash of fromBlockNumber, detecting any reorgs block, err := r.client.BlockByNumber(ctx, big.NewInt(int64(fromBlockNumber+1)))
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
pkg/blockchain/rpcLogStreamer.go(8 hunks)
🧰 Additional context used
🧠 Learnings (1)
pkg/blockchain/rpcLogStreamer.go (3)
Learnt from: fbac
PR: xmtp/xmtpd#800
File: pkg/blockchain/rpcLogStreamer.go:0-0
Timestamp: 2025-05-19T09:45:00.127Z
Learning: In the blockchain RpcLogStreamer implementation, backfillChannel and reorgChannel are meant to be internal implementation details that are managed by the streamer itself, not provided by external callers. This is why they are maintained as unexported fields with getter methods for access.
Learnt from: fbac
PR: xmtp/xmtpd#858
File: pkg/blockchain/rpcLogStreamer.go:39-40
Timestamp: 2025-06-03T11:01:03.276Z
Learning: In pkg/stress/watcher.go, the FromBlock field usage is part of ethereum.FilterQuery struct, not the ContractConfig struct, so it's not affected by changes to ContractConfig field names.
Learnt from: fbac
PR: xmtp/xmtpd#858
File: pkg/blockchain/rpcLogStreamer.go:39-40
Timestamp: 2025-06-03T11:01:03.276Z
Learning: In pkg/stress/watcher.go, the FromBlock field usage is part of ethereum.FilterQuery struct, not the ContractConfig struct, so it's not affected by changes to ContractConfig field names.
⏰ Context from checks skipped due to timeout of 90000ms (5)
- GitHub Check: Push Docker Images to GitHub Packages (xmtpd-cli)
- GitHub Check: Push Docker Images to GitHub Packages (xmtpd)
- GitHub Check: Upgrade Tests
- GitHub Check: Test (Node)
- GitHub Check: Code Review
🔇 Additional comments (10)
pkg/blockchain/rpcLogStreamer.go (10)
4-7: LGTM! New imports support reorg detection functionality.The added imports are appropriate for the new reorg detection features:
bytesfor hash comparison operationsencoding/hexfor readable hash loggingerrorsfor defining specific error types
23-34: LGTM! Constants and error variables well-defined.The constant renames improve clarity, and the new error variables provide specific error types for different backfill conditions. The sleep times and retry limits are reasonable for blockchain operations.
39-40: LGTM! Struct changes enable reorg detection.The transition from
FromBlocktoFromBlockNumberandFromBlockHashis a necessary breaking change that enables explicit reorg detection by tracking both block numbers and their corresponding hashes.
254-258: Excellent improvement! Addresses previous feedback on return tuple.This structured response replaces the "yucky return tuple" mentioned in past reviews. The design is clean:
- Pointer for
NextBlockNumberelegantly handles end-of-backfill (nil)NextBlockHashenables continued reorg detection- Much more readable than multiple return values
181-227: Excellent simplification of backfill loop logic.The refactored error handling is much cleaner:
- Specific error types make the flow clear
- Reorg detection is now explicit and handled by
GetNextPage- Proper progression of
backfillFromBlockNumberandbackfillFromBlockHash- Clean separation between backfill completion and subscription mode
337-357: Correct end-of-backfill detection and next block preparation.The logic properly handles the transition from backfill to real-time streaming:
- Accurate detection when caught up to chain head (
toBlock+1 > highestBlock)- Appropriate nil returns for end-of-backfill case
- Proper next block fetching for continued processing
268-271: Comprehensive error handling throughout the method.All blockchain client calls have proper error handling, and the use of specific error types (
ErrReorg,ErrEndOfBackfill) makes the calling code much cleaner and more explicit about different failure modes.
308-335: Good metrics instrumentation for observability.The metrics provide comprehensive visibility into the backfill process:
- Chain height tracking
- Lag measurement
- Performance timing of
FilterLogscalls- Progress tracking with current block and log counts
310-316: Good edge case handling with clear error messages.The validation correctly handles the case where the starting block is beyond the current chain head, providing a clear error message for debugging.
260-358: Excellent architectural improvement for reorg detection.This refactor successfully moves reorg detection into the streaming layer with several key benefits:
- Explicit hash-based reorg detection - More reliable than previous approaches
- Simplified calling code - Structured responses eliminate complex return tuples
- Better error semantics - Specific error types make handling clearer
- Comprehensive edge case handling - Genesis block, chain head, hash validation
- Good observability - Metrics and logging for debugging reorgs
The implementation follows blockchain best practices and addresses previous reviewer feedback about return value complexity.
### Add ILogStreamer interface abstraction to decouple AppChain and SettlementChain from concrete RpcLogStreamer implementation Creates a new `ILogStreamer` interface in [pkg/indexer/common/interface.go](https://github.com/xmtp/xmtpd/pull/862/files#diff-5cfd089dbe44085371c83acdae4a3de5a7ae9dbda09816f29adf96eafd6f4e34) and moves `RpcLogStreamer` from the blockchain package to a dedicated [pkg/indexer/rpc_streamer](https://github.com/xmtp/xmtpd/pull/862/files#diff-fd861225751f668ffeaae06058a690386c363ad41c0f309dc4778b967455a3e2) package. Updates `AppChain` and `SettlementChain` structs to depend on the `ILogStreamer` interface rather than the concrete `RpcLogStreamer` type, and modifies the `GetEventChannel` method to return a receive-only channel `<-chan types.Log`. #### 📍Where to Start Start with the new `ILogStreamer` interface definition in [pkg/indexer/common/interface.go](https://github.com/xmtp/xmtpd/pull/862/files#diff-5cfd089dbe44085371c83acdae4a3de5a7ae9dbda09816f29adf96eafd6f4e34) to understand the abstraction, then review how it's implemented in [pkg/indexer/rpc_streamer/rpc_log_streamer.go](https://github.com/xmtp/xmtpd/pull/862/files#diff-10e2ad066301e371b4055118a8d04f20723857f2c6158609bf3c33b5237e5b4d). ---- _[Macroscope](https://app.macroscope.com) summarized 7aba090._ <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **Refactor** - Improved modularity and abstraction in blockchain event streaming components. - Updated internal interfaces for event streamers to enhance flexibility and maintainability. - **Tests** - Updated and reorganized test files to align with refactored components. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
Summary by CodeRabbit
Summary by CodeRabbit
New Features
Bug Fixes
Refactor
Chores