Skip to content

KAFKA-15784: Ensure atomicity of in memory update and write when transactionally committing offsets#14774

Merged
jolshan merged 26 commits intoapache:trunkfrom
jolshan:kafka-15784
Dec 14, 2023
Merged

KAFKA-15784: Ensure atomicity of in memory update and write when transactionally committing offsets#14774
jolshan merged 26 commits intoapache:trunkfrom
jolshan:kafka-15784

Conversation

@jolshan
Copy link
Copy Markdown
Member

@jolshan jolshan commented Nov 16, 2023

Rewrote the verification flow to pass a callback to execute after verification completes.
For the TxnOffsetCommit, we will call doTxnCommitOffsets. This allows us to do offset validations post verification.

I've reorganized the verification code and group coordinator code to make these code paths clearer. The followup refactor (https://issues.apache.org/jira/browse/KAFKA-15987) will further clean up the produce verification code.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

Comment thread core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala Outdated
@jolshan jolshan marked this pull request as ready for review December 4, 2023 18:02
@jolshan jolshan changed the title WIP KAFKA-15784: Ensure atomicity of in memory update and write when transactionally committing offsets KAFKA-15784: Ensure atomicity of in memory update and write when transactionally committing offsets Dec 4, 2023
Comment thread core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala Outdated
@jolshan jolshan requested a review from dajac December 4, 2023 19:43
Comment thread core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala Outdated
Comment thread core/src/main/scala/kafka/server/KafkaApis.scala Outdated
Comment thread core/src/main/scala/kafka/server/ReplicaManager.scala Outdated
Comment thread core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala Outdated
@junrao
Copy link
Copy Markdown
Contributor

junrao commented Dec 6, 2023

