KAFKA-12926: ConsumerGroupCommand's java.lang.NullPointerException at negative offsets while running kafka-consumer-groups.sh#10858
Conversation
|
PING @hachikuji @apovzner (as I saw you on KAFKA-9507) |
|
Regarding tests:
|
|
@IgnacioAcunaF Thanks for the PR! I am a bit on the fence with regarding to modifying the admin api here. Users might expect 'null' in this case. Did you consider handling this in the command line tool? Also, it would be to add a unit test for the bug. |
|
@dajac Thanks David for your comment. a) Fix locally on the ConsumerGroupCommand I went by the second approach because thought it could be better for downstreams methods (like the one on ConsumerGroupCommand) to have it globally solved and not delegate the null's case handling to them. I have coded the locally approach on ConsumerGroupCommand also so I am going to update de PR. |
7746364 to
fc42b11
Compare
|
@dajac |
dajac
left a comment
There was a problem hiding this comment.
@IgnacioAcunaF Thanks for the update. Left a comment.
fc42b11 to
75580a9
Compare
|
@dajac first of all, thanks again for guidance. I could code the unit test, which also made me realize that there was another case which could potentially lead to a NullPointerException:
It was the same as for the assigned partitions: if the offset for the partition is a negative integer, it translates to null, so Some(offsets.offset) gets equal to Some(null.offset) and an exception is raised. I approached it with a ternary operator. Added the fix for it and the unit test in the last commit of the PR |
dajac
left a comment
There was a problem hiding this comment.
@IgnacioAcunaF Thanks for the update. I have left few comments.
I think that we should review the other usages in the tool as I suspect that we might have other cases where we don't handle null correctly. We could also do it as a follow-up of this PR.
…nterException at describing consumer groups, and unit test for consumer groups with negative offsets
69ce292 to
b7f857a
Compare
|
Thanks @dajac for your review and comments. Agree, there could be more potentially cases leading to a null pointer exception in the tool for similar reasons. Will take a look also if I encounter some as a follow-up for the PR. |
dajac
left a comment
There was a problem hiding this comment.
@IgnacioAcunaF Thanks for the update. We are getting closer :). I have left few more comments.
…to avoid NullPointerException. Improve unit test for negative offsets at Consumer Group.
|
@dajac Thanks again for comments and support! :)
Also modified a little bit the getPartitionOffsets to adapt it to being able to call directly to collectConsumerAssignment, as suggested. PD: As I posted on the comments earlier I encountered that the sibling test, testAdminRequestsForDescribeOffsets, lacks the validation for assigned topic partitions (it is currently only doing the validation against unassigned topic partitions, basically because there is not an topic's assigment to the testing consumer group at its initialization). Solved at the new test testAdminRequestsForDescribeNegativeOffsets, and I think that I could complement the former one. What do you think? Is it worth to open a new PR to approach that separatly? |
|
@dajac Updated the PR. Passed directly the param to collectConsumerAssignment as suggested, which makes the code cleaner. Also added a commit to fix an error I encountered on ARM compilation from the new unit test (from mockito). |
dajac
left a comment
There was a problem hiding this comment.
@IgnacioAcunaF Please excuse me for the delay. I did not have a chance to look at it before today. I took a look at the test and I left a few comments. Please take a look and let me know what you think.
|
Hi @dajac, thanks again for your comments. |
dajac
left a comment
There was a problem hiding this comment.
@IgnacioAcunaF I took another look at the test and I left a few comments below.
| val commitedOffsets = Map( | ||
| testTopicPartition1 -> new OffsetAndMetadata(100), | ||
| testTopicPartition2 -> null, | ||
| testTopicPartition4 -> new OffsetAndMetadata(100), | ||
| testTopicPartition5 -> null, | ||
| ).asJava |
There was a problem hiding this comment.
Actually, it seems that we should always have null or an OffsetAndMetadata here for each partition as the API always provided an answer for the requested partitions.
commitedOffsets -> committedOffsets
There was a problem hiding this comment.
Actually, put that case based on what I saw on that consumer-group:
This was the consumer-group state:
(groupId=order-validations, isSimpleConsumerGroup=false, members=(memberId=order-validations-d5fbca62-ab2b-48d7-96ba-0ae72dff72a6, groupInstanceId=null, clientId=order-validations, host=/127.0.0.1, assignment=(topicPartitions=rtl_orderReceive-0,rtl_orderReceive-1,rtl_orderReceive-2,rtl_orderReceive-3,rtl_orderReceive-4,rtl_orderReceive-5,rtl_orderReceive-6,rtl_orderReceive-7,rtl_orderReceive-8,rtl_orderReceive-9)), partitionAssignor=RoundRobinAssigner, state=Stable, coordinator=f0527.cluster.cl:31047 (id: 1 rack: null), authorizedOperations=[])
It has assigned all the partitions to rtl_orderReceive, but when getting the commited offsets:
Map(rtl_orderReceive-0 -> null, rtl_orderReceive-1 -> OffsetAndMetadata{offset=39, leaderEpoch=null, metadata=''}, rtl_orderReceive-2 -> null, rtl_orderReceive-3 -> OffsetAndMetadata{offset=33, leaderEpoch=null, metadata=''}, rtl_orderReceive-4 -> null, rtl_orderReceive-5 -> null, rtl_orderReceive-7 -> null, rtl_orderReceive-8 -> null)
Even that partition 6 was assigned, the aren't values commited for it (even not a -1).
That is for the case of assigned partitions, but for unassigned partitions, thinking it now, as it is the subsets of commited offsets that aren't assigned, it makes no sense to have non defined commited unassigned partition (because if there weren't a commited partition, then wouldn't exist the unassigned partition).
So I am thinking on:
val commitedOffsets = Map(
testTopicPartition1 -> new OffsetAndMetadata(100),
testTopicPartition2 -> null,
testTopicPartition3 -> new OffsetAndMetadata(100),
testTopicPartition4 -> new OffsetAndMetadata(100),
testTopicPartition5 -> null,
).asJava
Just letting testTopicPartition0 as undefined, but defining testTopicPartition3 (unassigned).
Makes sense?
There was a problem hiding this comment.
Thanks for the clarification. In this case, it might be better to keep committing in both cases in oder to be robust and to adjust the rest of the test accordingly.
| when(admin.listOffsets(offsetsArgMatcher, any())) | ||
| .thenReturn(new ListOffsetsResult(endOffsets.asJava)) |
There was a problem hiding this comment.
I was not fully satisfied with providing all the endOffsets in all the cases here so I took a bit of time to play with Mockito. I was able to make it work as follow:
def offsetsArgMatcher(expectedPartitions: Set[TopicPartition]): ArgumentMatcher[util.Map[TopicPartition, OffsetSpec]] = {
topicPartitionOffsets => topicPartitionOffsets != null && topicPartitionOffsets.keySet.asScala.equals(expectedPartitions)
}
when(admin.listOffsets(
ArgumentMatchers.argThat(offsetsArgMatcher(assignedTopicPartitions)),
any()
)).thenReturn(new ListOffsetsResult(endOffsets.view.filterKeys(assignedTopicPartitions.contains).toMap.asJava))
when(admin.listOffsets(
ArgumentMatchers.argThat(offsetsArgMatcher(unassignedTopicPartitions)),
any()
)).thenReturn(new ListOffsetsResult(endOffsets.view.filterKeys(unassignedTopicPartitions.contains).toMap.asJava))
Note the topicPartitionOffsets != null condition. It seems that Mockito call the argument matcher once with null somehow.
Interestingly, the test fails when I use this modified version. I think that this is due to the fact that not all the end offsets are returned all the time now and that committedOffsets must contain all the partitions. See my previous comment about this.
What do you think?
There was a problem hiding this comment.
Run the test defining all the topic-partitoins with some value but failed too.
The stack:
ConsumerGroupServiceTest > testAdminRequestsForDescribeNegativeOffsets() FAILED java.lang.NullPointerException at kafka.admin.ConsumerGroupCommand$ConsumerGroupService.getLogEndOffsets(ConsumerGroupCommand.scala:638) at kafka.admin.ConsumerGroupCommand$ConsumerGroupService.$anonfun$collectGroupsOffsets$2(ConsumerGroupCommand.scala:411) at scala.collection.Iterator$$anon$9.next(Iterator.scala:575) at scala.collection.mutable.Growable.addAll(Growable.scala:62) at scala.collection.mutable.Growable.addAll$(Growable.scala:57) at scala.collection.mutable.HashMap.addAll(HashMap.scala:117) at scala.collection.mutable.HashMap$.from(HashMap.scala:589) at scala.collection.mutable.HashMap$.from(HashMap.scala:582) at scala.collection.MapOps$WithFilter.map(Map.scala:381) at kafka.admin.ConsumerGroupCommand$ConsumerGroupService.collectGroupsOffsets(ConsumerGroupCommand.scala:560) at kafka.admin.ConsumerGroupCommand$ConsumerGroupService.collectGroupOffsets(ConsumerGroupCommand.scala:551) at kafka.admin.ConsumerGroupServiceTest.testAdminRequestsForDescribeNegativeOffsets(ConsumerGroupServiceTest.scala:134)
There was a problem hiding this comment.
Going to do some research, but seems like the problem I had before, the second call to when(..).then(..) doesnt get call, so the admin.listOffsets is call directly with no mock results.
Going to take a look.
There was a problem hiding this comment.
You might need to adjust the expected arguments.
There was a problem hiding this comment.
Actually yes, is because of the fact that not all the end offsets are returned all the time now, but only for unassigned partitions case.
Because the ConsumerGroupCommand defines unassigned topic partition as:
val unassignedPartitions = committedOffsets.filterNot { case (tp, _) => assignedTopicPartitions.contains(tp) }
The definition that I gave to the test is breaking that logic, because if the topic partition is not in the commitedOffsets, by definition shouldn't be on the unassignedPartitions.
The test, as as I primarily defined, is declaring testTopicPartition3 as an unassigned partition, but that partition is not being defined also on the commitedOffsets. So is not respecting the previous statment val unassignedPartitions = committedOffsets.filterNot { case (tp, _) => assignedTopicPartitions.contains(tp) }. That is the reason that the test is failing.
So the test case should be:
val commitedOffsets = Map(
testTopicPartition1 -> new OffsetAndMetadata(100),
testTopicPartition2 -> null,
testTopicPartition3 -> new OffsetAndMetadata(100),
testTopicPartition4 -> new OffsetAndMetadata(100),
testTopicPartition5 -> null,
).asJava
Just the testTopicPartition0 (which is assigned) as non defined, because as I seem there is never going to be a case where there is a non defined unassigned partition, because it is a requirement to be defined on commitedOffsets in order to be consider as unasigned.
On the other way, there could be assigned but not commited partitions.
There was a problem hiding this comment.
Once defined that way, test runs OK.
Makes sense?
There was a problem hiding this comment.
And why the test was passing before, was because of this:
def offsetsArgMatcher: util.Map[TopicPartition, OffsetSpec] = {
val expectedOffsetsUnassignedTopics = commitedOffsets.asScala.filter{ case (tp, _) => unassignedTopicPartitions.contains(tp) }.keySet.map(tp => tp -> OffsetSpec.latest).toMap
val expectedOffsetsAssignedTopics = endOffsets.filter{ case (tp, _) => assignedTopicPartitions.contains(tp) }.keySet.map(tp => tp -> OffsetSpec.latest).toMap
ArgumentMatchers.argThat[util.Map[TopicPartition, OffsetSpec]] { map =>
(map.keySet.asScala == expectedOffsetsUnassignedTopics.keySet || map.keySet.asScala == expectedOffsetsAssignedTopics.keySet) && map.values.asScala.forall(_.isInstanceOf[OffsetSpec.LatestSpec])
}
}
The expectedOffsetsUnassigned was a subset of commitedOffsets (commitedOffsets.asScala.filter{ case (tp, _) => unassignedTopicPartitions.contains(tp) }.keySet.map(tp => tp -> OffsetSpec.latest).toMap)
The expectedOffsetsAssignedTopics was a subset of endOffsets (endOffsets.filter{ case (tp, _) => assignedTopicPartitions.contains(tp) }.keySet.map(tp => tp -> OffsetSpec.latest).toMap)
It was defined that way to preserve the logic of val unassignedPartitions = committedOffsets.filterNot { case (tp, _) => assignedTopicPartitions.contains(tp) }.
But your suggestion of use endOffsets on both as a filter seems cleaner to me. Just needed to adjust the test case.
|
Hi @dajac, |
|
Some compileTestScala Error arised. Looking on it. |
|
Encounter and error on compileTestScala, so change to
|
dajac
left a comment
There was a problem hiding this comment.
Thanks for the update. I left a few minor comments.
|
Thanks @dajac for review and comments! |
dajac
left a comment
There was a problem hiding this comment.
LGTM! @IgnacioAcunaF Thanks for the patch!
|
Thanks @dajac for review and comments. |
… negative offsets while running kafka-consumer-groups.sh (apache#10858) This patch fixes the `ConsumerGroupCommand` to correctly handle missing offsets, which are returned as `null` by the admin API. Reviewers: David Jacot <djacot@confluent.io>

Jira: https://issues.apache.org/jira/browse/KAFKA-12926
Instead of setting "null" to negative offsets partition (as in KAFKA-9507), this PR aims to skip those cases from the returned list, because setting them in "null" can cause java.lang.NullPointerExceptions on downstreams methods that tries to access the data on them, because they are expecting an OffsetAndMetadata and they encouter null values.
For example, at ConsumerGroupCommand.scala at core:
If topicPartition has an negative offset, the committedOffsets.get(topicPartition) is null (as defined on KAFKA-9507), which translates into null.map(_.offset), which will lead to: Error: Executing consumer group command failed due to null
java.lang.NullPointerException
Example:
./kafka-consumer-groups.sh --describe --group order-validationsError: Executing consumer group command failed due to null java.lang.NullPointerException at kafka.admin.ConsumerGroupCommand$ConsumerGroupService.$anonfun$collectGroupsOffsets$6(ConsumerGroupCommand.scala:579) at scala.collection.StrictOptimizedIterableOps.map(StrictOptimizedIterableOps.scala:99) at scala.collection.StrictOptimizedIterableOps.map$(StrictOptimizedIterableOps.scala:86) at scala.collection.convert.JavaCollectionWrappers$JSetWrapper.map(JavaCollectionWrappers.scala:180) at kafka.admin.ConsumerGroupCommand$ConsumerGroupService.$anonfun$collectGroupsOffsets$5(ConsumerGroupCommand.scala:578) at scala.collection.immutable.List.flatMap(List.scala:293) at scala.collection.immutable.List.flatMap(List.scala:79) at kafka.admin.ConsumerGroupCommand$ConsumerGroupService.$anonfun$collectGroupsOffsets$2(ConsumerGroupCommand.scala:574) at scala.collection.Iterator$$anon$9.next(Iterator.scala:575) at scala.collection.mutable.Growable.addAll(Growable.scala:62) at scala.collection.mutable.Growable.addAll$(Growable.scala:59) at scala.collection.mutable.HashMap.addAll(HashMap.scala:117) at scala.collection.mutable.HashMap$.from(HashMap.scala:570) at scala.collection.mutable.HashMap$.from(HashMap.scala:563) at scala.collection.MapOps$WithFilter.map(Map.scala:358) at kafka.admin.ConsumerGroupCommand$ConsumerGroupService.collectGroupsOffsets(ConsumerGroupCommand.scala:569) at kafka.admin.ConsumerGroupCommand$ConsumerGroupService.describeGroups(ConsumerGroupCommand.scala:369) at kafka.admin.ConsumerGroupCommand$.run(ConsumerGroupCommand.scala:76) at kafka.admin.ConsumerGroupCommand$.main(ConsumerGroupCommand.scala:63) at kafka.admin.ConsumerGroupCommand.main(ConsumerGroupCommand.scala)Unit tests added to assert that topics's partitions with an INVALID_OFFSET are not considered on the returned list of the consmer groups's offsets, so the downstream methods receive only valid OffsetAndMetadata information.
Committer Checklist (excluded from commit message)