KAFKA-14367; Add TxnOffsetCommit to the new GroupCoordinator interface#12901
KAFKA-14367; Add TxnOffsetCommit to the new GroupCoordinator interface#12901dajac merged 6 commits intoapache:trunkfrom
TxnOffsetCommit to the new GroupCoordinator interface#12901Conversation
|
reminder to rebase here :) |
|
also minor nit in the title |
TnxOffsetCommit to the new GroupCoordinator interfaceTxnOffsetCommit to the new GroupCoordinator interface
10503c9 to
a8d5423
Compare
|
@jolshan I have updated the PR. |
| sendResponse(responseBuilder.build()) | ||
| CompletableFuture.completedFuture[Unit](()) | ||
| } else { | ||
| val txnOffsetCommitRequestData = new TxnOffsetCommitRequestData() |
There was a problem hiding this comment.
Are we only replacing the topics here? There's a lot of code to move the majority of one request data into a new one.
There was a problem hiding this comment.
Right. It is basically a copy of the original request but with the authorized/existing topics.
There was a problem hiding this comment.
i agree, it doesn't seem ideal. what's the reason for not reusing the original request data and applying setTopics()?
There was a problem hiding this comment.
We cannot mutate the original request. In case of an error resulting in calling txnOffsetCommitRequest.getErrorResponse(exception), the error response would only include the authorised topics and not all of the original one.
I agree that it is not ideal. I considered a few alternatives:
- We could use
.duplicate().setTopics(...)butduplicatealso copy the original topics and that it is not necessary; - We could change the GroupCoordinator interface to take individual arguments but I thought that keeping the interface consistent is better.
In the end, I went with the manual copy.
| partitions += tp -> new OffsetAndMetadata( | ||
| offset = partition.committedOffset, | ||
| leaderEpoch = partition.committedLeaderEpoch match { | ||
| case RecordBatch.NO_PARTITION_LEADER_EPOCH => Optional.empty[Integer] |
There was a problem hiding this comment.
Did we previously not handle -1 correctly here? In KafkaApis 2544 we just keep -1.
There was a problem hiding this comment.
Oh didn't see this. Thanks for sharing 😅 I like that both are handled in the same place now.
| def testJoinGroup(version: Short): Unit = { | ||
| val groupCoordinator = mock(classOf[GroupCoordinator]) | ||
| val adapter = new GroupCoordinatorAdapter(groupCoordinator) | ||
| val adapter = new GroupCoordinatorAdapter(groupCoordinator, Time.SYSTEM) |
There was a problem hiding this comment.
Curious if we wanted to mock time here. Doesn't seem to be a big difference one way or the for most of these since they don't use a time component anyway, but could be a good practice to have a MockTime. (Looks like we do use it for the test added 😄 )
There was a problem hiding this comment.
Why? I usually use MockTime only when I need it. Otherwise, I default to Time.SYSTEM.
There was a problem hiding this comment.
Maybe I'm just recalling some recent tests where I used system time because it was already there and it led to flakiness.
I suppose here though it is easier to switch so maybe not a problem.
There was a problem hiding this comment.
Yeah. In this case, it does not matter because the test does not depend on time at all.
| requestLocal.bufferSupplier | ||
| ).handle[Unit] { (response, exception) => | ||
| if (exception != null) { | ||
| sendResponse(txnOffsetCommitRequest.getErrorResponse(exception)) |
There was a problem hiding this comment.
Did we want to still send the existing responsebuilder response at all or mark the whole request as failed?
There was a problem hiding this comment.
Ah -- I misunderstood this error. This is how we handle errors that would have previously been thrown back to KafkaApis.
There was a problem hiding this comment.
I guess my confusion is when we see this line vs. in the handleError above.
There was a problem hiding this comment.
Yeah, this idea is to catch unexpected errors here. Otherwise, the future would return a response. Note that handleError is here to handle errors thrown in the handle. It is basically a safety net.
There was a problem hiding this comment.
Ah -- so we typically don't expect handleError to see anything since this handle should get most of the errors.
There was a problem hiding this comment.
That's right. handleError is more for unexpected errors.
| } | ||
|
|
||
| @Test | ||
| def testHandleTxnOffsetCommitRequestTopicsAndPartitionsValidation(): Unit = { |
There was a problem hiding this comment.
This test is super thorough! Thanks for adding
jolshan
left a comment
There was a problem hiding this comment.
I've left a few comments. Looks pretty good so far.
|
@jolshan @jeffkbkim Updated the PR. |
| topic = new TxnOffsetCommitResponseData.TxnOffsetCommitResponseTopic().setName(tp.topic) | ||
| byTopics.put(tp.topic, topic) | ||
| response.topics.add(topic) | ||
| val topic = byTopics.get(tp.topic) match { |
There was a problem hiding this comment.
This seems like it could use the getOrElseUpdate method, but I'm not sure if we typically have side effects like adding to the response. Not a huge deal either way.
There was a problem hiding this comment.
Yeah, I didn’t use it because of the side effect.
There was a problem hiding this comment.
The downside is double-lookup (one for get and another for insert).
jolshan
left a comment
There was a problem hiding this comment.
Thanks David. Failures look unrelated.
jeffkbkim
left a comment
There was a problem hiding this comment.
LGTM, left a small comment.
| sendResponse(responseBuilder.build()) | ||
| CompletableFuture.completedFuture[Unit](()) | ||
| } else { | ||
| val txnOffsetCommitRequestData = new TxnOffsetCommitRequestData() |
There was a problem hiding this comment.
i agree, it doesn't seem ideal. what's the reason for not reusing the original request data and applying setTopics()?
|
Failed tests are not related. |
…master * apache-github/trunk: (23 commits) MINOR: Include the inner exception stack trace when re-throwing an exception (apache#12229) MINOR: Fix docs to state that sendfile implemented in `TransferableRecords` instead of `MessageSet` (apache#13109) Update ProducerConfig.java (apache#13115) KAFKA-14618; Fix off by one error in snapshot id (apache#13108) KAFKA-13709 (follow-up): Avoid mention of 'exactly-once delivery' or 'delivery guarantees' in Connect (apache#13106) KAFKA-14367; Add `TxnOffsetCommit` to the new `GroupCoordinator` interface (apache#12901) KAFKA-14568: Move FetchDataInfo and related to storage module (apache#13085) KAFKA-14612: Make sure to write a new topics ConfigRecords to metadata log iff the topic is created (apache#13104) KAFKA-14601: Improve exception handling in KafkaEventQueue apache#13089 KAFKA-14367; Add `OffsetCommit` to the new `GroupCoordinator` interface (apache#12886) KAFKA-14530: Check state updater more often (apache#13017) KAFKA-14304 Use boolean for ZK migrating brokers in RPC/record (apache#13103) KAFKA-14003 Kafka Streams JUnit4 to JUnit5 migration part 2 (apache#12301) KAFKA-14607: Move Scheduler/KafkaScheduler to server-common (apache#13092) KAFKA-14367; Add `OffsetFetch` to the new `GroupCoordinator` interface (apache#12870) KAFKA-14557; Lock metadata log dir (apache#13058) MINOR: Implement toString method for TopicAssignment and PartitionAssignment (apache#13101) KAFKA-12558: Do not prematurely mutate internal partition state in Mirror Maker 2 (apache#11818) KAFKA-14540: Fix DataOutputStreamWritable#writeByteBuffer (apache#13032) KAFKA-14600: Reduce flakiness in ProducerIdExpirationTest (apache#13087) ...
…rface (apache#12901) This patch adds `TxnOffsetCommit` to the new `GroupCoordinator` interface and updates `KafkaApis` to use it. Reviewers: Jeff Kim <jeff.kim@confluent.io>, Justine Olshan <jolshan@confluent.io>
This patch adds
TxnOffsetCommitto the newGroupCoordinatorinterface and updatesKafkaApisto use it.Committer Checklist (excluded from commit message)