@jolshan : Thanks for the PR. Should we fix the append call in CoordinatorRuntime (#14705) too? There, a partition level lock in CoordinatorRuntime is held while checking/updating the coordinator state and calling append.

@jolshan
Copy link
Copy Markdown
Member Author

jolshan commented Dec 6, 2023

@junrao Thanks for taking a look. I was just rewriting the code to make this clearer. I will take a look at @artemlivshits and your questions about locking now.

I wasn't sure if this PR was trying to remove locks -- I think we want to address that as a follow-up?

(topicIdPartition, Errors.NOT_COORDINATOR)
}
responseCallback(commitStatus)
group.inLock {
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the group.inLocks in this method -- we should always be holding the group lock when we call storeOffsets from doTxnCommitOffsets and doCommitOffsets. What do we think about removing these locks and stating that we should be holding the group lock on this method? (and/or just wrapping the method in a lock)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we can just wrap this method in the lock to show the proper intent -- the intent is that the lock must be already held by the caller because the caller does some validation under the lock as well, and the atomicity of that validation needs to be preserved across local write. Note this is not the correctness issue (the lock is already taken outside, so any random locking pattern works), it's a comment about how to make the code maintenance better. The atomicity requirement is absolutely non-obvious and required a lot of effort from multiple people to figure out and I think this effort is not reflected in this change in any way -- the code got re-arranged into some form that makes it work, but the underlying issue (unclear and confusing atomicity invariants) is not addressed.

So I'd do 3 things:

  1. Remove explicit locking.
  2. Add a comment in the Java doc stating a requirement that this function must be called under the lock.
  3. Add a comment near the appendForGroup call that we rely on it not returning until the local append is done to preserve atomicity protected by the lock.

If it wasn't soon-to-be-dead code I'd probably do more with naming conventions and asserts, but in this situation adding proper comments should be good and easy.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok makes sense.

The only thing that made me wonder about keeping the locking is that this method is used in unit tests without the locking on the outside. I wasn't sure if the best solution is to put locks around those calls or not. I would hope that the test doesn't rely on locking given there really should only be one thread running the tests and the ReplicaManager is mocked (so no async appends) but I would need to double check.

Comment thread core/src/main/scala/kafka/server/ReplicaManager.scala
@dajac
Copy link
Copy Markdown
Member

dajac commented Dec 7, 2023

@junrao @jolshan I work on implementing the transactional offset commits in the new world. I will take care of adding the verification steps there when this PR is done. We can replicate the same pattern there.

* @param verificationGuards the mapping from topic partition to verification guards if transaction verification is used
* @param preAppendErrors the mapping from topic partition to LogAppendResult for errors that occurred before appending
*/
def appendForGroup(timeout: Long,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we generalize the name? I don't see any logic specific to offsets or groups.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I planned to take this out after the refactor. It is only used for appendForGroup to minimize the diff.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will be unified with appendRecords in the refactor. I can leave a comment referencing https://issues.apache.org/jira/browse/KAFKA-15987 if that helps.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense. But any harm having a more general name for now? It might be a week or two before the refactor gets checked in.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess I just didn't see anything else using it before I refactored and wanted to make the usage clear.

I didn't want to name it appendRecords as to not cause conflicts with that flow. What name were you thinking?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should also add a comment that would reflect 2 points:

  1. (For the maintainer of this code) -- this code must not return until the local write is done, it is an important invariant that the callers rely upon. Otherwise it looks like a generic async call that can return and continue asynchronously at any point. This way, if an additional async stage is required in this function before the the local write is complete, the maintainer would know to hunt down all usages of this function and figure out the correct action.
  2. (For the caller of this code) -- a quick example of the full workflow of how the caller should use this method: call maybeStartTransactionVerificationForPartition with a callback that would call this method.

Copy link
Copy Markdown
Member Author

@jolshan jolshan Dec 8, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

call maybeStartTransactionVerificationForPartition with a callback that would call this method

This will change when I do the refactor since this will become appendRecords where we only call maybeStartTransactionVerificationForPartition(s) if the append requires transaction verification. I can add the comment now, but it will be changed in the refactor (https://issues.apache.org/jira/browse/KAFKA-15987)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding comment in the refactor should be fine.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we keep the name, how about we add a check to ensure that the write is for __consumer_offsets? Also, we can drop the internalTopicsAllowed and appendOrigin arguments since they will be implicit.

Comment thread core/src/main/scala/kafka/server/ReplicaManager.scala Outdated
Comment thread core/src/main/scala/kafka/server/ReplicaManager.scala
recordValidationStatsCallback: Map[TopicPartition, RecordValidationStats] => Unit = _ => (),
requestLocal: RequestLocal = RequestLocal.NoCaching,
actionQueue: ActionQueue = null,
verificationGuards: Map[TopicPartition, VerificationGuard] = Map.empty,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the difference between passing no verification guard and passing VerificationGuard.SENTINEL?

Copy link
Copy Markdown
Member Author

@jolshan jolshan Dec 7, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe checking if the map is empty is a shortcut for skipping verification. That doesn't really matter for the offset change but does for the produce flow.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when we get to the log layer if we don't have an entry in the map we do a getOrElse and return the sentinel

Comment thread core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala Outdated
Comment thread core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala Outdated
}
}

appendForGroup(group, records, requestLocal, putCacheCallback, verificationGuards)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What error do we expect if the guard check fails during write?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We expect INVALID_TXN_STATE which is fatal. I think we previously discussed this and decided it was ok for old clients.

Copy link
Copy Markdown
Contributor

@hachikuji hachikuji Dec 7, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have logic in the createPutCacheCallback to convert the error code returned in TxnOffsetCommit. Do we want to add a case for INVALID_TXN_STATE?

Copy link
Copy Markdown
Member Author

@jolshan jolshan Dec 8, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We considered this on the first PR but decided that the abortable/retriable errors were not specific enough.

From KIP-890

Return Abortable Error for TxnOffsetCommitRequests
Instead of INVALID_TXN_STATE and INVALID_PID_MAPPING we considered using UNKNOWN_MEMBER_ID which is abortable. However, this is not a clear message and is not guaranteed to be abortable on non-Java clients. Since we can't specify a message in the response, we thought it would be better to just send the actual (but fatal) errors.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. Mainly I was considering whether we should have an explicit case for this so that it is clearly intentional. What do you think?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can do that. I just thought that all the ones there were ones that were changed. There are also errors that are returned but not mapped. (ie, coordinator_not_available)

We can also include InvalidPidMapping if we do want to map errors.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could just include a comment under the default case to emphasize that no mapping is expected for these error codes?

Comment thread core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala Outdated
Comment thread core/src/main/scala/kafka/server/ReplicaManager.scala Outdated
Comment thread core/src/main/scala/kafka/server/ReplicaManager.scala Outdated
Comment thread core/src/main/scala/kafka/server/ReplicaManager.scala
Comment thread core/src/main/scala/kafka/server/ReplicaManager.scala
}
}

