KAFKA-6145: Set HighAvailabilityTaskAssignor as default in streams_upgrade_test.py#8613
Conversation
…grade_test.py This PR sets HighAvailabilityTaskAssignor as default task assignor in streams_upgrade_test.py. The verification of the test needed to be modified to because the HighAvailabilityTaskAssignor surfaced a flakiness in the test. More precisely, the verifications assume that the last client that is bounced joins the group before the other two clients are able to rebalance without the last client. This assumption does not always hold.
| timeout_sec=60, | ||
| err_msg="Could not detect 'successful version probing' at upgrading node " + str(node.account)) | ||
| else: | ||
| log_monitor.wait_until("Sent a version 8 subscription and got version 7 assignment back (successful version probing). Downgrade subscription metadata to commonly supported version 8 and trigger new rebalance.", |
There was a problem hiding this comment.
This verification is only true if the two other processors haven't rebalanced before the processor that bounced last re-joins the group. If the rebalance occurs, the commonly supported version is already at 8 when the last processor joins.
Actually, the test test_version_probing_upgrade is independent of the used task assignor, but this issue was surfaced by the HighAvailabilityTaskAssignor but not by the StickyTaskAssignor. I cannot say for sure why.
IMO, removing this verification should be OK, since afterwards we check whether the processors have synchronized generations which means that all three processors successfully joined the group in the end. The state that we do not explicitly verify anymore is the transient state where version 7 is currently used, but all processor are able to use version 8.
There was a problem hiding this comment.
Thanks @cadonna ; I agree. This test should just be verifying that we first converge on 7, and then that we converge on 8.
|
Call for review: @vvcephei @ableegoldman |
|
System tests job: https://jenkins.confluent.io/job/system-test-kafka-branch-builder/3928/ |
| timeout_sec=60, | ||
| err_msg="Could not detect 'successful version probing' at upgrading node " + str(node.account)) | ||
| else: | ||
| log_monitor.wait_until("Sent a version 8 subscription and got version 7 assignment back (successful version probing). Downgrade subscription metadata to commonly supported version 8 and trigger new rebalance.", |
There was a problem hiding this comment.
Thanks @cadonna ; I agree. This test should just be verifying that we first converge on 7, and then that we converge on 8.
| # TODO KIP-441: consider rewriting the test for HighAvailabilityTaskAssignor | ||
| self.processor1 = StreamsUpgradeTestJobRunnerService(self.test_context, self.kafka) | ||
| self.processor1.set_config("internal.task.assignor.class", "org.apache.kafka.streams.processor.internals.assignment.StickyTaskAssignor") | ||
| self.processor1.set_config("internal.task.assignor.class", "org.apache.kafka.streams.processor.internals.assignment.HighAvailabilityTaskAssignor") |
There was a problem hiding this comment.
We can actually just delete these lines now.
| err_msg="Could not detect 'successful version probing' at upgrading node " + str(node.account)) | ||
| else: | ||
| log_monitor.wait_until("Sent a version 8 subscription and got version 7 assignment back (successful version probing). Downgrade subscription metadata to commonly supported version 8 and trigger new rebalance.", | ||
| log_monitor.wait_until("Detected that the assignor requested a rebalance. Rejoining the consumer group to trigger a new rebalance.", |
There was a problem hiding this comment.
I know that this check was here in some fashion before, but I'm drawing a blank on why we need to verify this log line. It seems like just checking the version number logs and nothing else would be the key to a long and happy life.
There was a problem hiding this comment.
I think the idea is to verify that the actual version probing rebalance takes place, ie that the partition assignor actually handles the version probing once it's detected. And that it signals to the stream thread which also handles it correctly in turn. But idk -- I've probably broken and fixed the version probing test 2 or 3 times now due to this one line in particular.
So, I'd be happy to see it go. I probably have too much bad history to make an unbiased call here though 😄
There was a problem hiding this comment.
We can leave it because it verifies whether the assignment was triggered in the assignor, which is better than nothing. However, it does not give us any guarantee that the rebalance took actually place.
I guess what we really would need is a way to check if a group stabilized and if the assignment is valid. We try to do that by verifying that the generations of the processors are synced. However, I ran into cases where all processors had the same generation, but one processor did not have any tasks assigned. So we would actually need to check if they have the highest generation in sync across the processors AND if all processors have at least one task assigned (AND if all tasks were assigned).
There was a problem hiding this comment.
Thanks, all. This doesn't seem like the best way to verify what we're trying to verify, but it also seems about the same as before. I'm happy to leave this here for now.
If/when the test breaks again, I'd prefer for us to put in a more reliable and direct mechanism.
| err_msg="Could not detect 'successful version probing' at upgrading node " + str(node.account)) | ||
| else: | ||
| log_monitor.wait_until("Sent a version 8 subscription and got version 7 assignment back (successful version probing). Downgrade subscription metadata to commonly supported version 8 and trigger new rebalance.", | ||
| log_monitor.wait_until("Detected that the assignor requested a rebalance. Rejoining the consumer group to trigger a new rebalance.", |
There was a problem hiding this comment.
Thanks, all. This doesn't seem like the best way to verify what we're trying to verify, but it also seems about the same as before. I'm happy to leave this here for now.
If/when the test breaks again, I'd prefer for us to put in a more reliable and direct mechanism.
|
test this please |
|
Just in case, I re-run the system tests: https://jenkins.confluent.io/job/system-test-kafka-branch-builder/3929/ |
|
Thanks @cadonna , Let's see how those tests play out. |
|
The test results are gone now, unfortunately. Retest this please |
|
Actually, since the only thing that changed was a python system test file, it couldn't cause any of the integration test failures, so I'll go ahead and merge. Here were the failures: org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta[false] |
* 'trunk' of github.com:apache/kafka: KAFKA-9290: Update IQ related JavaDocs (apache#8114) KAFKA-9928: Fix flaky GlobalKTableEOSIntegrationTest (apache#8600) KAFKA-6145: Set HighAvailabilityTaskAssignor as default in streams_upgrade_test.py (apache#8613) KAFKA-9667: Connect JSON serde strip trailing zeros (apache#8230) MINOR: Log4j Improvements on Fetcher (apache#8629)
…grade_test.py (apache#8613) Generalize the verification in the upgrade test so that it does not rely on the task assignor's behavior. Reviewers: A. Sophie Blee-Goldman <sophie@confluent.io>, John Roesler <vvcephei@apache.org>
This PR sets HighAvailabilityTaskAssignor as default task assignor in
streams_upgrade_test.py. The verification of the test needed to be
modified to because the HighAvailabilityTaskAssignor surfaced a flakiness
in the test. More precisely, the verifications assume that the last
client that is bounced joins the group before the other two clients
are able to rebalance without the last client. This assumption does not
always hold.
Committer Checklist (excluded from commit message)