diff --git a/tests/kafkatest/tests/streams/streams_upgrade_test.py b/tests/kafkatest/tests/streams/streams_upgrade_test.py index 7b9a310bdc5f6..c315f723f825b 100644 --- a/tests/kafkatest/tests/streams/streams_upgrade_test.py +++ b/tests/kafkatest/tests/streams/streams_upgrade_test.py @@ -84,8 +84,6 @@ def __init__(self, test_context): 'echo' : { 'partitions': 5 }, 'data' : { 'partitions': 5 }, } - self.leader = None - self.leader_counter = {} processed_msg = "processed [0-9]* records" @@ -311,13 +309,6 @@ def test_version_probing_upgrade(self): self.processors = [self.processor1, self.processor2, self.processor3] self.old_processors = [self.processor1, self.processor2, self.processor3] self.upgraded_processors = [] - for p in self.processors: - self.leader_counter[p] = 2 - - self.update_leader() - for p in self.processors: - self.leader_counter[p] = 0 - self.leader_counter[self.leader] = 3 counter = 1 current_generation = 3 @@ -342,25 +333,6 @@ def test_version_probing_upgrade(self): timeout_sec=60, err_msg="Never saw output 'UPGRADE-TEST-CLIENT-CLOSED' on" + str(node.account)) - def update_leader(self): - self.leader = None - retries = 10 - while retries > 0: - for p in self.processors: - found = list(p.node.account.ssh_capture("grep \"Finished assignment for group\" %s" % p.LOG_FILE, allow_fail=True)) - if len(found) >= self.leader_counter[p] + 1: - self.leader = p - self.leader_counter[p] = self.leader_counter[p] + 1 - - if self.leader is None: - retries = retries - 1 - time.sleep(5) - else: - break - - if self.leader is None: - raise Exception("Could not identify leader") - def get_version_string(self, version): if version.startswith("0") or version.startswith("1") \ or version.startswith("2.0") or version.startswith("2.1"): @@ -524,7 +496,6 @@ def do_rolling_bounce(self, processor, counter, current_generation): node.account.ssh("mv " + processor.STDOUT_FILE + " " + processor.STDOUT_FILE + "." + str(counter), allow_fail=False) node.account.ssh("mv " + processor.STDERR_FILE + " " + processor.STDERR_FILE + "." + str(counter), allow_fail=False) node.account.ssh("mv " + processor.LOG_FILE + " " + processor.LOG_FILE + "." + str(counter), allow_fail=False) - self.leader_counter[processor] = 0 with node.account.monitor_log(processor.LOG_FILE) as log_monitor: processor.set_upgrade_to("future_version") @@ -540,11 +511,6 @@ def do_rolling_bounce(self, processor, counter, current_generation): timeout_sec=60, err_msg="Could not detect FutureStreamsPartitionAssignor in " + str(node.account)) - if processor == self.leader: - self.update_leader() - else: - self.leader_counter[self.leader] = self.leader_counter[self.leader] + 1 - monitors = {} monitors[processor] = log_monitor monitors[first_other_processor] = first_other_monitor @@ -598,12 +564,8 @@ def do_rolling_bounce(self, processor, counter, current_generation): if generation_synchronized == False: raise Exception("Never saw all three processors have the synchronized generation number") - if processor == self.leader: - self.update_leader() - else: - self.leader_counter[self.leader] = self.leader_counter[self.leader] + 1 - if self.leader in self.old_processors or len(self.old_processors) > 0: + if len(self.old_processors) > 0: self.verify_metadata_no_upgraded_yet() return current_generation @@ -616,9 +578,9 @@ def extract_highest_generation(self, found_generations): def verify_metadata_no_upgraded_yet(self): for p in self.processors: - found = list(p.node.account.ssh_capture("grep \"Sent a version 4 subscription and group leader.s latest supported version is 5. Upgrading subscription metadata version to 5 for next rebalance.\" " + p.LOG_FILE, allow_fail=True)) + found = list(p.node.account.ssh_capture("grep \"Sent a version 4 subscription and group.s latest commonly supported version is 5 (successful version probing and end of rolling upgrade). Upgrading subscription metadata version to 5 for next rebalance.\" " + p.LOG_FILE, allow_fail=True)) if len(found) > 0: - raise Exception("Kafka Streams failed with 'group member upgraded to metadata 4 too early'") + raise Exception("Kafka Streams failed with 'group member upgraded to metadata 5 too early'") def get_topics_count(self): count = 0