Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 3 additions & 41 deletions tests/kafkatest/tests/streams/streams_upgrade_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,6 @@ def __init__(self, test_context):
'echo' : { 'partitions': 5 },
'data' : { 'partitions': 5 },
}
self.leader = None
self.leader_counter = {}

processed_msg = "processed [0-9]* records"

Expand Down Expand Up @@ -311,13 +309,6 @@ def test_version_probing_upgrade(self):
self.processors = [self.processor1, self.processor2, self.processor3]
self.old_processors = [self.processor1, self.processor2, self.processor3]
self.upgraded_processors = []
for p in self.processors:
self.leader_counter[p] = 2

self.update_leader()
for p in self.processors:
self.leader_counter[p] = 0
self.leader_counter[self.leader] = 3

counter = 1
current_generation = 3
Expand All @@ -342,25 +333,6 @@ def test_version_probing_upgrade(self):
timeout_sec=60,
err_msg="Never saw output 'UPGRADE-TEST-CLIENT-CLOSED' on" + str(node.account))

def update_leader(self):
self.leader = None
retries = 10
while retries > 0:
for p in self.processors:
found = list(p.node.account.ssh_capture("grep \"Finished assignment for group\" %s" % p.LOG_FILE, allow_fail=True))
if len(found) >= self.leader_counter[p] + 1:
self.leader = p
self.leader_counter[p] = self.leader_counter[p] + 1

if self.leader is None:
retries = retries - 1
time.sleep(5)
else:
break

if self.leader is None:
raise Exception("Could not identify leader")

def get_version_string(self, version):
if version.startswith("0") or version.startswith("1") \
or version.startswith("2.0") or version.startswith("2.1"):
Expand Down Expand Up @@ -524,7 +496,6 @@ def do_rolling_bounce(self, processor, counter, current_generation):
node.account.ssh("mv " + processor.STDOUT_FILE + " " + processor.STDOUT_FILE + "." + str(counter), allow_fail=False)
node.account.ssh("mv " + processor.STDERR_FILE + " " + processor.STDERR_FILE + "." + str(counter), allow_fail=False)
node.account.ssh("mv " + processor.LOG_FILE + " " + processor.LOG_FILE + "." + str(counter), allow_fail=False)
self.leader_counter[processor] = 0

with node.account.monitor_log(processor.LOG_FILE) as log_monitor:
processor.set_upgrade_to("future_version")
Expand All @@ -540,11 +511,6 @@ def do_rolling_bounce(self, processor, counter, current_generation):
timeout_sec=60,
err_msg="Could not detect FutureStreamsPartitionAssignor in " + str(node.account))

if processor == self.leader:
self.update_leader()
else:
self.leader_counter[self.leader] = self.leader_counter[self.leader] + 1

monitors = {}
monitors[processor] = log_monitor
monitors[first_other_processor] = first_other_monitor
Expand Down Expand Up @@ -598,12 +564,8 @@ def do_rolling_bounce(self, processor, counter, current_generation):
if generation_synchronized == False:
raise Exception("Never saw all three processors have the synchronized generation number")

if processor == self.leader:
self.update_leader()
else:
self.leader_counter[self.leader] = self.leader_counter[self.leader] + 1

if self.leader in self.old_processors or len(self.old_processors) > 0:
if len(self.old_processors) > 0:
self.verify_metadata_no_upgraded_yet()

return current_generation
Expand All @@ -616,9 +578,9 @@ def extract_highest_generation(self, found_generations):

def verify_metadata_no_upgraded_yet(self):
for p in self.processors:
found = list(p.node.account.ssh_capture("grep \"Sent a version 4 subscription and group leader.s latest supported version is 5. Upgrading subscription metadata version to 5 for next rebalance.\" " + p.LOG_FILE, allow_fail=True))
found = list(p.node.account.ssh_capture("grep \"Sent a version 4 subscription and group.s latest commonly supported version is 5 (successful version probing and end of rolling upgrade). Upgrading subscription metadata version to 5 for next rebalance.\" " + p.LOG_FILE, allow_fail=True))
if len(found) > 0:
raise Exception("Kafka Streams failed with 'group member upgraded to metadata 4 too early'")
raise Exception("Kafka Streams failed with 'group member upgraded to metadata 5 too early'")

def get_topics_count(self):
count = 0
Expand Down