-
Notifications
You must be signed in to change notification settings - Fork 39
poc: new indexer #777
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
poc: new indexer #777
Conversation
How to use the Graphite Merge QueueAdd either label to this PR to merge it via the merge queue:
You must have a Graphite account in order to use the merge queue. Sign up using this link. An organization admin has enabled the Graphite Merge Queue in this repository. Please do not merge from GitHub as this will restart CI on PRs being processed by the merge queue. |
WalkthroughThis set of changes introduces a modular, multi-chain blockchain event indexer framework with robust contract abstractions, network management, concurrent task processing, and storage interfaces. The implementation includes detailed contract event handling, reorg detection and recovery, in-memory and persistent storage, and comprehensive configuration refactoring to support both application and settlement chains. Extensive documentation and new SQL queries for envelope expiry management are also provided. Changes
Sequence Diagram(s)sequenceDiagram
participant User
participant Indexer
participant Network
participant Contract
participant Storage
User->>Indexer: Initialize Indexer with config
Indexer->>Network: Configure networks (AppChain, SettlementChain)
User->>Indexer: Add contracts (generic or specialized)
Indexer->>Contract: Register contract(s)
User->>Indexer: Run()
Indexer->>Network: Start polling latest block
Indexer->>Contract: For each contract, create task
loop For each task
Indexer->>Network: Get latest block number
Indexer->>Network: Get logs for contract (batch)
Network->>Contract: Return logs
Contract->>Storage: ProcessLogs(ctx, logs)
alt Reorg detected
Contract->>Storage: HandleReorg(ctx, reorgedBlock)
Storage->>Indexer: DeleteFromBlock(...)
end
Storage->>Indexer: SaveState(...)
end
User->>Indexer: Close()
Indexer->>Network: Stop polling
Possibly related PRs
Warning There were issues while running some tools. Please review the errors and either fix the tool's configuration or disable the tool if it's a critical failure. 🔧 golangci-lint (1.64.8)Error: you are using a configuration file for golangci-lint v2 with golangci-lint v1: please use golangci-lint v2 Tip ⚡️ Faster reviews with caching
Enjoy the performance boost—your workflow just got faster. ✨ Finishing Touches
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
|
|
||
| // If the stored state is for a higher block, revert it | ||
| if state.BlockNumber >= blockNumber { | ||
| state.BlockNumber = blockNumber - 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the DeleteFromBlock method, subtracting 1 from blockNumber could result in an underflow when blockNumber is 0, leading to the state being set to a large value. Consider checking if blockNumber is greater than 0 before subtracting 1, to ensure that the arithmetic remains within expected bounds.
- state.BlockNumber = blockNumber - 1
+ if blockNumber > 0 {
+ state.BlockNumber = blockNumber - 1
+ } else {
+ // Avoid underflow: if blockNumber is 0, set BlockNumber to 0
+ state.BlockNumber = blockNumber
+ }🚀 Reply to ask Macroscope to explain or update this suggestion.
👍 Helpful? React to give us feedback.
| err error | ||
| } | ||
|
|
||
| func (e UnrecoverableLogStorageError) Error() string { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The Error() method on UnrecoverableLogStorageError doesn't check if e.err is nil before calling e.err.Error(), which might lead to a nil pointer dereference at runtime. Consider adding a nil check so that if e.err is nil, an appropriate default string is returned instead.
func (e UnrecoverableLogStorageError) Error() string {
if e.err == nil {
return ""
}
return e.err.Error()
}🚀 Reply to ask Macroscope to explain or update this suggestion.
👍 Helpful? React to give us feedback.
| } | ||
|
|
||
| if state.ChainID != src.GetChainID() { | ||
| return nil, fmt.Errorf("chain ID mismatch for %s: stored %d, current %d", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the getOrCreateTask function, the chain ID mismatch check uses state.ChainID instead of task.state.ChainID. Since state remains nil when db.GetState returns a sql.ErrNoRows error (with task.state being initialized separately), this could lead to a nil pointer dereference. Consider using task.state for a more reliable check.
- if state.ChainID != src.GetChainID() {
+ if task.state.ChainID != src.GetChainID() {🚀 Reply to ask Macroscope to explain or update this suggestion.
👍 Helpful? React to give us feedback.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 13
♻️ Duplicate comments (1)
pkg/indexerpoc/contract/group_message_broadcaster/groupMessage.go (1)
46-50: This will panic ifabiis nil – see constructor comment above
Duplicate of the constructor issue.
🧹 Nitpick comments (18)
pkg/indexerpoc/network.go (1)
13-13: Consider naming consistency for acronyms.In Go, the convention for acronyms is to use all uppercase. Consider renaming
RpcURLtoRPCURLto follow standard Go style conventions.- RpcURL string // RPC endpoint URL + RPCURL string // RPC endpoint URLRemember to update the field reference in the constructor function as well.
pkg/indexerpoc/filter.go (1)
1-9: Document the expected input format for hex strings.The function lacks documentation about the expected format of the input strings. Add comments explaining whether addresses and topics should include the "0x" prefix and any other format requirements.
// Filter contains filtering options for blockchain data retrieval type Filter struct { Addresses []common.Address Topics [][]common.Hash } -// NewFilter creates a filter for specific addresses and topics +// NewFilter creates a filter for specific addresses and topics +// Input addresses should be hex strings (with or without "0x" prefix) +// Input topics should be 32-byte hex strings (with or without "0x" prefix) +// Returns an error if any address or topic is invalidpkg/indexerpoc/storage_mock.go (2)
90-114: Implement properRollbackfunctionality forMemoryTransaction.The current
Rollbackmethod is a no-op, which may not be clear to users expecting real transaction semantics. Consider either:
- Implementing a real rollback mechanism
- Improving documentation to clearly indicate that rollback doesn't actually do anything
// MemoryTransaction implements a simple in-memory transaction +// Note: This is a simplified implementation with limited transaction semantics. +// Rollback operations don't actually revert changes made during the transaction. type MemoryTransaction struct { storage *MemoryStorage committed bool + // Consider adding a map to track changes during the transaction + // changes map[string]*taskState } // Begin starts a new transaction func (s *MemoryStorage) Begin(ctx context.Context) (Transaction, error) { return &MemoryTransaction{ storage: s, committed: false, + // changes: make(map[string]*taskState), }, nil } // Commit commits the transaction func (t *MemoryTransaction) Commit() error { t.committed = true return nil } // Rollback rolls back the transaction func (t *MemoryTransaction) Rollback() error { - // Nothing to do for this simple implementation + // In a real implementation, this would restore the original state + // For example: iterate through changes map and restore original values + // Since this is a mock implementation, we simply mark the transaction as not committed + t.committed = false return nil }
9-21: Consider adding a size limit to prevent unbounded memory usage.The
MemoryStoragehas no mechanism to limit its size, which could lead to unbounded memory growth in long-running applications. Consider adding configuration options to limit the number of states stored.// MemoryStorage provides an in-memory implementation of the Storage interface // Useful for testing or simple applications type MemoryStorage struct { mu sync.RWMutex states map[string]*taskState // Key is contractName:network + // Optional configuration for memory limits + maxEntries int // Maximum number of states to store, 0 means unlimited } // NewMemoryStorage creates a new in-memory storage -func NewMemoryStorage() *MemoryStorage { +func NewMemoryStorage(maxEntries int) *MemoryStorage { return &MemoryStorage{ states: make(map[string]*taskState), + maxEntries: maxEntries, } }Then, update the
SaveStatemethod to enforce this limit:// SaveState saves the indexing state for a contract func (s *MemoryStorage) SaveState(ctx context.Context, state *taskState) error { s.mu.Lock() defer s.mu.Unlock() + // Check if we need to enforce size limits + if s.maxEntries > 0 && len(s.states) >= s.maxEntries { + // Simple approach: block new entries when full + key := makeKey(state.ContractName, state.NetworkName) + if _, exists := s.states[key]; !exists { + return fmt.Errorf("memory storage capacity reached (%d entries)", s.maxEntries) + } + } // Make a copy to prevent external modifications stateCopy := *state key := makeKey(state.ContractName, state.NetworkName) s.states[key] = &stateCopy return nil }pkg/indexerpoc/README.md (4)
32-34: Replace placeholder API keys with obviously fake placeholders.Using
YOUR_API_KEYas a placeholder might lead users to accidentally commit real API keys when substituting. Consider using clearly fake values with a format comment.- "https://mainnet.infura.io/v3/YOUR_API_KEY", + "https://mainnet.infura.io/v3/00000000000000000000000000000000", // Replace with your actual Infura API key
151-152: Replace placeholder API keys with obviously fake placeholders.Using
YOUR_API_KEYas a placeholder might lead users to accidentally commit real API keys when substituting. Consider using clearly fake values with a format comment.- "https://mainnet.infura.io/v3/YOUR_API_KEY", + "https://mainnet.infura.io/v3/00000000000000000000000000000000", // Replace with your actual Infura API key
171-172: Replace placeholder database connection string.Using
your_db_connection_string_hereas a placeholder might lead users to accidentally commit real connection strings. Consider using a clearly fake value with a format comment.- db, err := sql.Open("postgres", "your_db_connection_string_here") + db, err := sql.Open("postgres", "postgres://user:password@localhost:5432/dbname?sslmode=disable") // Replace with your actual connection string
187-189: Replace placeholder contract address with a properly formatted example.The example contract address should follow the standard Ethereum address format for clarity. Consider using a properly formatted example address.
- "0x1234567890abcdef1234567890abcdef12345678", // Contract address + "0x1234567890123456789012345678901234567890", // Replace with the actual contract addresspkg/indexerpoc/contract/group_message_broadcaster/error.go (1)
3-6: Expose root cause withUnwrap()so callers can leverageerrors.Is/AsYour custom error types wrap an underlying
error, but without anUnwrap()method the standard library helpers (errors.Is,errors.As, etc.) cannot traverse the chain. A tiny addition makes the types play nicely with idiomatic Go error-handling.type UnrecoverableLogStorageError struct { err error } +// Unwrap allows errors.Is / errors.As to inspect the wrapped error. +func (e UnrecoverableLogStorageError) Unwrap() error { return e.err } + ... type RetryableLogStorageError struct { err error } +func (e RetryableLogStorageError) Unwrap() error { return e.err }pkg/indexerpoc/contract/group_message_broadcaster/group_message.go (2)
94-102: Fail-fast aborts the entire batch on the first bad logThe current loop stops processing once the first
StoreLogcall returns an error. This means a single malformed / transiently-failing log blocks every subsequent log in the same batch.Consider continuing after retryable errors (or collecting all errors) so that good logs are still persisted:
for _, log := range logs { - err := c.storer.StoreLog(ctx, log) - if err != nil { - return err - } + if err := c.storer.StoreLog(ctx, log); err != nil { + // Retryable? Mark & continue, else abort. + if lsErr, ok := err.(LogStorageError); ok && lsErr.ShouldRetry() { + return err // bubble up so the task can retry the whole batch + } + c.storer.logger.Warn("skipping unrecoverable log", + zap.Uint64("blockNumber", log.BlockNumber), + zap.Error(err)) + // continue to next log + } }This keeps healthy logs flowing while still allowing the task to retry when appropriate.
106-109: Use structured logging instead offmt.Printf
fmt.Printfwrites to stdout with no context or log level. Prefer the existing Zap logger to keep logs uniform and machine-parseable.- fmt.Printf("Handling reorg for group messages from block %d\n", fromBlock) + c.storer.logger.Info("handling reorg for group messages", + zap.Uint64("fromBlock", fromBlock))pkg/indexerpoc/indexer.go (1)
138-158: Potential goroutine leak iftask.run()blocks forever
Runwaits on theWaitGroupafter spawning tasks, but there is no mechanism to cancel the context (e.g., on SIGINT) or to stop tasks gracefully. If one task hangs indefinitely,Runwill block permanently.Consider accepting a
context.Contextinrun()and propagating cancellation, or include a shutdown method on theIndexer.pkg/indexerpoc/contract/group_message_broadcaster/groupMessage.go (2)
20-23: Hard-coded originator ID & constant naming
GROUP_MESSAGE_ORIGINATOR_IDis hard-coded to0and the all-caps style violates Go naming guidelines for non-exported constants. If multiple contracts are indexed concurrently, collisions are likely. Consider:
- Renaming to
groupMessageOriginatorID.- Passing the ID in via config so each contract gets a unique value.
119-137: Block timestamp preferable totime.Now()
OriginatorNsis derived from the local wall-clock. Using the block’s timestamp (event.Time/BlockTime) would provide deterministic ordering across indexers and avoid drift between machines.- OriginatorNs: time.Now().UnixNano(), + OriginatorNs: int64(eventTimeNs), // derive from block headerWhere
eventTimeNsis obtained from the header’sTimefield (seconds) converted to ns.pkg/indexerpoc/source.go (1)
140-145:cacheHitsreset should start at 0, not 1
Immediately after refreshing the cache you setcacheHits = 1, meaning the very nextGetLatestBlockNumbercall counts as the second hit. Resetting to0is clearer.- s.cacheHits = 1 + s.cacheHits = 0pkg/indexerpoc/types.go (1)
33-41: Minor doc & wording tweaks
Comment “Source defines the interface a blockchain.” is missing a word; consider “…for a blockchain source.” Small polish but helps readability.pkg/indexerpoc/task.go (2)
408-446: Switch on specific error values can be simplified with a taggedswitch
golangci-lint(staticcheck QF1002) flagged this area. Tagging theswitchdirectly onerrimproves readability and avoids repeated comparisons:- err := t.nextState() - switch { - case err == nil: + switch err := t.nextState(); { + case err == nil:(or
switch err := t.nextState(); err { … }with explicit cases).Not critical, but adopting the idiomatic form will silence the linter and keep the loop concise.
🧰 Tools
🪛 GitHub Check: Lint-Go
[failure] 409-409:
QF1002: could use tagged switch on err (staticcheck)🪛 GitHub Actions: Lint
[error] 409-409: golangci-lint staticcheck: could use tagged switch on err (QF1002)
328-355: Edge case: possible worker starvation whenconcurrency > totalBlocks
blocksPerWorkerbecomes 0 whentotalBlocks < uint64(t.concurrency), then you bump it to 1 (line 341-343).
That’s okay, but it means the early workers will “walk past” the endBlock and be skipped, while later workers still get launched and immediately skip. The overhead is tiny, yet we can avoid spawning goroutines that do nothing:for i := 0; i < t.concurrency && workerStart <= endBlock; i++ {Purely optional micro-optimisation.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (12)
pkg/indexerpoc/README.md(1 hunks)pkg/indexerpoc/contract/contract.go(1 hunks)pkg/indexerpoc/contract/group_message_broadcaster/error.go(1 hunks)pkg/indexerpoc/contract/group_message_broadcaster/groupMessage.go(1 hunks)pkg/indexerpoc/contract/group_message_broadcaster/group_message.go(1 hunks)pkg/indexerpoc/filter.go(1 hunks)pkg/indexerpoc/indexer.go(1 hunks)pkg/indexerpoc/network.go(1 hunks)pkg/indexerpoc/source.go(1 hunks)pkg/indexerpoc/storage_mock.go(1 hunks)pkg/indexerpoc/task.go(1 hunks)pkg/indexerpoc/types.go(1 hunks)
🧰 Additional context used
🧬 Code Graph Analysis (5)
pkg/indexerpoc/storage_mock.go (1)
pkg/indexerpoc/types.go (1)
Transaction(57-60)
pkg/indexerpoc/contract/group_message_broadcaster/group_message.go (4)
pkg/indexerpoc/contract/contract.go (3)
Contract(16-24)ReorgProcessor(13-13)LogProcessor(10-10)pkg/indexerpoc/types.go (3)
Contract(23-31)ReorgProcessor(20-20)LogProcessor(17-17)pkg/indexerpoc/contract/group_message_broadcaster/groupMessage.go (2)
GroupMessageStorer(25-29)NewGroupMessageStorer(31-39)pkg/db/queries/db.go (1)
Queries(23-25)
pkg/indexerpoc/contract/group_message_broadcaster/groupMessage.go (6)
pkg/abi/groupmessagebroadcaster/GroupMessageBroadcaster.go (1)
GroupMessageBroadcaster(64-68)pkg/indexerpoc/contract/group_message_broadcaster/error.go (3)
LogStorageError(3-6)NewUnrecoverableLogStorageError(20-22)NewRetryableLogStorageError(36-38)pkg/topic/topic.go (2)
NewTopic(43-48)TOPIC_KIND_GROUP_MESSAGES_V1(11-11)pkg/envelopes/client.go (1)
NewClientEnvelopeFromBytes(38-44)pkg/db/queries/envelopes.sql.go (1)
InsertGatewayEnvelopeParams(97-105)pkg/db/queries/indexer.sql.go (1)
InsertBlockchainMessageParams(93-99)
pkg/indexerpoc/types.go (3)
pkg/indexerpoc/filter.go (1)
Filter(6-9)pkg/indexer/blockTracker.go (1)
Block(28-31)pkg/merkle/hash.go (1)
Hash(18-22)
pkg/indexerpoc/contract/contract.go (1)
pkg/indexerpoc/types.go (2)
ReorgProcessor(20-20)Contract(23-31)
🪛 GitHub Check: Lint-Go
pkg/indexerpoc/task.go
[failure] 409-409:
QF1002: could use tagged switch on err (staticcheck)
🪛 GitHub Actions: Lint
pkg/indexerpoc/task.go
[error] 409-409: golangci-lint staticcheck: could use tagged switch on err (QF1002)
⏰ Context from checks skipped due to timeout of 90000ms (5)
- GitHub Check: Code Review
- GitHub Check: Push Docker Images to GitHub Packages (xmtpd-cli)
- GitHub Check: Upgrade Tests
- GitHub Check: Test (Node)
- GitHub Check: Push Docker Images to GitHub Packages (xmtpd)
🔇 Additional comments (2)
pkg/indexerpoc/contract/group_message_broadcaster/groupMessage.go (1)
95-103:GatewayTime/Expiryomitted – check DB NOT NULL constraints
InsertGatewayEnvelopeParams(see envelopes.sql.go) containsGatewayTimeandExpiry. You’re supplying neither, relying on the zero valuenil. If the underlying SQL column isNOT NULLor has a default that should be set explicitly (e.g.,NOW()), this will fail at runtime and surface here as a retryable error, causing an infinite retry loop.
Verify the schema and populate the fields as needed.pkg/indexerpoc/source.go (1)
88-96: ValidatePollIntervalto avoid zero-duration ticker panic
A zero or negativePollIntervalwill maketime.NewTickerpanic. Add validation inNewGethSource.if network.PollInterval <= 0 { return nil, fmt.Errorf("invalid poll interval: %s", network.PollInterval) }
| func NewFilter(addressStrings []string, topicStrings [][]string) *Filter { | ||
| addresses := make([]common.Address, 0, len(addressStrings)) | ||
| for _, addr := range addressStrings { | ||
| addresses = append(addresses, common.HexToAddress(addr)) | ||
| } | ||
|
|
||
| topics := make([][]common.Hash, 0, len(topicStrings)) | ||
| for _, topicList := range topicStrings { | ||
| topicHashes := make([]common.Hash, 0, len(topicList)) | ||
| for _, topic := range topicList { | ||
| topicHashes = append(topicHashes, common.HexToHash(topic)) | ||
| } | ||
| topics = append(topics, topicHashes) | ||
| } | ||
|
|
||
| return &Filter{ | ||
| Addresses: addresses, | ||
| Topics: topics, | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Add validation for hex string inputs.
The function NewFilter doesn't validate that the input strings are valid hex addresses or hashes. If malformed hex is provided, it might cause unexpected behavior without any error indication.
func NewFilter(addressStrings []string, topicStrings [][]string) *Filter {
addresses := make([]common.Address, 0, len(addressStrings))
for _, addr := range addressStrings {
+ // Ensure address has 0x prefix
+ if !strings.HasPrefix(addr, "0x") {
+ addr = "0x" + addr
+ }
+ // Validate address format
+ if !common.IsHexAddress(addr) {
+ return nil, fmt.Errorf("invalid address format: %s", addr)
+ }
addresses = append(addresses, common.HexToAddress(addr))
}
topics := make([][]common.Hash, 0, len(topicStrings))
for _, topicList := range topicStrings {
topicHashes := make([]common.Hash, 0, len(topicList))
for _, topic := range topicList {
+ // Ensure topic has 0x prefix
+ if !strings.HasPrefix(topic, "0x") {
+ topic = "0x" + topic
+ }
+ // Validate topic format (32 bytes)
+ if len(topic) != 66 { // "0x" + 64 hex chars
+ return nil, fmt.Errorf("invalid topic format: %s", topic)
+ }
topicHashes = append(topicHashes, common.HexToHash(topic))
}
topics = append(topics, topicHashes)
}
- return &Filter{
+ return &Filter{
Addresses: addresses,
Topics: topics,
- }
+ }, nil
}Don't forget to update the function signature to return an error:
-func NewFilter(addressStrings []string, topicStrings [][]string) *Filter {
+func NewFilter(addressStrings []string, topicStrings [][]string) (*Filter, error) {This change will also require updating all function callers.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| func NewFilter(addressStrings []string, topicStrings [][]string) *Filter { | |
| addresses := make([]common.Address, 0, len(addressStrings)) | |
| for _, addr := range addressStrings { | |
| addresses = append(addresses, common.HexToAddress(addr)) | |
| } | |
| topics := make([][]common.Hash, 0, len(topicStrings)) | |
| for _, topicList := range topicStrings { | |
| topicHashes := make([]common.Hash, 0, len(topicList)) | |
| for _, topic := range topicList { | |
| topicHashes = append(topicHashes, common.HexToHash(topic)) | |
| } | |
| topics = append(topics, topicHashes) | |
| } | |
| return &Filter{ | |
| Addresses: addresses, | |
| Topics: topics, | |
| } | |
| } | |
| func NewFilter(addressStrings []string, topicStrings [][]string) (*Filter, error) { | |
| addresses := make([]common.Address, 0, len(addressStrings)) | |
| for _, addr := range addressStrings { | |
| // Ensure address has 0x prefix | |
| if !strings.HasPrefix(addr, "0x") { | |
| addr = "0x" + addr | |
| } | |
| // Validate address format | |
| if !common.IsHexAddress(addr) { | |
| return nil, fmt.Errorf("invalid address format: %s", addr) | |
| } | |
| addresses = append(addresses, common.HexToAddress(addr)) | |
| } | |
| topics := make([][]common.Hash, 0, len(topicStrings)) | |
| for _, topicList := range topicStrings { | |
| topicHashes := make([]common.Hash, 0, len(topicList)) | |
| for _, topic := range topicList { | |
| // Ensure topic has 0x prefix | |
| if !strings.HasPrefix(topic, "0x") { | |
| topic = "0x" + topic | |
| } | |
| // Validate topic format (32 bytes) | |
| if len(topic) != 66 { // "0x" + 64 hex chars | |
| return nil, fmt.Errorf("invalid topic format: %s", topic) | |
| } | |
| topicHashes = append(topicHashes, common.HexToHash(topic)) | |
| } | |
| topics = append(topics, topicHashes) | |
| } | |
| return &Filter{ | |
| Addresses: addresses, | |
| Topics: topics, | |
| }, nil | |
| } |
| // DeleteFromBlock deletes all data for a contract from the specified block number onwards | ||
| func (s *MemoryStorage) DeleteFromBlock( | ||
| ctx context.Context, | ||
| contractName string, | ||
| network string, | ||
| blockNumber uint64, | ||
| ) error { | ||
| s.mu.Lock() | ||
| defer s.mu.Unlock() | ||
|
|
||
| key := makeKey(contractName, network) | ||
| state, ok := s.states[key] | ||
| if !ok { | ||
| return nil // Nothing to delete | ||
| } | ||
|
|
||
| // If the stored state is for a higher block, revert it | ||
| if state.BlockNumber >= blockNumber { | ||
| state.BlockNumber = blockNumber - 1 | ||
| // BlockHash would be updated separately | ||
| } | ||
|
|
||
| return nil | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Clarify the DeleteFromBlock method implementation.
The DeleteFromBlock method doesn't actually delete data; it simply updates the block number. This behavior may be confusing to users who expect all data to be completely removed. Consider either:
- Renaming the method to better reflect what it actually does
- Updating the implementation to truly delete data
- Improving documentation to explain the actual behavior
// DeleteFromBlock deletes all data for a contract from the specified block number onwards
+// This method doesn't actually delete data but rather resets the block number
+// to indicate that blocks from the specified number onwards need to be reprocessed
func (s *MemoryStorage) DeleteFromBlock(
ctx context.Context,
contractName string,
network string,
blockNumber uint64,
) error {
s.mu.Lock()
defer s.mu.Unlock()
key := makeKey(contractName, network)
state, ok := s.states[key]
if !ok {
return nil // Nothing to delete
}
// If the stored state is for a higher block, revert it
if state.BlockNumber >= blockNumber {
state.BlockNumber = blockNumber - 1
// BlockHash would be updated separately
+ state.BlockHash = common.Hash{} // Reset block hash when rolling back
}
return nil
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| // DeleteFromBlock deletes all data for a contract from the specified block number onwards | |
| func (s *MemoryStorage) DeleteFromBlock( | |
| ctx context.Context, | |
| contractName string, | |
| network string, | |
| blockNumber uint64, | |
| ) error { | |
| s.mu.Lock() | |
| defer s.mu.Unlock() | |
| key := makeKey(contractName, network) | |
| state, ok := s.states[key] | |
| if !ok { | |
| return nil // Nothing to delete | |
| } | |
| // If the stored state is for a higher block, revert it | |
| if state.BlockNumber >= blockNumber { | |
| state.BlockNumber = blockNumber - 1 | |
| // BlockHash would be updated separately | |
| } | |
| return nil | |
| } | |
| // DeleteFromBlock deletes all data for a contract from the specified block number onwards | |
| // This method doesn't actually delete data but rather resets the block number | |
| // to indicate that blocks from the specified number onwards need to be reprocessed | |
| func (s *MemoryStorage) DeleteFromBlock( | |
| ctx context.Context, | |
| contractName string, | |
| network string, | |
| blockNumber uint64, | |
| ) error { | |
| s.mu.Lock() | |
| defer s.mu.Unlock() | |
| key := makeKey(contractName, network) | |
| state, ok := s.states[key] | |
| if !ok { | |
| return nil // Nothing to delete | |
| } | |
| // If the stored state is for a higher block, revert it | |
| if state.BlockNumber >= blockNumber { | |
| state.BlockNumber = blockNumber - 1 | |
| // BlockHash would be updated separately | |
| state.BlockHash = common.Hash{} // Reset block hash when rolling back | |
| } | |
| return nil | |
| } |
| log, _ := zap.NewProduction() | ||
| defer log.Sync() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Add error handling for the logger initialization.
The example creates a new logger without checking for potential initialization errors. Add error handling for proper demonstration.
- log, _ := zap.NewProduction()
+ log, err := zap.NewProduction()
+ if err != nil {
+ fmt.Printf("Failed to initialize logger: %v\n", err)
+ return
+ }
defer log.Sync()Don't forget to add fmt to the imports.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| log, _ := zap.NewProduction() | |
| defer log.Sync() | |
| log, err := zap.NewProduction() | |
| if err != nil { | |
| fmt.Printf("Failed to initialize logger: %v\n", err) | |
| return | |
| } | |
| defer log.Sync() |
| defer db.Close() | ||
|
|
||
| // Create querier | ||
| querier := queries.New(db) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Add error handling for the database querier initialization.
The example creates a new querier without checking for potential initialization errors. Add error handling for proper demonstration.
- // Create querier
- querier := queries.New(db)
+ // Create and validate querier
+ querier := queries.New(db)
+
+ // Verify database connection works
+ if err := db.PingContext(ctx); err != nil {
+ log.Error("Failed to connect to database", zap.Error(err))
+ return
+ }📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| querier := queries.New(db) | |
| // Create and validate querier | |
| querier := queries.New(db) | |
| // Verify database connection works | |
| if err := db.PingContext(ctx); err != nil { | |
| log.Error("Failed to connect to database", zap.Error(err)) | |
| return | |
| } |
| func NewGroupMessageStorer( | ||
| queries *queries.Queries, | ||
| logger *zap.Logger, | ||
| ) *GroupMessageStorer { | ||
| return &GroupMessageStorer{ | ||
| queries: queries, | ||
| logger: logger.Named("GroupMessageStorer"), | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Constructor never assigns ABI pointer → nil-pointer dereference ahead
NewGroupMessageStorer builds the storer without populating the abi field.
StoreLog() immediately calls s.abi.ParseMessageSent, which will panic at runtime because s.abi == nil. Pass the generated contract binding into the constructor (or instantiate it inside) and store it.
-func NewGroupMessageStorer(
- queries *queries.Queries,
- logger *zap.Logger,
-) *GroupMessageStorer {
+func NewGroupMessageStorer(
+ abi *gm.GroupMessageBroadcaster,
+ queries *queries.Queries,
+ logger *zap.Logger,
+) *GroupMessageStorer {
return &GroupMessageStorer{
+ abi: abi,
queries: queries,
logger: logger.Named("GroupMessageStorer"),
}
}Call-sites that create the storer must now supply the ABI pointer.
This avoids a hard crash and makes the dependency explicit.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| func NewGroupMessageStorer( | |
| queries *queries.Queries, | |
| logger *zap.Logger, | |
| ) *GroupMessageStorer { | |
| return &GroupMessageStorer{ | |
| queries: queries, | |
| logger: logger.Named("GroupMessageStorer"), | |
| } | |
| } | |
| func NewGroupMessageStorer( | |
| abi *gm.GroupMessageBroadcaster, | |
| queries *queries.Queries, | |
| logger *zap.Logger, | |
| ) *GroupMessageStorer { | |
| return &GroupMessageStorer{ | |
| abi: abi, | |
| queries: queries, | |
| logger: logger.Named("GroupMessageStorer"), | |
| } | |
| } |
pkg/indexerpoc/source.go
Outdated
| // Start a background goroutine to keep track of the latest block | ||
| go source.pollLatestBlock(context.Background()) | ||
|
|
||
| return source, nil | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Background poller cannot be stopped – leakable goroutine
pollLatestBlock is launched with context.Background(), so it runs forever even after the source is no longer needed, preventing clean shutdowns and tests from terminating.
Provide a cancellable context (e.g., return a Stop() func or expose Close() on GethSource) and honour it in the ticker loop.
- go source.pollLatestBlock(context.Background())
+ ctx, cancel := context.WithCancel(context.Background())
+source.stop = cancel // add field: stop func()
+go source.pollLatestBlock(ctx)Committable suggestion skipped: line range outside the PR's diff.
| // Load existing state or create new state. | ||
| // TODO: This would be replaced with a block/task tracker. | ||
| state, err := db.GetState(ctx, contract.GetName(), fmt.Sprintf("%d", contract.GetChainID())) | ||
| if err != nil { | ||
| if errors.Is(err, sql.ErrNoRows) { | ||
| // If no state exists, start from contract-defined block. | ||
| // If state exist we must force the starting block to prevent missing any data. | ||
| initialBlockNum := contract.GetStartBlock() | ||
|
|
||
| if initialBlockNum > 0 { | ||
| initialBlockNum-- | ||
| } | ||
|
|
||
| // Get hash for initial block if not starting from genesis. | ||
| var initialHash common.Hash | ||
| if initialBlockNum > 0 { | ||
| initialHash, err = src.GetBlockHash(ctx, initialBlockNum) | ||
| if err != nil { | ||
| return nil, fmt.Errorf("getting hash for initial block on chain %d: %w", | ||
| src.GetChainID(), err) | ||
| } | ||
| } | ||
|
|
||
| task.state = &taskState{ | ||
| ContractName: contract.GetName(), | ||
| NetworkName: src.GetNetworkName(), | ||
| ChainID: src.GetChainID(), | ||
| BlockNumber: initialBlockNum, | ||
| BlockHash: initialHash, | ||
| } | ||
| } else { | ||
| return nil, fmt.Errorf("getting state for %s on chain %d: %w", | ||
| contract.GetName(), src.GetChainID(), err) | ||
| } | ||
| } else { | ||
| task.state = state | ||
| } | ||
|
|
||
| if state.ChainID != src.GetChainID() { | ||
| return nil, fmt.Errorf("chain ID mismatch for %s: stored %d, current %d", | ||
| contract.GetName(), state.ChainID, src.GetChainID()) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Potential nil-pointer dereference after fresh state bootstrap
When db.GetState returns sql.ErrNoRows, you create t.state inside the first if block (lines 113-119).
However, the outer-scoped variable state (from state, err := db.GetState(...)) remains nil, yet it is referenced later at line 128:
if state.ChainID != src.GetChainID() {If we entered the “no rows” path, state is nil, so this comparison panics.
- if state.ChainID != src.GetChainID() {
+ if t.state.ChainID != src.GetChainID() {or, if you prefer to keep a local variable, assign it:
state = t.stateBe sure to add a unit test that boots an empty storage and calls getOrCreateTask; it should not panic.
| defer func() { | ||
| if err := tx.Rollback(); err != nil { | ||
| slog.ErrorContext(ctx, "error rolling back transaction", | ||
| "contract", t.contract.GetName(), | ||
| "network", t.state.NetworkName, | ||
| "error", err, | ||
| ) | ||
| } | ||
| }() | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Rollback is executed after a successful Commit – noisy logs / masking errors
The deferred tx.Rollback() runs unconditionally. When Commit() succeeds, Rollback() returns sql.ErrTxDone, triggering an error log even though the transaction was already committed (and making the log inside the defer misleading).
Typical pattern:
+ committed := false
defer func() {
- if err := tx.Rollback(); err != nil {
+ if !committed {
+ if err := tx.Rollback(); err != nil && !errors.Is(err, sql.ErrTxDone) {
slog.ErrorContext(ctx, "error rolling back transaction", /* … */)
}
}
}()
…
if err := tx.Commit(); err != nil {
return fmt.Errorf("committing transaction: %w", err)
}
+ committed = trueThis avoids spurious error messages and makes real rollback failures visible.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
🧹 Nitpick comments (3)
pkg/indexerpoc/contract/group_message_broadcaster/group_message.go (1)
87-91: Use structured logging instead of fmt.PrintfPrinting directly to stdout bypasses the project-wide logging infrastructure and loses metadata such as timestamp and log level.
- fmt.Printf("Handling reorg for group messages from block %d\n", reorgedBlock) + c.logger.Info("Handling reorg for group messages", + zap.Uint64("reorgedBlock", reorgedBlock))pkg/indexerpoc/contract/group_message_broadcaster/group_message_storer.go (2)
46-50: Nil check missing beforeParseMessageSentEven after initialisation, defensive programming helps:
if s.abi == nil { return NewUnrecoverableLogStorageError(errors.New("ABI binding is nil")) }
95-104: Database writes should be wrapped in a single transaction
InsertGatewayEnvelopeandInsertBlockchainMessageare executed independently; if the first succeeds and the second fails you end up with an envelope without a corresponding blockchain record (or vice-versa).Provide the caller with a transaction-aware version or expose a
StoreLogvariant that manages its ownsql.Tx.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
pkg/indexerpoc/contract/contract.go(1 hunks)pkg/indexerpoc/contract/group_message_broadcaster/group_message.go(1 hunks)pkg/indexerpoc/contract/group_message_broadcaster/group_message_storer.go(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- pkg/indexerpoc/contract/contract.go
🧰 Additional context used
🧬 Code Graph Analysis (1)
pkg/indexerpoc/contract/group_message_broadcaster/group_message_storer.go (5)
pkg/abi/groupmessagebroadcaster/GroupMessageBroadcaster.go (1)
GroupMessageBroadcaster(64-68)pkg/topic/topic.go (2)
NewTopic(43-48)TOPIC_KIND_GROUP_MESSAGES_V1(11-11)pkg/envelopes/client.go (1)
NewClientEnvelopeFromBytes(38-44)pkg/db/queries/envelopes.sql.go (1)
InsertGatewayEnvelopeParams(97-105)pkg/db/queries/indexer.sql.go (1)
InsertBlockchainMessageParams(93-99)
🔇 Additional comments (1)
pkg/indexerpoc/contract/group_message_broadcaster/group_message_storer.go (1)
134-135: Envelope timestamp should use block time, not wall-clock
time.Now()records the node’s local time, which can differ from the block’s timestamp and is non-deterministic in reorg scenarios.Prefer
event.Time(if available) or fetch the block header and useheader.Time.
| func (c *GroupMessageContract) ProcessLogs(ctx context.Context, logs []types.Log) error { | ||
| // TODO: This requires more logic. | ||
| for _, log := range logs { | ||
| err := c.storer.StoreLog(ctx, log) | ||
| if err != nil { | ||
| return err | ||
| } | ||
| } | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Fail-fast strategy drops the whole batch – iterate with per-log error handling
ProcessLogs aborts on the first error returned by StoreLog.
• A single malformed log will prevent later logs in the same block from being persisted.
• Retryable vs. unrecoverable storage errors are not differentiated.
Consider continuing the loop, collecting retryable errors, and returning an aggregated error (or nil if only unrecoverable errors were hit and successfully skipped).
| contract: &contract.Contract{ | ||
| Name: name, | ||
| ChainID: chainID, | ||
| Address: address, | ||
| StartBlock: startBlock, | ||
| Topics: []string{"MessageSent"}, | ||
| }, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Event topic is probably incorrect – use the event signature hash, not the plain name
Topics: []string{"MessageSent"} is unlikely to match anything on-chain.
go-ethereum filters expect the Keccak-256 hash of the full event signature (MessageSent(bytes32,uint256,bytes,…), etc.), not the readable name. Using the literal string means the indexer will silently receive zero logs.
- Topics: []string{"MessageSent"},
+ // keccak256("MessageSent(bytes32,uint256,bytes,uint256)")
+ Topics: []string{"0x1234deadbeef…"}, // TODO: replace with real topic hash📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| contract: &contract.Contract{ | |
| Name: name, | |
| ChainID: chainID, | |
| Address: address, | |
| StartBlock: startBlock, | |
| Topics: []string{"MessageSent"}, | |
| }, | |
| contract: &contract.Contract{ | |
| Name: name, | |
| ChainID: chainID, | |
| Address: address, | |
| StartBlock: startBlock, | |
| // keccak256("MessageSent(bytes32,uint256,bytes,uint256)") | |
| Topics: []string{"0x1234deadbeef…"}, // TODO: replace with real topic hash | |
| }, |
| func NewGroupMessageStorer( | ||
| queries *queries.Queries, | ||
| logger *zap.Logger, | ||
| ) *GroupMessageStorer { | ||
| return &GroupMessageStorer{ | ||
| queries: queries, | ||
| logger: logger.Named("group-message-broadcaster-storer"), | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
abi field is never initialised – nil-pointer panic at first log
NewGroupMessageStorer does not assign abi; therefore s.abi.ParseMessageSent will dereference nil.
-func NewGroupMessageStorer(
- queries *queries.Queries,
- logger *zap.Logger,
-) *GroupMessageStorer {
- return &GroupMessageStorer{
- queries: queries,
- logger: logger.Named("group-message-broadcaster-storer"),
- }
+func NewGroupMessageStorer(
+ abi *gm.GroupMessageBroadcaster,
+ queries *queries.Queries,
+ logger *zap.Logger,
+) *GroupMessageStorer {
+ return &GroupMessageStorer{
+ abi: abi,
+ queries: queries,
+ logger: logger.Named("group-message-broadcaster-storer"),
+ }
}NewGroupMessageContract must now construct the binding (e.g. gm.NewGroupMessageBroadcaster(common.Address{}, backend)) and pass it in.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| func NewGroupMessageStorer( | |
| queries *queries.Queries, | |
| logger *zap.Logger, | |
| ) *GroupMessageStorer { | |
| return &GroupMessageStorer{ | |
| queries: queries, | |
| logger: logger.Named("group-message-broadcaster-storer"), | |
| } | |
| func NewGroupMessageStorer( | |
| abi *gm.GroupMessageBroadcaster, | |
| queries *queries.Queries, | |
| logger *zap.Logger, | |
| ) *GroupMessageStorer { | |
| return &GroupMessageStorer{ | |
| abi: abi, | |
| queries: queries, | |
| logger: logger.Named("group-message-broadcaster-storer"), | |
| } | |
| } |
pkg/indexerpoc/source.go
Outdated
| } | ||
|
|
||
| // pollLatestBlock periodically polls for the latest block | ||
| func (s *GethSource) pollLatestBlock(ctx context.Context) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we still need a polling indexer? we should be using the Alchemy WSS interface
| "go.uber.org/zap" | ||
| ) | ||
|
|
||
| func main() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder whether there is an easy way to fmt readmes :D
| ) | ||
|
|
||
| // Create indexer. | ||
| manager, err := indexerpoc.NewIndexer( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if we are writing things from scratch I would prefer to use the golang Option Struct Patter. I've been meaning to introduce it in a few other places, but this might be the best time to start
| @@ -0,0 +1,114 @@ | |||
| package indexerpoc | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
did you write this by hand? We have mockery that generates a bunch of stuff from interface definitions. Is this a different type of mock?
pkg/indexerpoc/task.go
Outdated
| continue | ||
| case err == ErrNothingNew: | ||
| reorgCount = 0 | ||
| time.Sleep(defaultWaitTime) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I generally prefer a ticker over manually calling sleep. Especially since sleep won't actually wake up for a ctx.done() properly. I understand that it is sometimes difficult to write code that way, if you sometimes want to retry immediately and sometimes you want to wait.
For example, why does err == nil not wait?
| wg.Wait() | ||
| close(errCh) | ||
|
|
||
| // Check for errors |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@coderabbitai is this the right way to find the errors? Is it correct when using a closed channel?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No description provided.
|
|
||
| // getLogs retrieves logs in parallel for better performance with large batches. | ||
| // This is likely going to be used only when backfilling. | ||
| func (t *task) getLogs( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know that the indexer sometimes takes a bit, but the multi-threading code is more complicated. The sort at the end is somewhat expensive. Do we know whether it is really worth the maintenance?
pkg/indexerpoc/source.go
Outdated
| } | ||
|
|
||
| // Start a background goroutine to keep track of the latest block | ||
| go source.pollLatestBlock(context.Background()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
probably not Background(). There needs to be a way to shut it down cleanly. We shut down the background context absolutely last.
pkg/indexerpoc/source.go
Outdated
| return latestNumber, nil | ||
| } | ||
|
|
||
| s.cacheMu.Unlock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this should be using a check;lock;re-check pattern
| return fmt.Errorf("beginning transaction: %w", err) | ||
| } | ||
| defer func() { | ||
| if err := tx.Rollback(); err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
technically you can call rollback on a committed transaction and it should not implode. But it might be cleaner if you check that a rollback should be called (for example err is not null)
#769) ### Implement variable message retention periods by propagating payer expiration from client through to XMTPD database * Adds support for configurable message retention periods (2-365 days or infinite) in [publishWorker.go](https://github.com/xmtp/xmtpd/pull/769/files#diff-cd2a58705d7eacdaf32fc6b4148beb71638e391186b9682bd0c678a91362ce7b) and [service.go](https://github.com/xmtp/xmtpd/pull/769/files#diff-1f687c3f8e69fec5867ded411d50bda02bef3661181fa7422b9dc66b7b57f3c2) * Modifies `IFeeCalculator` interface and implementations to use `uint32` for storage duration in [calculator.go](https://github.com/xmtp/xmtpd/pull/769/files#diff-dcbc979a59bb75cb06de154c51e7220380369505752e8ec3f67b9cb12af678c6) * Sets infinite retention (MaxInt64) for blockchain-sourced messages in [groupMessage.go](https://github.com/xmtp/xmtpd/pull/769/files#diff-efeef048b7789cf1f5a86eff464134023ebb19f01c92c49b76771ce65bdbc1f9) and [identityUpdate.go](https://github.com/xmtp/xmtpd/pull/769/files#diff-cff55c051234a8ef9c37deba5fd2ef50bedea6820902226931fb01086a036c3f) * Updates `SignStagedEnvelope` in [registrant.go](https://github.com/xmtp/xmtpd/pull/769/files#diff-c3c27ab27b89dbdb44f1423d70834048d35c9c1ace0a16922e0c1c730b83ac38) to calculate explicit expiry timestamps * Adds comprehensive test coverage for variable retention periods in [publish_test.go](https://github.com/xmtp/xmtpd/pull/769/files#diff-b6d92459b9ec3f2503f1ccb187101664984e631b23275bd4a716b694de10408c)
I expect to make a few changes as we progress such as dry-run and a limit to the number of cycles it takes. <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **New Features** - Introduced configurable batch pruning of expired envelopes with options for maximum cycles and dry run mode. - **Enhancements** - Added detailed logging of expired envelope counts and pruning progress during execution. - **Chores** - Improved internal expired data management with efficient batch deletion and concurrency-safe locking. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
- Nightly Proto Update Auto-generated by [create-pull-request][1] [1]: https://github.com/peter-evans/create-pull-request Co-authored-by: mkysel <7513948+mkysel@users.noreply.github.com>
### Restructure contract configuration to support separate application and settlement chain configurations in the blockchain client Reorganizes contract configuration by introducing a nested structure with `AppChain` and `SettlementChain` properties in [options.go](https://github.com/xmtp/xmtpd/pull/755/files#diff-6731fb6f709392ce3e37d3b0c42074cddbce566dad2bab86af24ba7585eeb57c). The changes include: * Creates new `AppChainOptions` and `SettlementChainOptions` structs to separate chain-specific configurations * Renames contract addresses to be more descriptive (e.g., `MessagesContractAddress` to `GroupMessageBroadcasterAddress`) * Updates all blockchain client components to use the new configuration structure * Modifies validation logic in [validation.go](https://github.com/xmtp/xmtpd/pull/755/files#diff-3e0b3707cc5d712d06d1eeb6a67c10b45a7ba0b27e972924d71df3c8e539f23b) to handle the new nested configuration format * Updates test utilities and configuration generators to support the new structure #### 📍Where to Start Start with the configuration structure changes in [options.go](https://github.com/xmtp/xmtpd/pull/755/files#diff-6731fb6f709392ce3e37d3b0c42074cddbce566dad2bab86af24ba7585eeb57c) which defines the new `AppChainOptions` and `SettlementChainOptions` structs that form the foundation for all other changes. ---- _[Macroscope](https://app.macroscope.com) summarized 384b3a8._ <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **Refactor** - Separated blockchain settings into distinct "AppChain" and "SettlementChain" configuration sections. - Reorganized contract addresses, chain IDs, RPC URLs, and refresh intervals under nested configuration fields. - Updated environment variables and scripts to support the new multi-chain configuration structure. - **Bug Fixes** - Enhanced validation and initialization to correctly handle the distinct AppChain and SettlementChain configurations. - **Documentation** - Revised onboarding and deployment guides to reflect local development environment setup and updated CLI usage. - **Tests** - Updated all tests to align with the new nested configuration format for blockchain settings. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
Macroscope summarized 0ac2737. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 10
🧹 Nitpick comments (15)
pkg/indexer/storer/groupMessage.go (1)
100-100: Consider using the helper function for NullInt64Instead of directly constructing
sql.NullInt64, consider using the provided helper function frompkg/db/types.gofor better maintainability.- Expiry: sql.NullInt64{Int64: math.MaxInt64, Valid: true}, + Expiry: db.NullInt64(math.MaxInt64),Also, consider adding a comment explaining why
math.MaxInt64is being used as the expiry value. This represents "never expire" for group messages.pkg/indexer/storer/identityUpdate.go (1)
185-185: Consider using the helper function for NullInt64For consistency and maintainability, consider using the provided helper function from
pkg/db/types.goinstead of directly constructingsql.NullInt64.- Expiry: sql.NullInt64{Int64: math.MaxInt64, Valid: true}, + Expiry: db.NullInt64(math.MaxInt64),Also, consider adding a comment explaining why
math.MaxInt64is being used as the expiry value. This represents "never expire" for identity updates.pkg/fees/calculator.go (1)
25-25: Type change aligns with broader standardization effort.The parameter type for
storageDurationDayshas been changed fromint64touint32. This is part of a coordinated update across multiple packages to standardize message retention durations and storage duration parameters asuint32values.Consider updating the error message on lines 31-34 to specifically mention that the value must be greater than zero, rather than implying it could be negative:
- if storageDurationDays <= 0 { - return 0, fmt.Errorf( - "storageDurationDays must be greater than 0, got %d", - storageDurationDays, - ) + if storageDurationDays == 0 { + return 0, fmt.Errorf( + "storageDurationDays must be greater than 0, got %d", + storageDurationDays, + )pkg/testutils/config.go (1)
24-28: Unset contract addresses may lead to nil/zero address panics
SettlementChainOptionsomitsNodeRegistryAddressand
RateRegistryAddress. Several code paths (nodeRegistry.Refreshand
rateRegistry.Refresh) assume these fields are non-empty and will panic or
revert when given a zero address in tests.If these contracts are not relevant to the particular unit tests, consider
supplying a well-formed dummy address (e.g.0x000000000000000000000000000000000000dEaD) to
avoid surprises, or update the consuming code to tolerate empty values.pkg/prune/prune.go (3)
38-46: Open connection vs. pooled Tx – consider using an explicit transaction
queries.New(e.writerDB)receives a pooled*sql.DB. Each call to
DeleteExpiredEnvelopesBatchwill therefore run in its own implicit
transaction, possibly on a different connection, which is fine but loses
atomicity between theSELECT … FOR UPDATE SKIP LOCKEDCTE and the subsequent
DELETEif other pruning workers (or foreign keys) are in play.Wrapping each pruning cycle in an explicit serialisable transaction would give
you deterministic behaviour and the ability to roll back on failure.
74-77: Debug log usesfmt.SprintfunnecessarilyZap can format the struct more efficiently:
- e.log.Debug(fmt.Sprintf("Pruning expired envelopes batch row: %v", row)) + e.log.Debug("pruned envelope", zap.Any("row", row))Removes an allocation and retains structured logging.
80-88: Metric naming in log fieldUsing spaces in key names (
"pruned count") breaks many log aggregators’
parsers. Prefer snake– or kebab-case.- zap.Int("pruned count", totalDeletionCount), + zap.Int("pruned_count", totalDeletionCount),doc/deploy.md (2)
83-108: Improve JSON example formatting.The JSON example contains hard tabs that should be replaced with spaces for consistent formatting across different viewers.
-{ - "level": "INFO", - "time": "2025-05-06T16:39:35.737+0200", - "message": "got nodes", - "size": 2, - "nodes": [ - { - "node_id": 100, - "owner_address": "0x70997970C51812dc3A010C7d01b50e0d17dc79C8", - "signing_key_pub": "0x02ba5734d8f7091719471e7f7ed6b9df170dc70cc661ca05e688601ad984f068b0", - "http_address": "http://localhost:5050", - "min_monthly_fee_micro_dollars": 0, - "in_canonical_network": true - }, +{ + "level": "INFO", + "time": "2025-05-06T16:39:35.737+0200", + "message": "got nodes", + "size": 2, + "nodes": [ + { + "node_id": 100, + "owner_address": "0x70997970C51812dc3A010C7d01b50e0d17dc79C8", + "signing_key_pub": "0x02ba5734d8f7091719471e7f7ed6b9df170dc70cc661ca05e688601ad984f068b0", + "http_address": "http://localhost:5050", + "min_monthly_fee_micro_dollars": 0, + "in_canonical_network": true + },🧰 Tools
🪛 markdownlint-cli2 (0.17.2)
85-85: Hard tabs
Column: 1(MD010, no-hard-tabs)
86-86: Hard tabs
Column: 1(MD010, no-hard-tabs)
87-87: Hard tabs
Column: 1(MD010, no-hard-tabs)
88-88: Hard tabs
Column: 1(MD010, no-hard-tabs)
89-89: Hard tabs
Column: 1(MD010, no-hard-tabs)
90-90: Hard tabs
Column: 1(MD010, no-hard-tabs)
91-91: Hard tabs
Column: 1(MD010, no-hard-tabs)
92-92: Hard tabs
Column: 1(MD010, no-hard-tabs)
93-93: Hard tabs
Column: 1(MD010, no-hard-tabs)
94-94: Hard tabs
Column: 1(MD010, no-hard-tabs)
95-95: Hard tabs
Column: 1(MD010, no-hard-tabs)
96-96: Hard tabs
Column: 1(MD010, no-hard-tabs)
97-97: Hard tabs
Column: 1(MD010, no-hard-tabs)
98-98: Hard tabs
Column: 1(MD010, no-hard-tabs)
99-99: Hard tabs
Column: 1(MD010, no-hard-tabs)
100-100: Hard tabs
Column: 1(MD010, no-hard-tabs)
101-101: Hard tabs
Column: 1(MD010, no-hard-tabs)
102-102: Hard tabs
Column: 1(MD010, no-hard-tabs)
103-103: Hard tabs
Column: 1(MD010, no-hard-tabs)
104-104: Hard tabs
Column: 1(MD010, no-hard-tabs)
105-105: Hard tabs
Column: 1(MD010, no-hard-tabs)
106-106: Hard tabs
Column: 1(MD010, no-hard-tabs)
17-18: Format bare URLs as proper markdown links.The table contains bare URLs that should be formatted as proper markdown links for better readability and functionality.
-| https://grpc.testnet.xmtp.network | US-EAST-2 | 0x03e5442c5d1fe2f02b6b9a1a386383a7766860b40a6079a0223994ffa2ce10512c | -| https://grpc2.testnet.xmtp.network | EU-NORTH-1 | 0x02fc261d43a0153539a4c64c29763cb0e7e377c0eac2910c3d4bedb2235ac70371 | +| [https://grpc.testnet.xmtp.network](https://grpc.testnet.xmtp.network) | US-EAST-2 | 0x03e5442c5d1fe2f02b6b9a1a386383a7766860b40a6079a0223994ffa2ce10512c | +| [https://grpc2.testnet.xmtp.network](https://grpc2.testnet.xmtp.network) | EU-NORTH-1 | 0x02fc261d43a0153539a4c64c29763cb0e7e377c0eac2910c3d4bedb2235ac70371 |🧰 Tools
🪛 markdownlint-cli2 (0.17.2)
17-17: Bare URL used
null(MD034, no-bare-urls)
18-18: Bare URL used
null(MD034, no-bare-urls)
pkg/db/queries/prune.sql.go (1)
55-66: Duplicaterows.Close()call
rows.Close()is deferred at line 55 and called again at line 64. The second call is redundant and slightly obscures intent. (It is safe but unnecessary.)@@ - if err := rows.Close(); err != nil { - return nil, err - }pkg/config/validation.go (2)
122-125:isMultiChainDeploymentheuristic misses non-RPC settingsThe check only looks at
RpcURLs. A config that sets distinctChainIDs or addresses but leaves both URLs empty would be treated as single-chain and later “normalized,” producing misleading results.Consider extending the predicate to look at any non-zero field under
AppChainorSettlementChain.
171-174: Zero‐duration may be legitimate
time.Duration(0)is treated as an error, yet some callers use0to disable polling/refresh.
If0is a valid sentinel, change the comparison to< 0or add a dedicated “disable” flag.pkg/indexerpoc/indexer.go (1)
49-51:make(map[int64]*Network, 0)– zero initial capacity is the defaultThe second argument is unnecessary;
make(map[int64]*Network)is clearer.- networks: make(map[int64]*Network, 0), + networks: make(map[int64]*Network),pkg/indexerpoc/network.go (2)
25-38: Provide aClose()helper to release resources
ethclient.Clientholds open WebSocket/HTTP connections. Without an explicitClose()the application may leak file descriptors during tests or long-running processes.type Network struct { @@ maxCacheHits int } + +// Close shuts down the underlying RPC client. +func (s *Network) Close() error { + return s.client.Close() +}Callers (e.g. tests) can then
defer net.Close().
196-201: Micro-optimisation: reusefilter.TopicssliceThe manual deep-copy allocates a new slice every call.
If the caller does not mutatefilter.Topicsafter the call (typical case) you can simply reference it:- if len(filter.Topics) > 0 { - query.Topics = make([][]common.Hash, len(filter.Topics)) - for i, topicList := range filter.Topics { - query.Topics[i] = make([]common.Hash, len(topicList)) - copy(query.Topics[i], topicList) - } - } + query.Topics = filter.TopicsThis reduces heap churn inside tight indexing loops.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (1)
pkg/proto/device_sync/message_backup.pb.gois excluded by!**/*.pb.go,!pkg/proto/**
📒 Files selected for processing (52)
cmd/cli/main.go(14 hunks)cmd/prune/main.go(1 hunks)cmd/replication/main.go(2 hunks)dev/local.env(1 hunks)dev/up(1 hunks)doc/deploy.md(1 hunks)doc/onboarding.md(2 hunks)pkg/api/message/publishWorker.go(3 hunks)pkg/api/message/publish_test.go(2 hunks)pkg/api/message/service.go(3 hunks)pkg/api/payer/publish_test.go(2 hunks)pkg/api/payer/service.go(2 hunks)pkg/blockchain/blockchainPublisher.go(2 hunks)pkg/blockchain/blockchainPublisher_test.go(1 hunks)pkg/blockchain/migrator/migrator_test.go(1 hunks)pkg/blockchain/ratesAdmin.go(1 hunks)pkg/blockchain/ratesAdmin_test.go(1 hunks)pkg/blockchain/registryAdmin.go(1 hunks)pkg/blockchain/registryAdmin_test.go(1 hunks)pkg/blockchain/registryCaller.go(1 hunks)pkg/config/options.go(3 hunks)pkg/config/pruneOptions.go(1 hunks)pkg/config/validation.go(2 hunks)pkg/db/queries/prune.sql.go(1 hunks)pkg/db/sqlc/prune.sql(1 hunks)pkg/envelopes/payer.go(1 hunks)pkg/fees/calculator.go(1 hunks)pkg/fees/calculator_test.go(2 hunks)pkg/fees/contractRates.go(2 hunks)pkg/fees/interface.go(1 hunks)pkg/indexer/e2e_test.go(2 hunks)pkg/indexer/indexer.go(8 hunks)pkg/indexer/storer/groupMessage.go(2 hunks)pkg/indexer/storer/groupMessage_test.go(1 hunks)pkg/indexer/storer/identityUpdate.go(2 hunks)pkg/indexer/storer/identityUpdate_test.go(1 hunks)pkg/indexerpoc/indexer.go(1 hunks)pkg/indexerpoc/network.go(1 hunks)pkg/indexerpoc/task.go(1 hunks)pkg/indexerpoc/types.go(1 hunks)pkg/prune/prune.go(2 hunks)pkg/registrant/registrant.go(3 hunks)pkg/registrant/registrant_test.go(2 hunks)pkg/registry/contractRegistry.go(2 hunks)pkg/registry/contractRegistry_test.go(3 hunks)pkg/server/server.go(2 hunks)pkg/server/server_test.go(1 hunks)pkg/sync/syncWorker.go(1 hunks)pkg/testutils/config.go(1 hunks)pkg/testutils/envelopes/envelopes.go(3 hunks)pkg/upgrade/scripts/load_env.sh(1 hunks)pkg/utils/namespace.go(1 hunks)
✅ Files skipped from review due to trivial changes (8)
- pkg/blockchain/registryAdmin.go
- dev/up
- pkg/registry/contractRegistry.go
- pkg/registry/contractRegistry_test.go
- pkg/envelopes/payer.go
- pkg/server/server_test.go
- pkg/indexerpoc/types.go
- dev/local.env
🚧 Files skipped from review as they are similar to previous changes (1)
- pkg/indexerpoc/task.go
🧰 Additional context used
🧬 Code Graph Analysis (21)
pkg/indexer/storer/groupMessage.go (1)
pkg/db/types.go (1)
NullInt64(18-20)
pkg/indexer/storer/identityUpdate.go (1)
pkg/db/types.go (1)
NullInt64(18-20)
pkg/config/pruneOptions.go (1)
pkg/config/options.go (2)
DbOptions(49-57)LogOptions(100-103)
pkg/api/message/publishWorker.go (4)
pkg/envelopes/payer.go (2)
NewPayerEnvelopeFromBytes(36-42)PayerEnvelope(14-18)pkg/db/types.go (1)
NullInt64(18-20)pkg/envelopes/unsignedOriginator.go (1)
UnsignedOriginatorEnvelope(12-15)pkg/currency/currency.go (1)
PicoDollar(10-10)
pkg/api/payer/service.go (2)
pkg/constants/constants.go (1)
DEFAULT_STORAGE_DURATION_DAYS(10-10)pkg/envelopes/payer.go (1)
PayerEnvelope(14-18)
pkg/registrant/registrant_test.go (1)
pkg/constants/constants.go (1)
DEFAULT_STORAGE_DURATION_DAYS(10-10)
pkg/blockchain/ratesAdmin_test.go (3)
pkg/testutils/contracts.go (2)
DeployRatesRegistryContract(190-192)LOCAL_PRIVATE_KEY(26-26)pkg/blockchain/signer.go (1)
NewPrivateKeySigner(26-52)pkg/blockchain/client.go (1)
NewClient(17-19)
cmd/prune/main.go (2)
pkg/prune/prune.go (1)
NewPruneExecutor(23-35)pkg/config/pruneOptions.go (1)
PruneConfig(3-6)
pkg/blockchain/blockchainPublisher_test.go (4)
pkg/testutils/contracts.go (3)
DeployNodesContract(178-180)DeployGroupMessagesContract(182-184)DeployIdentityUpdatesContract(186-188)pkg/blockchain/signer.go (1)
NewPrivateKeySigner(26-52)pkg/testutils/config.go (1)
GetPayerOptions(32-36)pkg/blockchain/client.go (1)
NewClient(17-19)
pkg/blockchain/blockchainPublisher.go (1)
pkg/abi/identityupdatebroadcaster/IdentityUpdateBroadcaster.go (1)
NewIdentityUpdateBroadcaster(123-129)
pkg/indexer/storer/groupMessage_test.go (3)
pkg/testutils/contracts.go (1)
DeployGroupMessagesContract(182-184)pkg/blockchain/client.go (1)
NewClient(17-19)pkg/abi/groupmessagebroadcaster/GroupMessageBroadcaster.go (1)
NewGroupMessageBroadcaster(123-129)
pkg/api/payer/publish_test.go (3)
pkg/constants/constants.go (1)
DEFAULT_STORAGE_DURATION_DAYS(10-10)pkg/envelopes/unsignedOriginator.go (1)
UnsignedOriginatorEnvelope(12-15)pkg/envelopes/payer.go (1)
PayerEnvelope(14-18)
pkg/sync/syncWorker.go (2)
pkg/envelopes/unsignedOriginator.go (1)
UnsignedOriginatorEnvelope(12-15)pkg/envelopes/payer.go (1)
PayerEnvelope(14-18)
pkg/indexer/storer/identityUpdate_test.go (3)
pkg/testutils/contracts.go (1)
DeployIdentityUpdatesContract(186-188)pkg/blockchain/client.go (1)
NewClient(17-19)pkg/abi/identityupdatebroadcaster/IdentityUpdateBroadcaster.go (1)
NewIdentityUpdateBroadcaster(123-129)
pkg/prune/prune.go (2)
pkg/config/pruneOptions.go (1)
PruneConfig(3-6)pkg/db/queries/db.go (1)
New(19-21)
pkg/testutils/envelopes/envelopes.go (2)
pkg/envelopes/payer.go (1)
PayerEnvelope(14-18)pkg/constants/constants.go (1)
DEFAULT_STORAGE_DURATION_DAYS(10-10)
pkg/server/server.go (1)
pkg/blockchain/client.go (1)
NewClient(17-19)
pkg/testutils/config.go (1)
pkg/config/options.go (2)
AppChainOptions(30-37)SettlementChainOptions(39-47)
pkg/blockchain/registryAdmin_test.go (4)
pkg/testutils/contracts.go (1)
DeployNodesContract(178-180)pkg/blockchain/signer.go (1)
NewPrivateKeySigner(26-52)pkg/testutils/config.go (1)
GetPayerOptions(32-36)pkg/blockchain/client.go (1)
NewClient(17-19)
pkg/db/queries/prune.sql.go (1)
pkg/db/queries/db.go (1)
Queries(23-25)
pkg/indexerpoc/network.go (1)
pkg/indexerpoc/filter.go (1)
Filter(6-9)
🪛 markdownlint-cli2 (0.17.2)
doc/deploy.md
17-17: Bare URL used
null
(MD034, no-bare-urls)
18-18: Bare URL used
null
(MD034, no-bare-urls)
85-85: Hard tabs
Column: 1
(MD010, no-hard-tabs)
86-86: Hard tabs
Column: 1
(MD010, no-hard-tabs)
87-87: Hard tabs
Column: 1
(MD010, no-hard-tabs)
88-88: Hard tabs
Column: 1
(MD010, no-hard-tabs)
89-89: Hard tabs
Column: 1
(MD010, no-hard-tabs)
90-90: Hard tabs
Column: 1
(MD010, no-hard-tabs)
91-91: Hard tabs
Column: 1
(MD010, no-hard-tabs)
92-92: Hard tabs
Column: 1
(MD010, no-hard-tabs)
93-93: Hard tabs
Column: 1
(MD010, no-hard-tabs)
94-94: Hard tabs
Column: 1
(MD010, no-hard-tabs)
95-95: Hard tabs
Column: 1
(MD010, no-hard-tabs)
96-96: Hard tabs
Column: 1
(MD010, no-hard-tabs)
97-97: Hard tabs
Column: 1
(MD010, no-hard-tabs)
98-98: Hard tabs
Column: 1
(MD010, no-hard-tabs)
99-99: Hard tabs
Column: 1
(MD010, no-hard-tabs)
100-100: Hard tabs
Column: 1
(MD010, no-hard-tabs)
101-101: Hard tabs
Column: 1
(MD010, no-hard-tabs)
102-102: Hard tabs
Column: 1
(MD010, no-hard-tabs)
103-103: Hard tabs
Column: 1
(MD010, no-hard-tabs)
104-104: Hard tabs
Column: 1
(MD010, no-hard-tabs)
🔇 Additional comments (68)
pkg/registrant/registrant_test.go (1)
9-10: Update correctly implements the storage duration parameterThe addition of the constants import and passing of
DEFAULT_STORAGE_DURATION_DAYSto theSignStagedEnvelopemethod correctly aligns with the corresponding changes in the registrant implementation. This ensures tests properly validate the new retention-days functionality.Also applies to: 261-261
pkg/fees/calculator_test.go (1)
61-65: Type cast correctly accommodates the API changeThe change from
int64touint32properly aligns with the updated function signature in the calculator implementation.pkg/upgrade/scripts/load_env.sh (1)
28-29: Environment variables correctly updated for multi-chain architectureThe script now properly requires separate RPC URLs for application and settlement chains, which aligns with the broader architectural changes to support multi-chain deployments.
pkg/blockchain/registryCaller.go (1)
33-33: Contract address correctly updated to use settlement chain configurationThe change properly references the node registry contract address from the settlement chain configuration, aligning with the multi-chain architecture changes.
pkg/indexer/storer/groupMessage.go (1)
5-7: New imports added for database and math packages.These imports are now required to support the explicit
Expiryfield setting withmath.MaxInt64in theInsertGatewayEnvelopeparameters.pkg/indexer/storer/identityUpdate.go (1)
8-8: Math package imported for MaxInt64 value.The import is now required to support the explicit
Expiryfield setting withmath.MaxInt64in theInsertGatewayEnvelopeparameters.pkg/utils/namespace.go (1)
13-13:Details
✅ Verification successful
Updated to use nested SettlementChain configuration.
This change aligns with the broader refactoring to support multi-chain deployments by using the new nested configuration structure.
🏁 Script executed:
#!/bin/bash # Verify that all references to NodeRegistryAddress have been updated consistently # Find all occurrences of NodeRegistryAddress to ensure consistent usage rg --type go "NodeRegistryAddress" --no-heading # Look for any potential references to the old NodesContractAddress pattern rg --type go "NodesContractAddress" --no-headingLength of output: 1836
Use nested
SettlementChain.NodeRegistryAddressconsistentlyVerified that all code now references the new nested field, with the legacy
NodesContractAddressretained only inpkg/config/options.goand inpkg/config/validation.goas a backward-compatibility assignment.Key locations updated:
- pkg/utils/namespace.go
- pkg/registry/contractRegistry.go
- pkg/config/validation.go (usage + fallback from
NodesContractAddress)- pkg/config/options.go
- pkg/blockchain/registryAdmin.go & registryCaller.go
- pkg/blockchain/registryAdmin_test.go
- pkg/blockchain/blockchainPublisher_test.go
- pkg/blockchain/migrator/migrator_test.go
This aligns with the multi-chain refactoring and supports both new and legacy configurations.
pkg/sync/syncWorker.go (1)
542-542:Details
❓ Verification inconclusive
Updated to use dynamic retention days instead of a constant.
This change aligns fee calculation with the envelope-specified retention period rather than a static default, which is consistent with the broader changes for dynamic message retention handling.
Make sure that
RetentionDays()method always returns a valid and reasonable value for fee calculation. Let's verify how RetentionDays is implemented:
🏁 Script executed:
#!/bin/bash # Find the implementation of RetentionDays method # Search for the RetentionDays method in the PayerEnvelope type rg --type go -A 5 -B 2 "func \(.*PayerEnvelope\) RetentionDays\(\)"Length of output: 275
🏁 Script executed:
#!/bin/bash # 1. Locate the PayerEnvelope struct definition echo "=== Struct Definition ===" rg --type go -C 5 "type PayerEnvelope" pkg/envelopes/payer.go # 2. Inspect how MessageRetentionDays is set during PayerEnvelope construction echo "=== Envelope Constructors ===" rg --type go -n "NewPayerEnvelope" -C 3 pkg/envelopes rg --type go -n "PayerEnvelope" -C 3 pkg/envelopes | grep -E "New|From|Parse" # 3. Show the first 200 lines of pkg/envelopes/payer.go for context echo "=== Top of payer.go ===" sed -n '1,200p' pkg/envelopes/payer.goLength of output: 6779
🏁 Script executed:
#!/bin/bash # Locate the .proto file defining PayerEnvelope proto_file=$(rg --files-with-matches "message PayerEnvelope" -n pkg/proto/xmtpv4/envelopes) echo "=== PayerEnvelope proto file: $proto_file ===" # Show the PayerEnvelope definition including MessageRetentionDays field sed -n '1,200p' "$proto_file" | sed -n '/message PayerEnvelope {/,/}/p'Length of output: 345
🏁 Script executed:
#!/bin/bash # 1. List all .proto files under the proto directory echo "=== Proto files ===" find pkg/proto -type f -name "*.proto" # 2. Search for PayerEnvelope references in all proto files echo "=== Search for PayerEnvelope in proto files ===" rg -n "PayerEnvelope" pkg/proto # 3. For each matching file, display the message definition matches=$(rg --files-with-matches "message PayerEnvelope" pkg/proto) for file in $matches; do echo "=== Definition in $file ===" sed -n '/message PayerEnvelope {/,/}/p' "$file" doneLength of output: 443
🏁 Script executed:
#!/bin/bash # 1. List proto files under pkg/proto/xmtpv4/envelopes echo "=== Proto files in envelopes directory ===" find pkg/proto/xmtpv4/envelopes -type f -name '*.proto' # 2. Search for the MessageRetentionDays field in those proto files echo "=== Search for MessageRetentionDays ===" rg -n "MessageRetentionDays" -C3 pkg/proto/xmtpv4/envelopes # 3. Display the field definition context for MessageRetentionDays proto_file=$(find pkg/proto/xmtpv4/envelopes -type f -name '*.proto' | head -n1) echo "=== Definition in $proto_file ===" sed -n '/MessageRetentionDays/,/;/p' "$proto_file"Length of output: 718
🏁 Script executed:
#!/bin/bash # 1. List all .proto files in the repository echo "=== Listing all .proto files ===" find . -type f -name "*.proto" # 2. Search for the PayerEnvelope message definition echo "=== Searching for message PayerEnvelope ===" rg -n "message PayerEnvelope" -C7 --glob "*.proto" # 3. Search for the MessageRetentionDays field echo "=== Searching for MessageRetentionDays ===" rg -n "MessageRetentionDays" -C5 --glob "*.proto"Length of output: 700
Validate dynamic retention days
Ensure thatRetentionDays()—which returnsp.proto.MessageRetentionDays—always yields a sensible value for fee calculation. If the field can ever be unset, zero, or out of expected bounds, you should either:
- Add validation in
pkg/envelopes/payer.go(e.g., clamp to a min/max or fall back to the previous constant).- Confirm in your message‐generation code or
.protoschema thatMessageRetentionDaysis always set to a valid default and cannot be abused.Relevant location:
- pkg/envelopes/payer.go:
func (p *PayerEnvelope) RetentionDays() uint32 { return p.proto.MessageRetentionDays }pkg/blockchain/ratesAdmin.go (1)
34-34: Configuration structure refactoring for multi-chain support.The rate registry address is now accessed from the nested
SettlementChainstruct, which is part of a broader refactoring to support multi-chain deployments. This change improves clarity by separating settlement chain and application chain parameters.pkg/api/payer/publish_test.go (1)
219-236: Good addition: Verifying envelope expiry metadata.These assertions ensure that the expiry-related properties of published envelopes are correctly set. The test verifies both that the retention days match the expected constant and that the expiry timestamp is calculated correctly relative to the current time.
The use of
InDeltafor comparing timestamps is appropriate since it accounts for small timing variations between envelope creation and testing.pkg/blockchain/migrator/migrator_test.go (3)
24-24: Configuration structure refactoring for multi-chain support.The node registry address is now accessed from the nested
SettlementChainstruct, which is part of a broader refactoring to support multi-chain deployments. This change improves clarity by separating settlement chain and application chain parameters.
28-28: Accessing chain ID from the nested configuration structure.Chain ID is now accessed from the settlement chain configuration struct, consistent with the multi-chain architecture refactoring.
32-32: Accessing RPC URL from the nested configuration structure.RPC URL is now accessed from the settlement chain configuration struct, consistent with the multi-chain architecture refactoring.
pkg/server/server.go (1)
241-241: Correctly refactored to use AppChain configuration.The code has been properly updated to use the nested AppChain configuration structure instead of the previous flat contract options. This change aligns with the multi-chain architecture being implemented across the codebase.
Also applies to: 247-247, 257-257
cmd/replication/main.go (2)
50-50: Good practice: Passing options by reference.Passing options by reference to the validation function avoids unnecessary copying of the entire structure and allows for potential in-place modifications.
102-105: Properly updated to use SettlementChain configuration.The code has been correctly refactored to use the settlement chain for registry-related operations. This is consistent with the multi-chain architecture where node registration and discovery happen on the settlement chain.
Also applies to: 112-112
cmd/prune/main.go (1)
61-61: Enhanced configurability for pruning operations.The change correctly passes the prune configuration to the executor, allowing for more controlled pruning behavior with options like MaxCycles and DryRun mode.
pkg/indexer/storer/identityUpdate_test.go (1)
31-34: Test properly updated to use AppChain configuration.The test setup has been correctly modified to align with the multi-chain architecture, using the nested AppChain configuration for the identity update broadcaster contract and RPC URL.
Also applies to: 36-36, 39-39
pkg/fees/interface.go (1)
30-30: Change from int64 to uint32 type looks appropriate.This type change makes sense as storage duration should never be negative, and uint32 provides more than enough range for representing days (can hold over 11 million years). This appears to be part of a broader refactoring for dynamic message retention handling.
cmd/cli/main.go (1)
220-220: Multi-chain configuration refactoring looks well-implemented.All references to the generic contract configuration fields have been consistently updated to use the new nested
SettlementChainstructure. This change supports the multi-chain deployment model introduced in this PR, clearly separating settlement chain configuration from application chain configuration.Also applies to: 251-254, 288-288, 315-315, 342-342, 361-361, 383-383, 402-403, 409-409, 445-445, 484-484, 507-507, 536-536, 571-571, 725-728
pkg/blockchain/blockchainPublisher.go (1)
52-53: Contract address references correctly updated for multi-chain support.The blockchain publisher now properly uses the nested
AppChainconfiguration structure to access contract addresses forGroupMessageBroadcasterandIdentityUpdateBroadcaster. This aligns with the architectural changes for multi-chain support while maintaining the same functionality.Also applies to: 59-60, 83-83
pkg/indexer/e2e_test.go (1)
33-40: Test configuration properly updated for multi-chain model.The E2E tests have been refactored to use the new nested
AppChainconfiguration structure for contract addresses, chain ID, and RPC URL. This ensures that tests are aligned with the new multi-chain configuration model introduced in this PR.Also applies to: 63-66, 69-69
pkg/indexer/storer/groupMessage_test.go (3)
27-30: The configuration path update looks good.The update correctly reflects the new nested configuration structure that separates application chain and settlement chain details, consistent with the multi-chain architecture.
32-32: Properly updated RPC URL access.The code correctly uses the new nested configuration structure to access the RPC URL from the AppChain component.
35-35: Consistent update to GroupMessageBroadcaster address path.The contract address is now correctly accessed from the nested AppChain configuration, maintaining consistency with other similar changes across the codebase.
pkg/config/pruneOptions.go (2)
3-6: Well-structured configuration with sensible defaults.The PruneConfig struct is well-defined with appropriate field tags for CLI flags and environment variables, and includes a reasonable default of 10 for MaxCycles.
7-11: Good organization of configuration options.The PruneOptions struct properly nests the new PruneConfig alongside existing database and logging options, making the configuration hierarchy clear and well-organized.
pkg/fees/contractRates.go (2)
60-60: Updated to use the settlement chain configuration.The code appropriately uses the nested SettlementChain configuration structure to access the rate registry contract address.
71-71: Properly updated refresh interval configuration path.The code correctly accesses the rate registry refresh interval from the nested SettlementChain configuration.
pkg/api/payer/service.go (3)
6-6: Added math import for MaxUint32.Appropriate import added for using math.MaxUint32 in the message retention logic.
501-505: Good implementation of conditional message retention logic.The code sets appropriate retention days based on whether the message is a commit message. For commit messages, it uses MaxUint32 (effectively "never expire"), while non-commit messages use the default storage duration.
512-513: Correctly added retention days to the PayerEnvelope.The MessageRetentionDays field is now included in the PayerEnvelope, ensuring this information is properly propagated downstream for storage and expiration handling.
pkg/blockchain/registryAdmin_test.go (3)
23-23: Update to use nested settlement chain configuration.The code correctly updates to use the new multi-chain configuration structure by setting the node registry address in the
SettlementChainnested struct.
27-27: Chain ID reference updated for multi-chain support.The signer now correctly uses the chain ID from the settlement chain configuration.
31-31: RPC URL updated to use settlement chain configuration.The client initialization now properly uses the RPC URL from the settlement chain configuration.
pkg/blockchain/ratesAdmin_test.go (3)
24-27: Rate registry address updated to use settlement chain configuration.The test now correctly deploys and assigns the rate registry contract address to the settlement chain configuration.
31-31: Chain ID reference updated to use app chain configuration.The signer now correctly uses the chain ID from the application chain configuration instead of a top-level field.
35-35: RPC URL updated to use app chain configuration.The client initialization now properly uses the RPC URL from the application chain configuration.
pkg/api/message/publish_test.go (2)
5-5: Added math package import for MaxUint32.Correctly imported the math package to access MaxUint32 constant used for infinite expiry tests.
352-432: Well-structured test for envelope expiration scenarios.This comprehensive table-driven test properly validates multiple expiration retention day values:
- Tests invalid cases (0, 1, and 5-year expiry)
- Tests valid cases (2, 7, 30, 90 days, and infinite expiry)
- Verifies proper error messages for invalid cases
The test ensures the system enforces expiration policies correctly throughout the publishing flow.
pkg/registrant/registrant.go (3)
9-9: Added time package import.Required for timestamp calculations to set envelope expiry in the updated
SignStagedEnvelopemethod.
84-84: Added retention days parameter to SignStagedEnvelope.The function signature has been updated to accept a retention days parameter that determines how long the envelope should be stored.
93-93: Implemented expiry timestamp calculation.The code correctly calculates the expiry timestamp by adding the retention days to the current time. This timestamp will be used by the system to determine when envelopes should be expired and removed.
pkg/api/message/publishWorker.go (4)
111-118: Well-implemented dynamic retention period extraction.The code now correctly extracts the retention period from the payer envelope instead of using a fixed constant, which enables flexible message expiry management.
119-119: Properly propagated retention days to fee calculation.The extracted retention days parameter is now correctly passed to the fee calculator, ensuring accurate pricing based on storage duration.
125-130: Retention period now properly propagated to envelope signing.The code correctly forwards the dynamic retention period to the registrant for signature generation, maintaining consistency throughout the message publishing flow.
176-178: Dynamic expiry timestamp correctly stored in database.The code now properly extracts and stores the expiry timestamp from the validated originator envelope, ensuring messages can be expired based on their individual retention settings.
doc/onboarding.md (2)
1-1: Clear environment specification added.Good addition clarifying that these instructions are specifically for the testnet environment.
55-58: Updated CLI flags using nested notation.The CLI flag format has been updated to reflect the nested configuration structure with dot notation (e.g.,
--admin.private-key), aligning with the code changes implementing multi-chain support.pkg/indexer/indexer.go (2)
73-73: Updated client initialization to use AppChain RPC URL.The code now correctly uses the nested AppChain configuration to initialize the blockchain client, aligning with the multi-chain architecture.
97-97: Consistently updated all contract references to use AppChain configuration.All contract addresses and configuration parameters have been systematically updated to use the nested AppChain structure, maintaining consistency throughout the indexer implementation.
Also applies to: 108-108, 123-123, 138-138, 170-171, 180-182, 192-194, 202-204, 410-411, 420-421
pkg/blockchain/blockchainPublisher_test.go (2)
21-29: Clear separation of contract addresses between chains.The test setup now correctly assigns contract addresses to their appropriate chains: node registry to the settlement chain and broadcaster contracts to the application chain. This reflects the architectural decision to separate node management (settlement chain) from message/identity operations (application chain).
33-33: Updated chain parameters in test setup for multi-chain support.The test properly uses the AppChain's chainID for the signer and the SettlementChain's RPC URL for the client, maintaining consistency with the production code's multi-chain architecture.
Also applies to: 37-37
pkg/db/sqlc/prune.sql (1)
8-16: Possible lock escalation & starvation when many workers run in parallelThe CTE locks a batch with
FOR UPDATE SKIP LOCKED, which is great, but when
several pruning workers are running concurrently, the orderedLIMIT 1000
combined withORDER BY expirycan starve newer rows:
the first worker will always grab the oldest 1000 rows, and the rest will
take progressively newer rows. If the rate of insertion is higher than the
pruning throughput, the newest rows may never be reached.Consider randomising or round-robinning the selection (or running without
ORDER BY) when intensive parallel pruning is expected, or run a single worker
per DB.pkg/testutils/envelopes/envelopes.go (2)
87-114: Good refactoring to support configurable message expiration in tests.The renamed function
CreatePayerEnvelopeWithExpirationnow accepts an explicitexpirationDaysparameter, allowing tests to create payer envelopes with variable retention periods. This will be useful for testing boundary conditions and expiration-related logic.
116-126: Well-designed backward compatibility wrapper.This new wrapper function maintains the original API while leveraging the new implementation with configurable expiration. Using the constant
constants.DEFAULT_STORAGE_DURATION_DAYSas the default value ensures consistency across the codebase.pkg/api/message/service.go (4)
381-384: Properly propagates retention days to fee calculation.The message retention period is now dynamically extracted from the payer envelope rather than using a fixed value, ensuring that fees are calculated accurately based on the actual storage duration requested.
389-394: Consistent propagation of retention days to envelope signing.The retention days are correctly passed to the registrant's
SignStagedEnvelopemethod, ensuring that the expiry timestamp on the signed envelope matches the requested retention period.
513-516: Good integration of expiry validation.The expiry validation is now properly integrated into the existing validation pipeline for payer envelopes.
521-533: Well-designed validation logic for message retention period.The validation enforces reasonable constraints:
- Minimum of 2 days prevents messages from expiring too quickly
- Maximum of either 365 days or
math.MaxUint32(for infinite retention) prevents unreasonably long retentionError messages are clear and descriptive, helping clients understand the requirements.
pkg/config/options.go (3)
13-14: Good practice to mark deprecated fields.Properly documenting deprecated fields with a TODO for removal helps guide future maintenance and prevents confusion during the transition to the new multi-chain configuration model.
25-47: Well-structured multi-chain configuration options.The new nested structures for
AppChainOptionsandSettlementChainOptionsprovide a clean separation of concerns between application and settlement chains. The environment variables follow a consistent naming pattern, and sensible defaults are provided where appropriate.This architecture change will support more complex deployment scenarios with distinct chains for different purposes.
61-63: Good addition of indexer performance tuning parameters.The new
BatchSizeandConcurrencyoptions provide important controls for tuning the indexer's performance. The default values (batch size of 1000 and concurrency of 10) are reasonable starting points.doc/deploy.md (4)
5-21: Clear information about the XMTP Sepolia testnet environment.The documentation provides valuable context about the current testnet environment and the nodes managed by Ephemera, including their DNS names, locations, and public keys.
🧰 Tools
🪛 markdownlint-cli2 (0.17.2)
17-17: Bare URL used
null(MD034, no-bare-urls)
18-18: Bare URL used
null(MD034, no-bare-urls)
22-41: Well-structured local development setup instructions.The documentation clearly outlines two methods for setting up a local development environment:
- Using the XMTP Contracts Image
- Using the dev/up automation script
This provides flexibility for different developer workflows.
49-63: Updated CLI commands reflect the new configuration structure.The environment variables and CLI flags are updated to match the new multi-chain configuration model, with clear examples for node registration.
72-79: Consistent updates to verification commands.The environment variables for verifying node registration are correctly updated to use the settlement chain configuration.
pkg/indexerpoc/indexer.go (1)
63-71: Missing validation for zeroChainIDwhen building networkIf
AppChain.ChainIDis0, the network is still created and stored under key0, leading to later failures when contracts validateChainID > 0.Add an explicit check before
NewNetworkor propagate the validation fromValidateServerOptions.
| _, err := calculator.CalculateBaseFee( | ||
| messageTime, | ||
| int64(messageSize), | ||
| int64(storageDurationDays), | ||
| uint32(storageDurationDays), | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Overflow test may not be fully testing the intended case
While the type cast accommodates the API change, casting math.MaxInt64 to uint32 results in truncation to 4294967295 (max uint32 value), which means the test is no longer testing with the original large value.
Consider updating the test to verify both truncation and overflow scenarios:
- messageSize := math.MaxInt64
- storageDurationDays := math.MaxInt64
+ // Test overflow in message size
+ messageSize := math.MaxInt64
+ storageDurationDays := uint32(30)
+ _, err := calculator.CalculateBaseFee(
+ messageTime,
+ int64(messageSize),
+ storageDurationDays,
+ )
+ require.Error(t, err)
+ require.Contains(t, err.Error(), "storage fee calculation overflow")
- _, err := calculator.CalculateBaseFee(
- messageTime,
- int64(messageSize),
- uint32(storageDurationDays),
- )
- require.Error(t, err)
- require.Contains(t, err.Error(), "storage fee calculation overflow")
+ // Test overflow when converting from large int64 to uint32
+ messageSize = 100
+ _, err = calculator.CalculateBaseFee(
+ messageTime,
+ int64(messageSize),
+ math.MaxUint32,
+ )
+ require.Error(t, err)
+ require.Contains(t, err.Error(), "storage fee calculation overflow")Committable suggestion skipped: line range outside the PR's diff.
| export XMTPD_SETTLEMENT_CHAIN_RPC_URL="http://localhost:7545/" | ||
| export XMTPD_SETTLEMENT_CHAIN_CHAIN_ID=31337 | ||
| export XMTPD_SETTLEMENT_CHAIN_NODE_REGISTRY_ADDRESS="0xDEADBEEF" | ||
| export ADMIN_PRIVATE_KEY="0xDEADBEEF" | ||
| export NODE_HTTP_ADDRESS="https://grpc.example.com" | ||
| export NODE_OWNER_ADDRESS="0xDEADBEEF" | ||
| export NODE_SIGNING_KEY_PUB="0xDEADBEEF" | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Verification agent
🧩 Analysis chain
Environment variables updated for multi-chain architecture, but app chain variables missing.
The documentation has been updated to reflect the settlement chain configuration, but there are no app chain environment variables mentioned, which would be needed for a complete multi-chain setup.
Consider adding the app chain environment variables to provide a complete picture of the multi-chain configuration requirements:
export XMTPD_SETTLEMENT_CHAIN_RPC_URL="http://localhost:7545/"
export XMTPD_SETTLEMENT_CHAIN_CHAIN_ID=31337
export XMTPD_SETTLEMENT_CHAIN_NODE_REGISTRY_ADDRESS="0xDEADBEEF"
+export XMTPD_APP_CHAIN_RPC_URL="http://localhost:8545/"
+export XMTPD_APP_CHAIN_CHAIN_ID=31337
+export XMTPD_APP_CHAIN_GROUP_MESSAGE_BROADCASTER_ADDRESS="0xDEADBEEF"
+export XMTPD_APP_CHAIN_IDENTITY_UPDATE_BROADCASTER_ADDRESS="0xDEADBEEF"
export ADMIN_PRIVATE_KEY="0xDEADBEEF"
export NODE_HTTP_ADDRESS="https://grpc.example.com"
export NODE_OWNER_ADDRESS="0xDEADBEEF"
export NODE_SIGNING_KEY_PUB="0xDEADBEEF"🏁 Script executed:
#!/bin/bash
# Check if this is intentional by examining how the CLI parses variables
grep -r "AppChain" ./cmd/cli/ --include="*.go"Length of output: 46
🏁 Script executed:
#!/bin/bash
set -e
# Search for explicit AppChain env vars in the codebase
rg -n "XMTPD_APP_CHAIN_" .
# Confirm existing SettlementChain env var usage
rg -n "XMTPD_SETTLEMENT_CHAIN_" .
# Inspect code that loads environment variables in cmd/cli
rg -n "os.Getenv" -n cmd/cli
rg -n "viper.Get" -n cmd/cliLength of output: 4605
Add missing App Chain environment variables to onboarding.md
To complete the multi-chain setup, the App Chain vars defined in pkg/config/options.go and load_env.sh should also be documented in doc/onboarding.md around lines 46–53:
Missing variables:
XMTPD_APP_CHAIN_RPC_URLXMTPD_APP_CHAIN_CHAIN_IDXMTPD_APP_CHAIN_POLL_INTERVALXMTPD_APP_CHAIN_MAX_CHAIN_DISCONNECT_TIMEXMTPD_APP_CHAIN_GROUP_MESSAGE_BROADCAST_ADDRESSXMTPD_APP_CHAIN_IDENTITY_UPDATE_BROADCAST_ADDRESS
Suggested diff:
--- doc/onboarding.md
+++ doc/onboarding.md
@@ lines 46-53
export XMTPD_SETTLEMENT_CHAIN_RPC_URL="http://localhost:7545/"
export XMTPD_SETTLEMENT_CHAIN_CHAIN_ID=31337
export XMTPD_SETTLEMENT_CHAIN_NODE_REGISTRY_ADDRESS="0xDEADBEEF"
+export XMTPD_APP_CHAIN_RPC_URL="http://localhost:8545/"
+export XMTPD_APP_CHAIN_CHAIN_ID=31337
+export XMTPD_APP_CHAIN_POLL_INTERVAL="250ms"
+export XMTPD_APP_CHAIN_MAX_CHAIN_DISCONNECT_TIME="300s"
+export XMTPD_APP_CHAIN_GROUP_MESSAGE_BROADCAST_ADDRESS="0xD5b7B43B0e31112fF99Bd5d5C4f6b828259bedDE"
+export XMTPD_APP_CHAIN_IDENTITY_UPDATE_BROADCAST_ADDRESS="0xe67104BC93003192ab78B797d120DBA6e9Ff4928"
export ADMIN_PRIVATE_KEY="0xDEADBEEF"
export NODE_HTTP_ADDRESS="https://grpc.example.com"
export NODE_OWNER_ADDRESS="0xDEADBEEF"
export NODE_SIGNING_KEY_PUB="0xDEADBEEF"📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| export XMTPD_SETTLEMENT_CHAIN_RPC_URL="http://localhost:7545/" | |
| export XMTPD_SETTLEMENT_CHAIN_CHAIN_ID=31337 | |
| export XMTPD_SETTLEMENT_CHAIN_NODE_REGISTRY_ADDRESS="0xDEADBEEF" | |
| export ADMIN_PRIVATE_KEY="0xDEADBEEF" | |
| export NODE_HTTP_ADDRESS="https://grpc.example.com" | |
| export NODE_OWNER_ADDRESS="0xDEADBEEF" | |
| export NODE_SIGNING_KEY_PUB="0xDEADBEEF" | |
| export XMTPD_SETTLEMENT_CHAIN_RPC_URL="http://localhost:7545/" | |
| export XMTPD_SETTLEMENT_CHAIN_CHAIN_ID=31337 | |
| export XMTPD_SETTLEMENT_CHAIN_NODE_REGISTRY_ADDRESS="0xDEADBEEF" | |
| export XMTPD_APP_CHAIN_RPC_URL="http://localhost:8545/" | |
| export XMTPD_APP_CHAIN_CHAIN_ID=31337 | |
| export XMTPD_APP_CHAIN_POLL_INTERVAL="250ms" | |
| export XMTPD_APP_CHAIN_MAX_CHAIN_DISCONNECT_TIME="300s" | |
| export XMTPD_APP_CHAIN_GROUP_MESSAGE_BROADCAST_ADDRESS="0xD5b7B43B0e31112fF99Bd5d5C4f6b828259bedDE" | |
| export XMTPD_APP_CHAIN_IDENTITY_UPDATE_BROADCAST_ADDRESS="0xe67104BC93003192ab78B797d120DBA6e9Ff4928" | |
| export ADMIN_PRIVATE_KEY="0xDEADBEEF" | |
| export NODE_HTTP_ADDRESS="https://grpc.example.com" | |
| export NODE_OWNER_ADDRESS="0xDEADBEEF" | |
| export NODE_SIGNING_KEY_PUB="0xDEADBEEF" |
| WHERE expiry IS NOT NULL | ||
| AND expiry < EXTRACT(EPOCH FROM now())::bigint; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Add an index on expiry to avoid sequential scans during counts & deletes
Both CountExpiredEnvelopes and DeleteExpiredEnvelopesBatch filter on
expiry < now(). Unless the table already has an index covering expiry, the
queries will devolve into a full-table scan, which will quickly become a
bottleneck as gateway_envelopes grows.
-- Recommended migration (example)
CREATE INDEX IF NOT EXISTS gateway_envelopes_expiry_idx
ON public.gateway_envelopes (expiry);| AppChain: config.AppChainOptions{ | ||
| RpcURL: rpcUrl, | ||
| ChainID: 31337, | ||
| MaxChainDisconnectTime: 10 * time.Second, | ||
| }, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Zero PollInterval disables back-off in tests
AppChainOptions.PollInterval is left at the Go zero value (0s).
Down-stream polling loops that rely on this field will spin continuously
instead of the intended 250ms default, creating noisy logs and wasting CPU
during tests. Unless the test harness overrides the value later, please set an
explicit interval:
- RpcURL: rpcUrl,
- ChainID: 31337,
- MaxChainDisconnectTime: 10 * time.Second,
+ RpcURL: rpcUrl,
+ ChainID: 31337,
+ MaxChainDisconnectTime: 10 * time.Second,
+ PollInterval: 250 * time.Millisecond, // keep parity with prod default📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| AppChain: config.AppChainOptions{ | |
| RpcURL: rpcUrl, | |
| ChainID: 31337, | |
| MaxChainDisconnectTime: 10 * time.Second, | |
| }, | |
| AppChain: config.AppChainOptions{ | |
| RpcURL: rpcUrl, | |
| ChainID: 31337, | |
| MaxChainDisconnectTime: 10 * time.Second, | |
| PollInterval: 250 * time.Millisecond, // keep parity with prod default | |
| }, |
| for { | ||
| if cyclesCompleted >= e.config.MaxCycles { | ||
| e.log.Warn("Reached maximum pruning cycles", zap.Int("maxCycles", e.config.MaxCycles)) | ||
| break | ||
| } | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Potentially leaving rows behind when MaxCycles is reached
If MaxCycles is hit while there are still >0 expired rows, pruning stops
silently. That’s fine for tests, but in production you probably want to either
(a) warn with the remaining count or (b) loop until the table is empty when
MaxCycles <= 0.
- if cyclesCompleted >= e.config.MaxCycles {
+ if e.config.MaxCycles > 0 && cyclesCompleted >= e.config.MaxCycles {This keeps the safeguard while allowing “run-to-completion” by configuring
MaxCycles = 0.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| for { | |
| if cyclesCompleted >= e.config.MaxCycles { | |
| e.log.Warn("Reached maximum pruning cycles", zap.Int("maxCycles", e.config.MaxCycles)) | |
| break | |
| } | |
| for { | |
| - if cyclesCompleted >= e.config.MaxCycles { | |
| + if e.config.MaxCycles > 0 && cyclesCompleted >= e.config.MaxCycles { | |
| e.log.Warn("Reached maximum pruning cycles", zap.Int("maxCycles", e.config.MaxCycles)) | |
| break | |
| } | |
| const deleteExpiredEnvelopesBatch = `-- name: DeleteExpiredEnvelopesBatch :many | ||
| WITH to_delete AS ( | ||
| SELECT originator_node_id, originator_sequence_id | ||
| FROM gateway_envelopes | ||
| WHERE expiry IS NOT NULL | ||
| AND expiry < EXTRACT(EPOCH FROM now())::bigint | ||
| ORDER BY expiry | ||
| LIMIT 1000 | ||
| FOR UPDATE SKIP LOCKED |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FOR UPDATE SKIP LOCKED is ineffective without an explicit transaction
DeleteExpiredEnvelopesBatch relies on row-level locking (FOR UPDATE … SKIP LOCKED) to avoid concurrent deletions, but q.db.QueryContext is executed outside any explicit transaction.
Because the PostgreSQL driver starts and commits an implicit transaction for each statement when no transaction is supplied, the row locks are released immediately after the WITH query finishes; the subsequent DELETE therefore provides no protection against a second worker selecting the same rows in a concurrent call.
If you intend to run multiple pruners in parallel, wrap the whole statement in a single explicit transaction:
tx, err := q.db.BeginTx(ctx, nil)
if err != nil { … }
rows, err := tx.QueryContext(ctx, deleteExpiredEnvelopesBatch)
…
if err := tx.Commit(); err != nil { … }Otherwise remove FOR UPDATE SKIP LOCKED to avoid the false sense of safety.
| func validateField(value interface{}, fieldName string, set map[string]struct{}) { | ||
| switch v := value.(type) { | ||
| case string: | ||
| if v == "" { | ||
| set[fmt.Sprintf("--%s", fieldName)] = struct{}{} | ||
| } | ||
| case int: | ||
| if v <= 0 { | ||
| set[fmt.Sprintf("--%s must be greater than 0", fieldName)] = struct{}{} | ||
| } | ||
| case time.Duration: | ||
| if v <= 0 { | ||
| set[fmt.Sprintf("--%s must be greater than 0", fieldName)] = struct{}{} | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
validateField ignores int64/uint64 → required numeric checks silently skipped
ChainID, refresh intervals, etc. are int64/uint64 in most of the codebase, but the type-switch only handles int.
As a result, passing a zero ChainID bypasses validation and the server starts with an invalid configuration.
Proposed fix:
case int, int64, uint64:
- if v <= 0 {
+ if toInt64(v) <= 0 {
set[fmt.Sprintf("--%s must be greater than 0", fieldName)] = struct{}{}
}or add separate cases for int64 and uint64.
Make sure helper converts the unsigned value safely.
Committable suggestion skipped: line range outside the PR's diff.
| tracing.GoPanicWrap( | ||
| i.ctx, | ||
| &i.wg, | ||
| fmt.Sprintf("indexer-task-%v", task.contract.GetName()), | ||
| func(ctx context.Context) { | ||
| task.run() | ||
| }) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Task does not receive cancellation context – graceful shutdown impossible
tracing.GoPanicWrap hands a derived ctx, but task.run() is invoked with no argument, so the task cannot observe cancellation and will continue after Indexer.Close().
- func(ctx context.Context) {
- task.run()
+ func(ctx context.Context) {
+ task.run(ctx)
})You will also need to update the task type accordingly.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| tracing.GoPanicWrap( | |
| i.ctx, | |
| &i.wg, | |
| fmt.Sprintf("indexer-task-%v", task.contract.GetName()), | |
| func(ctx context.Context) { | |
| task.run() | |
| }) | |
| tracing.GoPanicWrap( | |
| i.ctx, | |
| &i.wg, | |
| fmt.Sprintf("indexer-task-%v", task.contract.GetName()), | |
| func(ctx context.Context) { | |
| task.run(ctx) | |
| }) |
| if chainID.Int64() != int64(cfg.ChainID) { | ||
| return nil, fmt.Errorf("chain ID mismatch for %s: expected %d, got %d", | ||
| cfg.Name, cfg.ChainID, chainID.Int64()) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Avoid lossy int64 cast when comparing chain IDs
chainID.Int64() will truncate or overflow when the ID exceeds int64 (some private/L2 chains already do). A safer comparison avoids the cast altogether:
- if chainID.Int64() != int64(cfg.ChainID) {
+ if chainID.Cmp(big.NewInt(cfg.ChainID)) != 0 {This keeps the code future-proof for larger IDs.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| if chainID.Int64() != int64(cfg.ChainID) { | |
| return nil, fmt.Errorf("chain ID mismatch for %s: expected %d, got %d", | |
| cfg.Name, cfg.ChainID, chainID.Int64()) | |
| } | |
| if chainID.Cmp(big.NewInt(cfg.ChainID)) != 0 { | |
| return nil, fmt.Errorf("chain ID mismatch for %s: expected %d, got %d", | |
| cfg.Name, cfg.ChainID, chainID.Int64()) | |
| } |
| func NewNetwork( | ||
| ctx context.Context, | ||
| cfg NetworkConfig, | ||
| log *zap.Logger, | ||
| ) (*Network, error) { | ||
| networkLogger := log.Named(fmt.Sprintf("network-%s", cfg.Name)) | ||
|
|
||
| rpcClient, err := rpc.Dial(cfg.RpcURL) | ||
| if err != nil { | ||
| return nil, fmt.Errorf("connecting to %s node: %w", cfg.Name, err) | ||
| } | ||
|
|
||
| ethClient := ethclient.NewClient(rpcClient) | ||
|
|
||
| chainID, err := ethClient.ChainID(ctx) | ||
| if err != nil { | ||
| return nil, fmt.Errorf("getting chain ID from %s: %w", cfg.Name, err) | ||
| } | ||
|
|
||
| if chainID.Int64() != int64(cfg.ChainID) { | ||
| return nil, fmt.Errorf("chain ID mismatch for %s: expected %d, got %d", | ||
| cfg.Name, cfg.ChainID, chainID.Int64()) | ||
| } | ||
|
|
||
| source := &Network{ | ||
| ctx: ctx, | ||
| client: ethClient, | ||
| network: cfg, | ||
| log: networkLogger, | ||
| maxCacheHits: 20, | ||
| } | ||
|
|
||
| return source, nil | ||
| } | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Add sanity-checks and auto-defaults in NewNetwork
PollInterval is forwarded verbatim to time.NewTicker later on.
If the caller forgets to set the field (zero value) the program will panic (NewTicker forbids 0 or negative durations). Likewise, the background cache warmer (start) is never launched, so the cached‐header logic is effectively dead code right now.
func NewNetwork(
@@
- networkLogger := log.Named(fmt.Sprintf("network-%s", cfg.Name))
+ if cfg.PollInterval <= 0 {
+ cfg.PollInterval = 15 * time.Second // sensible default
+ }
+ networkLogger := log.Named(fmt.Sprintf("network-%s", cfg.Name))
@@
- source := &Network{
+ n := &Network{
ctx: ctx,
@@
- }
-
- return source, nil
+ }
+
+ // start cache warmer ‑ runs until ctx is cancelled
+ go n.start(ctx)
+
+ return n, nil
}Benefits:
• Prevents runtime panic on bad config.
• Activates the intended caching path without requiring the caller to remember an extra step.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| func NewNetwork( | |
| ctx context.Context, | |
| cfg NetworkConfig, | |
| log *zap.Logger, | |
| ) (*Network, error) { | |
| networkLogger := log.Named(fmt.Sprintf("network-%s", cfg.Name)) | |
| rpcClient, err := rpc.Dial(cfg.RpcURL) | |
| if err != nil { | |
| return nil, fmt.Errorf("connecting to %s node: %w", cfg.Name, err) | |
| } | |
| ethClient := ethclient.NewClient(rpcClient) | |
| chainID, err := ethClient.ChainID(ctx) | |
| if err != nil { | |
| return nil, fmt.Errorf("getting chain ID from %s: %w", cfg.Name, err) | |
| } | |
| if chainID.Int64() != int64(cfg.ChainID) { | |
| return nil, fmt.Errorf("chain ID mismatch for %s: expected %d, got %d", | |
| cfg.Name, cfg.ChainID, chainID.Int64()) | |
| } | |
| source := &Network{ | |
| ctx: ctx, | |
| client: ethClient, | |
| network: cfg, | |
| log: networkLogger, | |
| maxCacheHits: 20, | |
| } | |
| return source, nil | |
| } | |
| func NewNetwork( | |
| ctx context.Context, | |
| cfg NetworkConfig, | |
| log *zap.Logger, | |
| ) (*Network, error) { | |
| // ensure a sane poll interval to avoid time.NewTicker panics | |
| if cfg.PollInterval <= 0 { | |
| cfg.PollInterval = 15 * time.Second // sensible default | |
| } | |
| networkLogger := log.Named(fmt.Sprintf("network-%s", cfg.Name)) | |
| rpcClient, err := rpc.Dial(cfg.RpcURL) | |
| if err != nil { | |
| return nil, fmt.Errorf("connecting to %s node: %w", cfg.Name, err) | |
| } | |
| ethClient := ethclient.NewClient(rpcClient) | |
| chainID, err := ethClient.ChainID(ctx) | |
| if err != nil { | |
| return nil, fmt.Errorf("getting chain ID from %s: %w", cfg.Name, err) | |
| } | |
| if chainID.Int64() != int64(cfg.ChainID) { | |
| return nil, fmt.Errorf("chain ID mismatch for %s: expected %d, got %d", | |
| cfg.Name, cfg.ChainID, chainID.Int64()) | |
| } | |
| n := &Network{ | |
| ctx: ctx, | |
| client: ethClient, | |
| network: cfg, | |
| log: networkLogger, | |
| maxCacheHits: 20, | |
| } | |
| // start cache warmer ‑ runs until ctx is cancelled | |
| go n.start(ctx) | |
| return n, nil | |
| } |
Implement new indexer package for processing blockchain events with support for ERC20 tokens and group messages
Introduces a new
indexerpocpackage that provides blockchain event indexing capabilities:GroupMessageContractimplementation in group_message.go handles group message eventsGethSourcein source.go with caching and parallel processing📍Where to Start
Start with the README examples in README.md to understand the usage patterns, then review the main indexer implementation in indexer.go which coordinates the entire system.
Macroscope summarized ddd79a9.
Summary by CodeRabbit
New Features
Improvements
Bug Fixes
Chores
Documentation