KAFKA-8768: DeleteRecords request/response automated protocol#7957
Conversation
94ae971 to
9512f79
Compare
|
@cmccabe are you able to review this? |
|
I'm going to be on vacation the next two weeks, but I'll certainly review after that. Sorry for the delays |
|
@tombentley Can you rebase on trunk? |
9512f79 to
18c4330
Compare
|
@mimaison done |
mimaison
left a comment
There was a problem hiding this comment.
Thanks @tombentley
It looks good overall, I've left a few comments
…tocol Also add version 2 to make use of flexible versions, per KIP-482.
18c4330 to
8861774
Compare
|
@mimaison fixed, if you want to take another look. |
|
ok to test |
mimaison
left a comment
There was a problem hiding this comment.
Thanks for the update. It looks good overall.
I've left a few suggestions/questions for minor issues
| if (node != null) { | ||
| leaders.computeIfAbsent(node, key -> new HashMap<>()).put(entry.getKey(), | ||
| entry.getValue().beforeOffset()); | ||
| if (!leaders.containsKey(node)) |
There was a problem hiding this comment.
I think we can computeIfAbsent() here. It will replace containsKey, put and get.
| if (!leaders.containsKey(node)) | ||
| leaders.put(node, new HashMap<>()); | ||
| Map<String, DeleteRecordsTopic> deletionsForLeader = leaders.get(node); | ||
| DeleteRecordsTopic deleteRecords = deletionsForLeader.get(topicPartition.topic()); |
There was a problem hiding this comment.
Same here, I think computeIfAbsent would simplify the logic
| partitionDeleteOffsets.keySet().stream().map(futures::get); | ||
| partitionDeleteOffsets.values().stream().flatMap( | ||
| recordsToDelete1 -> { | ||
| Stream<TopicPartition> topicPartitionStream = recordsToDelete1.partitions().stream().map(partitionsToDelete -> |
There was a problem hiding this comment.
Can we return that directly without creating a variable?
|
|
||
| private final int throttleTimeMs; | ||
| private final Map<TopicPartition, PartitionResponse> responses; | ||
| public static final long INVALID_LOW_WATERMARK = -1; |
| deleteRecordsRequest.partitionOffsets.asScala.toSeq.map(_._1.topic)) | ||
| for ((topicPartition, offset) <- deleteRecordsRequest.partitionOffsets.asScala) { | ||
| deleteRecordsRequest.data.topics.asScala.map(_.name)) | ||
| for ((topicPartition, offset) <- deleteRecordsRequest.data.topics.asScala.flatMap(deleteTopic => { |
There was a problem hiding this comment.
I wonder if it's more readable having 2 nested for loops. What do you think?
There was a problem hiding this comment.
To me what makes it less readable is that the expression for the for loop is the flatMap & map. I think the simplest was to improve that would just be to factor out a val for that expression. Does that work for you?
| ApiKeys.DELETE_TOPICS -> ((resp: requests.DeleteTopicsResponse) => Errors.forCode(resp.data.responses.find(topic).errorCode())), | ||
| ApiKeys.DELETE_RECORDS -> ((resp: requests.DeleteRecordsResponse) => resp.responses.get(tp).error), | ||
| ApiKeys.DELETE_RECORDS -> ((resp: requests.DeleteRecordsResponse) => Errors.forCode(resp.data.topics | ||
| .find(tp.topic).partitions().find(tp.partition).errorCode)), |
There was a problem hiding this comment.
No need for brackets after partitions
|
@mimaison I've pushed fixes and tested locally. |
mimaison
left a comment
There was a problem hiding this comment.
LGTM, thanks for the quick update
|
All tests passed, merging to trunk |
* apache-github/trunk: (39 commits) MINOR: cleanup and add tests to StateDirectoryTest (apache#8304) HOTFIX: StateDirectoryTest should use Set instead of List (apache#8305) MINOR: Fix build and JavaDoc warnings (apache#8291) MINOR: Fix kafka.server.RequestQuotaTest missing new ApiKeys. (apache#8302) KAFKA-9712: Catch and handle exception thrown by reflections scanner (apache#8289) KAFKA-9670; Reduce allocations in Metadata Response preparation (apache#8236) MINOR: fix Scala 2.13 build error introduced in apache#8083 (apache#8301) MINOR: enforce non-negative invariant for checkpointed offsets (apache#8297) MINOR: comment apikey types in generated switch (apache#8201) MINOR: Fix typo in CreateTopicsResponse.json (apache#8300) KIP-546: Implement describeClientQuotas and alterClientQuotas. (apache#8083) KAFKA-6647: Do note delete the lock file while holding the lock (apache#8267) KAFKA-9677: Fix consumer fetch with small consume bandwidth quotas (apache#8290) KAFKA-9533: Fix JavaDocs of KStream.transformValues (apache#8298) MINOR: reuse pseudo-topic in FKJoin (apache#8296) KAFKA-6145: Pt 2. Include offset sums in subscription (apache#8246) KAFKA-9714; Eliminate unused reference to IBP in `TransactionStateManager` (apache#8293) KAFKA-9718; Don't log passwords for AlterConfigs in request logs (apache#8294) KAFKA-8768: DeleteRecords request/response automated protocol (apache#7957) KAFKA-9685: Solve Set concatenation perf issue in AclAuthorizer ...
Also add version 2 to make use of flexible versions, per KIP-482.
Committer Checklist (excluded from commit message)