diff --git a/tests/kafkatest/tests/streams/streams_upgrade_test.py b/tests/kafkatest/tests/streams/streams_upgrade_test.py index 16b9d60c75206..7e1b27cfe669d 100644 --- a/tests/kafkatest/tests/streams/streams_upgrade_test.py +++ b/tests/kafkatest/tests/streams/streams_upgrade_test.py @@ -303,13 +303,9 @@ def test_version_probing_upgrade(self): self.driver = StreamsSmokeTestDriverService(self.test_context, self.kafka) self.driver.disable_auto_terminate() - # TODO KIP-441: consider rewriting the test for HighAvailabilityTaskAssignor self.processor1 = StreamsUpgradeTestJobRunnerService(self.test_context, self.kafka) - self.processor1.set_config("internal.task.assignor.class", "org.apache.kafka.streams.processor.internals.assignment.StickyTaskAssignor") self.processor2 = StreamsUpgradeTestJobRunnerService(self.test_context, self.kafka) - self.processor2.set_config("internal.task.assignor.class", "org.apache.kafka.streams.processor.internals.assignment.StickyTaskAssignor") self.processor3 = StreamsUpgradeTestJobRunnerService(self.test_context, self.kafka) - self.processor3.set_config("internal.task.assignor.class", "org.apache.kafka.streams.processor.internals.assignment.StickyTaskAssignor") self.driver.start() self.start_all_nodes_with("") # run with TRUNK @@ -532,20 +528,22 @@ def do_rolling_bounce(self, processor, counter, current_generation): log_monitor.wait_until("Sent a version 8 subscription and got version 7 assignment back (successful version probing). Downgrade subscription metadata to commonly supported version 7 and trigger new rebalance.", timeout_sec=60, err_msg="Could not detect 'successful version probing' at upgrading node " + str(node.account)) - else: - log_monitor.wait_until("Sent a version 8 subscription and got version 7 assignment back (successful version probing). Downgrade subscription metadata to commonly supported version 8 and trigger new rebalance.", + log_monitor.wait_until("Detected that the assignor requested a rebalance. Rejoining the consumer group to trigger a new rebalance.", timeout_sec=60, - err_msg="Could not detect 'successful version probing with upgraded leader' at upgrading node " + str(node.account)) + err_msg="Could not detect 'Triggering new rebalance' at upgrading node " + str(node.account)) + else: first_other_monitor.wait_until("Sent a version 7 subscription and group.s latest commonly supported version is 8 (successful version probing and end of rolling upgrade). Upgrading subscription metadata version to 8 for next rebalance.", timeout_sec=60, err_msg="Never saw output 'Upgrade metadata to version 8' on" + str(first_other_node.account)) + first_other_monitor.wait_until("Detected that the assignor requested a rebalance. Rejoining the consumer group to trigger a new rebalance.", + timeout_sec=60, + err_msg="Could not detect 'Triggering new rebalance' at upgrading node " + str(node.account)) second_other_monitor.wait_until("Sent a version 7 subscription and group.s latest commonly supported version is 8 (successful version probing and end of rolling upgrade). Upgrading subscription metadata version to 8 for next rebalance.", timeout_sec=60, err_msg="Never saw output 'Upgrade metadata to version 8' on" + str(second_other_node.account)) - - log_monitor.wait_until("Detected that the assignor requested a rebalance. Rejoining the consumer group to trigger a new rebalance.", - timeout_sec=60, - err_msg="Could not detect 'Triggering new rebalance' at upgrading node " + str(node.account)) + second_other_monitor.wait_until("Detected that the assignor requested a rebalance. Rejoining the consumer group to trigger a new rebalance.", + timeout_sec=60, + err_msg="Could not detect 'Triggering new rebalance' at upgrading node " + str(node.account)) # version probing should trigger second rebalance # now we check that after consecutive rebalances we have synchronized generation