KAFKA-8012: Ensure partitionStates have not been removed before truncating.#6333
Conversation
stanislavkozlovski
left a comment
There was a problem hiding this comment.
Seems to be a regression from d152989
Nice find, LGTM!
| info(s"Failed to truncate $tp", e) | ||
| partitionsWithError += tp | ||
| val partitionState = partitionStates.stateValue(tp) | ||
| if (partitionState != null) { |
There was a problem hiding this comment.
nit: Looking at the usage in the file, I think we would prefer:
Option(partitionStates.stateValue(topicPartition)).foreach { currentFetchState =
...
instead of the null check
| } | ||
|
|
||
| @Test | ||
| def testTruncateToHighWatermarkDuringRemovePartitions(): Unit = { |
There was a problem hiding this comment.
Thanks for the test case. Maybe we can add a similar test case for the truncateToEpochEndOffsets path? I think testLeaderEpochChangeDuringFetchEpochsFromLeader has most of what we want, but we don't need to put the partition back after removing it.
|
@hachikuji @stanislavkozlovski thanks again for the reviews. I added a test to exercise the |
hachikuji
left a comment
There was a problem hiding this comment.
LGTM. Thanks for the patch!
|
retest this please |
…ating. (#6333) This patch fixes a regression in the replica fetcher which occurs when the replica fetcher manager simultaneously calls `removeFetcherForPartitions`, removing the corresponding partitionStates, while a replica fetcher thread attempts to truncate the same partition(s) in `truncateToHighWatermark`. This causes an NPE which causes the fetcher to crash. This change simply checks that the `partitionState` is not null first. Note that a similar guard exists in `truncateToEpochEndOffsets`. Reviewers: Stanislav Kozlovski <stanislav_kozlovski@outlook.com>, Jason Gustafson <jason@confluent.io>
…ating. (#6333) This patch fixes a regression in the replica fetcher which occurs when the replica fetcher manager simultaneously calls `removeFetcherForPartitions`, removing the corresponding partitionStates, while a replica fetcher thread attempts to truncate the same partition(s) in `truncateToHighWatermark`. This causes an NPE which causes the fetcher to crash. This change simply checks that the `partitionState` is not null first. Note that a similar guard exists in `truncateToEpochEndOffsets`. Reviewers: Stanislav Kozlovski <stanislav_kozlovski@outlook.com>, Jason Gustafson <jason@confluent.io>
* apache/trunk: KAFKA-7880:Naming worker thread by task id (apache#6275) improve some logging statements (apache#6078) KAFKA-7312: Change broker port used in testMinimumRequestTimeouts and testForceClose KAFKA-7997: Use automatic RPC generation in SaslAuthenticate KAFKA-8002; Log dir reassignment stalls if future replica has different segment base offset (apache#6346) KAFKA-3522: Add TimestampedKeyValueStore builder/runtime classes (apache#6152) HOTFIX: add igore import to streams_upgrade_test MINOR: ConsumerNetworkClient does not need to send the remaining requests when the node is not ready (apache#6264) KAFKA-7922: Return authorized operations in describe consumer group responses (KIP-430 Part-1) KAFKA-7918: Inline generic parameters Pt. III: in-memory window store (apache#6328) KAFKA-8012; Ensure partitionStates have not been removed before truncating. (apache#6333) KAFKA-8011: Fix for race condition causing concurrent modification exception (apache#6338) KAFKA-7912: Support concurrent access in InMemoryKeyValueStore (apache#6336) MINOR: Skip quota check when replica is in sync (apache#6344) HOTFIX: Change header back to http instead of https to path license header test (apache#6347) MINOR: fix release.py script (apache#6317) MINOR: Remove types from caching stores (apache#6331) MINOR: Improve logging for alter log dirs (apache#6302) MINOR: state.cleanup.delay.ms default is 600,000 ms (10 minutes). (apache#6345) MINOR: disable Streams system test for broker upgrade/downgrade (apache#6341)
…ating. (apache#6333) This patch fixes a regression in the replica fetcher which occurs when the replica fetcher manager simultaneously calls `removeFetcherForPartitions`, removing the corresponding partitionStates, while a replica fetcher thread attempts to truncate the same partition(s) in `truncateToHighWatermark`. This causes an NPE which causes the fetcher to crash. This change simply checks that the `partitionState` is not null first. Note that a similar guard exists in `truncateToEpochEndOffsets`. Reviewers: Stanislav Kozlovski <stanislav_kozlovski@outlook.com>, Jason Gustafson <jason@confluent.io>
When reproducing the flakiness of
DynamicBrokerReconfigurationTest#testThreadPoolResize(KAFKA-7988) on my machine, I saw that failures correspond to an NPE in one or more replica fetcher threads. This happens when the replica fetcher manager simultaneously callsremoveFetcherForPartitions, removing the corresponding partitionStates, while a replica fetcher thread attempts to truncate the same partition(s) intruncateToHighWatermark.This change simply checks that the
partitionStateis not null first. Note that a similar guard exists intruncateToEpochEndOffsets.Committer Checklist (excluded from commit message)