Skip to content

KAFKA-9750 Flaky test kafka.server.ReplicaManagerTest.testFencedError…#8344

Closed
chia7712 wants to merge 1 commit intoapache:trunkfrom
chia7712:fix_9750
Closed

KAFKA-9750 Flaky test kafka.server.ReplicaManagerTest.testFencedError…#8344
chia7712 wants to merge 1 commit intoapache:trunkfrom
chia7712:fix_9750

Conversation

@chia7712
Copy link
Copy Markdown
Member

      // change the epoch from 0 to 1 in order to make fenced error
      replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest(1), (_, _) => ())
      TestUtils.waitUntilTrue(() => replicaManager.replicaAlterLogDirsManager.fetcherThreadMap.values.forall(_.partitionCount() == 0),
        s"the partition=$topicPartition should be removed from pending state")

The root cause is race condition. The partition is add to the end instead of being removed if the epoch in ReplicaAlterLogDirsThread is increased. This PR includes following changes.

  1. controls the lock of ReplicaAlterLogDirsThread to make the fenced error happen almost.
  2. wait for the completion of thread

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@chia7712
Copy link
Copy Markdown
Member Author

@jsancio FYI

@chia7712 chia7712 force-pushed the fix_9750 branch 2 times, most recently from bfa84b4 to a4aaa30 Compare March 25, 2020 16:42
@jsancio
Copy link
Copy Markdown
Member

jsancio commented Mar 25, 2020

cc @hachikuji @junrao to help with the review.

Copy link
Copy Markdown
Member

@jsancio jsancio left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the fix.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At a high level, I am concerned with the stability of this test as we make changes to the code in the future. It seems like we are exposing internal concurrency requirements to the tests.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. Let me revert this change.

@chia7712
Copy link
Copy Markdown
Member Author

the build error is traced by #8361

@hachikuji
Copy link
Copy Markdown
Contributor

@chia7712 Will look more carefully later today. Just want to confirm that this is an issue with the test case only?

@chia7712
Copy link
Copy Markdown
Member Author

Just want to confirm that this is an issue with the test case only?

@hachikuji Could you share the log to me?

@chia7712
Copy link
Copy Markdown
Member Author

chia7712 commented Mar 26, 2020

        val futureReplicasAndInitialOffset = new mutable.HashMap[TopicPartition, InitialFetchState]
        for (partition <- updatedPartitions) {
          val topicPartition = partition.topicPartition
          if (logManager.getLog(topicPartition, isFuture = true).isDefined) {
            partition.log.foreach { log =>
              val leader = BrokerEndPoint(config.brokerId, "localhost", -1)

              // Add future replica to partition's map
              partition.createLogIfNotExists(isNew = false, isFutureReplica = true,
                highWatermarkCheckpoints)

              // pause cleaning for partitions that are being moved and start ReplicaAlterDirThread to move
              // replica from source dir to destination dir
              logManager.abortAndPauseCleaning(topicPartition)

              futureReplicasAndInitialOffset.put(topicPartition, InitialFetchState(leader,
                partition.getLeaderEpoch, log.highWatermark))
            }
          }
        }
        replicaAlterLogDirsManager.addFetcherForPartitions(futureReplicasAndInitialOffset)

it seems the second run (created by #becomeLeaderOrFollower) start to run as the first run (created by #alterReplicaLogDirs) is completed. Hence, the future log is removed by first run and then second run crashes. Finally, the assert fails (since the second run crashes so the pending partition exists) and we see the terrible error message.

#8223 introduced this issue since it tries to create an new alter thread (or add partition to existent thread) for the existent partition. Before #8223, it works only for the new partition.

@chia7712
Copy link
Copy Markdown
Member Author

alter_error.txt

Apply the attachment and then the error can happen consistently :(

@chia7712
Copy link
Copy Markdown
Member Author

There are three scenarios when ReplicaManager#becomeLeaderOrFollower is updating epoch and the alter thread is running.

  1. the thread is completed successfully before epoch is updated.
    • no future log
    • no pending partition
    • no failed partition
  2. the fanced error happens before the epoch in alter thread is updated
    • the future log is alive
    • no pending partition
    • one failed partition
  3. the fanced error happens after the epoch in alter thread is updated
    • the future log is alive
    • one pending partition
    • no failed partition

  1. For first caes, we should NOT add new partition to alter thread since the future log is removed.
  2. For second case, we should remove the partition from failed partitions and add it back to pending partitions
  3. For third case, we should just update the epoch of pending partitions.

In short, this PR adds new method (AbstractFetcherThread#updateEpochs) to alter thread to enable us to update epoch of pending thread and resume the thread from fenced error.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

handle the fenced error only

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

update the epoch in alter thread. the update make alter thread aware of new epoch so it is able to keep fetching data with new epoch

@chia7712 chia7712 force-pushed the fix_9750 branch 3 times, most recently from 2166e0a to eda568d Compare April 1, 2020 06:53
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo: "two"

Copy link
Copy Markdown
Member

@mumrah mumrah left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the patch! Only one real question:

Previously, online partitions and new partitions would get added to the updatedPartitions set and later pass their initial fetch state toAbstractFetcherManager#addFetcherForPartitions. Now it seems like only new partitions follow this code path and existing online partitions simply update the epoch in the existing fetcher threads.

I'm wondering if this code still gets run in AbstractFetcherManager#addFetcherForPartitions:

          case Some(f) =>
            f.shutdown()
            addAndStartFetcherThread(brokerAndFetcherId, brokerIdAndFetcherId)

Will new threads still be created in cases where the source broker has moved?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this need to be synchronized here? Shouldn't it only be called from becomeLeaderOrFollower which holds the replicaStateChangeLock?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The other methods used to update inner threads are in synchronized lock so this new method is synchronized also.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Can this be handled with a nested match/case using types? (rather than using getClass)

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure how to match dynamical class type in scala. But I rewrite this match pattern by Errors.forException which converts the exception to Error.

          failedPartitions.exception(tp) match {
            case Some(e) =>
              Errors.forException(e) match {
                case Errors.FENCED_LEADER_EPOCH =>
                  addPartitions(Map(tp -> OffsetAndEpoch(state.initOffset, state.currentLeaderEpoch)))
                  failedPartitions.removeAll(Set(tp))
                case _ =>
                  // the exception is NOT caused by inconsistent epoch so we can't resume the error
              }
            case None =>
              Option(partitionStates.stateValue(tp)).map(_.copy(currentLeaderEpoch = state.currentLeaderEpoch))
                .foreach(partitionStates.updateAndMoveToEnd(tp, _))
          }

@chia7712
Copy link
Copy Markdown
Member Author

chia7712 commented Apr 2, 2020

Will new threads still be created in cases where the source broker has moved?

IIRC, the source folder and target folder must be on the same broker when altering a replica folder. The case you mentioned should not happen on ReplicaAlterLogDirsManager.

@hachikuji
Copy link
Copy Markdown
Contributor

hachikuji commented Apr 2, 2020

@chia7712 @mumrah I wanted to suggest an alternative fix: hachikuji@20a9c6a. As I understand the problem, there is a race between adding the partition to the alter log dir fetcher on receiving the LeaderAndIsr request and the completion of the log dir reassignment which removes the partition directly from the fetcher thread. It seems like it might be a little simpler to check at the time the partition is added whether the future log dir still exists. Does this miss any cases?

@hachikuji
Copy link
Copy Markdown
Contributor

I turned this into a separate PR: #8412. I think this patch is simpler and less risky for 2.5, but let me know if I have missed anything.

@chia7712
Copy link
Copy Markdown
Member Author

chia7712 commented Apr 3, 2020

@hachikuji I have left some comment on you PR. I will close this PR later :)

@chia7712 chia7712 closed this Apr 3, 2020
@chia7712 chia7712 deleted the fix_9750 branch March 25, 2024 15:23
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants