Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 9 additions & 11 deletions tests/kafkatest/tests/streams/streams_upgrade_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -303,13 +303,9 @@ def test_version_probing_upgrade(self):

self.driver = StreamsSmokeTestDriverService(self.test_context, self.kafka)
self.driver.disable_auto_terminate()
# TODO KIP-441: consider rewriting the test for HighAvailabilityTaskAssignor
self.processor1 = StreamsUpgradeTestJobRunnerService(self.test_context, self.kafka)
self.processor1.set_config("internal.task.assignor.class", "org.apache.kafka.streams.processor.internals.assignment.StickyTaskAssignor")
self.processor2 = StreamsUpgradeTestJobRunnerService(self.test_context, self.kafka)
self.processor2.set_config("internal.task.assignor.class", "org.apache.kafka.streams.processor.internals.assignment.StickyTaskAssignor")
self.processor3 = StreamsUpgradeTestJobRunnerService(self.test_context, self.kafka)
self.processor3.set_config("internal.task.assignor.class", "org.apache.kafka.streams.processor.internals.assignment.StickyTaskAssignor")

self.driver.start()
self.start_all_nodes_with("") # run with TRUNK
Expand Down Expand Up @@ -532,20 +528,22 @@ def do_rolling_bounce(self, processor, counter, current_generation):
log_monitor.wait_until("Sent a version 8 subscription and got version 7 assignment back (successful version probing). Downgrade subscription metadata to commonly supported version 7 and trigger new rebalance.",
timeout_sec=60,
err_msg="Could not detect 'successful version probing' at upgrading node " + str(node.account))
else:
log_monitor.wait_until("Sent a version 8 subscription and got version 7 assignment back (successful version probing). Downgrade subscription metadata to commonly supported version 8 and trigger new rebalance.",
Copy link
Copy Markdown
Member Author

@cadonna cadonna May 4, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This verification is only true if the two other processors haven't rebalanced before the processor that bounced last re-joins the group. If the rebalance occurs, the commonly supported version is already at 8 when the last processor joins.

Actually, the test test_version_probing_upgrade is independent of the used task assignor, but this issue was surfaced by the HighAvailabilityTaskAssignor but not by the StickyTaskAssignor. I cannot say for sure why.

IMO, removing this verification should be OK, since afterwards we check whether the processors have synchronized generations which means that all three processors successfully joined the group in the end. The state that we do not explicitly verify anymore is the transient state where version 7 is currently used, but all processor are able to use version 8.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @cadonna ; I agree. This test should just be verifying that we first converge on 7, and then that we converge on 8.

log_monitor.wait_until("Detected that the assignor requested a rebalance. Rejoining the consumer group to trigger a new rebalance.",
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know that this check was here in some fashion before, but I'm drawing a blank on why we need to verify this log line. It seems like just checking the version number logs and nothing else would be the key to a long and happy life.

Copy link
Copy Markdown
Member

@ableegoldman ableegoldman May 5, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the idea is to verify that the actual version probing rebalance takes place, ie that the partition assignor actually handles the version probing once it's detected. And that it signals to the stream thread which also handles it correctly in turn. But idk -- I've probably broken and fixed the version probing test 2 or 3 times now due to this one line in particular.

So, I'd be happy to see it go. I probably have too much bad history to make an unbiased call here though 😄

Copy link
Copy Markdown
Member Author

@cadonna cadonna May 5, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can leave it because it verifies whether the assignment was triggered in the assignor, which is better than nothing. However, it does not give us any guarantee that the rebalance took actually place.

I guess what we really would need is a way to check if a group stabilized and if the assignment is valid. We try to do that by verifying that the generations of the processors are synced. However, I ran into cases where all processors had the same generation, but one processor did not have any tasks assigned. So we would actually need to check if they have the highest generation in sync across the processors AND if all processors have at least one task assigned (AND if all tasks were assigned).

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, all. This doesn't seem like the best way to verify what we're trying to verify, but it also seems about the same as before. I'm happy to leave this here for now.

If/when the test breaks again, I'd prefer for us to put in a more reliable and direct mechanism.

timeout_sec=60,
err_msg="Could not detect 'successful version probing with upgraded leader' at upgrading node " + str(node.account))
err_msg="Could not detect 'Triggering new rebalance' at upgrading node " + str(node.account))
else:
first_other_monitor.wait_until("Sent a version 7 subscription and group.s latest commonly supported version is 8 (successful version probing and end of rolling upgrade). Upgrading subscription metadata version to 8 for next rebalance.",
timeout_sec=60,
err_msg="Never saw output 'Upgrade metadata to version 8' on" + str(first_other_node.account))
first_other_monitor.wait_until("Detected that the assignor requested a rebalance. Rejoining the consumer group to trigger a new rebalance.",
timeout_sec=60,
err_msg="Could not detect 'Triggering new rebalance' at upgrading node " + str(node.account))
second_other_monitor.wait_until("Sent a version 7 subscription and group.s latest commonly supported version is 8 (successful version probing and end of rolling upgrade). Upgrading subscription metadata version to 8 for next rebalance.",
timeout_sec=60,
err_msg="Never saw output 'Upgrade metadata to version 8' on" + str(second_other_node.account))

log_monitor.wait_until("Detected that the assignor requested a rebalance. Rejoining the consumer group to trigger a new rebalance.",
timeout_sec=60,
err_msg="Could not detect 'Triggering new rebalance' at upgrading node " + str(node.account))
second_other_monitor.wait_until("Detected that the assignor requested a rebalance. Rejoining the consumer group to trigger a new rebalance.",
timeout_sec=60,
err_msg="Could not detect 'Triggering new rebalance' at upgrading node " + str(node.account))

# version probing should trigger second rebalance
# now we check that after consecutive rebalances we have synchronized generation
Expand Down