Skip to content

KAFKA-5692: Change PreferredReplicaLeaderElectionCommand to use Admin…#3848

Merged
junrao merged 22 commits intoapache:trunkfrom
tombentley:KAFKA-5692-elect-preferred
Jan 25, 2019
Merged

KAFKA-5692: Change PreferredReplicaLeaderElectionCommand to use Admin…#3848
junrao merged 22 commits intoapache:trunkfrom
tombentley:KAFKA-5692-elect-preferred

Conversation

@tombentley
Copy link
Copy Markdown
Member

…Client

See also KIP-183.

The contribution is my original work and I license the work to the project under the project's open source license.

This implements the following algorithm:

  1. AdminClient sends ElectPreferredLeadersRequest.
  2. KafakApis receives ElectPreferredLeadersRequest and delegates to
    ReplicaManager.electPreferredLeaders()
  3. ReplicaManager delegates to KafkaController.electPreferredLeaders()
  4. KafkaController adds a PreferredReplicaLeaderElection to the EventManager,
  5. ReplicaManager.electPreferredLeaders()'s callback uses the
    delayedElectPreferredReplicasPurgatory to wait for the results of the
    election to appear in the metadata cache. If there are no results
    because of errors, or because the preferred leaders are already leading
    the partitions then a response is returned immediately.

In the EventManager work thread the preferred leader is elected as follows:

  1. The EventManager runs PreferredReplicaLeaderElection.process()
  2. process() calls KafkaController.onPreferredReplicaElectionWithResults()
  3. KafkaController.onPreferredReplicaElectionWithResults()
    calls the PartitionStateMachine.handleStateChangesWithResults() to
    perform the election (asynchronously the PSM will send LeaderAndIsrRequest
    to the new and old leaders and UpdateMetadataRequest to all brokers)
    then invokes the callback.

Note: the change in parameter type for CollectionUtils.groupDataByTopic().
This makes sense because the AdminClient APIs use Collection consistently,
rather than List or Set. If binary compatiblity is a consideration the old
version should be kept, delegating to the new version.

I had to add PartitionStateMachine.handleStateChangesWithResults()
in order to be able to process a set of state changes in the
PartitionStateMachine and get back individual results.
At the same time I noticed that all callers of existing handleStateChange()
were destructuring a TopicAndPartition that they already had in order
to call handleStateChange(), and that handleStateChange() immediately
instantiated a new TopicAndPartition. Since TopicAndPartition is immutable
this is pointless, so I refactored it. handleStateChange() also now returns
any exception it caught, which is necessary for handleStateChangesWithResults()

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is backed by List.
Change variable name accordingly.

@tombentley tombentley force-pushed the KAFKA-5692-elect-preferred branch 4 times, most recently from 3086bf3 to a693d47 Compare September 19, 2017 09:57
@tombentley
Copy link
Copy Markdown
Member Author

@ijuma I've rebased this PR for the Protocol refactoring.

@tombentley tombentley force-pushed the KAFKA-5692-elect-preferred branch from a693d47 to 3e9408d Compare September 28, 2017 14:40
@tombentley tombentley force-pushed the KAFKA-5692-elect-preferred branch from 3e9408d to 64fcbfc Compare October 12, 2017 11:58
@tombentley
Copy link
Copy Markdown
Member Author

Rebased for merge conflict. Please could you review this when you have time @ijuma?

@ijuma
Copy link
Copy Markdown
Member

ijuma commented Oct 27, 2017

@cmccabe can you please review this (particularly the AdminClient and protocol parts, I can take a look at the Controller bits).

}

/** Return a new future that has completed exceptionally */
private static <T> KafkaFuture<T> exceptionalFuture(Throwable e) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a good utility function. It makes sense to add this to function KafkaFuture.java, right? This is similar to KafkaFuture#completedFuture, which returns a future completed with the given result.