groupManager.replicaManager.maybeStartTransactionVerificationForPartition(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The access to replicaManager here probably suggests that we probably should be going through GroupMetadataManager. We could expose a wrapped maybeStartTransactionVerificationForPartition from GroupMetadataManager instead. That might also help us encapsulate the error conversion a little better.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i was just about to push my change before I saw this comment. I will address this comment tomorrow.

Comment thread core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala Outdated
Copy link
Copy Markdown
Contributor

@hachikuji hachikuji left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@jolshan
Copy link
Copy Markdown
Member Author

jolshan commented Dec 14, 2023

I took a look at the tests and the only one that looked suspicious were the mirrorIntegration tests related to delete.retention.ms not being configured correctly. I saw some flakes in trunk and other PRs and that seems unrelated to this change.

Note, although the last build had a failure for a version, the previous build succeeded and only included a minor code change. Given the nature of the change and the failure, I believe this is non-blocking.

I ran system tests and noticed an issue with GroupModeTransactionsTest when bouncing brokers. After investigating with @hachikuji we believe it is unrelated. He will file a JIRA as a followup. -- edit followup here: https://issues.apache.org/jira/browse/KAFKA-16012

@jolshan jolshan merged commit e4249b6 into apache:trunk Dec 14, 2023
jolshan added a commit that referenced this pull request Dec 14, 2023
…sactionally committing offsets (#14774)

Rewrote the verification flow to pass a callback to execute after verification completes.
For the TxnOffsetCommit, we will call doTxnCommitOffsets. This allows us to do offset validations post verification.

I've reorganized the verification code and group coordinator code to make these code paths clearer. The followup refactor (https://issues.apache.org/jira/browse/KAFKA-15987) will further clean up the produce verification code.

Reviewers: Artem Livshits <alivshits@confluent.io>, Jason Gustafson <jason@confluent.io>, David Jacot <djacot@confluent.io>, Jun Rao <junrao@gmail.com>
Copy link
Copy Markdown
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jolshan : Thanks for the updated PR. Sorry for the late review. Just a few minor comments.

)
}

def maybeStartTransactionVerificationForPartitions(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could this be private?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am doing a refactor PR so I can address your comments there. https://issues.apache.org/jira/browse/KAFKA-15987

* This method should not return until the write to the local log is completed because updating offsets requires updating
* the in-memory and persisted state under a lock together.
*
* Noted that all pending delayed check operations are stored in a queue. All callers to ReplicaManager.appendRecords()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's weird to refer to ReplicaManager.appendRecords() here since this method is appendForGroup.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will be fixed in the refactor. I plan to get rid of appendForGroup and unify in a single appendRecords method.

// Produce requests (only requests that require verification) should only have one batch per partition in "batches" but check all just to be safe.
val transactionalBatches = records.batches.asScala.filter(batch => batch.hasProducerId && batch.isTransactional)
transactionalBatches.foreach(batch => transactionalProducerIds.add(batch.producerId))
private def sendInvalidRequiredAcksResponse(entries: Map[TopicPartition, MemoryRecords],
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we reuse this method in appendRecords?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another change for the refactor. Jason asked me to keep the diff here to a minimum, but I plan to unify the code in the refactor. (He asked me to revert these changes for this PR)

if (delayedProduceRequestRequired(requiredAcks, allEntries, allResults)) {
debug("Produce to local log in %d ms".format(time.milliseconds - sTime))

val allResults = localProduceResults
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we just get rid of allResults and just use localProduceResults?

}
}

private def buildProducePartitionStatus(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we reuse buildProducePartitionStatus in appendEntries?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All of these will be covered in the refactor. I didn't touch the produce flow to minimize the diff and cause minimal confusion when reviewing.

)
}

private def maybeAddDelayedProduce(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we reuse maybeAddDelayedProduce in appendEntries?

*
* When the verification returns, the callback will be supplied the error if it exists or Errors.NONE.
* If the verification guard exists, it will also be supplied. Otherwise the SENTINEL verification guard will be returned.
* This guard can not be used for verification and any appends that attenpt to use it will fail.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo attenpt

requestLocal
)

addPartitionsToTxnManager.get.verifyTransaction(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, addPartitionsToTxnManager could be empty in tests. Should we change addPartitionsToTxnManager.get to addPartitionsToTxnManager.foreach like in the original code?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for pointing this out. I will fix this in the followup.

gaurav-narula pushed a commit to gaurav-narula/kafka that referenced this pull request Jan 24, 2024
…sactionally committing offsets (apache#14774)

Rewrote the verification flow to pass a callback to execute after verification completes.
For the TxnOffsetCommit, we will call doTxnCommitOffsets. This allows us to do offset validations post verification.

I've reorganized the verification code and group coordinator code to make these code paths clearer. The followup refactor (https://issues.apache.org/jira/browse/KAFKA-15987) will further clean up the produce verification code.

Reviewers: Artem Livshits <alivshits@confluent.io>, Jason Gustafson <jason@confluent.io>, David Jacot <djacot@confluent.io>, Jun Rao <junrao@gmail.com>
jolshan added a commit that referenced this pull request Jan 26, 2024
#15087)

I originally did some refactors in #14774, but we decided to keep the changes minimal since the ticket was a blocker. Here are those refactors:

* Removed separate append paths so that produce, group coordinator, and other append paths all call appendRecords
* AppendRecords has been simplified
* Removed unneeded error conversions in verification code since group coordinator and produce path convert errors differently, removed test for that
* Fixed incorrect capital param name in KafkaRequestHandler
* Updated ReplicaManager test to handle produce appends separately when transactions are used.

Reviewers: David Jacot <djacot@confluent.io>, Jason Gustafson <jason@confluent.io>
yyu1993 pushed a commit to yyu1993/kafka that referenced this pull request Feb 15, 2024
…sactionally committing offsets (apache#14774)

Rewrote the verification flow to pass a callback to execute after verification completes.
For the TxnOffsetCommit, we will call doTxnCommitOffsets. This allows us to do offset validations post verification.

I've reorganized the verification code and group coordinator code to make these code paths clearer. The followup refactor (https://issues.apache.org/jira/browse/KAFKA-15987) will further clean up the produce verification code.

Reviewers: Artem Livshits <alivshits@confluent.io>, Jason Gustafson <jason@confluent.io>, David Jacot <djacot@confluent.io>, Jun Rao <junrao@gmail.com>
yyu1993 pushed a commit to yyu1993/kafka that referenced this pull request Feb 15, 2024
apache#15087)

I originally did some refactors in apache#14774, but we decided to keep the changes minimal since the ticket was a blocker. Here are those refactors:

* Removed separate append paths so that produce, group coordinator, and other append paths all call appendRecords
* AppendRecords has been simplified
* Removed unneeded error conversions in verification code since group coordinator and produce path convert errors differently, removed test for that
* Fixed incorrect capital param name in KafkaRequestHandler
* Updated ReplicaManager test to handle produce appends separately when transactions are used.

Reviewers: David Jacot <djacot@confluent.io>, Jason Gustafson <jason@confluent.io>
AnatolyPopov pushed a commit to aiven/kafka that referenced this pull request Feb 16, 2024
…sactionally committing offsets (apache#14774)

Rewrote the verification flow to pass a callback to execute after verification completes.
For the TxnOffsetCommit, we will call doTxnCommitOffsets. This allows us to do offset validations post verification.

I've reorganized the verification code and group coordinator code to make these code paths clearer. The followup refactor (https://issues.apache.org/jira/browse/KAFKA-15987) will further clean up the produce verification code.

Reviewers: Artem Livshits <alivshits@confluent.io>, Jason Gustafson <jason@confluent.io>, David Jacot <djacot@confluent.io>, Jun Rao <junrao@gmail.com>
clolov pushed a commit to clolov/kafka that referenced this pull request Apr 5, 2024
…sactionally committing offsets (apache#14774)

Rewrote the verification flow to pass a callback to execute after verification completes.
For the TxnOffsetCommit, we will call doTxnCommitOffsets. This allows us to do offset validations post verification.

I've reorganized the verification code and group coordinator code to make these code paths clearer. The followup refactor (https://issues.apache.org/jira/browse/KAFKA-15987) will further clean up the produce verification code.

Reviewers: Artem Livshits <alivshits@confluent.io>, Jason Gustafson <jason@confluent.io>, David Jacot <djacot@confluent.io>, Jun Rao <junrao@gmail.com>
clolov pushed a commit to clolov/kafka that referenced this pull request Apr 5, 2024
apache#15087)

I originally did some refactors in apache#14774, but we decided to keep the changes minimal since the ticket was a blocker. Here are those refactors:

* Removed separate append paths so that produce, group coordinator, and other append paths all call appendRecords
* AppendRecords has been simplified
* Removed unneeded error conversions in verification code since group coordinator and produce path convert errors differently, removed test for that
* Fixed incorrect capital param name in KafkaRequestHandler
* Updated ReplicaManager test to handle produce appends separately when transactions are used.

Reviewers: David Jacot <djacot@confluent.io>, Jason Gustafson <jason@confluent.io>
Phuc-Hong-Tran pushed a commit to Phuc-Hong-Tran/kafka that referenced this pull request Jun 6, 2024
apache#15087)

I originally did some refactors in apache#14774, but we decided to keep the changes minimal since the ticket was a blocker. Here are those refactors:

* Removed separate append paths so that produce, group coordinator, and other append paths all call appendRecords
* AppendRecords has been simplified
* Removed unneeded error conversions in verification code since group coordinator and produce path convert errors differently, removed test for that
* Fixed incorrect capital param name in KafkaRequestHandler
* Updated ReplicaManager test to handle produce appends separately when transactions are used.

Reviewers: David Jacot <djacot@confluent.io>, Jason Gustafson <jason@confluent.io>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants