Skip to content

KAFKA-7786: Ignore OffsetsForLeaderEpoch response if leader epoch changed while request in flight#6101

Merged
hachikuji merged 5 commits intoapache:trunkfrom
apovzner:kafka-7786
Jan 9, 2019
Merged

KAFKA-7786: Ignore OffsetsForLeaderEpoch response if leader epoch changed while request in flight#6101
hachikuji merged 5 commits intoapache:trunkfrom
apovzner:kafka-7786

Conversation

@apovzner
Copy link
Copy Markdown
Contributor

@apovzner apovzner commented Jan 7, 2019

There is a race condition in ReplicaFetcherThread, where we can update PartitionFetchState with the new leader epoch (same leader) before handling the OffsetsForLeaderEpoch response with FENCED_LEADER_EPOCH error which causes removing partition from partitionStates, which in turn causes no fetching until the next LeaderAndIsr.

Our system test kafkatest.tests.core.reassign_partitions_test.ReassignPartitionsTest.test_reassign_partitions.bounce_brokers=True.reassign_from_offset_zero=True failed 3 times due to this error in the last couple of months. Since this test is already able to test this condition, not adding any more tests.

Also added toString() implementation to PartitionData, because some log messages did not show useful info which I found while investigating the above system test failure.

cc @hachikuji who suggested the fix.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@ijuma
Copy link
Copy Markdown
Member

ijuma commented Jan 7, 2019

It would be good to have a unit/integration test as well.

Copy link
Copy Markdown
Contributor

@hachikuji hachikuji left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the patch. If we can hit the case in a unit test, that is probably sufficient. It may be possible to do something through MockFetcherThread by letting it block in the call to fetchEpochsFromLeader.

//Check no leadership and no leader epoch changes happened whilst we were unlocked, fetching epochs
val leaderEpochs = fetchedEpochs.filter { case (tp, _) =>
val curPartitionState = partitionStates.stateValue(tp)
val leaderEpochInRequest = epochRequests.get(tp).get.currentLeaderEpoch.get
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps no harm being a little more defensive here. At least perhaps we can ensure tp is contained in epochRequests?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But we still throw an exception, right? I guess it's better to throw IllegalStateException with a descriptive message vs. NPE.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or we can log a warning and ignore that partition. Well, perhaps an exception with a nice message is preferable since this would suggest the remote broker gave us data for a partition that we didn't request.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, generally Option.get should never be used in favour of a more descriptive error.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, generally Option.get should never be used in favour of a more descriptive error.

Copy link
Copy Markdown
Contributor

@hachikuji hachikuji left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, the fix looks good. Just a couple small comments.

import kafka.log.LogAppendInfo
import kafka.message.NoCompressionCodec
import kafka.server.AbstractFetcherThread.ResultWithPartitions
import kafka.server.PartitionFetchState
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: unneeded import

val curPartitionState = partitionStates.stateValue(tp)
val leaderEpochInRequest = epochRequests.get(tp) match {
case Some(request) => request.currentLeaderEpoch.get
case _ =>
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: case None since there are no other alternatives


val leaderLog = Seq(
mkBatch(baseOffset = 0, leaderEpoch = 0, new SimpleRecord("c".getBytes)))
val leaderState = MockFetcherThread.PartitionState(leaderLog, leaderEpoch = 1, highWatermark = 0L)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case, the leader has updated the epoch before the follower and sends back the fenced error. It is also possible that the leader is still on the old epoch and returns a valid response (which we should also ignore). Is it worthwhile having a separate test for that case?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, I agree it's worthwhile having a separate test like that

Copy link
Copy Markdown
Contributor

@hachikuji hachikuji left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Thanks for the patch!

val mockNetwork = new ReplicaFetcherMockBlockingSend(offsetsReply, brokerEndPoint, new SystemTime())
val thread = new ReplicaFetcherThread("bob", 0, brokerEndPoint, configs(0), replicaManager, new Metrics(), new SystemTime(), quota, Some(mockNetwork))
thread.addPartitions(Map(t1p0 -> offsetAndEpoch(0L), t2p1 -> offsetAndEpoch(0L)))
thread.addPartitions(Map(t1p0 -> offsetAndEpoch(0L), t1p1 -> offsetAndEpoch(0L)))
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice catch.

@apovzner
Copy link
Copy Markdown
Contributor Author

apovzner commented Jan 8, 2019

@hachikuji thanks for the review. PR builder
JDK 11 failed consistently on each of 3 PR builds due to ReplicaManagerTest.testBecomeFollowerWhenLeaderIsUnchangedButMissedLeaderUpdate. So, most likely related to this PR. Looks like an issue how we do mocking (still investigating, since it does not fail locally for me). Let me add a commit to fix this once I find the issue.

@apovzner
Copy link
Copy Markdown
Contributor Author

apovzner commented Jan 9, 2019

@hachikuji I fixed the ReplicaManagerTest, where the bug in the test caused exactly the race condition this PR fixed (but not intentionally, hence the failure) and the test itself had a race condition, that's why it did not fail 100% of the times.

Most recent build failures are unrelated to this PR:
JDK 8:
kafka.api.AdminClientIntegrationTest.testForceClose
org.junit.runners.model.TestTimedOutException: test timed out after 120000 milliseconds

JDK 11:

  1. kafka.api.ConsumerBounceTest.testCloseDuringRebalance
    org.apache.kafka.common.KafkaException: Socket server failed to bind to localhost:43476: Address already in use.
  2. org.apache.kafka.streams.KafkaStreamsTest.statefulTopologyShouldCreateStateDirectory
    java.io.UncheckedIOException: java.nio.file.NoSuchFileException: /tmp/kafka-9Bd3Q/appId/1_0/rocksdb/statefulTopologyShouldCreateStateDirectory-counts/MANIFEST-000001

Copy link
Copy Markdown
Contributor

@hachikuji hachikuji left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@hachikuji hachikuji merged commit b2b79c4 into apache:trunk Jan 9, 2019
hachikuji pushed a commit that referenced this pull request Jan 9, 2019
…ile request in flight (#6101)

There is a race condition in ReplicaFetcherThread, where we can update PartitionFetchState with the new leader epoch (same leader) before handling the OffsetsForLeaderEpoch response with FENCED_LEADER_EPOCH error which causes removing partition from partitionStates, which in turn causes no fetching until the next LeaderAndIsr. 

This patch adds logic to ensure that the leader epoch doesn't change while an OffsetsForLeaderEpoch request is in flight (which could happen with back-to-back leader elections). If it has changed, we ignore the response.

Also added toString() implementation to PartitionData, because some log messages did not show useful info which I found while investigating the above system test failure.

Reviewers: Ismael Juma <ismael@juma.me.uk>, Jason Gustafson <jason@confluent.io>
pengxiaolong pushed a commit to pengxiaolong/kafka that referenced this pull request Jun 14, 2019
…ile request in flight (apache#6101)

There is a race condition in ReplicaFetcherThread, where we can update PartitionFetchState with the new leader epoch (same leader) before handling the OffsetsForLeaderEpoch response with FENCED_LEADER_EPOCH error which causes removing partition from partitionStates, which in turn causes no fetching until the next LeaderAndIsr. 

This patch adds logic to ensure that the leader epoch doesn't change while an OffsetsForLeaderEpoch request is in flight (which could happen with back-to-back leader elections). If it has changed, we ignore the response.

Also added toString() implementation to PartitionData, because some log messages did not show useful info which I found while investigating the above system test failure.

Reviewers: Ismael Juma <ismael@juma.me.uk>, Jason Gustafson <jason@confluent.io>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants