diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java index 185fa7c3bb14e..318154889ca60 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java @@ -240,7 +240,9 @@ public GroupAssignment assign(final Cluster metadata, final GroupSubscription gr info.prevTasks(), info.standbyTasks(), info.userEndPoint()) - .encode())); + .encode(), + subscription.ownedPartitions() + )); } assignment = super.assign(metadata, new GroupSubscription(downgradedSubscriptions)).groupAssignment(); bumpUsedVersion = true; diff --git a/streams/upgrade-system-tests-0100/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java b/streams/upgrade-system-tests-0100/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java index c2d8c4ddb3799..3c1d99fde2585 100644 --- a/streams/upgrade-system-tests-0100/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java +++ b/streams/upgrade-system-tests-0100/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java @@ -32,19 +32,16 @@ public class StreamsUpgradeTest { @SuppressWarnings("unchecked") public static void main(final String[] args) throws Exception { - if (args.length < 3) { - System.err.println("StreamsUpgradeTest requires three argument (kafka-url, zookeeper-url, properties-file) but only " + args.length + " provided: " - + (args.length > 0 ? args[0] + " " : "") - + (args.length > 1 ? args[1] : "")); + if (args.length < 2) { + System.err.println("StreamsUpgradeTest requires two arguments (zookeeper-url, properties-file) but only " + args.length + " provided: " + + (args.length > 0 ? args[0] + " " : "")); } - final String kafka = args[0]; - final String zookeeper = args[1]; - final String propFileName = args.length > 2 ? args[2] : null; + final String zookeeper = args[0]; + final String propFileName = args[1]; final Properties streamsProperties = Utils.loadProps(propFileName); System.out.println("StreamsTest instance started (StreamsUpgradeTest v0.10.0)"); - System.out.println("kafka=" + kafka); System.out.println("zookeeper=" + zookeeper); System.out.println("props=" + streamsProperties); @@ -55,7 +52,6 @@ public static void main(final String[] args) throws Exception { final Properties config = new Properties(); config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsUpgradeTest"); - config.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka); config.setProperty(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, zookeeper); config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000); config.putAll(streamsProperties); diff --git a/streams/upgrade-system-tests-0101/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java b/streams/upgrade-system-tests-0101/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java index e525658f6cf72..53ee0dcc42b80 100644 --- a/streams/upgrade-system-tests-0101/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java +++ b/streams/upgrade-system-tests-0101/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java @@ -35,19 +35,16 @@ public class StreamsUpgradeTest { */ @SuppressWarnings("unchecked") public static void main(final String[] args) throws Exception { - if (args.length < 3) { - System.err.println("StreamsUpgradeTest requires three argument (kafka-url, zookeeper-url, properties-file) but only " + args.length + " provided: " - + (args.length > 0 ? args[0] + " " : "") - + (args.length > 1 ? args[1] : "")); + if (args.length < 2) { + System.err.println("StreamsUpgradeTest requires two arguments (zookeeper-url, properties-file) but only " + args.length + " provided: " + + (args.length > 0 ? args[0] + " " : "")); } - final String kafka = args[0]; - final String zookeeper = args[1]; - final String propFileName = args.length > 2 ? args[2] : null; + final String zookeeper = args[0]; + final String propFileName = args[1]; final Properties streamsProperties = Utils.loadProps(propFileName); System.out.println("StreamsTest instance started (StreamsUpgradeTest v0.10.1)"); - System.out.println("kafka=" + kafka); System.out.println("zookeeper=" + zookeeper); System.out.println("props=" + streamsProperties); @@ -58,7 +55,6 @@ public static void main(final String[] args) throws Exception { final Properties config = new Properties(); config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsUpgradeTest"); - config.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka); config.setProperty(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, zookeeper); config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000); config.putAll(streamsProperties); diff --git a/streams/upgrade-system-tests-0102/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java b/streams/upgrade-system-tests-0102/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java index fa855521a64fb..d8e355b2e9889 100644 --- a/streams/upgrade-system-tests-0102/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java +++ b/streams/upgrade-system-tests-0102/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java @@ -32,17 +32,14 @@ public class StreamsUpgradeTest { @SuppressWarnings("unchecked") public static void main(final String[] args) throws Exception { - if (args.length < 2) { - System.err.println("StreamsUpgradeTest requires two argument (kafka-url, properties-file) but only " + args.length + " provided: " - + (args.length > 0 ? args[0] : "")); + if (args.length < 1) { + System.err.println("StreamsUpgradeTest requires one argument (properties-file) but provided none"); } - final String kafka = args[0]; - final String propFileName = args.length > 1 ? args[1] : null; + final String propFileName = args[0]; final Properties streamsProperties = Utils.loadProps(propFileName); System.out.println("StreamsTest instance started (StreamsUpgradeTest v0.10.2)"); - System.out.println("kafka=" + kafka); System.out.println("props=" + streamsProperties); final KStreamBuilder builder = new KStreamBuilder(); @@ -52,7 +49,6 @@ public static void main(final String[] args) throws Exception { final Properties config = new Properties(); config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsUpgradeTest"); - config.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka); config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000); config.putAll(streamsProperties); diff --git a/streams/upgrade-system-tests-0110/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java b/streams/upgrade-system-tests-0110/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java index efb96ff5f0da7..a1187578a623c 100644 --- a/streams/upgrade-system-tests-0110/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java +++ b/streams/upgrade-system-tests-0110/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java @@ -32,17 +32,14 @@ public class StreamsUpgradeTest { @SuppressWarnings("unchecked") public static void main(final String[] args) throws Exception { - if (args.length < 2) { - System.err.println("StreamsUpgradeTest requires two argument (kafka-url, properties-file) but only " + args.length + " provided: " - + (args.length > 0 ? args[0] : "")); + if (args.length < 1) { + System.err.println("StreamsUpgradeTest requires one argument (properties-file) but provided none"); } - final String kafka = args[0]; - final String propFileName = args.length > 1 ? args[1] : null; + final String propFileName = args[0]; final Properties streamsProperties = Utils.loadProps(propFileName); System.out.println("StreamsTest instance started (StreamsUpgradeTest v0.11.0)"); - System.out.println("kafka=" + kafka); System.out.println("props=" + streamsProperties); final KStreamBuilder builder = new KStreamBuilder(); @@ -52,7 +49,6 @@ public static void main(final String[] args) throws Exception { final Properties config = new Properties(); config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsUpgradeTest"); - config.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka); config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000); config.putAll(streamsProperties); diff --git a/streams/upgrade-system-tests-10/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java b/streams/upgrade-system-tests-10/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java index 6f63f8dd7558e..f69162c7d8043 100644 --- a/streams/upgrade-system-tests-10/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java +++ b/streams/upgrade-system-tests-10/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java @@ -32,17 +32,14 @@ public class StreamsUpgradeTest { @SuppressWarnings("unchecked") public static void main(final String[] args) throws Exception { - if (args.length < 2) { - System.err.println("StreamsUpgradeTest requires two argument (kafka-url, properties-file) but only " + args.length + " provided: " - + (args.length > 0 ? args[0] : "")); + if (args.length < 1) { + System.err.println("StreamsUpgradeTest requires one argument (properties-file) but provided none"); } - final String kafka = args[0]; - final String propFileName = args.length > 1 ? args[1] : null; + final String propFileName = args[0]; final Properties streamsProperties = Utils.loadProps(propFileName); System.out.println("StreamsTest instance started (StreamsUpgradeTest v1.0)"); - System.out.println("kafka=" + kafka); System.out.println("props=" + streamsProperties); final StreamsBuilder builder = new StreamsBuilder(); @@ -52,7 +49,6 @@ public static void main(final String[] args) throws Exception { final Properties config = new Properties(); config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsUpgradeTest"); - config.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka); config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000); config.putAll(streamsProperties); diff --git a/streams/upgrade-system-tests-11/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java b/streams/upgrade-system-tests-11/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java index b5759f56a5a28..ca284f22bfb75 100644 --- a/streams/upgrade-system-tests-11/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java +++ b/streams/upgrade-system-tests-11/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java @@ -32,17 +32,14 @@ public class StreamsUpgradeTest { @SuppressWarnings("unchecked") public static void main(final String[] args) throws Exception { - if (args.length < 2) { - System.err.println("StreamsUpgradeTest requires two argument (kafka-url, properties-file) but only " + args.length + " provided: " - + (args.length > 0 ? args[0] : "")); + if (args.length < 1) { + System.err.println("StreamsUpgradeTest requires one argument (properties-file) but provided none"); } - final String kafka = args[0]; - final String propFileName = args.length > 1 ? args[1] : null; + final String propFileName = args[0]; final Properties streamsProperties = Utils.loadProps(propFileName); System.out.println("StreamsTest instance started (StreamsUpgradeTest v1.1)"); - System.out.println("kafka=" + kafka); System.out.println("props=" + streamsProperties); final StreamsBuilder builder = new StreamsBuilder(); @@ -52,7 +49,6 @@ public static void main(final String[] args) throws Exception { final Properties config = new Properties(); config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsUpgradeTest"); - config.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka); config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000); config.putAll(streamsProperties); diff --git a/streams/upgrade-system-tests-20/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java b/streams/upgrade-system-tests-20/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java index d963e4a1f65c1..f66c7a42d7121 100644 --- a/streams/upgrade-system-tests-20/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java +++ b/streams/upgrade-system-tests-20/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java @@ -31,17 +31,14 @@ public class StreamsUpgradeTest { @SuppressWarnings("unchecked") public static void main(final String[] args) throws Exception { - if (args.length < 2) { - System.err.println("StreamsUpgradeTest requires two argument (kafka-url, properties-file) but only " + args.length + " provided: " - + (args.length > 0 ? args[0] : "")); + if (args.length < 1) { + System.err.println("StreamsUpgradeTest requires one argument (properties-file) but provided none"); } - final String kafka = args[0]; - final String propFileName = args.length > 1 ? args[1] : null; + final String propFileName = args[0]; final Properties streamsProperties = Utils.loadProps(propFileName); System.out.println("StreamsTest instance started (StreamsUpgradeTest v2.0)"); - System.out.println("kafka=" + kafka); System.out.println("props=" + streamsProperties); final StreamsBuilder builder = new StreamsBuilder(); @@ -51,7 +48,6 @@ public static void main(final String[] args) throws Exception { final Properties config = new Properties(); config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsUpgradeTest"); - config.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka); config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000); config.putAll(streamsProperties); diff --git a/streams/upgrade-system-tests-21/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java b/streams/upgrade-system-tests-21/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java index 6e409a00658b0..d467df099b6ae 100644 --- a/streams/upgrade-system-tests-21/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java +++ b/streams/upgrade-system-tests-21/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java @@ -31,17 +31,14 @@ public class StreamsUpgradeTest { @SuppressWarnings("unchecked") public static void main(final String[] args) throws Exception { - if (args.length < 2) { - System.err.println("StreamsUpgradeTest requires two argument (kafka-url, properties-file) but only " + args.length + " provided: " - + (args.length > 0 ? args[0] : "")); + if (args.length < 1) { + System.err.println("StreamsUpgradeTest requires one argument (properties-file) but provided none"); } - final String kafka = args[0]; - final String propFileName = args.length > 1 ? args[1] : null; + final String propFileName = args[0]; final Properties streamsProperties = Utils.loadProps(propFileName); System.out.println("StreamsTest instance started (StreamsUpgradeTest v2.1)"); - System.out.println("kafka=" + kafka); System.out.println("props=" + streamsProperties); final StreamsBuilder builder = new StreamsBuilder(); @@ -51,7 +48,6 @@ public static void main(final String[] args) throws Exception { final Properties config = new Properties(); config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsUpgradeTest"); - config.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka); config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000); config.putAll(streamsProperties); diff --git a/streams/upgrade-system-tests-22/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java b/streams/upgrade-system-tests-22/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java index 7ff4d815406e1..83b68cc146039 100644 --- a/streams/upgrade-system-tests-22/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java +++ b/streams/upgrade-system-tests-22/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java @@ -31,17 +31,14 @@ public class StreamsUpgradeTest { @SuppressWarnings("unchecked") public static void main(final String[] args) throws Exception { - if (args.length < 2) { - System.err.println("StreamsUpgradeTest requires two argument (kafka-url, properties-file) but only " + args.length + " provided: " - + (args.length > 0 ? args[0] : "")); + if (args.length < 1) { + System.err.println("StreamsUpgradeTest requires one argument (properties-file) but provided none"); } - final String kafka = args[0]; - final String propFileName = args.length > 1 ? args[1] : null; + final String propFileName = args[0]; final Properties streamsProperties = Utils.loadProps(propFileName); System.out.println("StreamsTest instance started (StreamsUpgradeTest v2.2)"); - System.out.println("kafka=" + kafka); System.out.println("props=" + streamsProperties); final StreamsBuilder builder = new StreamsBuilder(); @@ -51,7 +48,6 @@ public static void main(final String[] args) throws Exception { final Properties config = new Properties(); config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsUpgradeTest"); - config.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka); config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000); config.putAll(streamsProperties); diff --git a/streams/upgrade-system-tests-23/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java b/streams/upgrade-system-tests-23/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java index f9182c4b34704..6428ec6e4deff 100644 --- a/streams/upgrade-system-tests-23/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java +++ b/streams/upgrade-system-tests-23/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java @@ -31,17 +31,14 @@ public class StreamsUpgradeTest { @SuppressWarnings("unchecked") public static void main(final String[] args) throws Exception { - if (args.length < 2) { - System.err.println("StreamsUpgradeTest requires two argument (kafka-url, properties-file) but only " + args.length + " provided: " - + (args.length > 0 ? args[0] : "")); + if (args.length < 1) { + System.err.println("StreamsUpgradeTest requires one argument (properties-file) but provided none"); } - final String kafka = args[0]; - final String propFileName = args.length > 1 ? args[1] : null; + final String propFileName = args[0]; final Properties streamsProperties = Utils.loadProps(propFileName); System.out.println("StreamsTest instance started (StreamsUpgradeTest v2.3)"); - System.out.println("kafka=" + kafka); System.out.println("props=" + streamsProperties); final StreamsBuilder builder = new StreamsBuilder(); @@ -51,7 +48,6 @@ public static void main(final String[] args) throws Exception { final Properties config = new Properties(); config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsUpgradeTest"); - config.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka); config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000); config.putAll(streamsProperties); diff --git a/tests/kafkatest/services/streams.py b/tests/kafkatest/services/streams.py index 63efb4f662faa..80c2ee7367348 100644 --- a/tests/kafkatest/services/streams.py +++ b/tests/kafkatest/services/streams.py @@ -487,12 +487,7 @@ def prop_file(self): def start_cmd(self, node): args = self.args.copy() - if self.KAFKA_STREAMS_VERSION in [str(LATEST_0_10_0), str(LATEST_0_10_1), str(LATEST_0_10_2), - str(LATEST_0_11_0), str(LATEST_1_0), str(LATEST_1_1), - str(LATEST_2_0), str(LATEST_2_1), str(LATEST_2_2), str(LATEST_2_3)]: - args['kafka'] = self.kafka.bootstrap_servers() - else: - args['kafka'] = "" + if self.KAFKA_STREAMS_VERSION == str(LATEST_0_10_0) or self.KAFKA_STREAMS_VERSION == str(LATEST_0_10_1): args['zk'] = self.kafka.zk.connect_setting() else: @@ -507,7 +502,7 @@ def start_cmd(self, node): cmd = "( export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%(log4j)s\"; " \ "INCLUDE_TEST_JARS=true UPGRADE_KAFKA_STREAMS_TEST_VERSION=%(version)s " \ - " %(kafka_run_class)s %(streams_class_name)s %(kafka)s %(zk)s %(config_file)s " \ + " %(kafka_run_class)s %(streams_class_name)s %(zk)s %(config_file)s " \ " & echo $! >&3 ) 1>> %(stdout)s 2>> %(stderr)s 3> %(pidfile)s" % args self.logger.info("Executing: " + cmd) diff --git a/tests/kafkatest/tests/streams/streams_upgrade_test.py b/tests/kafkatest/tests/streams/streams_upgrade_test.py index 00dbe355f8086..25fe065f8e55b 100644 --- a/tests/kafkatest/tests/streams/streams_upgrade_test.py +++ b/tests/kafkatest/tests/streams/streams_upgrade_test.py @@ -527,11 +527,11 @@ def do_rolling_bounce(self, processor, counter, current_generation): monitors[second_other_processor] = second_other_monitor if len(self.old_processors) > 0: - log_monitor.wait_until("Sent a version 6 subscription and got version 5 assignment back (successful version probing). Downgrade subscription metadata to commonly supported version and trigger new rebalance.", + log_monitor.wait_until("Sent a version 6 subscription and got version 5 assignment back (successful version probing). Downgrade subscription metadata to commonly supported version 5 and trigger new rebalance.", timeout_sec=60, err_msg="Could not detect 'successful version probing' at upgrading node " + str(node.account)) else: - log_monitor.wait_until("Sent a version 6 subscription and got version 5 assignment back (successful version probing). Downgrade subscription metadata to commonly supported version and trigger new rebalance.", + log_monitor.wait_until("Sent a version 6 subscription and got version 5 assignment back (successful version probing). Downgrade subscription metadata to commonly supported version 6 and trigger new rebalance.", timeout_sec=60, err_msg="Could not detect 'successful version probing with upgraded leader' at upgrading node " + str(node.account)) first_other_monitor.wait_until("Sent a version 5 subscription and group.s latest commonly supported version is 6 (successful version probing and end of rolling upgrade). Upgrading subscription metadata version to 6 for next rebalance.",