-
Notifications
You must be signed in to change notification settings - Fork 39
Partitioned Gateway Envelopes #1279
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
How to use the Graphite Merge QueueAdd either label to this PR to merge it via the merge queue:
You must have a Graphite account in order to use the merge queue. Sign up using this link. An organization admin has enabled the Graphite Merge Queue in this repository. Please do not merge from GitHub as this will restart CI on PRs being processed by the merge queue. |
|
Keep the Hot Path Fast by Handling Unusual Partition Errors Separately In normal operation, all necessary partitions already exist, and the vast majority of inserts succeed immediately. Handling the “missing partition” case as a rare exception instead of building defensive partition-creation logic into every insert keeps the hot path as lean as possible: Avoids extra round-trips and locks. Optimizes for the common case. Isolates the slow, rare logic.
Improves scalability under load. Maintains correctness and safety. |
|
The is no migration path. This assumes a DB wipe. Our testnet-dev DBs are unreadable and we would never be able to migrate them anyway. |
| // This function runs inside a managed transaction created by RunInTxWithResult(). | ||
| // | ||
| // Steps: | ||
| // 1. Calls InsertGatewayEnvelopeWithChecksTransactional() to insert the envelope, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought we were trying to avoid mixing the DDL operations with DML workflows?
This at least handles the flow quite nicely, only paying a performance penalty when the partitions are missing and handling rollbacks nicely. But still, it has a bit of an ick to it.
- Makes performance harder to reason about (some inserts take much longer than others)
- Scatters any errors in this flow across the logs of normal writes (maybe we can help address that by emitting a specific metric on these failures)
The alternative is to have some worker pre-creating the partitions, which has its own problems and complexities. So IDK
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah. Perfect intuition here.
I dont like the background worker option. The worker then has to listen to the registry. It also has to run frequenty enough to pre-fill it. And its a nightmare for tests, since some of them create random originators. And there are special originators such as 10-13 which are not even in the registry and you have to remember they exist.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the error flow is not so bad. If it fails with "missing partition" it will create the partition and retry, without showing any errors or printing any logs.
We could of course at least print the fact that we indeed did create a new partition for nodeid/seq-range.
If the DDL fails, then we might see the error in rather unexpected places. But the DDL is super simple, with IF NOT EXISTS so it should be pretty safe.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm alright with saying this is the least-bad option. Agree the worker would have its own ick. At least this should be consistent and reliable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
neekolas
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks great. Let's start wiping things
Shard gateway envelopes and add transactional auto-partitioned inserts with SAVEPOINT retries across V2 schema and APIs
Introduce a V2 sharded gateway envelope schema with partitioned meta/blob tables and a joined view, replace legacy queries with V2 selectors, and add
db.InsertGatewayEnvelopeWithChecksTransactional/db.InsertGatewayEnvelopeWithChecksStandaloneto auto-create partitions and retry inserts using SAVEPOINTs; update services, workers, indexers, and tests to use V2 params, views, and a configurable publish retry sleep.📍Where to Start
Start with the insert flow in
db.InsertGatewayEnvelopeAndIncrementUnsettledUsageand the new helpersdb.InsertGatewayEnvelopeWithChecksTransactionalanddb.InsertGatewayEnvelopeWithChecksStandalonein gateway_envelope.go, then review the V2 schema and queries in 00021_sharded_gateway_envelopes.up.sql and envelopes_v2.sql.Changes since #1279 opened
📊 Macroscope summarized 93bfb0f. 14 files reviewed, 29 issues evaluated, 27 issues filtered, 0 comments posted
🗂️ Filtered Issues
pkg/api/message/publish_worker.go — 0 comments posted, 4 evaluated, 4 filtered
publishWorker.startdoes not observe the context. Whenp.ctxis canceled during publishing,publishStagedEnvelopereturnsfalse(it checksp.ctx.Err()and returnsfalse), causing the outer loop to keep retrying indefinitely. The outerselectonp.ctx.Done()is not reached until the inner loop exits, which never happens after cancellation. This results in a stuck worker that cannot terminate gracefully on shutdown. [ Out of scope ]sleepOnFailureTimeis zero. The change replaces a fixedtime.Sleep(time.Second)withtime.Sleep(p.sleepOnFailureTime). IfsleepOnFailureTimeis zero (or very small), the inner retry loop inpublishWorker.startwill spin with minimal or no delay upon repeated failures, causing excessive CPU usage and preventing backoff under error conditions. This is reachable becausesleepOnFailureTimeis supplied by callers and is not validated. [ Low confidence ]publishStagedEnvelope(e.g., topic parsing error, malformed payer envelope, signature recovery failure, fee calculation errors) cause the function to returnfalseand the caller to retry indefinitely without changing any state. This leads to an infinite retry loop that permanently blocks processing of subsequent envelopes in the batch. Examples: [ Out of scope ]publishStagedEnvelopecan cause the worker to never exit on cancellation. After the insert step, the function checksp.ctx.Err()and returnsfalse(lines 217–219), which the caller interprets as a failure and retries indefinitely. Later, if the context is cancelled before or during the delete step, the function returnstrue(lines 229–231), signaling success and allowing progress. [ Out of scope ]pkg/api/message/service.go — 0 comments posted, 4 evaluated, 4 filtered
NewReplicationAPIServicestarts the publish worker goroutine before attempting to start the subscribe worker, but ifstartSubscribeWorkerfails, the function returns an error without stopping/cleaning up the already-started publish worker. This leaks the goroutine and any associated subscription resources, leaving background work running with no owner and potentially causing further side effects. To fix, ensure that on any subsequent failure after starting the publish worker, you stop/cancel the publish worker (and any resources it acquired) before returning. [ Previously rejected ]topicsandoriginator_node_idsinmessage_api.EnvelopesQuerynow silently ignoresoriginator_node_idswhenevertopicsis non-empty. The new logic inService.fetchEnvelopesprioritizes the topics branch (if len(query.GetTopics()) != 0 { ... return rows, nil }) and returns early, never applying the originator filter. Previously, a single combinedSelectGatewayEnvelopescall accepted both filters. This is a contract change: callers that expect both filters to apply will receive envelopes filtered only by topics, which can lead to incorrect results. [ Already posted ]queries.New(s.store)is called withs.storeacross all branches offetchEnvelopes. Ifs.storeis nil at runtime, the resultingQuerieswill have a nildband callingQueryContextinside the query methods will panic. There is no guard infetchEnvelopesensurings.storeis non-nil. [ Low confidence ]fetchEnvelopes:uint32values fromEnvelopesQuery.GetOriginatorNodeIds()are converted toint32and stored inparams.OriginatorNodeIds. If any originator node ID exceedsmath.MaxInt32, this will wrap to a negative number and cause incorrect filtering inSelectGatewayEnvelopesByOriginators. [ Low confidence ]pkg/db/gateway_envelope.go — 0 comments posted, 1 evaluated, 1 filtered
sql.Tx) from multiple goroutines insideInsertGatewayEnvelopeAndIncrementUnsettledUsageis unsafe and can cause runtime errors or deadlocks. The function launches two goroutines that both calltxQueriesmethods (IncrementUnsettledUsageandIncrementOriginatorCongestion) within the same transaction context. Per Go'sdatabase/sqlcontract, asql.Txis not safe for concurrent use across goroutines. This can lead to driver-level errors like "driver: bad connection", serialization failures, or blocked execution due to contention on the single pinned connection. [ Out of scope ]pkg/db/types.go — 0 comments posted, 2 evaluated, 2 filtered
uint32node IDs anduint64sequence IDs from the cursor to signed types used in SQL params. InSetVectorClockByTopics,SetVectorClockByOriginators, andSetVectorClockUnfiltered,nodeIDis cast fromuint32toint32andsequenceIDfromuint64toint64. Similarly, infetchEnvelopes, originator IDs are cast fromuint32toint32. If anynodeID > math.MaxInt32orsequenceID > math.MaxInt64, these casts will wrap to negative or truncated values, causing incorrect filtering or vector clock behavior in queries. [ Previously rejected ]uint64sequenceIDvalues from the cursor are cast toint64inSetVectorClockByTopics(lines 29–31),SetVectorClockByOriginators(lines 42–44), andSetVectorClockUnfiltered(lines 55–56). If a sequence ID exceedsmath.MaxInt64, it will be truncated to a negativeint64, corrupting the vector clock used in queries. [ Low confidence ]pkg/indexer/app_chain/contracts/group_message_storer.go — 0 comments posted, 1 evaluated, 1 filtered
StoreLogonly validates that the client envelope payload type matches the topic kind viaclientEnvelope.TopicMatchesPayload(), but it never verifies that the client envelope’s target topic identifier (the bytes after the kind) matches the on-chainGroupIdfrom theMessageSentevent. The code constructstopicStructfrommsgSent.GroupId[:]and later stores to that topic, regardless of what topic identifier the client envelope carries. This can lead to storing an envelope under a topic derived from the event even if the envelope’s own target topic identifier differs. To preserve integrity, also check thatclientEnvelope.TargetTopic().Bytes()(or identifier) matchestopicStruct.Bytes()/msgSent.GroupIdbefore storing; otherwise, reject the log. [ Low confidence ]pkg/indexer/app_chain/contracts/identity_update_storer.go — 0 comments posted, 4 evaluated, 4 filtered
StoreLog: errors fromquerier.GetLatestSequenceIdare wrapped withre.NewNonRecoverableError(ErrGetLatestSequenceID, err)(lines 106–112). If the error is a transient database issue, returning a non-recoverable error will prevent retry and may lead to dropped events. Consider classifying database operation errors as recoverable (or propagate raw errors to be wrapped as recoverable at the outer level), consistent with other DB operations in this function. [ Out of scope ]StoreLogwraps all errors fromvalidateIdentityUpdatewithre.NewNonRecoverableError(ErrValidateIdentityUpdate, err)(lines 136–145).validateIdentityUpdateperforms a DB query (SelectGatewayEnvelopesByTopics) and may return errors due to transient database issues. Treating these as non-recoverable will prevent retry, potentially dropping events. Consider distinguishing between validation failures (non-recoverable) and underlying IO/DB errors (recoverable). [ Out of scope ]associationState.StateDiff.NewMembersandassociationState.StateDiff.RemovedMemberswithout checking whetherassociationStateorassociationState.StateDiffare non-nil. Sincemlsvalidate.AssociationStateResult.StateDiffis a pointer, it can be nil (e.g., if there are no changes or the validator returns a state without a diff). Accessing a field on a nil pointer will panic. Add a guard such asif associationState == nil || associationState.StateDiff == nil { ... }before dereferencing. [ Out of scope ]validateIdentityUpdatepassesidentityUpdate.IdentityUpdatetoMLSValidationService.GetAssociationStateFromEnvelopeswithout checking for nil. While the outer type assertion ensures the payload is an identity-update wrapper, the innerIdentityUpdatepointer can still be nil in protobuf-generated types. Passing nil may cause downstream logic to panic or misbehave if the service implementation assumes a non-nil value. Add a check likeif identityUpdate.IdentityUpdate == nil { return nil, fmt.Errorf("identity update is nil") }. [ Out of scope ]pkg/migrator/writer.go — 0 comments posted, 2 evaluated, 2 filtered
env.OriginatorSequenceID()fromuint64toint64for multiple DB parameters.OriginatorSequenceIDis derived from auint64(originator sequence), but the code passes it to the database asint64forInsertGatewayEnvelopeParams.OriginatorSequenceID(line 60) and again forIncrementUnsettledUsage.SequenceID(line 81) andUpdateMigrationProgress.LastMigratedID(line 89). If the sequence ID exceedsmath.MaxInt64, the conversion will wrap to a negative number silently, resulting in incorrect keys/progress and potential data corruption or failed lookups. [ Low confidence ]uint64toint64.Expiryis built fromenv.UnsignedOriginatorEnvelope.Proto().GetExpiryUnixtime()(returnsuint64) and cast toint64(lines 65–67). If expiry exceedsmath.MaxInt64, conversion will wrap negative, producing invalid expiry timestamps in the database. [ Previously rejected ]pkg/mlsvalidate/service.go — 0 comments posted, 1 evaluated, 1 filtered
newUpdatepassed toGetAssociationState, yielding a gRPC request with a nil element inNewUpdates. InGetAssociationStateFromEnvelopes,newUpdateis forwarded without a nil check (line 110). Callers likeIdentityUpdateStorer.validateIdentityUpdatedo not verifyidentityUpdate.IdentityUpdate != nil, so a nil can be passed under realistic conditions if the client envelope has the oneof wrapper set but innerIdentityUpdateis nil. This may cause marshalling errors or runtime panics in gRPC/protobuf when serializing a request containing a nil message. [ Low confidence ]pkg/server/server.go — 0 comments posted, 1 evaluated, 1 filtered
startAPIServer,serviceRegistrationFunccreates and starts aCursorUpdaterviametadata.NewCursorUpdaterbefore constructingreplicationService. IfNewReplicationAPIServicereturns an error, the function returns that error without stopping theCursorUpdater. This leaks the cursor updater goroutine and its resources. Any subsequent failure in the registration function should clean up already-started background components (e.g., callCursorUpdater.Stop()or cancel its context) before returning. [ Previously rejected ]pkg/sync/envelope_sink.go — 0 comments posted, 5 evaluated, 4 filtered
originatorID := int32(env.OriginatorNodeID())may overflow if the originator node ID exceedsmath.MaxInt32. In that case, the resultingoriginatorIDbecomes negative, which will be propagated to unsettled usage/congestion accounting and persisted to the database. Add a bounds check (reject IDs > MaxInt32, or change types toint64/uint32through to storage). [ Low confidence ]storeEnvelopeconvertsexpiryfromuint64(GetExpiryUnixtime()) toint64withint64(expiry)and writes it to the database viaqueries.InsertGatewayEnvelopeParams{ Expiry: int64(expiry) }. Ifexpiryexceedsmath.MaxInt64, the conversion silently wraps to a negativeint64. This can corrupt stored expiry and lead to incorrect behavior downstream. Add an upper-bound check (e.g., ifexpiry > math.MaxInt64then clamp/reject) and decide policy for zero/negative values. [ Previously rejected ]storeEnvelopeonly persisted an expiry whenexpiry > 0(writing a SQL NULL otherwise). The new code always persists anExpiryvalue, including0. This changes the external contract/semantics from “no expiry stored” (NULL) to “expiry = 0”, which many schemas or queries treat differently. If consumers differentiate NULL vs 0, this can cause incorrect behavior. To preserve parity, either keep NULL semantics for non-positive expiry or explicitly migrate downstream logic to treat 0 equivalently and document the change. [ Low confidence ]MinutesSinceEpochreturns anint32, andstoreEnvelopepasses it through asMinutesSinceEpoch: utils.MinutesSinceEpoch(originatorTime). IforiginatorTimeis far in the future (or far past), the minute count can overflowint32, truncating to an incorrect value. SinceOriginatorNscomes from the envelope, a malformed/malicious envelope could trigger this. Add bounds checks or clamp to a safe range, and consider rejecting envelopes with unreasonable timestamps. [ Previously rejected ]pkg/testutils/store.go — 0 comments posted, 4 evaluated, 3 filtered
"CREATE DATABASE " + dbNameand"DROP DATABASE " + dbNamewithout quoting or validatingdbName. IfdbNamecontains characters that are not valid in unquoted PostgreSQL identifiers (e.g., hyphen, space) or contains SQL metacharacters, this can cause runtime SQL errors or even SQL injection in tests. At minimum, the name should be validated to match allowed identifier characters or wrapped with proper identifier quoting (e.g., usingpgx.Identifier.Sanitize/pgx.Identifieror a helper to quote identifiers). This affects both the create and drop statements. [ Low confidence ]DROP DATABASEcleanup also lacksIF EXISTS, so if the database was never created or was already dropped (e.g., partial failures or external interference), the cleanup will error and, due torequire.NoErrorin the cleanup, abort remaining cleanups. Consider usingDROP DATABASE IF EXISTSto make cleanup idempotent and robust. [ Low confidence ]NewDBs, an empty slice is created with zero capacity and appended to in a loop. While not a functional bug, for largecountthis can cause avoidable reallocations. Preallocating withmake([]*sql.DB, 0, count)would prevent repeated allocations. Note: this is a performance micro-optimization and does not affect correctness. [ Code style ]