KAFKA-7962: Avoid NPE for StickyAssignor#6308
Conversation
|
@vahidhashemian Please review this patch. Thanks. |
vahidhashemian
left a comment
There was a problem hiding this comment.
Thanks for the PR @huxihx. Left a small comment.
There was a problem hiding this comment.
Wouldn't it be simpler and more readable to encompass the for loop in an if block that verifies the topic exists in partitionsPerTopic?
vahidhashemian
left a comment
There was a problem hiding this comment.
Thanks for addressing my comment, and sorry for the delay. It would be great if we can have another unit test as commented inline.
There was a problem hiding this comment.
Could you please also add a unit test that covers this scenario? If I'm not mistaken col will be empty when either the list of partitions or the list of consumers passes to the assigner is empty.
There was a problem hiding this comment.
nit: You can use the topic(...) helper function instead of Arrays.asList(...). See examples in other unit tests.
…ments if topic is deleted https://issues.apache.org/jira/browse/KAFKA-7962 Consumer using StickyAssignor throws NullPointerException if a subscribed topic was removed.
|
retest this please |
vahidhashemian
left a comment
There was a problem hiding this comment.
Thank you very much for addressing my comments. I believe the failed unit test is unrelated to this PR. LGTM.
* AK/trunk: (36 commits) KAFKA-7962: Avoid NPE for StickyAssignor (apache#6308) Address flakiness of CustomQuotaCallbackTest#testCustomQuotaCallback (apache#6330) KAFKA-7918: Inline generic parameters Pt. II: RocksDB Bytes Store and Memory LRU Caches (apache#6327) MINOR: fix parameter naming (apache#6316) KAFKA-7956 In ShutdownableThread, immediately complete the shutdown if the thread has not been started (apache#6218) MINOR: Refactor replica log dir fetching for improved logging (apache#6313) [TRIVIAL] Remove unused StreamsGraphNode#repartitionRequired (apache#6227) MINOR: Increase produce timeout to 120 seconds (apache#6326) KAFKA-7918: Inline generic parameters Pt. I: in-memory key-value store (apache#6293) MINOR: Fix line break issue in upgrade notes (apache#6320) KAFKA-7972: Use automatic RPC generation in SaslHandshake MINOR: Enable capture of full stack trace in StreamTask#process (apache#6310) KAFKA-7938: Fix test flakiness in DeleteConsumerGroupsTest (apache#6312) KAFKA-7937: Fix Flaky Test ResetConsumerGroupOffsetTest.testResetOffsetsNotExistingGroup (apache#6311) MINOR: Update docs to say 2.2 (apache#6315) KAFKA-7672 : force write checkpoint during StreamTask #suspend (apache#6115) KAFKA-7961; Ignore assignment for un-subscribed partitions (apache#6304) KAFKA-7672: Restoring tasks need to be closed upon task suspension (apache#6113) KAFKA-7864; validate partitions are 0-based (apache#6246) KAFKA-7492 : Updated javadocs for aggregate and reduce methods returning null behavior. (apache#6285) ...
* KAFKA-7962: StickyAssignor: throws NullPointerException during assignments if topic is deleted https://issues.apache.org/jira/browse/KAFKA-7962 Consumer using StickyAssignor throws NullPointerException if a subscribed topic was removed. * addressed vahidhashemian's comments * lower NPath Complexity * added a unit test
https://issues.apache.org/jira/browse/KAFKA-7962
Consumer using StickyAssignor throws NullPointerException if a subscribed topic was removed.
More detailed description of your change,
if necessary. The PR title and PR message become
the squashed commit message, so use a separate
comment to ping reviewers.
Summary of testing strategy (including rationale)
for the feature or bug fix. Unit and/or integration
tests are expected for any behaviour change and
system tests should be considered for larger changes.
Committer Checklist (excluded from commit message)