KAFKA-14911: Add system tests for rolling upgrade path of KIP-904#13656
KAFKA-14911: Add system tests for rolling upgrade path of KIP-904#13656mjsax wants to merge 3 commits intoapache:trunkfrom
Conversation
- fix data gen to use ring-buffer - fix application-id for older versions
| } | ||
|
|
||
| int next() { | ||
| return (index < values.length) ? values[index++] : -1; |
There was a problem hiding this comment.
The test produces too much data, and thus ended up sending -1 for a lot of records at the end.
| } | ||
|
|
||
| final Random rand = new Random(); | ||
| final Random rand = new Random(System.currentTimeMillis()); |
| final List<String> expectedAggValues) { | ||
| final KStream<Integer, String> result = sourceTable | ||
| .groupBy( | ||
| (k, v) -> new KeyValue<>(v, aggProduceValue), |
There was a problem hiding this comment.
Changed this to use v as key -- works just fine
| } | ||
|
|
||
| final boolean shouldLog = updated || (agg.recordsProcessed % 10 == 0); // value of 10 is chosen for debugging purposes. can increase to 100 once test is passing. | ||
| if (shouldLog) { |
There was a problem hiding this comment.
Changed this slightly to avoid spamming the output
There was a problem hiding this comment.
Hmm I'm not seeing what the change was. Should we increase the value in the line above from 10 too 100? Currently the comment still says "value of 10 is chosen for debugging purposes. can increase to 100 once test is passing"
There was a problem hiding this comment.
Originally it was
if (shouldLog && seenValues.containsAll(expectedAggValues) {
...
} else {
...
}
So it always logged something.
There was a problem hiding this comment.
Ah, must've put that in for debugging and forgotten to leave a note. The new mechanism makes sense 👍 If you think recordsProcessed % 10 is the right frequency, rather than recordsProcessed % 100, then perhaps we can remove the comment entirely and just leave it as recordsProcessed % 10.
| final boolean runFkJoin = Boolean.parseBoolean(streamsProperties.getProperty( | ||
| "test.run_fk_join", | ||
| "false")); | ||
| final boolean runTableAgg = Boolean.parseBoolean(streamsProperties.getProperty( |
There was a problem hiding this comment.
Backported the table-aggregation step to older versions -- without it, the first app instances we start up don't have it.
This must be done for other older versions we want to test, too.
There was a problem hiding this comment.
Doh! This is the step I was missing when I was testing these test changes earlier. Thanks for solving my mystery :)
| config.setProperty( | ||
| StreamsConfig.APPLICATION_ID_CONFIG, | ||
| "StreamsUpgradeTest-" + new Random().nextLong()); | ||
| "StreamsUpgradeTest"); |
| metadata_2_versions = [str(LATEST_0_10_1), str(LATEST_0_10_2), str(LATEST_0_11_0), str(LATEST_1_0), str(LATEST_1_1)] | ||
| fk_join_versions = [str(LATEST_2_4), str(LATEST_2_5), str(LATEST_2_6), str(LATEST_2_7), str(LATEST_2_8), | ||
| str(LATEST_3_0), str(LATEST_3_1), str(LATEST_3_2), str(LATEST_3_3)] | ||
| table_agg_versions = [str(LATEST_3_3)] |
There was a problem hiding this comment.
We should add more versions here -- not sure how far back we want to go?
There was a problem hiding this comment.
What's the reason for adding older versions? Do we expect that upgrading from a version older than 3.3 will be different than updating from 3.3?
There was a problem hiding this comment.
Guess it should not be different -- but in the past, we basically tested all versions -- if we think it's too excessive, we can also cut down the test matrix in general.
There was a problem hiding this comment.
You have a better sense of how long these tests take to run vs how much additional value testing multiple older versions gives, so I trust your judgment. My instinct would've been to say that since this is a new test, we don't need to add in the older versions unless we expect them to be different, but no strong preference :)
|
|
||
| log_monitor.wait_until(connected_message, | ||
| timeout_sec=120, | ||
| timeout_sec=60, |
There was a problem hiding this comment.
Not sure why this timeout was higher than all others. Side cleanup
| agg_success_str = "Table aggregate processor saw expected values. Seen: " + expected_values | ||
| with self.processor1.node.account.monitor_log(self.processor1.STDOUT_FILE) as first_monitor: | ||
| with self.processor2.node.account.monitor_log(self.processor2.STDOUT_FILE) as second_monitor: | ||
| with self.processor3.node.account.monitor_log(self.processor3.STDOUT_FILE) as third_monitor: |
There was a problem hiding this comment.
All three monitors need to tail STDOUT (not the LOG files)
| } | ||
|
|
||
| final boolean shouldLog = updated || (agg.recordsProcessed % 10 == 0); // value of 10 is chosen for debugging purposes. can increase to 100 once test is passing. | ||
| if (shouldLog) { |
There was a problem hiding this comment.
Hmm I'm not seeing what the change was. Should we increase the value in the line above from 10 too 100? Currently the comment still says "value of 10 is chosen for debugging purposes. can increase to 100 once test is passing"
| final boolean runFkJoin = Boolean.parseBoolean(streamsProperties.getProperty( | ||
| "test.run_fk_join", | ||
| "false")); | ||
| final boolean runTableAgg = Boolean.parseBoolean(streamsProperties.getProperty( |
There was a problem hiding this comment.
Doh! This is the step I was missing when I was testing these test changes earlier. Thanks for solving my mystery :)
| metadata_2_versions = [str(LATEST_0_10_1), str(LATEST_0_10_2), str(LATEST_0_11_0), str(LATEST_1_0), str(LATEST_1_1)] | ||
| fk_join_versions = [str(LATEST_2_4), str(LATEST_2_5), str(LATEST_2_6), str(LATEST_2_7), str(LATEST_2_8), | ||
| str(LATEST_3_0), str(LATEST_3_1), str(LATEST_3_2), str(LATEST_3_3)] | ||
| table_agg_versions = [str(LATEST_3_3)] |
There was a problem hiding this comment.
What's the reason for adding older versions? Do we expect that upgrading from a version older than 3.3 will be different than updating from 3.3?
| return printProcessorSupplier(topic, ""); | ||
| } | ||
|
|
||
| static ProcessorSupplier<Object, Object, Void, Void> printTaskProcessorSupplier(final String topic, final String name) { |
There was a problem hiding this comment.
nit: this new processor is the same as the existing one except that it doesn't track or print the number of records processed, right? Would it be better to have a boolean to toggle the print behavior, rather than duplicating the rest of the processor code? (Not a big deal either way since it's not much code, but as a reader I had to spent some time determining/verifying that the print behavior is the only difference.)
There was a problem hiding this comment.
Yeah, it was a quick-and-dirty thing -- guess it might make sense to actually have a single Processor and let the original one inherit from the new one.
| return printProcessorSupplier(topic, ""); | ||
| } | ||
|
|
||
| static ProcessorSupplier<Object, Object, Void, Void> printTaskProcessorSupplier(final String topic, final String name) { |
There was a problem hiding this comment.
It seems the parameter name is not used.
| if (index >= values.length) { | ||
| index = 0; | ||
| } |
There was a problem hiding this comment.
Doesn't this risk to bring a lot of disorder into the timestamps? I am referring to the comment on line 100. What are the consequences of such a disorder?
There was a problem hiding this comment.
Seems the comment is outdated. The custom TS-extractor was removed years ago: 52e3979
Let me delete the comment.
| extra_properties['test.agg_produce_value'] = 'B' | ||
| extra_properties['test.expected_agg_values'] = 'A,B' | ||
| for p in self.processors[:-1]: | ||
| self.do_stop_start_bounce(p, from_version[:-2], to_version, counter, extra_properties) |
There was a problem hiding this comment.
I do not understand from_version[:-2] here. Doesn't this return a sublist?
There was a problem hiding this comment.
Yes, [n:m] returns a sub-string (from_version is a string), from index n to m (m exclusive). If you omit n it's from beginning, if you omit m it's to the end.
A negative index is "from backwards", so here we get "from beginning" to "second last", ie, we cut of the last to chars, ie, the .x bug-fix suffix.
There was a problem hiding this comment.
Ah, I misinterpreted the code. I thought, the whole list of from_versions is passed into the function. Now I see that it is just one version, obviously. My fault, sorry!
| # bounce two instances to new version (verifies that new version can process records | ||
| # written by old version) |
There was a problem hiding this comment.
Do we have any guarantee that the instance on the new version do actually read any records from the repartition topic?
Do we need to insert some sleeps to ensure that enough records are written to the repartition topics?
Do we need to increase the commit interval to ensure that records are not purged from the repartition topic?
There was a problem hiding this comment.
We have a guarantee. Inside do_stop_start_bounce we wait until the processor prints that it has processed records.
Not sure if we want to tighten this check, but I think it should be ok?
There was a problem hiding this comment.
I see the check, but it does not guarantee that the instance that was not rolled had enough time to write records in the old format to partitions that will be read by the rolled instances. Also it does not guarantee that the records that might have been written by the not rolled instance to partitions that will be read by the rolled instances have not been already consumed by the not rolled instance itself before the rolled instances start processing. Similar is true for the subsequent rolls.
Does this make sense or do I miss something?
fqaiser94
left a comment
There was a problem hiding this comment.
I really like the approach you've taken in this PR, I had something much more complicated in mind (when I was thinking about this a month ago and then forgot ...).
Unless I'm missing some nuance in the testing framework, I think we can tighten the test assertions up a bit more to give us more confidence, see comments for details.
| # encoding different target values for different versions | ||
| # - old version: value=A | ||
| # - new version with `upgrade_from` flag set: value=B | ||
| # - new version w/o `upgrade_from` set set: value=C |
There was a problem hiding this comment.
| # - new version w/o `upgrade_from` set set: value=C | |
| # - new version w/o `upgrade_from` flag set: value=C |
| if (!agg.seenValues.contains(v)) { | ||
| seenValues = new ArrayList<>(agg.seenValues); | ||
| seenValues.add(v); | ||
| Collections.sort(seenValues); |
There was a problem hiding this comment.
Is there a reason we need to sort?
| self.stop_and_await() | ||
|
|
||
| def wait_for_table_agg_success(self, expected_values): | ||
| agg_success_str = "Table aggregate processor saw expected values. Seen: " + expected_values |
There was a problem hiding this comment.
Surprised you don't need to specify the full string? I guess we just check for any lines beginning with agg_success_str rather than lines that are an exact match with agg_success_str.
| p3 = self.processors[-1] | ||
| for p in self.processors: | ||
| p.CLEAN_NODE_ENABLED = False | ||
|
|
There was a problem hiding this comment.
| for p in self.processors[:-1]: | ||
| self.do_stop_start_bounce(p, from_version[:-2], to_version, counter, extra_properties) | ||
| counter = counter + 1 | ||
|
|
There was a problem hiding this comment.
| # verify that old version can process records from new version | |
| self.wait_for_table_agg_success('A,B') |
Can we assert this condition earlier, like here?
I don't think we need to bounce the remaining instance before asserting this condition.
| # bounce remaining instance on old version (just for verification purposes, to verify that | ||
| # instance on old version can process records written by new version) | ||
| extra_properties = extra_properties.copy() | ||
| extra_properties['test.agg_produce_value'] = 'A' | ||
| extra_properties['test.expected_agg_values'] = 'A,B' | ||
| self.do_stop_start_bounce(p3, None, from_version, counter, extra_properties) | ||
| counter = counter + 1 | ||
|
|
||
| self.wait_for_table_agg_success('A,B') |
There was a problem hiding this comment.
| # bounce remaining instance on old version (just for verification purposes, to verify that | |
| # instance on old version can process records written by new version) | |
| extra_properties = extra_properties.copy() | |
| extra_properties['test.agg_produce_value'] = 'A' | |
| extra_properties['test.expected_agg_values'] = 'A,B' | |
| self.do_stop_start_bounce(p3, None, from_version, counter, extra_properties) | |
| counter = counter + 1 | |
| self.wait_for_table_agg_success('A,B') | |
| # bounce remaining instance on old version to produce a new unique value | |
| extra_properties = extra_properties.copy() | |
| extra_properties['test.agg_produce_value'] = 'C' | |
| extra_properties['test.expected_agg_values'] = 'A,B,C' | |
| self.do_stop_start_bounce(p3, None, from_version, counter, extra_properties) | |
| counter = counter + 1 | |
| # verify that new version can process records from old version | |
| self.wait_for_table_agg_success('A,B,C') |
I think it might be better to have this old (not-upgraded) instance start producing a new value 'C' when we bounce it here. That way, we can assert using self.wait_for_table_agg_success('A,B,C') and be sure that the two upgraded instances have successfully processed messages from the old (not-upgraded) instance as well.
(Note, if you accept this change, you will need to make changes below here to produce new unique values like D,E,F, etc.)
| # rolling bounce | ||
| random.shuffle(self.processors) | ||
| p3 = self.processors[-1] | ||
| for p in self.processors: | ||
| p.CLEAN_NODE_ENABLED = False |
There was a problem hiding this comment.
| # rolling bounce | |
| random.shuffle(self.processors) | |
| p3 = self.processors[-1] | |
| for p in self.processors: | |
| p.CLEAN_NODE_ENABLED = False | |
| random.shuffle(self.processors) | |
| p3 = self.processors[-1] | |
| for p in self.processors: | |
| p.CLEAN_NODE_ENABLED = False |
It seems to me these lines of code have nothing to do with executing a "rolling bounce"?
|
This PR is being marked as stale since it has not had any activity in 90 days. If you would like to keep this PR alive, please ask a committer for review. If the PR has merge conflicts, please update it with the latest from trunk (or appropriate release branch) If this PR is no longer valid or desired, please feel free to close it. If no activity occurrs in the next 30 days, it will be automatically closed. |
|
Closing PR for now -- we still have the Jira ticket... Can create a new one when I find time to resume this. |



Co-Author: @vcrfxia