public KafkaFuture<Void> partitionResult(TopicPartition partition) {
final Map<TopicPartition, ? extends KafkaFuture<Void>> map;
try {
map = futures.get();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are calling KafkaFuture#get here, which is blocking. Don't do that.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cmccabe You're right, but to fix I'll need to implement a KafkaFuture.thenCompose(Function<T, KafkaFuture<R>>) method on KafkaFuture<T>. That seems to be awkward because the API of KafkaFuture doesn't provide a public addWaiter() method, but the Function<A, KafkaFuture<B>> in the API of thenApply() is returning a KafkaFuture, not an Impl. I've pushed WIP patch, so you can see what I mean. Thoughts?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see why you need thenCompose. You already have thenApply, and you can just do something like this:

final KafkaFuture<Map<TopicPartition, Throwable>> partitionFuture = new KafkaFutureImpl<>();
rpcFuture.thenApply(new Function<Map<TopicPartition, Throwable>, Void>() {
  @Override
  Void apply(Map<TopicPartition, Throwable> map) {
    // check map for this partition and ensure there is no error
   if (not found in map) {
     partitionFuture.completeExceptionally(not there);
   else if (there was some error) {
     partitionFuture.completeExceptionally(error);
   } else {
     partitionFuture.complete(null);
   }
  }
});

I am not opposed to adding thenCompose to KafkaFuture, but I just don't think you need it for this.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cmccabe that code makes no sense to me. Is it supposed to be a replacement for what's in ElectPreferredLeadersResult.partitionResult()? What's supposed to happen with your partitionFuture? It gets completed, but doesn't otherwise get used/returned/passed to anything, so nothing can handle or wait on its result. Your apply() lacks a return, if it always returns null then the result of the thenApply() is a Future<Void> that is always successful.

I think we need thenCompose() because, in effect, we want a Function that can return a Future<Void>. thenApply() forces us into returning the actual Void, but then we are having to call get() in the Function, which defeats the point. Or we can use thenApply() and return a Future<Void> from the Function, but then the result of thenApply() is a double Future<Future<Void>>.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cmccabe I need your help to resolve this.

public KafkaFuture<Set<TopicPartition>> partitions() {
final Map<TopicPartition, ? extends KafkaFuture<Void>> map;
try {
map = futures.get();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same issue here.

public KafkaFuture<Void> all() {
final Map<TopicPartition, ? extends KafkaFuture<Void>> map;
try {
map = futures.get();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and here.

TopicPartition partition = entry.getKey();
KafkaFutureImpl<Void> future;
if (knownPartitions) {
future = mapOfFutures.get(partition);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You need to handle the case where there is an incorrect server response and there is no such partition in our request. Right now, your code gets a null pointer error in this case.

future = mapOfFutures.get(partition);
} else {
future = new KafkaFutureImpl<Void>();
mapOfFutures.put(partition, future);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can't modify this map here because it is concurrently accessed by people accessing ElectPreferredLeadersResult.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cmccabe there are two cases here, depending on the state of knownPartitions. We only modify mapOfFutures when knownPartitions == false and in this case it's impossible for anyone to access the map via the ElectPreferredLeadersResult because the "outer" futures won't be completed.

@cmccabe
Copy link
Copy Markdown
Contributor

cmccabe commented Oct 27, 2017

Thanks for the patch, @tombentley. There are some issues here, however. It seems like all the async APIs are actually sync, because the code to retrieve the future is calling Future#get. This obviously defeats the point of an async API.

Another issue I see is that you modify a map in the results object concurrently with users accessing the map. If you want to have your code inserting elements into the map, you need locking to do that.

You also need to handle error conditions like the server sending the wrong thing, etc.

@cmccabe
Copy link
Copy Markdown
Contributor

cmccabe commented Nov 6, 2017

Tom, there is a patch here which shows what I'm talking about: #4183

@tombentley
Copy link
Copy Markdown
Member Author

@cmccabe thanks going to the trouble of pointing out what must have seemed blindingly obvious.

Your patch raises a broader question in my mind: If it's OK to just use a Future<Map<Key, Value>> for this Result class why is that approach also not used in the other result classes, such as CreateTopicsResult, which use a Map<Key, Future<Value>> but likewise get a single response from a broker?

It was the precedents set by the existing Results which set me on the path of using a Map<Key, Future<Value>>. Personally I couldn't see a great benefit in a Map<Key, Future<Value>> when all the results are available at once. I feel l must be missing some subtlety.

@tombentley
Copy link
Copy Markdown
Member Author

@cmccabe a problem with your patch is that it doesn't work when the electionFuture completes abnormally (e.g. the Call times out and the future is completed via handleFailure()). thenApply() only invokes the Function when the receiver future completes normally, so abnormal completion of the electionFuture means the result future is never completed. We could fix this by exposing the BiConsumer and addWaiter(). wdyt?

@tombentley
Copy link
Copy Markdown
Member Author

@cmccabe any thoughts about the preferred way to deal with abnormal completion of then electionFuture?

@cmccabe
Copy link
Copy Markdown
Contributor

cmccabe commented Nov 25, 2017

@tombentley wrote: thanks going to the trouble of pointing out what must have seemed blindingly obvious.

It's OK. It wasn't obvious to me when I started doing async programming. But over time it became easier :)

a problem with your patch is that it doesn't work when the electionFuture completes abnormally (e.g. the Call times out and the future is completed via handleFailure()). thenApply() only invokes the Function when the receiver future completes normally, so abnormal completion of the electionFuture means the result future is never completed. We could fix this by exposing the BiConsumer and addWaiter(). wdyt?

Good catch. I think this shortcoming is why I didn't use this approach for things like createTopics.

@ijuma wrote: any thoughts about the preferred way to deal with abnormal completion of then electionFuture?

Sorry, this got buried in my email box. But I fixed my email settings to make notifications more visible...

I think @tombentley's idea of exposing the BiConsumer as public is the best way to go. We should probably make it a pure interface to make things look nicer in Java 8. (Function should have been a pure interface, too.....)

@tombentley tombentley force-pushed the KAFKA-5692-elect-preferred branch from 3a92833 to 0630c4c Compare December 8, 2017 15:12
@tombentley
Copy link
Copy Markdown
Member Author

Rebased onto current trunk. This PR now assumes that KIP-218 gets accepted and merged, since it uses KafkaFutureImpl.addWaiter() (using a dummy patch in place of the real KIP-218 work). This will probably need to be rebased again soon due to "KAFKA-5647: Use KafkaZkClient in ReassignPartitionsCommand and PreferredReplicaLeaderElectionCommand (#4260)".

@cmccabe
Copy link
Copy Markdown
Contributor

cmccabe commented Dec 12, 2017

Thanks for being patient, @tombentley. We're working on getting the KafkaFuture change(s) in.

@tombentley tombentley force-pushed the KAFKA-5692-elect-preferred branch from 0630c4c to d6c739c Compare March 12, 2018 15:11
@tombentley
Copy link
Copy Markdown
Member Author

@cmccabe, @ijuma I have rebased this onto latest trunk. Would be grateful for review.

@tombentley
Copy link
Copy Markdown
Member Author

@cmccabe, @ijuma is one of you able to review for me? Thanks.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it really necessary to keep around both functions? It seems like this utility function is only supposed to be used by Kafka, not by end-users.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My intention was to retain binary compatibility. Since it's not in an internal package how would an end-user know that they weren't supposed to use this API?

Copy link
Copy Markdown
Contributor

@cmccabe cmccabe Apr 26, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They should know because there isn't an interface annotation that declares it a public API for use outside Kafka.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We publish javadoc for public APIs. Everything else is not public. This is a bit opaque and it would be better to have annotations as @cmccabe suggests.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice to avoid this and similar whitespace changes

@tombentley tombentley force-pushed the KAFKA-5692-elect-preferred branch from a6d1b7a to 2cd37d5 Compare January 21, 2019 14:09
@tombentley
Copy link
Copy Markdown
Member Author

Rebased for trunk conflict. I've also made a stab at invoking the callbacks when clearing the controller queue. I'm also now using the generated protocol messages in the Request and Response.

@junrao @cmccabe let me know what you think.

Copy link
Copy Markdown
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tombentley : Thanks for the updated patch. A few more comments below.

"about": "The topic partitions to elect the preferred leader of.",
"fields": [
{ "name": "Topic", "type": "string", "versions": "0+",
"about": "The name of a topic." },
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that we should mark this field as mapKey to enforce uniqueness. @cmccabe : Would you agree?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I add "mapKey": true then I end up with a subclass of ImplicitLinkedHashMultiSet<TopicPartitions>, which wouldn't enforce uniqueness because it's a multiset. @cmccabe can you enlighten me? Is there a way to get a subclass of ImplicitLinkedHashSet<TopicPartitions> instead?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tombentley, you are correct that making this a mapKey would not inherently enforce uniqueness. However, it would make it easier to add that enforcement to the code if you need to. You could easily look up "are there any other topic entries that have the same name as this one, and set an error if so.

"about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
{ "name": "ReplicaElectionResults", "type": "[]ReplicaElectionResult", "versions": "0+",
"about": "The error code, or 0 if there was no error.", "fields": [
{ "name": "Topic", "type": "string", "versions": "0+",
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to the above, it seems that we should mark the topic field and the PartitionId field as mapKey?

Comment thread clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java Outdated
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed earlier, it would be useful not to use Controller.controllerContext here since it's meant to be internal to the controller.

Comment thread core/src/main/scala/kafka/controller/KafkaController.scala Outdated
Comment thread core/src/main/scala/kafka/controller/KafkaController.scala Outdated
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As mentioned earlier, this only works if the controller is the bootstrap server. Otherwise, by the time that PreferredReplicaLeaderElectionCommand runs, a new controller could have been elected.

Comment thread clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java Outdated
@tombentley
Copy link
Copy Markdown
Member Author

tombentley commented Jan 24, 2019

@junrao I think I addressed all your comments except the mapKey-related ones, needing @cmccabe's input on those.

Copy link
Copy Markdown
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tombentley : Thanks for addressing the comments. We are pretty close. Just a few more comments below.

Comment thread core/src/main/scala/kafka/server/DelayedElectPreferredLeader.scala Outdated
val deadline = time.milliseconds() + requestTimeout

def electionCallback(waiting: Set[TopicPartition],
expectedLeaders: Set[ElectPreferredLeaderMetadata],
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, the waiting param seems redundant. Could we just pass in expectedLeaders? Also, instead of creating a new type ElectPreferredLeaderMetadata, could we just pass in a Map[TP, int]?

Comment thread clients/src/main/resources/common/message/ElectPreferredLeadersRequest.json Outdated
Comment thread core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala Outdated
@tombentley
Copy link
Copy Markdown
Member Author

@junrao I think I addressed all your comments. Still waiting to hear from @cmccabe about the mapKey.

Copy link
Copy Markdown
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tombentley : Thanks for addressing all the comments in the PR. LGTM. About using mapKey, since currently it doesn't enforce uniqueness, it's kind of optional. So, we can just leave them out for now.

@junrao junrao merged commit 269b652 into apache:trunk Jan 25, 2019
@junrao
Copy link
Copy Markdown
Contributor

junrao commented Jan 25, 2019

@tombentley : I merged the PR. The implementation has some slight modification to what's described in KIP-183. For example, the ElectPreferredLeadersRequest now has a timeout field. Could you update the KIP wiki and send a followup email to the original voting thread about the minor changes? We can then see if anyone has any additional feedback.

@tombentley
Copy link
Copy Markdown
Member Author

@junrao sure, I can do that. Thanks for your help!

jarekr pushed a commit to confluentinc/kafka that referenced this pull request Apr 18, 2019
* ak/trunk:
  MINOR: Update usage of deprecated API (apache#6146)
  KAFKA-4217: Add KStream.flatTransform (apache#5273)
  MINOR: Update Gradle to 5.1.1 (apache#6160)
  KAFKA-3522: Generalize Segments (apache#6170)
  Added quotes around the class path (apache#4469)
  KAFKA-7837: Ensure offline partitions are picked up as soon as possible when shrinking ISR (apache#6202)
  MINOR: In the MetadataResponse schema, ignorable should be a boolean
  KAFKA-7838: Log leader and follower end offsets when shrinking ISR (apache#6168)
  KAFKA-5692: Change PreferredReplicaLeaderElectionCommand to use Admin… (apache#3848)
  MINOR: clarify why suppress can sometimes drop tombstones (apache#6195)
  MINOR: Upgrade ducktape to 0.7.5 (apache#6197)
  MINOR: Improve IntegrationTestUtils documentation (apache#5664)
  MINOR: upgrade to jdk8 8u202
  KAFKA-7693; Fix SequenceNumber overflow in producer (apache#5989)
  KAFKA-7692; Fix ProducerStateManager SequenceNumber overflow (apache#5990)
  MINOR: update copyright year in the NOTICE file. (apache#6196)
  KAFKA-7793: Improve the Trogdor command line. (apache#6133)
pengxiaolong pushed a commit to pengxiaolong/kafka that referenced this pull request Jun 14, 2019
apache#3848)

See also KIP-183.

This implements the following algorithm:

AdminClient sends ElectPreferredLeadersRequest.
KafakApis receives ElectPreferredLeadersRequest and delegates to
ReplicaManager.electPreferredLeaders()
ReplicaManager delegates to KafkaController.electPreferredLeaders()
KafkaController adds a PreferredReplicaLeaderElection to the EventManager,
ReplicaManager.electPreferredLeaders()'s callback uses the
delayedElectPreferredReplicasPurgatory to wait for the results of the
election to appear in the metadata cache. If there are no results
because of errors, or because the preferred leaders are already leading
the partitions then a response is returned immediately.
In the EventManager work thread the preferred leader is elected as follows:

The EventManager runs PreferredReplicaLeaderElection.process()
process() calls KafkaController.onPreferredReplicaElectionWithResults()
KafkaController.onPreferredReplicaElectionWithResults()
calls the PartitionStateMachine.handleStateChangesWithResults() to
perform the election (asynchronously the PSM will send LeaderAndIsrRequest
to the new and old leaders and UpdateMetadataRequest to all brokers)
then invokes the callback.

Reviewers: Colin P. McCabe <cmccabe@apache.org>, Jun Rao <junrao@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants