Skip to content

Kafka 5692 elect preferred#4183

Closed
cmccabe wants to merge 13 commits intoapache:trunkfrom
cmccabe:KAFKA-5692-elect-preferred
Closed

Kafka 5692 elect preferred#4183
cmccabe wants to merge 13 commits intoapache:trunkfrom
cmccabe:KAFKA-5692-elect-preferred

Conversation

@cmccabe
Copy link
Copy Markdown
Contributor

@cmccabe cmccabe commented Nov 6, 2017

No description provided.

tombentley and others added 13 commits October 12, 2017 10:42
…Client

See also KIP-183.

This implements the following algorithm:

1. AdminClient sends ElectPreferredLeadersRequest.
2. KafakApis receives ElectPreferredLeadersRequest and delegates to
   ReplicaManager.electPreferredLeaders()
3. ReplicaManager delegates to KafkaController.electPreferredLeaders()
4. KafkaController adds a PreferredReplicaLeaderElection to the EventManager,
5. ReplicaManager.electPreferredLeaders()'s callback uses the
   delayedElectPreferredReplicasPurgatory to wait for the results of the
   election to appear in the metadata cache. If there are no results
   because of errors, or because the preferred leaders are already leading
   the partitions then a response is returned immediately.

In the EventManager work thread the preferred leader is elected as follows:

1. The EventManager runs PreferredReplicaLeaderElection.process()
2. process() calls KafkaController.onPreferredReplicaElectionWithResults()
3. KafkaController.onPreferredReplicaElectionWithResults()
   calls the PartitionStateMachine.handleStateChangesWithResults() to
   perform the election (asynchronously the PSM will send LeaderAndIsrRequest
   to the new and old leaders and UpdateMetadataRequest to all brokers)
   then invokes the callback.

Note: the change in parameter type for CollectionUtils.groupDataByTopic().
This makes sense because the AdminClient APIs use Collection consistently,
rather than List or Set. If binary compatiblity is a consideration the old
version should be kept, delegating to the new version.

I had to add PartitionStateMachine.handleStateChangesWithResults()
in order to be able to process a set of state changes in the
PartitionStateMachine *and get back individual results*.
At the same time I noticed that all callers of existing handleStateChange()
were destructuring a TopicAndPartition that they already had in order
to call handleStateChange(), and that handleStateChange() immediately
instantiated a new TopicAndPartition. Since TopicAndPartition is immutable
this is pointless, so I refactored it. handleStateChange() also now returns
any exception it caught, which is necessary for handleStateChangesWithResults()
TODO: Add Authorizer test
@asfgit
Copy link
Copy Markdown

asfgit commented Nov 6, 2017

FAILURE
No test results found.
--none--

@asfgit
Copy link
Copy Markdown

asfgit commented Nov 7, 2017

FAILURE
7145 tests run, 1 skipped, 0 failed.
--none--

1 similar comment
@asfgit
Copy link
Copy Markdown

asfgit commented Nov 7, 2017

FAILURE
7145 tests run, 1 skipped, 0 failed.
--none--

@cmccabe cmccabe closed this Mar 4, 2019
@cmccabe
Copy link
Copy Markdown
Contributor Author

cmccabe commented Mar 4, 2019

See #3848

@cmccabe cmccabe deleted the KAFKA-5692-elect-preferred branch March 4, 2019 16:17
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants