From a6e1f292b82a17bad8c1e8f249d37f869b347894 Mon Sep 17 00:00:00 2001 From: Bruno Cadonna Date: Mon, 4 May 2020 16:28:56 +0200 Subject: [PATCH 1/2] KAFKA-6145: Set HighAvailabilityTaskAssignor as default in streams_upgrade_test.py This PR sets HighAvailabilityTaskAssignor as default task assignor in streams_upgrade_test.py. The verification of the test needed to be modified to because the HighAvailabilityTaskAssignor surfaced a flakiness in the test. More precisely, the verifications assume that the last client that is bounced joins the group before the other two clients are able to rebalance without the last client. This assumption does not always hold. --- .../tests/streams/streams_upgrade_test.py | 22 ++++++++++--------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/tests/kafkatest/tests/streams/streams_upgrade_test.py b/tests/kafkatest/tests/streams/streams_upgrade_test.py index 16b9d60c75206..f9d4d5b61227b 100644 --- a/tests/kafkatest/tests/streams/streams_upgrade_test.py +++ b/tests/kafkatest/tests/streams/streams_upgrade_test.py @@ -305,11 +305,11 @@ def test_version_probing_upgrade(self): self.driver.disable_auto_terminate() # TODO KIP-441: consider rewriting the test for HighAvailabilityTaskAssignor self.processor1 = StreamsUpgradeTestJobRunnerService(self.test_context, self.kafka) - self.processor1.set_config("internal.task.assignor.class", "org.apache.kafka.streams.processor.internals.assignment.StickyTaskAssignor") + self.processor1.set_config("internal.task.assignor.class", "org.apache.kafka.streams.processor.internals.assignment.HighAvailabilityTaskAssignor") self.processor2 = StreamsUpgradeTestJobRunnerService(self.test_context, self.kafka) - self.processor2.set_config("internal.task.assignor.class", "org.apache.kafka.streams.processor.internals.assignment.StickyTaskAssignor") + self.processor2.set_config("internal.task.assignor.class", "org.apache.kafka.streams.processor.internals.assignment.HighAvailabilityTaskAssignor") self.processor3 = StreamsUpgradeTestJobRunnerService(self.test_context, self.kafka) - self.processor3.set_config("internal.task.assignor.class", "org.apache.kafka.streams.processor.internals.assignment.StickyTaskAssignor") + self.processor3.set_config("internal.task.assignor.class", "org.apache.kafka.streams.processor.internals.assignment.HighAvailabilityTaskAssignor") self.driver.start() self.start_all_nodes_with("") # run with TRUNK @@ -532,20 +532,22 @@ def do_rolling_bounce(self, processor, counter, current_generation): log_monitor.wait_until("Sent a version 8 subscription and got version 7 assignment back (successful version probing). Downgrade subscription metadata to commonly supported version 7 and trigger new rebalance.", timeout_sec=60, err_msg="Could not detect 'successful version probing' at upgrading node " + str(node.account)) - else: - log_monitor.wait_until("Sent a version 8 subscription and got version 7 assignment back (successful version probing). Downgrade subscription metadata to commonly supported version 8 and trigger new rebalance.", + log_monitor.wait_until("Detected that the assignor requested a rebalance. Rejoining the consumer group to trigger a new rebalance.", timeout_sec=60, - err_msg="Could not detect 'successful version probing with upgraded leader' at upgrading node " + str(node.account)) + err_msg="Could not detect 'Triggering new rebalance' at upgrading node " + str(node.account)) + else: first_other_monitor.wait_until("Sent a version 7 subscription and group.s latest commonly supported version is 8 (successful version probing and end of rolling upgrade). Upgrading subscription metadata version to 8 for next rebalance.", timeout_sec=60, err_msg="Never saw output 'Upgrade metadata to version 8' on" + str(first_other_node.account)) + first_other_monitor.wait_until("Detected that the assignor requested a rebalance. Rejoining the consumer group to trigger a new rebalance.", + timeout_sec=60, + err_msg="Could not detect 'Triggering new rebalance' at upgrading node " + str(node.account)) second_other_monitor.wait_until("Sent a version 7 subscription and group.s latest commonly supported version is 8 (successful version probing and end of rolling upgrade). Upgrading subscription metadata version to 8 for next rebalance.", timeout_sec=60, err_msg="Never saw output 'Upgrade metadata to version 8' on" + str(second_other_node.account)) - - log_monitor.wait_until("Detected that the assignor requested a rebalance. Rejoining the consumer group to trigger a new rebalance.", - timeout_sec=60, - err_msg="Could not detect 'Triggering new rebalance' at upgrading node " + str(node.account)) + second_other_monitor.wait_until("Detected that the assignor requested a rebalance. Rejoining the consumer group to trigger a new rebalance.", + timeout_sec=60, + err_msg="Could not detect 'Triggering new rebalance' at upgrading node " + str(node.account)) # version probing should trigger second rebalance # now we check that after consecutive rebalances we have synchronized generation From b80ef741379b3044715738f16648bc0b35370fed Mon Sep 17 00:00:00 2001 From: Bruno Cadonna Date: Mon, 4 May 2020 22:10:43 +0200 Subject: [PATCH 2/2] Remove calls to set the task assignor --- tests/kafkatest/tests/streams/streams_upgrade_test.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/tests/kafkatest/tests/streams/streams_upgrade_test.py b/tests/kafkatest/tests/streams/streams_upgrade_test.py index f9d4d5b61227b..7e1b27cfe669d 100644 --- a/tests/kafkatest/tests/streams/streams_upgrade_test.py +++ b/tests/kafkatest/tests/streams/streams_upgrade_test.py @@ -303,13 +303,9 @@ def test_version_probing_upgrade(self): self.driver = StreamsSmokeTestDriverService(self.test_context, self.kafka) self.driver.disable_auto_terminate() - # TODO KIP-441: consider rewriting the test for HighAvailabilityTaskAssignor self.processor1 = StreamsUpgradeTestJobRunnerService(self.test_context, self.kafka) - self.processor1.set_config("internal.task.assignor.class", "org.apache.kafka.streams.processor.internals.assignment.HighAvailabilityTaskAssignor") self.processor2 = StreamsUpgradeTestJobRunnerService(self.test_context, self.kafka) - self.processor2.set_config("internal.task.assignor.class", "org.apache.kafka.streams.processor.internals.assignment.HighAvailabilityTaskAssignor") self.processor3 = StreamsUpgradeTestJobRunnerService(self.test_context, self.kafka) - self.processor3.set_config("internal.task.assignor.class", "org.apache.kafka.streams.processor.internals.assignment.HighAvailabilityTaskAssignor") self.driver.start() self.start_all_nodes_with("") # run with TRUNK