KAFKA-8179: Part 7, cooperative rebalancing in Streams#7386
KAFKA-8179: Part 7, cooperative rebalancing in Streams#7386guozhangwang merged 18 commits intoapache:trunkfrom
Conversation
dd021e4 to
fd95d72
Compare
There was a problem hiding this comment.
Don't let the SubscriptionState wipe out existing partition state (including uncommitted offsets) when the new assignment comes in
There was a problem hiding this comment.
Nice find! I think we did not hit this since we always revoke everything before :P
There was a problem hiding this comment.
Allow PARTITIONS_REVOKED to transition to itself (but callback is a no-op if no new partitions have been revoked)
There was a problem hiding this comment.
These methods were duplicates
There was a problem hiding this comment.
We removed key from the map above and store contents in expectedCount, should use that in message
|
Jenkins seems to be having some issues today but all tests pass locally, kicked off system tests here: https://jenkins.confluent.io/job/system-test-kafka-branch-builder/3031/ |
There was a problem hiding this comment.
If the subscription came from a client following the cooperative protocol, it will have embedded ownedPartitions instead of activeTasks
There was a problem hiding this comment.
Added a new way to generate assignments that falls back to the old (interleaved) assignment if we can't avoid giving away a previously owned partition and triggering a second rebalance. I tried to comment it sufficiently well to explain what it's doing, but please give it a look and lmk if anything doesn't make sense
|
Still debugging the Jenkins failures cleaned up, retest this please |
There was a problem hiding this comment.
It's actually fine to call standby.commit during a rebalance, I just wasn't sure how that would work with the metrics (it would screw with commit-latency to be sometimes recording the latency of active + standby commits, and sometimes just standby). Maybe we could still commit the standbys but pretend like we didn't call commit (ie, not record anything?) Or would that screw up the metrics even more @guozhangwang
There was a problem hiding this comment.
On a somewhat related note, I notice we don't actually call commit on restoring tasks but we do on standbys. Is there a particular reason for this? What is there to commit for a standby task but not for a restoring?
There was a problem hiding this comment.
As long as we have separate tags for standby and primary tasks commit, I don't think the metrics will be messed up.
There was a problem hiding this comment.
I don't think we do though -- currently we just call commitSensor.record(intervalCommitLatency / (double) committed, now); where the commitLatency is for committing all tasks
There was a problem hiding this comment.
During version probing rebalances we used to just give all clients back their old assignments, except for "future" consumers who would get an empty assignment since we can't interpret their subscriptions. But we can now determine their previous active tasks by the ownedPartitions so we can now put them in their assignment.
But actually we might as well just do our best to generate a "real" (not version probing) assignment during this rebalance, so that on the second rebalance we will not have to revoke any partitions and trigger a third rebalance. So I consolidated the assignment generation into a separate method that both computeNewAssignment and versionProbingAssignment can use
There was a problem hiding this comment.
There might be kind of an ugly merge/rebasing ahead once this version probing bugfix PR is fixed, but I think the overall behavior will remain the same
There was a problem hiding this comment.
I thought it might be helpful to see that we are not committing some of what we've processed, because it is we can't commit during a rebalance. But I don't think it's absolutely necessary and can take it out if you don't think it adds much?
There was a problem hiding this comment.
I think it's okay to leave it as TRACE, as practically we do not turn on TRACE that frequently.
There was a problem hiding this comment.
As long as we have separate tags for standby and primary tasks commit, I don't think the metrics will be messed up.
There was a problem hiding this comment.
We would like to restore this test if we choose to keep standby tasks metrics.
There was a problem hiding this comment.
Removed the test because we no longer suspend standbys (we still have a test for recording metrics on task close)
There was a problem hiding this comment.
The processor used here keeps track of the numRecordsProcessed and outputs this whenever it's a multiple of 100. It also resets in init, so the eos test watches for a rebalance and then looks for the numRecordsProcessed to hit 500 again, knowing it should have started over again from 0.
But the whole point of cooperative rebalancing is we may not have needed to revoke/reinitialize this task during a rebalance, and the counter may be reset after a rebalance.
c0417bb to
d388b3b
Compare
There was a problem hiding this comment.
Nice find! I think we did not hit this since we always revoke everything before :P
There was a problem hiding this comment.
Nice cleanup! In #5501 we moved the close / abort logic into the caller suspend function, and we should actually remove the parameter in that PR but overlooked it.
There was a problem hiding this comment.
This is a meta comment: maybe another (equally hacky?) way to do this, is to expose the MemberState from ConsumerMetadata (we are exposing this in KIP-447). And then in Streams, we can check that state after each poll call -- remember that state can only change within the poll call, and depending on that we can decide whether or not commit.
As for now I think this way is fine, cannot really think of a better way that does not change public APIs.
There was a problem hiding this comment.
It's a bit anti-pattern to use null and "" indicating two different sentinel cases: the partitions is owned by different clients, or the task is new. I think there's a better way to save on not redundantly iterating through taskPartitions: e.g. we let it to return a Collection<String> (and rename to previousConsumers... which is the union of the claimed owners of the partitions, and then in the caller we can just treat: 1) it's a singleton, 2) it's empty, 3) it's a plural, differently.
There was a problem hiding this comment.
Haha I knew this would get put down in the PR review but didn't get around to cleaning it up in time...thanks for the suggestion, that is much better 😄
There was a problem hiding this comment.
Related to my other comment: since now allOwnedPartitions is just a union of clientOwnedPartitions I think it is not needed anymore inside the assign function.
There was a problem hiding this comment.
I believe we still need allOwnedPartitions here because we need to distinguish between partitions which are not owned within this client vs partitions which are not owned by anyone in any client. ie just because no one in the client claims a partition as owned doesn't mean it is safe to give that partition/task away, since it might still be owned by someone in the group and need to be revoked
There was a problem hiding this comment.
My read was that the keyset of Map<TopicPartition, String> clientOwnedPartitions and Set<TopicPartition> allOwnedPartitions are the same, but now I realized the former is only for a single client (as with my other comment above) actually.. Now this logic makes sense.
There was a problem hiding this comment.
It's hard to give good self-explanatory names to all your variables 😉 Let me know if you think of any names that better explain what allOwnedPartitions is supposed to be
There was a problem hiding this comment.
Why this can happen? We checked !newTasks.isEmpty() before already right?
There was a problem hiding this comment.
We have a loop inside the main while (!newTasks.isEmpty()) loop, where we are actively removing things from newTasks. So, we might have polled the last task in the while (consumerIt.hasNext()) loop before we get back to the outer loop and check newTasks.isEmpty
This code is actually pretty much the same code as in the interleaveTasksByGroupId method. I'll see if it can be moved out into a shared method.
There was a problem hiding this comment.
nit: I think this variable can be reduced to a local one since it's only needed inside computeNewAssignment: before loop over the client initialize it to false. And then let giveTasksBackToConsumers to return empty map if it needs to beak fail fast (maybe rename to tryGiveTasksBackToConsumsers). And the caller becomes:
if (rebalanceRequired || state.ownedPartitions().isEmpty())
assignment = interleaveTasksByGroupId
else if ((assignment = giveTasksBackToConsumers).equals(empty))
assignment = interleaveTasksByGroupId
rebalanceRequired = true
WDYT?
There was a problem hiding this comment.
I think the test coverage is not sufficient with the augmented logic here, we should at least test the following code path:
- among clients, with EAGER set we still try to honor stickiness based on user metadata (this may be covered already).
- among clients, with COOPERATIVE we still try to honor stickiness based on owned partitions.
- within a client, we try to reassign tasks back to consumers if the prev consumers are fixed, and load balance is not violated.
- within a client, we if we cannot satisfy 3), we interleave.
- within a client, upon version probing we interleave.
There was a problem hiding this comment.
Yeah I was in the middle of extending the tests when I found the version probing bug -- I'll get back to them once I've rebased
e92caf1 to
b97e8ea
Compare
There was a problem hiding this comment.
We can potentially be in PARTITIONS_REVOKED with cooperative rebalancing so we don't want to just block doing nothing during the rebalance -- not sure if it's worth polling for zero since this is rare with cooperative, or some other time < pollTime? cc/ @guozhangwang
There was a problem hiding this comment.
In EAGER, during the PARTITIONS_REVOKED state we would not return any data from consumer anyways; In future COOPERATIVE, even if we can return some data during the rebalance, the transition of PARTITIONS_REVOKED -> PARTITIONS_ASSIGNED would happen in a single consumer.poll call, and only very rarely we would stay in PARTITION_REVOKED after consumer.poll if the subscription changed. So I think poll with zero sounds good to me.
Also note that in my PR for returning data in the middle of a rebalance, we still pass in non-zero timeout for finding the coordinator so that we are ensured to have one round-trip at least within that call.
There was a problem hiding this comment.
For my own education purpose, why we need to use TreeMap as underlying return struct?
There was a problem hiding this comment.
The motivation is to avoid the random order of consumers in HashMap, so that we hopefully end up with a similar task -> consumer assignment in subsequent rebalances.
There was a problem hiding this comment.
For my own education purpose, what's the benefit we get from initializing a linked list vs a general array list?
There was a problem hiding this comment.
I didn't choose this this so I can only guess, but probably so we can easily/efficiently poll and remove the last element until empty..?
There was a problem hiding this comment.
How about state.activeTaskCount() / consumers.size() + 1?
There was a problem hiding this comment.
Doesn't work when numConsumers evenly divides numTasks
There was a problem hiding this comment.
I don't quite follow the logic here. If this partition is owned by the current owner, then it should appear in allOwnedPartitions which means we expect a count of 2 for the final result of previousConsumers.
While outside we check for this set to be > 1 and early terminate, is that true?
There was a problem hiding this comment.
If this partition is owned by a consumer in this client, then currentPartitionConsumer != null and currentPartitionConsumer gets added to the previousConsumers set. We don't even check allOwnedPartitions in that case since it's in an "else if"
There was a problem hiding this comment.
I think the logic here is not very intuitive as we are doing a double loop here. What we could do is to loop through all the unfilled consumers and fetch unfulfilled amount of new tasks to fed them. This could also reduce the number of loop cycles we have to go through.
There was a problem hiding this comment.
Well, the goal here is to interleave the tasks of the same groupId (ie subtopology) across different consumers as much as possible, since some subtopologies might be quite heavy while others are quite light. (This is what we do in interleaveTasksByGroupId also)
It is a little unintuitive I agree, so let me know if you have any suggestions for clearer code and/or comments. But if it's an consolation,
a) we probably aren't looping through that many remaining consumers and/or tasks
b) we may be doing a nested loop but within the inner loop we are removing things from the outer loop. So, in the end we are actually only looping over newTasks and hitting each task in it once
There was a problem hiding this comment.
I see your point, could you reflect your goal as meta comments for interleaveTasksByGroupId? It's a bit hard to interpret the goal just by reading this code.
There was a problem hiding this comment.
Sure, I'll try to shore up the explanation for interleaveTasksByGroupId and then refer to that here
There was a problem hiding this comment.
Hey @ableegoldman , I think you need to add the missing "upgrade from" versions to the configuredMetadataVersion method below as well. Otherwise, creating Streams will just throw an exception when you pass (for example) UPGRCE_FROM_23.
There was a problem hiding this comment.
Oops, thanks yeah good catch! That reminds me I still need to add test that actually configures the assignor using these.
There was a problem hiding this comment.
Also, not sure if it makes sense to have a test that you can actually construct and start Streams with all versions of "upgrade.from", which would have caught this.
There was a problem hiding this comment.
Well no, I just added a test that uses UPGRADE_FROM_23 (and one that doesn't)
There was a problem hiding this comment.
Which would have caught that. I'm wondering if it actually makes more sense to not throw an exception in the default and just check beforehand whether its a valid config (ie is it contained in some "valid config values" set, which we actually may already have somewhere in StreamsConfig?)
It's not really practical to have to add new UPGRADE_FROM versions to an increasing number of places in the code or else get an exception thrown. Some cleanup to think about once this is over
There was a problem hiding this comment.
Agreed on the cleanup front. The current code makes sense when you just have one usage. As soon as we add a second, we probably need a different strategy.
|
Doing another run of system tests but initial run passed (exception for already flaky |
turn on cooperative rebalancing make StreamsPartitionAssignor sticky within a client match any number records processed in eos test
ded2f4b to
da6d6c3
Compare
|
Another (this time completely) green run of the system tests: https://jenkins.confluent.io/job/system-test-kafka-branch-builder/3142/ |
|
Test failures come from connect and this one: Not sure if it is consistent, need to re-run. |
guozhangwang
left a comment
There was a problem hiding this comment.
Made another pass on the PR, thanks for the added unit tests.
There was a problem hiding this comment.
I think it's okay to leave it as TRACE, as practically we do not turn on TRACE that frequently.
|
|
||
| AssignedPartition(final TaskId taskId, | ||
| final TopicPartition partition) { | ||
| final TopicPartition partition) { |
There was a problem hiding this comment.
nit: how about just put these two in one line? We usually only use multi-lines if there are 3+ parameters.
There was a problem hiding this comment.
Yeah must have changed that by mistake, will revert
| private Map<String, Assignment> errorAssignment(final Map<UUID, ClientMetadata> clientsMetadata, | ||
| final String topic, | ||
| final int errorCode) { | ||
| final String topic, |
There was a problem hiding this comment.
Is it intentional to not align parameters in the function definition?
There was a problem hiding this comment.
Nope, IDE must have auto-indented some things (wrongly)
| if (repartitionTopicMetadata.get(sourceTopicName).numberOfPartitions().isPresent()) { | ||
| numPartitionsCandidate = repartitionTopicMetadata.get(sourceTopicName).numberOfPartitions().get(); | ||
| if (repartitionTopicMetadata.get(sourceTopicName).numberOfPartitions() | ||
| .isPresent()) { |
There was a problem hiding this comment.
Is this suggested by IDE? :P I feel it is not necessary but if I do not feel strongly either.
There was a problem hiding this comment.
I guess a bunch of the indentation got messed up somehow, I agree this is not necessary
| ") prevActiveTasks: (" + prevActiveTasks + | ||
| ") prevStandbyTasks: (" + prevStandbyTasks + | ||
| ") prevAssignedTasks: (" + prevAssignedTasks + | ||
| ") prevOwnedPartitions: (" + ownedPartitions.keySet() + |
There was a problem hiding this comment.
nit: prevOwnedPartitionsByConsumerId
There was a problem hiding this comment.
In that case, should we check in state.addPreviousActiveTasks that the task ids were not added in other client metadata? I might be paranoid here but what if client A claims (cooperative) its ownership of partition1 which maps to task1, while client B (eager) encodes task1 as its prev owned tasks with empty owned partitions?
|
|
||
| // If the partition is new to this consumer but is still owned by another, remove from the assignment | ||
| // until it has been revoked and can safely be reassigned according the COOPERATIVE protocol | ||
| if (newPartitionForConsumer && allOwnedPartitions.contains(partition)) { |
There was a problem hiding this comment.
Is it possible that a task maps to multiple partitions, while only some of them have old owners, would that cause us to encode a partial task here?
There was a problem hiding this comment.
Ah, I see what you mean. We should collect the assignedPartitions for a single task and only add them to the assignedPartitions list at the end, if all can be safely assigned
|
|
||
| // If this consumer previously owned more tasks than it has capacity for, some must be revoked | ||
| if (assignments.get(consumer).size() >= maxTasksPerClient) { | ||
| return Collections.emptyMap(); |
There was a problem hiding this comment.
nit: add a debug entry as well?
| previousStandbyTaskAssignment.put(prevAssignedTask, new HashSet<>()); | ||
| } | ||
| for (final TaskId prevAssignedTask : clientState.getValue().prevStandbyTasks()) { | ||
| previousStandbyTaskAssignment.computeIfAbsent(prevAssignedTask, t -> new HashSet<>()); |
| return info; | ||
| } | ||
|
|
||
| private void assertEquivalentAssignment(final Map<String, List<TaskId>> thisAssignment, |
There was a problem hiding this comment.
Can we just rely on the equals function of map and list here? I think it checks for "exact equality", while here otherAssignment being a super-set of thisAssignment can still pass.
There was a problem hiding this comment.
Well we also check for the size to be the same, but this could be improved I guess. I'm not sure we can use the map equals because the values are not necessarily equal until sorted.
But we could sort them and then use list equality instead.
|
The new jenkins failures are due to timed out after 270 min (2.12 and 2.11) and connect integration test (2.13). And there's no overlapping test failures in consecutive runs. Will merge to trunk now. |
Key improvements with this PR: * tasks will remain available for IQ during a rebalance (but not during restore) * continue restoring and processing standby tasks during a rebalance * continue processing active tasks during rebalance until the RecordQueue is empty* * only revoked tasks must suspended/closed * StreamsPartitionAssignor tries to return tasks to their previous consumers within a client * but do not try to commit, for now (pending KAFKA-7312) Reviewers: John Roesler <john@confluent.io>, Boyang Chen <boyang@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
|
Also cherry-picked to 2.4 |
…t-for-generated-requests * apache-github/trunk: KAFKA-8932; Add tag for CreateTopicsResponse.TopicConfigErrorCode (KIP-525) (apache#7464) KAFKA-8944: Fixed KTable compiler warning. (apache#7393) KAFKA-8964: Rename tag client-id for thread-level metrics and below (apache#7429) MINOR: remove unused imports in Streams system tests (apache#7468) KAFKA-7190; Retain producer state until transactionalIdExpiration time passes (apache#7388) KAFKA-8983; AdminClient deleteRecords should not fail all partitions unnecessarily (apache#7449) MINOR: Modified Exception handling for KIP-470 (apache#7461) KAFKA-7245: Deprecate WindowStore#put(key, value) (apache#7105) KAFKA-8179: Part 7, cooperative rebalancing in Streams (apache#7386) KAFKA-8985; Add flexible version support to inter-broker APIs (apache#7453) MINOR: Bump version to 2.5.0-SNAPSHOT (apache#7455)
Key improvements with this PR:
RecordQueueis empty*StreamsPartitionAssignortries to return tasks to their previous consumers within a client*but do not try to commit, for now (pending KAFKA-7312)