Skip to content

MINOR: Refactor controller partition reassignment logic into separate class#7339

Closed
stanislavkozlovski wants to merge 6 commits intoapache:trunkfrom
stanislavkozlovski:reassignments-refactor
Closed

MINOR: Refactor controller partition reassignment logic into separate class#7339
stanislavkozlovski wants to merge 6 commits intoapache:trunkfrom
stanislavkozlovski:reassignments-refactor

Conversation

@stanislavkozlovski
Copy link
Copy Markdown
Contributor

This patch adds a ReassignmentManager class which encapsulates most of the nitty-gritty details of reassigning a partition. Splitting the logic helps with testability and this patch leverages that to add unit tests for partition reassignments

@stanislavkozlovski
Copy link
Copy Markdown
Contributor Author

stanislavkozlovski commented Sep 16, 2019

Of note to reviewers:

  • Most of the logic is copy-pasted unless explicitly mentioned
  • The reassignment logic used both controllerContext.epoch and KafkaController#epoch. I decided to continue with controllerContext#epoch
  • KafkaController#sendUpdateMetadata() is used within KafkaController for separate functionality. I duplicated its 3-line logic in onPartitionReassignment()'s B7 step
  • Modified the private methods used by onPartitionReassignment to bubble up IllegalStateExceptions, which are now handled by the caller of onPartitionReassignment, due to KafkaController's handleIllegalState(e) method (effect should be the same)
  • Any feedback, however nitty, on how to make the new tests more readable, simpler and better is greatly appreciated since any subsequent tests would likely copy from the existing ones

@stanislavkozlovski stanislavkozlovski force-pushed the reassignments-refactor branch 2 times, most recently from 3ef4dc5 to a05b75c Compare September 16, 2019 18:06
@stanislavkozlovski
Copy link
Copy Markdown
Contributor Author

JDK 8 passed, JDK 11 both were grey builds with Task :core:integrationTest FAILED in the logs. Couldn't diagnose properly. Let me run locally

retest this please

@stanislavkozlovski
Copy link
Copy Markdown
Contributor Author

I'm having trouble running locally. I have only verified the ReassignPartitionsClusterTest and DeleteTopicsTest locally

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should be using mockito for new tests.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the use of Nil: _* as a second argument is due to this being unable to compile otherwise in Scala, due to its auto-tupling feature (article explaining it).
There are some workarounds like adding a helper method or using mockito-scala. I opted for repeating the argument for the time being

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

moved all these instantiations below the handlers such that we can have ReassignmentManager reference a handler

@stanislavkozlovski stanislavkozlovski marked this pull request as ready for review September 20, 2019 11:54
@stanislavkozlovski
Copy link
Copy Markdown
Contributor Author

JDK 11/2.13 failed although it expired

JDK 11 and Scala 2.13 — FAILURE 10547 tests run, 61 skipped, 0 failed.

JDK 11/8 passed

JDK 11 and Scala 2.12  SUCCESS 12040 tests run, 81 skipped, 0 failed.
JDK 8 and Scala 2.11  SUCCESS 12040 tests run, 81 skipped, 0 failed.

@stanislavkozlovski
Copy link
Copy Markdown
Contributor Author

retest this please

@stanislavkozlovski
Copy link
Copy Markdown
Contributor Author

JDK 11 / Scala 2.13 - kafka.api.SaslSslAdminClientIntegrationTest.testAlterReplicaLogDirs - known flake https://issues.apache.org/jira/browse/KAFKA-6331

JDK 8 / Scala 2.11 - kafka.api.SaslSslAdminClientIntegrationTest.testCreateTopicsResponseMetadataAndConfig

java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server does not host this topic-partition.
	at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
	at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
	at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
	at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
	at kafka.api.SaslSslAdminClientIntegrationTest.testCreateTopicsResponseMetadataAndConfig(SaslSslAdminClientIntegrationTest.scala:452)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

Seems like a flake. Created https://issues.apache.org/jira/browse/KAFKA-8967

@stanislavkozlovski
Copy link
Copy Markdown
Contributor Author

retest this please

Copy link
Copy Markdown
Contributor

@viktorsomogyi viktorsomogyi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey, this is great stuff, I was also looking at this class (and also KafkaApis) that we could tear apart. There it would be better maybe to separate based on API calls but that's for another evenings.

So here are a few questions/suggestions:

  • Is there any reason passing the epoch as a function or is it just preference? Can't you just use the one in controllerContext as both seem to do the same thing?
  • For sendUpdateMetadata I think we could pass the method itself to the ReassignmentHelper. Even though it's a 3liner, I wouldn't copy-paste it.
  • For tests I think we might be able to use the parameterized unit tests if the goal is to test the ReassignmentHelper (one example I did recently is https://github.com/apache/kafka/pull/7361/files#diff-3e5b61802d5dae0d374bf75f6c06a10a)

@stanislavkozlovski
Copy link
Copy Markdown
Contributor Author

stanislavkozlovski commented Oct 11, 2019

Is there any reason passing the epoch as a function or is it just preference? Can't you just use the one in controllerContext as both seem to do the same thing?

I had said

The reassignment logic used both controllerContext.epoch and KafkaController#epoch. I decided to continue with KafkaController#epoch so I sent the function as an argument

We can use the same epoch, I just wanted to maintain the code as similar as possible.

For sendUpdateMetadata I think we could pass the method itself to the ReassignmentHelper. Even though it's a 3liner, I wouldn't copy-paste it.

That was my initial approach, in the end I went without it due to its simplicity. Passing a method looked weirder to me. I don't have a strong opinion

For tests I think we might be able to use the parameterized unit tests if the goal is to test the ReassignmentHelper

Thanks! That is a good example. I would prefer we defer this to another PR

@stanislavkozlovski
Copy link
Copy Markdown
Contributor Author

stanislavkozlovski commented Oct 11, 2019

Rebased with c620b73

cc @hachikuji @cmccabe for a second round

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't really like passing in the eventManager here. It is used in

  1. when we remove the /reassign_partitions znode
  2. when we register a new ISR ZNodeChangeHandler

We could circumvent this by passing in two methods onZNodeDeletion and onReassignmentStart depending on people's thoughts

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the your idea of adding callbacks. I think we can probably turn it into a Listener or something. For example:

trait ReassignmentListener {
  def onReassignmentUpdated // invoked in `updateCurrentReassignment`
  def onReassignmentResumed // invoked at the start of `onPartitionReassignment`
  def onReassignmentFinished // invoked at the end of `onPartitionReassignment` (case B)
}

Using this approach, we can probably also get rid of the dependence on TopicDeletionManager.

stanislavkozlovski and others added 2 commits November 14, 2019 17:00
… class

This patch adds a ReassignmentManager class which encapsulates most of the nitty-gritty details of reassigning a partition. Splitting the logic helps with testability and this patch leverages that to add unit tests for partition reassignments
Copy link
Copy Markdown
Contributor

@hachikuji hachikuji left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the refactor. Left a few comments.

* A helper class which contains logic for driving partition reassignments.
* This class is not thread-safe.
*/
class ReassignmentsManager(controllerContext: ControllerContext,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: maybe just ReassignmentManager. More in line with classes like ReplicaManager.

!zkPartitionsResumed.contains(tp)
}
} catch {
case e: IllegalStateException => handleIllegalState(e)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Prior to this patch, handleIllegalState is only protecting calls to sendRequestsToBrokers which re-throws all unexpected exceptions as IllegalStateException. If we want this to be useful here, we should do the same.

However, to be honest, I am not sure when it makes sense to force resignation of the current controller. The current protection only for sendRequestsToBrokers seems arbitrary. I'm inclined to say it should be rare though which makes me doubt the changes here. We need to be sure that the next controller will actually be able to recover. In some cases, it seems like it would clearly be preferable to just let the current operation fail and go on to the next event.

Can we leave behavioral changes like this out of this PR since the focus here is improving testability? I'd prefer to try and come up with a principled approach to handling unexpected errors in the controller.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did this because maybeResumeReassignments calls onPartitionReassignment which calls sendRequestsToBrokers in phase B or in phase A's updateLeaderEpochAndSendRequest call.

I wanted to ensure that any errors there are caught, otherwise this patch would again change the behavior in it being propagated. If we don't catch this, it is again a behavioral change.

Is there concern that other methods may raise an IllegalStateException?
Perhaps we can re-throw the sendRequestsToBrokers' exception to something else and catch that only?

Copy link
Copy Markdown
Contributor

@hachikuji hachikuji Nov 27, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, my concern was that something else might raise IllegalStateException. I am actually a bit tempted to get rid of handleIllegalState altogether. It just seems so arbitrary. Looking through the code, I cannot see the specific case we're trying to protect. The call to sendRequestsToBrokers just builds the requests and puts them in a queue. Perhaps I'm missing something?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Copy Markdown
Contributor

@hachikuji hachikuji Nov 27, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @ijuma. That helps. It looks like it was specifically trying to protect the validation we do in newBatch(), but the root cause of the reported issue was evidently unknown. And neither have I heard of any recurrence of it. So my first inclination after seeing this is to also get rid of newBatch along with the logic to resign the controller (which seems like massive overkill). Will look more carefully tomorrow if I get a chance.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the your idea of adding callbacks. I think we can probably turn it into a Listener or something. For example:

trait ReassignmentListener {
  def onReassignmentUpdated // invoked in `updateCurrentReassignment`
  def onReassignmentResumed // invoked at the start of `onPartitionReassignment`
  def onReassignmentFinished // invoked at the end of `onPartitionReassignment` (case B)
}

Using this approach, we can probably also get rid of the dependence on TopicDeletionManager.

}

/**
* Phase B of a partition reassignment is the part where all the new replicas are in ISR
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this duplication necessary? It seems likely to diverge over time.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Theoretically, if we change the code these tests would fail and this gets updated but I can see how it's likely to get missed. Let's keep the comments in between the test code though

mockTopicDeletionManager = Mockito.mock(classOf[TopicDeletionManager])
mockControllerBrokerRequestBatch = Mockito.mock(classOf[ControllerBrokerRequestBatch])
mockReplicaStateMachine = Mockito.mock(classOf[ReplicaStateMachine])
mockPartitionStateMachine = Mockito.mock(classOf[PartitionStateMachine])
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd suggest using MockPartitionStateMachine and MockReplicaStateMachine to simplify these test cases.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense on the MockPartitionStateMachine. With the replica state machine, isn't it more useful to keep it an EasyMock for now since we only use it for the assertion we have in A2 of testPhaseAOfPartitionReassignment ?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The nice thing that comes from MockReplicaStateMachine is validation of the state changes.

new ControllerBrokerRequestBatch(config, controllerChannelManager, eventManager, controllerContext, stateChangeLogger))
val topicDeletionManager = new TopicDeletionManager(config, controllerContext, replicaStateMachine,
partitionStateMachine, new ControllerDeletionClient(this, zkClient))
val reassignmentsManager = new ReassignmentsManager(controllerContext, zkClient, topicDeletionManager,
Copy link
Copy Markdown
Contributor

@hachikuji hachikuji Nov 21, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

High level, I think this refactor is a good improvement. It is similar to the refactor that introduced TopicDeletionManager and it does make testing a bit easier. That said, I want to mention a kind of drawback. Although it succeeds in factoring out some of the logic out of KafkaController, it doesn't really do anything about the complex interdependencies between the various components and the fact that any one of them can mutate the controller state. And in fact, it makes it a little harder to track all these mutations because they are spread over more classes.

I think it would be a useful exercise to try and think about some of these components in more of a functional way. Rather than allowing the reassignment manager to directly mutate any and all state that the controller owns, perhaps we can treat it more like a function which accepts the current state of the world, makes some modifications, and then returns the proposed new state of the world. Then it could be up to the controller to decide how to enact the new state (e.g. by making changes in ZK and sending UpdateMetadata requests). The nice thing then is that we don't need all the dependencies and all the nasty mocking that comes with them.

Anyway, this is more of a "food for thought" comment. I'm not exactly sure how to do this myself.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do agree it does not fix the underlying issue of complexity and I agree it makes the mutation changes harder to track.

If we are to start refactoring towards a more functional approach, I think it would be easier to start with the lowest level classes which still mutate the state - the state machines.
Otherwise it'd be pretty difficult to make the ReassignmentManager functional when components it uses mutate the state underneath

zkClient.unregisterZNodeChangeHandler(path)
if (deletedZNode) {
// Ensure we detect future reassignments
eventManager.put(ZkPartitionReassignment)
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we can just call

isActive && zkClient.registerZNodeChangeHandlerAndCheckExistence(partitionReassignmentHandler))

to outright register the change handler here?

@stanislavkozlovski
Copy link
Copy Markdown
Contributor Author

retest this please

}
val reassignmentsManager = new ReassignmentManager(controllerContext, zkClient, reassignmentListener,
replicaStateMachine, partitionStateMachine, brokerRequestBatch, stateChangeLogger,
shouldSkipReassignment = tp => {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had one suggestion which you are free to reject. We have this kind of awkward back and forth between the controller and the manager when a reassignment is triggered:

  1. Controller detects reassignment
  2. Controller delegates trigger to Manager
  3. Manager asks Controller if reassignment is allowed
  4. Manager executes reassignment

I am wondering if it would be better to leave all of the trigger logic inside the controller and only delegate to the manager after step 3. In other words, perhaps maybeTriggerPartitionReassignment can be left inside the controller. What do you think?

@github-actions
Copy link
Copy Markdown

This PR is being marked as stale since it has not had any activity in 90 days. If you
would like to keep this PR alive, please leave a comment asking for a review. If the PR has
merge conflicts, update it with the latest from the base branch.

If you are having difficulty finding a reviewer, please reach out on the [mailing list](https://kafka.apache.org/contact).

If this PR is no longer valid or desired, please feel free to close it. If no activity occurs in the next 30 days, it will be automatically closed.

@github-actions github-actions Bot added the stale Stale PRs label Nov 21, 2024
@github-actions
Copy link
Copy Markdown

This PR has been closed since it has not had any activity in 120 days. If you feel like this
was a mistake, or you would like to continue working on it, please feel free to re-open the
PR and ask for a review.

@github-actions github-actions Bot added the closed-stale PRs that were closed due to inactivity label Dec 23, 2024
@github-actions github-actions Bot closed this Dec 23, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

closed-stale PRs that were closed due to inactivity stale Stale PRs

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants