Skip to content

KAFKA-8496: System test for KIP-429 upgrades and compatibility#7529

Merged
guozhangwang merged 13 commits intoapache:trunkfrom
bbejeck:KAFKA-8496_system_test_for_kip_429_upgrades_and_compatibility
Oct 17, 2019
Merged

KAFKA-8496: System test for KIP-429 upgrades and compatibility#7529
guozhangwang merged 13 commits intoapache:trunkfrom
bbejeck:KAFKA-8496_system_test_for_kip_429_upgrades_and_compatibility

Conversation

@bbejeck
Copy link
Copy Markdown
Member

@bbejeck bbejeck commented Oct 16, 2019

More detailed description of your change,
if necessary. The PR title and PR message become
the squashed commit message, so use a separate
comment to ping reviewers.

Summary of testing strategy (including rationale)
for the feature or bug fix. Unit and/or integration
tests are expected for any behaviour change and
system tests should be considered for larger changes.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@bbejeck bbejeck changed the title KAFKA-8496: System test for kip 429 upgrades and compatibility[Don KAFKA-8496: System test for kip 429 upgrades and compatibility[Don't merge yet] Oct 16, 2019
@bbejeck
Copy link
Copy Markdown
Member Author

bbejeck commented Oct 16, 2019

ping @guozhangwang @vvcephei @ableegoldman for reviews.

The current PR doesn't include updates from all versions ATM. That will come soon in another push but the logic won't change at all. It may involve some adjustments to the new test class in
the older version with parameters to run ect.

@bbejeck
Copy link
Copy Markdown
Member Author

bbejeck commented Oct 16, 2019

@bbejeck
Copy link
Copy Markdown
Member Author

bbejeck commented Oct 16, 2019

will fix checkstyle errors on next push

@bbejeck
Copy link
Copy Markdown
Member Author

bbejeck commented Oct 16, 2019

small changes and fixed checkstyle errors

@bbejeck bbejeck changed the title KAFKA-8496: System test for kip 429 upgrades and compatibility[Don't merge yet] KAFKA-8496: System test for kip 429 upgrades and compatibility Oct 16, 2019
@bbejeck bbejeck changed the title KAFKA-8496: System test for kip 429 upgrades and compatibility KAFKA-8496: System test for KIP-429 upgrades and compatibility Oct 16, 2019
@bbejeck
Copy link
Copy Markdown
Member Author

bbejeck commented Oct 16, 2019

ping @guozhangwang, @mjsax, @vvcephei, @ableegoldman for reviews

}
}

log.info("Cooperative rebalancing enabled now");
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added for test and thinking this could be useful for users

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm +1 on this: actually I'm thinking it would be nice to make the log message above symmetric, ie something like
log.info("Eager rebalancing enabled now for upgrade from {}.x", upgradeFrom);
so we can just grep for "eager" or "cooperative" to figure out what we're on?

Copy link
Copy Markdown
Member Author

@bbejeck bbejeck Oct 16, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, but there's already this log statement https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java#L150

Did you mean to change the wording of this one? NM I re-read your comment and I get it now. I'll update the log statement and the system test and push.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 on @ableegoldman 's suggestion.

@bbejeck
Copy link
Copy Markdown
Member Author

bbejeck commented Oct 16, 2019

I'll fix the checkstyle errors along with changes from comments above

Copy link
Copy Markdown
Member

@ableegoldman ableegoldman left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks Bill! This looks great and tests exactly what we wanted. Left a few comments but no pressing issues that I see.
I did have the idea while reviewing to run each "processor" with more than one thread -- and then do all the usual verification for non-overlapping tasks between the consumers (rather than at instance level).
Don't really have a sense of whether that would be a nontrivial extension or a huge additional effort, so feel free to ignore or not. Let's call it a "beneficial, but not necessary" addition

config.put(StreamsConfig.APPLICATION_ID_CONFIG, "cooperative-rebalance-upgrade");
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just out of curiosity why do we set the commit interval to 1s?

}

if (newState == State.REBALANCING) {
System.out.println(String.format("%sStarting a REBALANCE", upgradePhase));
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not saying you need to change this or anything, but just want to point out that once everyone is on cooperative this state change actually indicates the end of a rebalance.


public class StreamsUpgradeToCooperativeRebalanceTest {


Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: there's a few double linebreaks throughout the PR, not sure if intentional or not?

import java.util.Properties;
import java.util.Set;

public class StreamsUpgradeToCooperativeRebalanceTest {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm assuming these are all just the same code and I just need to look over one, is that fair? For my own education, why do we need all these copies?

Copy link
Copy Markdown
Member Author

@bbejeck bbejeck Oct 17, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is needed for using older streams versions in system tests. All of these classes go in the corresponding upgrade-system-tests-XX modules.

try:
del properties['upgrade.from']
except KeyError:
self.logger.info("Key 'upgrade.from' not there, better safe than sorry")
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice 😄

for processor in processors:
self.verify_processing(processor, self.first_bounce_phase + self.processing_message)

# second rolling bounce without "upgrade.from" conifg
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: conifg -> config (applies elsewhere too)

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ack

Copy link
Copy Markdown
Contributor

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made a pass and left some comments. cc @ableegoldman could you take a look at my comments and lmk wdyt?

}
}

log.info("Cooperative rebalancing enabled now");
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 on @ableegoldman 's suggestion.

+ (args.length > 0 ? args[0] + " " : "")
+ (args.length > 1 ? args[1] : ""));
}
final String kafka = args[0];
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This parameter is not used?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ack

final Properties config = new Properties();

System.out.println("StreamsTest instance started (StreamsUpgradeToCooperativeRebalanceTest v0.10.1)");
System.out.println("kafka=" + kafka);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto here.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ack

System.err.println("StreamsUpgradeTest requires three argument (kafka-url, properties-file) but only " + args.length + " provided: "
+ (args.length > 0 ? args[0] + " " : ""));
}
final String kafka = args[0];
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto here.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ack

public static void main(final String[] args) throws Exception {
if (args.length < 2) {
System.err.println("StreamsUpgradeToCooperativeRebalanceTest requires two argument (kafka-url, properties-file) but only " + args.length + " provided: "
+ (args.length > 0 ? args[0] : ""));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here we do not read the args[0] at all, is that intentional?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, as we only need the prop file configs. Maybe I can refactor these upgrade tests in a follow-on PR?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is args[0] not the name of the program in Java? ...Huh

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not C :)


source_topic = "source"
sink_topic = "sink"
task_delimiter = "#"
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it necessary to make TASK_DELIMITER configurable? I feel it is unnecessarily complicating the setup..

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack, although the test uses this delimiter (or will in the near future) so I'll leave the definition here but remove from the configs passed.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah my point here is that we can just hard-coded it than passing it through.

processor.SINK_TOPIC = self.sink_topic
processor.TASK_DELIMITER = self.task_delimiter
processor.REPORT_INTERVAL = self.report_interval
processor.UPGRADE_FROM = upgrade_from
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we now let the log4j to print the upgrade_from string already, do we still need this and to print within the state listener any more?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure I follow here. We need the upgrade_from field for setting in the CooperativeRebalanceUpgradeService service to set the streams config.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My bad, I thought the upgradePhase-Starting a REBALANCE message is used somewhere in verification and thought with the augmented log4j it can now be replaced, but now I see it is only for debugging purposes.

err_msg="Never saw '%s' message " % rebalance_msg + str(
processor.node.account))

