Skip to content

KAFKA-6145: KIP-441 Build state constrained assignment from balanced one#8497

Merged
vvcephei merged 16 commits intoapache:trunkfrom
ableegoldman:KIP-441-redo-assignors-to-converge
Apr 21, 2020
Merged

KAFKA-6145: KIP-441 Build state constrained assignment from balanced one#8497
vvcephei merged 16 commits intoapache:trunkfrom
ableegoldman:KIP-441-redo-assignors-to-converge

Conversation

@ableegoldman
Copy link
Copy Markdown
Member

@ableegoldman ableegoldman commented Apr 16, 2020

John's awesome TaskAssignorConvergenceTest revealed some issues with the current assignor, which he nailed down as being due to the state constrained and balanced assignments not converging.

One way to get an assignment that is as close to the balanced assignment as possible while still being state constrained is of course to start with the balanced assignment, and move tasks around as necessary to satisfy the state constraint. With this basic approach, the converge test is passing.

This PR also includes some semi-orthogonal refactoring, most significantly the removal of the assignment maps; we now just immediately assign tasks to the ClientState rather than first sticking them in an intermediate map.

Also moves ValidClientsByTaskLoadQueue to its own file.

Apologies for the length of this PR due to the above, but it didn't seem reasonable to do things the wrong way in the parts I changed, just so they could be undone in a followup PR along with the other parts.

@ableegoldman ableegoldman changed the title KAFKA-6145: KIP-441 Build state constrained assignment from balanced one [WIP] KAFKA-6145: KIP-441 Build state constrained assignment from balanced one Apr 16, 2020
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just moved this class to its own file from HATA, with one main change: we now just pass in the criteria to consider a client a valid candidate for a task.
The original criteria was that the client has no other version of this task already, but now we are flexible enough to use other validation criteria (eg that the client is caught-up on this task)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd just like to say what an awesome tool for optimization this class is. Kudos to you and @cadonna .

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not remember having contributed to this awesomeness. It is all @ableegoldman 's merit.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, sorry about that @ableegoldman ; I wasn't able (or was too lazy) to follow the git praise trail through the class movement. Well, kudos to you, then. :)

@vvcephei
Copy link
Copy Markdown
Contributor

test this please

@ableegoldman ableegoldman changed the title [WIP] KAFKA-6145: KIP-441 Build state constrained assignment from balanced one KAFKA-6145: KIP-441 Build state constrained assignment from balanced one Apr 17, 2020
@ableegoldman ableegoldman force-pushed the KIP-441-redo-assignors-to-converge branch from 26f0557 to 82d4541 Compare April 17, 2020 20:44
Copy link
Copy Markdown
Contributor

@vvcephei vvcephei left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dropping in my first (partial pass) batch of review comments, so I can kick off the tests again for you. I'm actively continuing with my review...

Comment on lines 94 to 105
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
final PriorityQueue<UUID> queue = new PriorityQueue<>(
(client, other) -> {
final double clientTaskLoad = clientStates.get(client).taskLoad();
final double otherTaskLoad = clientStates.get(other).taskLoad();
if (clientTaskLoad < otherTaskLoad) {
return -1;
} else if (clientTaskLoad > otherTaskLoad) {
return 1;
} else {
return client.compareTo(other);
}
});
final PriorityQueue<UUID> queue = new PriorityQueue<>(
Comparator.comparingDouble(k -> clientStates.get(k).taskLoad())
);

Upon second reading, this does the same thing, right?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Almost, we want to fall back to comparing the actual UUIDs if the taskLoad happens to be equal

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, thanks.

@vvcephei
Copy link
Copy Markdown
Contributor

test this please

2 similar comments
@vvcephei
Copy link
Copy Markdown
Contributor

test this please

@vvcephei
Copy link
Copy Markdown
Contributor

test this please

@ableegoldman ableegoldman force-pushed the KIP-441-redo-assignors-to-converge branch from 69a3a9c to 596f228 Compare April 17, 2020 21:54
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test now produces different assignments depending on which task assignor is used. Since the only thing its verifying is the actual assignment, and that's not really the responsibility of the StreamsPartitionAssignor anyway, I thought it made the most sense to just remove

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we instead adapt the test to verify that it produces a valid assignment for mixed instances during version probing? Or is that already covered?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't mean to totally cop out on this, but I think we should do this in a followup PR. I'll make a ticket and assign it to myself for later so I can't escape, but I don't even think it's worth marking it @Ignore for now.
Tbh we should have removed it a while ago, rather than changing it over time to become its useless self today. It's a long history, and I'm mostly responsible, but just looking ahead the question now is: what do we even want to validate? The task assignor has no knowledge of version probing, and the partition assignor is not responsible for the task assignment (whereas it used to be with version probing, hence this test). What we should do is validate the inputs are being assembled sensibly during version probing.
Anyways this will be really difficult to do just based on the final partition assignment, and even harder to distinguish a real failure from an unrelated one. So I'd propose to kick this into the future, when we embed the actual assignor class in the configs instead of this flag, and then pass in a VersionProbingClientStatesValidatingAssignor or whatever...SG?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably a much longer answer than you ever wanted, but this test has been haunting me over many PRs 👀

@vvcephei
Copy link
Copy Markdown
Contributor

test this please

6 similar comments
@vvcephei
Copy link
Copy Markdown
Contributor

test this please

@vvcephei
Copy link
Copy Markdown
Contributor

test this please

@vvcephei
Copy link
Copy Markdown
Contributor

test this please

@vvcephei
Copy link
Copy Markdown
Contributor

test this please

@vvcephei
Copy link
Copy Markdown
Contributor

test this please

@vvcephei
Copy link
Copy Markdown
Contributor

test this please

@ableegoldman
Copy link
Copy Markdown
Member Author

One build failed with unrelated RocksDBTimestampedStoreTest.shouldVerifyThatMetricsGetMeasurementsFromRocksDB (should actually be fixed already by #8510)

Copy link
Copy Markdown
Contributor

@vvcephei vvcephei left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok @ableegoldman , I finally got the whole review in! Unfortunately, there are at least a few things that should be addressed before we can merge it.

Thanks so much for this fix!

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unless I missed something, it's possible for tasksToCaughtUpClients not to contain a task, which would give us an NPE. Can we either add a test and handle the case or assert it here with an IllegalStateException, so we don't have to chase down an NPE later?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, by definition every task in here must have at least one caught-up client. I'll add the IllegalStateException

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I might have dropped the thread of logic here. Why is that by definition? It looks like all we know about the task is that it's not caught up on the destination client. Why do we think it's caught up on some other client?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we've hit the same source of confusion as in the other thread. But anything in taskMovements, and in fact any task that has an associated TaskMovement object must have at least one caught-up client. If it didn't, we wouldn't be creating a warmup task for it; that's just a normal standby. A warmup replica always implies there is an active version elsewhere on a caught-up client

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, you're right. I'm on the same page now. So the only risk is that the code changes elsewhere and breaks your invariant.

I'll leave it to you whether you want to check the invariant and throw an exception or just let it be an NPE if that happens.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I am a bit worried about protecting against future changes (very much including those by myself a few months from now). I have a thought about how to enforce things a bit better, let's see where this goes...

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alright I decided to just push the validation into the TaskMovement constructor, and skip the check here

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd just like to say what an awesome tool for optimization this class is. Kudos to you and @cadonna .

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we instead adapt the test to verify that it produces a valid assignment for mixed instances during version probing? Or is that already covered?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yay!

Copy link
Copy Markdown
Member

@cadonna cadonna left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ableegoldman Thank you for the PR

Here my feedback

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I love it when a comment gets killed by a meaningful method name!

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Q: I do not understand why we need uniqueClients here? Would it not suffice to check for clientsByTaskLoad.contains(client)?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's just a computer-sciencey matter of principle. clientsByTaskLoad is a linear collection, so every offer would become O(n) if we did a contains call on it every time. Right now, it's only O(n) when we need to remove the prior record for the same client, and O(log(n)) otherwise.

Does it really matter? I'm not sure.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it!

That means, we get O(n) for all cases where we first poll() and then offer() the same clients because those clients are contained in uniqueClients, i.e.:

  • HighAvailabilityTaskAssignor@155
  • HighAvailabilityTaskAssignor@131
  • TaskMovement@94
  • ValidClientsByTaskLoadQueue@76
  • ValidClientsByTaskLoadQueue@88

Those are the majority of the calls to offer() and offerAll(). Additionally, the last two occurrences in the list are called in each call to poll(). In poll(), if the top does not satisfy the criteria it is added to invalidPolledClients which then is added with offerAll(). For each element of invalidPolledClients the whole queue clientsByTaskLoad is scanned, since each element is contained in uniqueClients but not in clientsByTaskLoad. This results in O(n^2).

AFAIU, we need the uniqueness check because of TaskMovement@99.

If we update uniqueClients also in poll(), we would avoid O(n^2) for poll() and restrict O(n) to the case at TaskMovement@99.

Does it really matter?

I'm not also sure. Performance test would be the only way to tell.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gah! You're right. We should also remove the client from uniqueClients when we poll.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cadonna you're right, I forgot to remove from uniqueClients in poll. Good catch

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not remember having contributed to this awesomeness. It is all @ableegoldman 's merit.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Q: Why do we even care at all whether the task was running on the client? What if we just assign a real stand-by task if we have a spare one?

@ableegoldman ableegoldman force-pushed the KIP-441-redo-assignors-to-converge branch from e1920fe to 864a539 Compare April 21, 2020 03:31
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it!

That means, we get O(n) for all cases where we first poll() and then offer() the same clients because those clients are contained in uniqueClients, i.e.:

  • HighAvailabilityTaskAssignor@155
  • HighAvailabilityTaskAssignor@131
  • TaskMovement@94
  • ValidClientsByTaskLoadQueue@76
  • ValidClientsByTaskLoadQueue@88

Those are the majority of the calls to offer() and offerAll(). Additionally, the last two occurrences in the list are called in each call to poll(). In poll(), if the top does not satisfy the criteria it is added to invalidPolledClients which then is added with offerAll(). For each element of invalidPolledClients the whole queue clientsByTaskLoad is scanned, since each element is contained in uniqueClients but not in clientsByTaskLoad. This results in O(n^2).

AFAIU, we need the uniqueness check because of TaskMovement@99.

If we update uniqueClients also in poll(), we would avoid O(n^2) for poll() and restrict O(n) to the case at TaskMovement@99.

Does it really matter?

I'm not also sure. Performance test would be the only way to tell.

final ClientState destinationClientState,
final AtomicInteger remainingWarmupReplicas,
final Map<TaskId, Integer> tasksToRemainingStandbys) {
if (destinationClientState.previousAssignedTasks().contains(task) && tasksToRemainingStandbys.get(task) > 0) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This question from my previous review went unnoticed (or you did simply not care ;-)).

Q: Why do we even care at all whether the task was running on the client? What if we just assign a real stand-by task if we have a spare one?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I answered this already. We're trying not to decrease the overall availability the standbys are providing, which could happen if we drop a caught-up standby in order to warm up an empty node. We can certainly do better than what we do now, which is not very efficient in terms of task movement, but I think it's good enough for this PR.

@vvcephei
Copy link
Copy Markdown
Contributor

Test this, please.

2 similar comments
@vvcephei
Copy link
Copy Markdown
Contributor

Test this, please.

@vvcephei
Copy link
Copy Markdown
Contributor

Test this, please.

Copy link
Copy Markdown
Contributor

@vvcephei vvcephei left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only outstanding thing I see is that we need to remove the client from the priority queue in poll

@vvcephei
Copy link
Copy Markdown
Contributor

test this please

1 similar comment
@vvcephei
Copy link
Copy Markdown
Contributor

test this please

@vvcephei
Copy link
Copy Markdown
Contributor

Unrelated java 11 failures:
kafka.api.ConsumerBounceTest.testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup
org.apache.kafka.streams.integration.StoreQueryIntegrationTest.shouldQueryAllStalePartitionStores

@vvcephei vvcephei merged commit 5c548e5 into apache:trunk Apr 21, 2020
@ableegoldman ableegoldman deleted the KIP-441-redo-assignors-to-converge branch June 26, 2020 22:39
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants