KAFKA-12675: improve the sticky general assignor scalability and performance#10552
KAFKA-12675: improve the sticky general assignor scalability and performance#10552guozhangwang merged 6 commits intoapache:trunkfrom
Conversation
There was a problem hiding this comment.
refactor 1:
We used to have 2 map consumer2AllPotentialPartitions and partition2AllPotentialConsumers. But that would need a lot of memory here, ex: consumer2AllPotentialPartitions will need 2000 map, and each map contains 1M partitions (suppose 1 million partition and 2000 consumers). But actually, we only need to store the topics of each potential partitions/consumers, and mapped with partitionsPerTopic. so I changed to topic2AllPotentialConsumers and consumer2AllPotentialTopics. Save memory and save time.
There was a problem hiding this comment.
refactor 2:
We used to have an ArrayList of unassignedPartitions, with all sorted partitions (ex: 1 million partitions), and loop through current assignment, to remove already assigned partitions, ex: 999,000 of them, so we'll only have 1000 partitions left. However, the ArrayList element remove is pretty slow for huge size because it needs to find element first, and then, do arrayCopy for the removed array with size of (originalSize -1). This situation should happen a lot since each rebalance, we should only have small set of changes (ex: 1 consumer dropped), so this is an important improvement.
To refactor it, I used two pointer technique to loop through 2 sorted list: sortedPartitions and sortedToBeRemovedPartitions. And only add the difference set of the 2 lists. The looping and element adding is very fast in ArrayList. So, it improves a lot.
There was a problem hiding this comment.
We use unassignedPartitions and sortedPartitions as the base list, so make them refer to the same list to save memory when brand-new assignment.
There was a problem hiding this comment.
refactor 3:
We used to have a sortedPartitionConsumersByGeneration map to store all partitions with all generation/consumer, to compute the currentAssignment and prevAssignment. It takes many memory and slow down the calculation. Improve it by computing the currentAssignment and prevAssignment while looping the subscriptions list (referred to the allSubscriptionsEqual method :)) .
There was a problem hiding this comment.
refactor 4: To have sortPartitions list, we used to sort all of the partitions. To improve it, I sort all topics first(only 500 topics to sort, compared to the original 1 million partitions to sort), and then add the partitions by looping all sorted topics.
There was a problem hiding this comment.
subscribe to only 1 topic for the last consumer
|
@ableegoldman , please help review this PR. Thank you. |
As @showuon pointed out in github.com/apache/kafka/pull/10552, tracking partitionPotentials (in Java, partition2AllPotentialConsumers) is a huge waste of memory when we only need to know potential topic consumers. If we knock that out, we knock out the most expensive allocation as well as a lot of hot looping. Also, if the members are consistently heap sorted by the least loaded member, then assigning parttions gets much faster. Lastly, we can knock out more allocations by getting rid of partNum. This does unfortunately slow things down in the complex graph case, but that only happened in one benchmark. Overall it may be a wash. name old time/op new time/op delta Large-8 9.32ms ± 1% 4.95ms ± 1% -46.89% (p=0.000 n=10+10) LargeWithExisting-8 15.7ms ± 1% 12.8ms ± 1% -18.66% (p=0.000 n=10+10) LargeImbalanced-8 25.7ms ±27% 144.7ms ±14% +462.35% (p=0.000 n=10+10) LargeWithExistingImbalanced-8 15.7ms ± 1% 12.7ms ± 1% -19.38% (p=0.000 n=10+10) Java/large-8 2.63s ± 1% 0.40s ± 2% -84.72% (p=0.000 n=10+10) Java/large_imbalance-8 13.4s ± 5% 0.5s ± 3% -96.62% (p=0.000 n=9+10) Java/medium-8 70.9ms ± 1% 17.9ms ± 1% -74.73% (p=0.000 n=10+9) Java/medium_imbalance-8 216ms ± 1% 22ms ± 1% -89.78% (p=0.000 n=10+9) Java/small-8 49.3ms ± 0% 14.4ms ± 1% -70.79% (p=0.000 n=10+10) Java/small_imbalance-8 149ms ± 0% 17ms ± 1% -88.46% (p=0.000 n=9+10) name old alloc/op new alloc/op delta Large-8 7.12MB ± 0% 4.43MB ± 0% -37.73% (p=0.000 n=10+10) LargeWithExisting-8 9.60MB ± 0% 6.94MB ± 0% -27.71% (p=0.000 n=9+9) LargeImbalanced-8 17.0MB ± 0% 4.7MB ± 1% -72.09% (p=0.000 n=9+10) LargeWithExistingImbalanced-8 9.60MB ± 0% 7.00MB ± 0% -27.13% (p=0.000 n=10+9) Java/large-8 531MB ± 0% 441MB ± 0% -17.09% (p=0.000 n=10+9) Java/large_imbalance-8 8.54GB ± 0% 0.50GB ± 0% -94.10% (p=0.000 n=10+7) Java/medium-8 22.5MB ± 0% 17.1MB ± 0% -23.90% (p=0.000 n=10+10) Java/medium_imbalance-8 223MB ± 0% 33MB ± 0% -85.08% (p=0.000 n=10+10) Java/small-8 18.8MB ± 0% 13.7MB ± 0% -27.32% (p=0.000 n=10+10) Java/small_imbalance-8 147MB ± 0% 24MB ± 0% -83.67% (p=0.000 n=9+10) name old allocs/op new allocs/op delta Large-8 9.56k ± 0% 9.44k ± 0% -1.31% (p=0.000 n=9+10) LargeWithExisting-8 34.0k ± 0% 34.0k ± 1% -0.18% (p=0.002 n=9+10) LargeImbalanced-8 9.93k ± 0% 9.82k ± 1% -1.14% (p=0.000 n=9+10) LargeWithExistingImbalanced-8 33.8k ± 0% 33.8k ± 0% ~ (p=0.183 n=10+10) Java/large-8 1.04M ± 0% 1.04M ± 0% ~ (p=0.968 n=10+9) Java/large_imbalance-8 1.04M ± 0% 1.04M ± 0% -0.18% (p=0.000 n=10+8) Java/medium-8 56.1k ± 0% 56.1k ± 0% ~ (p=0.127 n=10+10) Java/medium_imbalance-8 56.1k ± 0% 56.1k ± 0% ~ (p=0.473 n=10+8) Java/small-8 44.9k ± 0% 44.9k ± 0% ~ (p=0.468 n=10+10) Java/small_imbalance-8 44.9k ± 0% 44.9k ± 0% -0.11% (p=0.000 n=8+10)
|
The performance comparison in jenkins for uniform subscription and non-equal subscription with the setting: I think after this PR, the performance is acceptable for non-equal subscription cases. We can have incremental improvement in the following stories. Thank you. |
|
Failed tests are all flaky and unrelated. Thanks. |
|
@guozhangwang @ableegoldman , PR is ready for review. Thank you. :) |
guozhangwang
left a comment
There was a problem hiding this comment.
One minor questions is do we want to preserve the exact logic of prepopulateCurrentAssignments or not in this refactoring since I'm not sure if it is the case --- I'm fine if we do not really want to preserve that logic and just want to do it in a more efficient way, just bringing this up for clarification.
| for (Entry<String, Integer> entry: partitionsPerTopic.entrySet()) { | ||
| for (int i = 0; i < entry.getValue(); ++i) | ||
| partition2AllPotentialConsumers.put(new TopicPartition(entry.getKey(), i), new ArrayList<>()); | ||
| topic2AllPotentialConsumers.put(entry.getKey(), new ArrayList<>()); |
There was a problem hiding this comment.
Hmm is this right? Wouldn't we put the same empty list for the key N times?
There was a problem hiding this comment.
Nice catch! Updated. Thank you.
| for (Iterator<TopicPartition> partitionIter = entry.getValue().iterator(); partitionIter.hasNext();) { | ||
| TopicPartition partition = partitionIter.next(); | ||
| if (!partition2AllPotentialConsumers.containsKey(partition)) { | ||
| if (!topic2AllPotentialConsumers.containsKey(partition.topic())) { |
There was a problem hiding this comment.
nit: the following comment needs to be updated as well.
| ownedPartitions.addAll(memberData.partitions); | ||
| } else if (!memberData.generation.isPresent()) { | ||
| // current maxGeneration is larger than DEFAULT_GENERATION, | ||
| // put all partitions as DEFAULT_GENERATION into provAssignment |
There was a problem hiding this comment.
I'm not 100% sure if the refactored code has the exactly same logic as the old code now since its branching conditions have largely changed. E.g. do we still detect if a partition is assigned to different consumers in a generation or not?
There was a problem hiding this comment.
My refactor is just trying to reach the same currentAssignment and prevAssignment as before. So, if you meant this:
if (memberData.generation.isPresent() && consumers.containsKey(memberData.generation.get())) {
// same partition is assigned to two consumers during the same rebalance.
// log a warning and skip this record
log.warn("Partition '{}' is assigned to multiple consumers following sticky assignment generation {}.",
partition, memberData.generation);I think this check is unnecessary since we didn't do anything to it, and we cannot do anything to it, either. That's my thought. Thanks.
|
I'll have some refine to this PR. Please wait for a while . Thanks. |
| private static final Logger log = LoggerFactory.getLogger(AbstractStickyAssignor.class); | ||
|
|
||
| public static final int DEFAULT_GENERATION = -1; | ||
| public int maxGeneration = DEFAULT_GENERATION; |
There was a problem hiding this comment.
put the maxGeneration into class scope, so we can re-use it in prepopulateCurrentAssignments.
| } else if (isAllSubscriptionsEqual && !(subscription.topics().size() == subscribedTopics.size() | ||
| && subscribedTopics.containsAll(subscription.topics()))) { | ||
| return false; | ||
| isAllSubscriptionsEqual = false; |
There was a problem hiding this comment.
Now, we'll run through all the subscriptions since the data consumerToOwnedPartitions will also passed into generalAssign
| int totalPartitionsCount = partitionsPerTopic.values().stream().reduce(0, Integer::sum); | ||
| List<String> sortedAllTopics = new ArrayList<>(topic2AllPotentialConsumers.keySet()); | ||
| Collections.sort(sortedAllTopics, new TopicComparator(topic2AllPotentialConsumers)); | ||
| List<TopicPartition> sortedAllPartitions = getAllTopicPartitions(partitionsPerTopic, sortedAllTopics, totalPartitionsCount); |
There was a problem hiding this comment.
reuse the getAllTopicPartitions in constrainedAssign
| * @param topic2AllPotentialConsumers: topics mapped to all consumers that subscribed to it | ||
| * @return the partitions don't assign to any current consumers | ||
| */ | ||
| private List<TopicPartition> getUnassignedPartitions(List<TopicPartition> sortedAllPartitions, |
There was a problem hiding this comment.
put the 2 getUnassignedPartitions (this one and the following one) overloading method together for readability
| if (memberData.generation.isPresent() && memberData.generation.get() < maxGeneration) { | ||
| // If the current member's generation is lower than maxGeneration, put into prevAssignment if needed | ||
| updatePrevAssignment(prevAssignment, memberData.partitions, consumer, memberData.generation.get()); | ||
| } else if (!memberData.generation.isPresent()) { |
There was a problem hiding this comment.
Since now, we already have currentAssignment from allSubscriptionsEqual method, as well as the maxGeneration data, so we can simplify the logic here for prevAssignment only.
|
@ableegoldman , I've done the code refinement and refactor. Basically is what we've discussed in constrained Assignor PR. Please take a look when available. |
|
I saw there are cooperative sticky tests failed. I'll update it, and add more tests into it tomorrow or later. Thanks. |
|
Broken tests are fixed and new tests are added for multiple generation tests for unequal subscription cases. Thanks. |
guozhangwang
left a comment
There was a problem hiding this comment.
I do not have further comments here, could @vahidhashemian (author of the sticky algorithm) or @ableegoldman lend me another pair of eyes before we proceed? Thanks.
|
It's on my list to review in the next couple of weeks, if not sooner. Sorry I have not had time to get to this one yet, but I will 🙂 (and I agree we should also get feedback from @vahidhashemian if he sees this) |
|
Thanks for tagging me @guozhangwang @ableegoldman. |
|
Thank you, guys! :) |
vahidhashemian
left a comment
There was a problem hiding this comment.
Thanks for this detailed work and improvement. Left some initial comments.
|
|
||
| // all partitions that needed to be assigned | ||
| List<TopicPartition> unassignedPartitions = getUnassignedPartitions(sortedAllPartitions, assignedPartitions, topic2AllPotentialConsumers); | ||
| assignedPartitions = null; |
There was a problem hiding this comment.
Is this null assignment needed? Don't see the variable used after this.
There was a problem hiding this comment.
Yes, it just tells the GC that this memory can be freed, to avoid OOM. I know in this step, we should already allocated all memories we need, but it's just in case. What do you think?
There was a problem hiding this comment.
So this is assuming the following balance() call could run beyond the next GC?
In that case imho assignedPartitions.clear() would look better (having almost the same impact).
There was a problem hiding this comment.
Yes, assignedPartitions.clear() would have the same impact, but it'll loop through all the arrayList and nullify them one by one. I think we can either null it, or remove this line. What do you think?
/**
* Removes all of the elements from this list. The list will
* be empty after this call returns.
*/
public void clear() {
modCount++;
final Object[] es = elementData;
for (int to = size, i = size = 0; i < to; i++)
es[i] = null;
}```|
@vahidhashemian , thanks for your comments. I've updated. Please take a look again. Thank you. |
|
|
||
| // all partitions that needed to be assigned | ||
| List<TopicPartition> unassignedPartitions = getUnassignedPartitions(sortedAllPartitions, assignedPartitions, topic2AllPotentialConsumers); | ||
| assignedPartitions = null; |
There was a problem hiding this comment.
So this is assuming the following balance() call could run beyond the next GC?
In that case imho assignedPartitions.clear() would look better (having almost the same impact).
| if (subscription.userData() != null) { | ||
| // since this is our 2nd time to deserialize memberData, rewind userData is necessary | ||
| subscription.userData().rewind(); | ||
| } |
There was a problem hiding this comment.
This block didn't exist before, why is it needed now?
There was a problem hiding this comment.
This is actually a bug after constrainedAssign implemented. After constrainedAssign implemented, we'll do allSubscriptionsEqual to decide if we want to use constrainedAssign or generalAssign. In allSubscriptionsEqual, we not only check if subscription equal, but also deserialize the user data. So, if it is deserialized once, the position of userData (ByteBuffer) will be moved to the end of the buffer, so that we have to rewind here.
|
@vahidhashemian , thanks for the comments. I've updated. Please take a look again. Thank you. |
|
Thanks for addressing my comments @showuon. I tested a couple of unit tests and saw the difference this change makes. |
|
@vahidhashemian , thank you for your review! :) |
vahidhashemian
left a comment
There was a problem hiding this comment.
Had a chance to run some tests against this pr and they all went fine. +1 from me. Thanks again for the improvements.
|
The failed tests are irrelevant to this PR, I'm merging to trunk now. |
|
Thank you @showuon !! |
I did code refactor/optimization, keep the same algorithm in this PR.
I've achieved:
We complete in 10 seconds, after my code refactor, the time down to 100~200 ms
No OutOfMemory will be thrown anymore. The time will take 4~5 seconds.
Committer Checklist (excluded from commit message)