# verify processing
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems we are verifying processing twice: once here inside maybe_upgrade_rolling_bounce_and_verify, and once after maybe_upgrade_rolling_bounce_and_verify in verify_processing. Here we did the verify right after bouncing each node, and outside the maybe_upgrade_rolling_bounce_and_verify we just loop over them again and verify again. Is that intentional and necessary?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it's intentional. The check inside the maybe_upgrade_rolling_bounce_and_verify is for validating the individual node resumes processing after it gets bounced. The additional check outside maybe_upgrade_rolling_bounce_and_verify confirms that all nodes are continuing to process and that rebalancing has ceased. I can remove the outside check if you insist.

Copy link
Copy Markdown
Contributor

@guozhangwang guozhangwang Oct 17, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense, let's keep it in that sense.

EDIT: Actually, I'm wondering that if the monitor would always grep the same log4j entry in the outside verification or it always try to grep the new lines after the inner verification? If it's the first case, then the outside verification would always be redundant as we are doomed to just grep the same lines.

Copy link
Copy Markdown
Member Author

@bbejeck bbejeck Oct 17, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The external check uses a new monitor object which when it's created grabs an offset of the current number of bytes in the file. The monitor only checks in the file from that point forward. The issue you describe could happen if you were to re-use an existing monitor.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fancy 👍


# verify rebalanced into a running state
rebalance_msg = current_phase + self.running_state_msg
stdout_monitor.wait_until(rebalance_msg,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, on old versions, we do not have state listener and hence would not print current_phase + self.running_state_msg, how we would still pass here?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At this point in the test, we've upgraded to the new version so we have a state listener.

self.get_tasks_for_processor(processor)
self.logger.info("Active tasks %s" % processor.active_tasks)

overlapping_tasks = processor1.active_tasks.intersection(processor2.active_tasks, processor3.active_tasks)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why only testing intersect(1, 2, 3) and intersect(2, 3) here? Reading on https://www.geeksforgeeks.org/intersection-function-python/ I think if there's a task owned by both 1 and 2 but not 3, or by both 2 and 3 but not 1, then this would still pass?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack, updated the tests to be

processor1.instersection(processor2)
processor1.instersection(processor3)
processor2.instersection(processor3)

@bbejeck
Copy link
Copy Markdown
Member Author

bbejeck commented Oct 17, 2019

I did have the idea while reviewing to run each "processor" with more than one thread -- and then do all the usual verification for non-overlapping tasks between the consumers (rather than at instance level).

Actually I want to run another test with multiple threads and standby tasks. But I'm thinking this may be good enough for starters and I'll add another test in a follow-on PR

@bbejeck
Copy link
Copy Markdown
Member Author

bbejeck commented Oct 17, 2019

updated this for checkstyle fixes and some comments, I'll address the other comments in another push

@bbejeck
Copy link
Copy Markdown
Member Author

bbejeck commented Oct 17, 2019

@guozhangwang updated this

@ableegoldman
Copy link
Copy Markdown
Member

Was reviewing mostly for correctness but gave it another pass (good catch on the intersection function @guozhangwang), I think it looks good as a test of the 429 upgrade path -- I realize those extensions (more threads, standbys, etc) would be good to cover but don't/shouldn't really be part of the upgrade test anyway. Gotta keep it modularized...
Thanks @bbejeck!

@guozhangwang guozhangwang merged commit b62f2a1 into apache:trunk Oct 17, 2019
guozhangwang pushed a commit that referenced this pull request Oct 17, 2019
Reviewers: A. Sophie Blee-Goldman <sophie@confluent.io>, Guozhang Wang <wangguoz@gmail.com>

# 2.3.x versions
V_2_3_0 = KafkaVersion("2.3.0")
V_2_3_1 = KafkaVersion("2.3.1")
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2.3.1 is not released yet. Why do we add this here as we don't use it yet?

ableegoldman added a commit to ableegoldman/kafka that referenced this pull request Nov 13, 2019
@bbejeck bbejeck deleted the KAFKA-8496_system_test_for_kip_429_upgrades_and_compatibility branch July 10, 2024 13:56
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants