KAFKA-15680: Partition-Count is not getting updated Correctly in the Incremental Co-operative Rebalancing(ICR) Mode of Rebalancing.#14630
KAFKA-15680: Partition-Count is not getting updated Correctly in the Incremental Co-operative Rebalancing(ICR) Mode of Rebalancing.#14630C0urante merged 9 commits intoapache:trunkfrom kumarpritam863:KAFKA-15680
Conversation
kumarpritam863
commented
Oct 25, 2023
- In ICR(Incremental Cooperative Rebalancing) mode, whenever a new worker, say Worker 3 joins, a new global assignment is computed by the leader, say Worker1, that results in the revocation of one task from each existing worker i.e Worker1 and Worker2.
- Once the new member join is completed, ConsumerCoordinator.OnJoinComplete() method is called which primarily computes all the new partitions assigned and the partitions which are revoked and updates the subscription Object.
- If it was the case of revocation which we check by checking the “partitonsRevoked” list, we call the method “invokePartitionRevoked()” which internally calls “updatePartitionCount()” which fetches partition from the assignment object which is yet not updated by the new assignment.
- It is only just before calling the “invokePartitionsAssigned()” method that we update the assignment by invoking the following → subscriptions.assignFromSubscribed(assignedPartitions);
…partition count in both partition invocation or addition case. Partition-Count is not getting updated Correctly in the Incremental Co-operative Rebalancing(ICR) Mode of Rebalancing.
|
@C0urante Can you please review? |
|
@kumarpritam863 is this the same as #12622 and https://issues.apache.org/jira/browse/KAFKA-14220? And if so, can you shed some light on why that PR and ticket were wiped and closed without comment after the initial round of review? |
Yes @C0urante This is same and I closed the PR and JIRA at that time due to some privacy policy about which I was not sure at that time. Really sorry for that as that was miss from my side. Can we please precede this with new PR and JIRA. |
|
@C0urante If you have any questions or need further clarification about the changes, please feel free to reach out to me. Your insights and expertise are highly valued, and your approval would be really helpful. Thank you in advance for your time and consideration. Best regards, |
|
I can take a look next week. In the meantime, do you think you could add some unit tests for this change? |
|
@C0urante These are the few reasons why adding unit test is difficult for this:
Can we please take this without the test case or with the existing test case. This is a really small change but of utmost important to us as we heavily rely on this metrics. Thank you in advance for your time and consideration. Best regards, |
|
@kumarpritam863 I understand that the code changes are small, and I'm glad to hear that pre-prod testing indicates that this commit has the intended effect. However, I still think that unit testing coverage is warranted--we want to make sure not just that this change works now, but that we don't accidentally regress in the future. I think it should be possible to use the MockConsumer class (possibly with some small tweaks to is rebalance method to make the ordering of steps more closely match the ordering in the You may also notice that existing unit tests are failing now (see the CI results, or feel free to test this PR locally with Finally, can you update the title of the PR to describe what changes are actually made? You can see examples of good PR titles in the list of recently-merged commits to trunk. Thanks! |
…n call. Call to closePartition method will not invoke consumer.assignment(). To accomodate That I have reduced the count in times call to 1.
|
@C0urante |
|
@C0urante All the tests passed locally. But I will check for the failing tests again and will update here. |
|
I ran the following command:
|
|
That's fine, we can change the left-hand type for |
|
@C0urante Did not get the above part. Do you want me to change the type of consumer from KafkaConsumer<byte[], byte[]> to Consumer<byte[], byte[]> in the WorkerSinkTask class. |
|
Yes, exactly 👍 |
… flow to update the partition count which updates wrong value before the actual update of Susbscription object 2. The correct flow which updates the partition after the subscription object is updated with the correct subscrition. As part of the test case update the signature for consumer from KafkaConsumer to consumer in worker sink task and worker sink task context.
|
@C0urante I have added the test for both the scenarios:
Can you please review and let me know if any other change or modification is required. |
C0urante
left a comment
There was a problem hiding this comment.
Thanks @kumarpritam863, this is headed in the right direction. I think we can do a little better on the tests though; left my thoughts.
| transformationChain, mockConsumer, pluginLoader, time, | ||
| RetryWithToleranceOperatorTest.NOOP_OPERATOR, null, statusBackingStore, Collections::emptyList); | ||
| // Subscribing to Topic = "test" with "MockHandleRebalance" | ||
| mockConsumer.subscribe(asList(TOPIC), new MockHandleRebalancePass()); |
There was a problem hiding this comment.
We shouldn't be subscribing directly here. Instead, the WorkerSinkTask instance should be issuing this call as part of its initializeAndStart method. This will also allow us to test the real consumer rebalance listener used by the WorkerSinkTask class, instead of a mocked listener.
I think the sequence of calls should be something like this:
MockConsumer::updateBeginningOffsets(necessary because theWorkerSinkTaskinstance's rebalance listener will try to get the current position of its assignment inonPartitionsAssigned)WorkerSinkTask::initialize(you can use theTASK_CONFIGconstant in the test class as the argument)WorkerSinkTask::initializeAndStartassertSinkMetricValue,MockConsumer::rebalance(as many times as you want)
There was a problem hiding this comment.
Thanks @C0urante , Yes this would be nice and I have added this step and removed the Mock Handle Rebalances.
| private StatusBackingStore statusBackingStore; | ||
| @Mock | ||
| private KafkaConsumer<byte[], byte[]> consumer; | ||
| private Consumer<byte[], byte[]> mockConsumer; |
There was a problem hiding this comment.
Nit: it's a little strange to call this mockConsumer when there's also a consumer field that itself contains a mocked consumer. To avoid confusion, can we just make this a local variable for any of the tests that require it?
There was a problem hiding this comment.
Yeah, I have made this a local variable inside that test method.
|
|
||
| @Test | ||
| public void testPartitionCountInCaseOfPartitionRevocation() { | ||
| mockConsumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST); |
There was a problem hiding this comment.
If we make this a local variable, we can change the left-hand type to MockConsumer<byte[], byte[]> and then the casts later on at lines 2019 and 2022 won't be necessary.
There was a problem hiding this comment.
Yes, I missed this, Thanks, I have added this change.
| } | ||
|
|
||
| @Test | ||
| public void testPartitionCountInCaseOfPartitionRevocationFail() { |
There was a problem hiding this comment.
This is testing the MockConsumer behavior more than anything else. If we want to do that (which I don't think is necessary but wouldn't oppose), then we should move this kind of testing into the MockConsumerTest suite.
There was a problem hiding this comment.
Actually I added this to test the failure scenario but as I have made a change to use Handle Rebalance of Worker Sink Task this is not needed anymore.
| /* | ||
| Correct Order to Call the updatePartitionCount | ||
| */ | ||
| private class MockHandleRebalancePass implements ConsumerRebalanceListener { | ||
| @Override | ||
| public void onPartitionsAssigned(Collection<TopicPartition> partitions) { | ||
| workerTask.sinkTaskMetricsGroup().recordPartitionCount(mockConsumer.assignment().size()); | ||
| if (partitions.isEmpty()) { | ||
| return; | ||
| } | ||
| // Not doing anything as the objective is to test only Partition Count | ||
| } | ||
| @Override | ||
| public void onPartitionsRevoked(Collection<TopicPartition> partitions) { | ||
| // Not doing anything as we do not updatePartitionCount here. | ||
| } | ||
| } | ||
| /* | ||
| Existing flow which calls updatePartitionCount before updation of Subscription. | ||
| */ | ||
| private class MockHandleRebalanceFail implements ConsumerRebalanceListener { | ||
| @Override | ||
| public void onPartitionsAssigned(Collection<TopicPartition> partitions) { | ||
| if (partitions.isEmpty()) { | ||
| return; | ||
| } | ||
| // Not doing anything as the objective is to test only Partition Count | ||
| workerTask.sinkTaskMetricsGroup().recordPartitionCount(mockConsumer.assignment().size()); | ||
| } | ||
| @Override | ||
| public void onPartitionsRevoked(Collection<TopicPartition> partitions) { | ||
| // Not doing anything as we do not updatePartitionCount here. | ||
| workerTask.sinkTaskMetricsGroup().recordPartitionCount(mockConsumer.assignment().size()); | ||
| } | ||
| } |
There was a problem hiding this comment.
I don't think we need any custom rebalance listeners for this PR, we should just be testing the one used by the WorkerSinkTask class.
There was a problem hiding this comment.
Yes, @C0urante , we do not need. Actually missed this. I have added this change.
… of Worker Sink Task
…g of worker sink task
|
@C0urante I have addressed all the comments and made the changes accordingly. Can you please review once and let me know if any changes are required or I missed anything. |
C0urante
left a comment
There was a problem hiding this comment.
LGTM, thanks @kumarpritam863!
…nsumer protocol is used (#14630) Reviewers: Chris Egerton <chrise@aiven.io>
|
Thanks @C0urante for all the help. |
…nsumer protocol is used (apache#14630) Reviewers: Chris Egerton <chrise@aiven.io>
…nsumer protocol is used (apache#14630) Reviewers: Chris Egerton <chrise@aiven.io>