Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,9 @@ public GroupAssignment assign(final Cluster metadata, final GroupSubscription gr
info.prevTasks(),
info.standbyTasks(),
info.userEndPoint())
.encode()));
.encode(),
subscription.ownedPartitions()
));
}
assignment = super.assign(metadata, new GroupSubscription(downgradedSubscriptions)).groupAssignment();
bumpUsedVersion = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,19 +32,16 @@ public class StreamsUpgradeTest {

@SuppressWarnings("unchecked")
public static void main(final String[] args) throws Exception {
if (args.length < 3) {
System.err.println("StreamsUpgradeTest requires three argument (kafka-url, zookeeper-url, properties-file) but only " + args.length + " provided: "
+ (args.length > 0 ? args[0] + " " : "")
+ (args.length > 1 ? args[1] : ""));
if (args.length < 2) {
System.err.println("StreamsUpgradeTest requires two arguments (zookeeper-url, properties-file) but only " + args.length + " provided: "
+ (args.length > 0 ? args[0] + " " : ""));
}
final String kafka = args[0];
final String zookeeper = args[1];
final String propFileName = args.length > 2 ? args[2] : null;
final String zookeeper = args[0];
final String propFileName = args[1];

final Properties streamsProperties = Utils.loadProps(propFileName);

System.out.println("StreamsTest instance started (StreamsUpgradeTest v0.10.0)");
System.out.println("kafka=" + kafka);
System.out.println("zookeeper=" + zookeeper);
System.out.println("props=" + streamsProperties);

Expand All @@ -55,7 +52,6 @@ public static void main(final String[] args) throws Exception {

final Properties config = new Properties();
config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsUpgradeTest");
config.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
config.setProperty(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, zookeeper);
config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
config.putAll(streamsProperties);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,19 +35,16 @@ public class StreamsUpgradeTest {
*/
@SuppressWarnings("unchecked")
public static void main(final String[] args) throws Exception {
if (args.length < 3) {
System.err.println("StreamsUpgradeTest requires three argument (kafka-url, zookeeper-url, properties-file) but only " + args.length + " provided: "
+ (args.length > 0 ? args[0] + " " : "")
+ (args.length > 1 ? args[1] : ""));
if (args.length < 2) {
System.err.println("StreamsUpgradeTest requires two arguments (zookeeper-url, properties-file) but only " + args.length + " provided: "
+ (args.length > 0 ? args[0] + " " : ""));
}
final String kafka = args[0];
final String zookeeper = args[1];
final String propFileName = args.length > 2 ? args[2] : null;
final String zookeeper = args[0];
final String propFileName = args[1];

final Properties streamsProperties = Utils.loadProps(propFileName);

System.out.println("StreamsTest instance started (StreamsUpgradeTest v0.10.1)");
System.out.println("kafka=" + kafka);
System.out.println("zookeeper=" + zookeeper);
System.out.println("props=" + streamsProperties);

Expand All @@ -58,7 +55,6 @@ public static void main(final String[] args) throws Exception {

final Properties config = new Properties();
config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsUpgradeTest");
config.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
config.setProperty(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, zookeeper);
config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
config.putAll(streamsProperties);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,17 +32,14 @@ public class StreamsUpgradeTest {

@SuppressWarnings("unchecked")
public static void main(final String[] args) throws Exception {
if (args.length < 2) {
System.err.println("StreamsUpgradeTest requires two argument (kafka-url, properties-file) but only " + args.length + " provided: "
+ (args.length > 0 ? args[0] : ""));
if (args.length < 1) {
System.err.println("StreamsUpgradeTest requires one argument (properties-file) but provided none");
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we remove args in print?

}
final String kafka = args[0];
final String propFileName = args.length > 1 ? args[1] : null;
final String propFileName = args[0];

final Properties streamsProperties = Utils.loadProps(propFileName);

System.out.println("StreamsTest instance started (StreamsUpgradeTest v0.10.2)");
System.out.println("kafka=" + kafka);
System.out.println("props=" + streamsProperties);

final KStreamBuilder builder = new KStreamBuilder();
Expand All @@ -52,7 +49,6 @@ public static void main(final String[] args) throws Exception {

final Properties config = new Properties();
config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsUpgradeTest");
config.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
config.putAll(streamsProperties);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,17 +32,14 @@ public class StreamsUpgradeTest {

@SuppressWarnings("unchecked")
public static void main(final String[] args) throws Exception {
if (args.length < 2) {
System.err.println("StreamsUpgradeTest requires two argument (kafka-url, properties-file) but only " + args.length + " provided: "
+ (args.length > 0 ? args[0] : ""));
if (args.length < 1) {
System.err.println("StreamsUpgradeTest requires one argument (properties-file) but provided none");
}
final String kafka = args[0];
final String propFileName = args.length > 1 ? args[1] : null;
final String propFileName = args[0];

final Properties streamsProperties = Utils.loadProps(propFileName);

System.out.println("StreamsTest instance started (StreamsUpgradeTest v0.11.0)");
System.out.println("kafka=" + kafka);
System.out.println("props=" + streamsProperties);

final KStreamBuilder builder = new KStreamBuilder();
Expand All @@ -52,7 +49,6 @@ public static void main(final String[] args) throws Exception {

final Properties config = new Properties();
config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsUpgradeTest");
config.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
config.putAll(streamsProperties);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,17 +32,14 @@ public class StreamsUpgradeTest {

@SuppressWarnings("unchecked")
public static void main(final String[] args) throws Exception {
if (args.length < 2) {
System.err.println("StreamsUpgradeTest requires two argument (kafka-url, properties-file) but only " + args.length + " provided: "
+ (args.length > 0 ? args[0] : ""));
if (args.length < 1) {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will we hit NPE in this case?

System.err.println("StreamsUpgradeTest requires one argument (properties-file) but provided none");
}
final String kafka = args[0];
final String propFileName = args.length > 1 ? args[1] : null;
final String propFileName = args[0];

final Properties streamsProperties = Utils.loadProps(propFileName);

System.out.println("StreamsTest instance started (StreamsUpgradeTest v1.0)");
System.out.println("kafka=" + kafka);
System.out.println("props=" + streamsProperties);

final StreamsBuilder builder = new StreamsBuilder();
Expand All @@ -52,7 +49,6 @@ public static void main(final String[] args) throws Exception {

final Properties config = new Properties();
config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsUpgradeTest");
config.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
config.putAll(streamsProperties);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,17 +32,14 @@ public class StreamsUpgradeTest {

@SuppressWarnings("unchecked")
public static void main(final String[] args) throws Exception {
if (args.length < 2) {
System.err.println("StreamsUpgradeTest requires two argument (kafka-url, properties-file) but only " + args.length + " provided: "
+ (args.length > 0 ? args[0] : ""));
if (args.length < 1) {
System.err.println("StreamsUpgradeTest requires one argument (properties-file) but provided none");
}
final String kafka = args[0];
final String propFileName = args.length > 1 ? args[1] : null;
final String propFileName = args[0];

final Properties streamsProperties = Utils.loadProps(propFileName);

System.out.println("StreamsTest instance started (StreamsUpgradeTest v1.1)");
System.out.println("kafka=" + kafka);
System.out.println("props=" + streamsProperties);

final StreamsBuilder builder = new StreamsBuilder();
Expand All @@ -52,7 +49,6 @@ public static void main(final String[] args) throws Exception {

final Properties config = new Properties();
config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsUpgradeTest");
config.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
config.putAll(streamsProperties);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,17 +31,14 @@ public class StreamsUpgradeTest {

@SuppressWarnings("unchecked")
public static void main(final String[] args) throws Exception {
if (args.length < 2) {
System.err.println("StreamsUpgradeTest requires two argument (kafka-url, properties-file) but only " + args.length + " provided: "
+ (args.length > 0 ? args[0] : ""));
if (args.length < 1) {
System.err.println("StreamsUpgradeTest requires one argument (properties-file) but provided none");
}
final String kafka = args[0];
final String propFileName = args.length > 1 ? args[1] : null;
final String propFileName = args[0];

final Properties streamsProperties = Utils.loadProps(propFileName);

System.out.println("StreamsTest instance started (StreamsUpgradeTest v2.0)");
System.out.println("kafka=" + kafka);
System.out.println("props=" + streamsProperties);

final StreamsBuilder builder = new StreamsBuilder();
Expand All @@ -51,7 +48,6 @@ public static void main(final String[] args) throws Exception {

final Properties config = new Properties();
config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsUpgradeTest");
config.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
config.putAll(streamsProperties);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,17 +31,14 @@ public class StreamsUpgradeTest {

@SuppressWarnings("unchecked")
public static void main(final String[] args) throws Exception {
if (args.length < 2) {
System.err.println("StreamsUpgradeTest requires two argument (kafka-url, properties-file) but only " + args.length + " provided: "
+ (args.length > 0 ? args[0] : ""));
if (args.length < 1) {
System.err.println("StreamsUpgradeTest requires one argument (properties-file) but provided none");
}
final String kafka = args[0];
final String propFileName = args.length > 1 ? args[1] : null;
final String propFileName = args[0];

final Properties streamsProperties = Utils.loadProps(propFileName);

System.out.println("StreamsTest instance started (StreamsUpgradeTest v2.1)");
System.out.println("kafka=" + kafka);
System.out.println("props=" + streamsProperties);

final StreamsBuilder builder = new StreamsBuilder();
Expand All @@ -51,7 +48,6 @@ public static void main(final String[] args) throws Exception {

final Properties config = new Properties();
config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsUpgradeTest");
config.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
config.putAll(streamsProperties);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,17 +31,14 @@ public class StreamsUpgradeTest {

@SuppressWarnings("unchecked")
public static void main(final String[] args) throws Exception {
if (args.length < 2) {
System.err.println("StreamsUpgradeTest requires two argument (kafka-url, properties-file) but only " + args.length + " provided: "
+ (args.length > 0 ? args[0] : ""));
if (args.length < 1) {
System.err.println("StreamsUpgradeTest requires one argument (properties-file) but provided none");
}
final String kafka = args[0];
final String propFileName = args.length > 1 ? args[1] : null;
final String propFileName = args[0];

final Properties streamsProperties = Utils.loadProps(propFileName);

System.out.println("StreamsTest instance started (StreamsUpgradeTest v2.2)");
System.out.println("kafka=" + kafka);
System.out.println("props=" + streamsProperties);

final StreamsBuilder builder = new StreamsBuilder();
Expand All @@ -51,7 +48,6 @@ public static void main(final String[] args) throws Exception {

final Properties config = new Properties();
config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsUpgradeTest");
config.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
config.putAll(streamsProperties);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,17 +31,14 @@ public class StreamsUpgradeTest {

@SuppressWarnings("unchecked")
public static void main(final String[] args) throws Exception {
if (args.length < 2) {
System.err.println("StreamsUpgradeTest requires two argument (kafka-url, properties-file) but only " + args.length + " provided: "
+ (args.length > 0 ? args[0] : ""));
if (args.length < 1) {
System.err.println("StreamsUpgradeTest requires one argument (properties-file) but provided none");
}
final String kafka = args[0];
final String propFileName = args.length > 1 ? args[1] : null;
final String propFileName = args[0];

final Properties streamsProperties = Utils.loadProps(propFileName);

System.out.println("StreamsTest instance started (StreamsUpgradeTest v2.3)");
System.out.println("kafka=" + kafka);
System.out.println("props=" + streamsProperties);

final StreamsBuilder builder = new StreamsBuilder();
Expand All @@ -51,7 +48,6 @@ public static void main(final String[] args) throws Exception {

final Properties config = new Properties();
config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsUpgradeTest");
config.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
config.putAll(streamsProperties);

Expand Down
9 changes: 2 additions & 7 deletions tests/kafkatest/services/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -487,12 +487,7 @@ def prop_file(self):

def start_cmd(self, node):
args = self.args.copy()
if self.KAFKA_STREAMS_VERSION in [str(LATEST_0_10_0), str(LATEST_0_10_1), str(LATEST_0_10_2),
str(LATEST_0_11_0), str(LATEST_1_0), str(LATEST_1_1),
str(LATEST_2_0), str(LATEST_2_1), str(LATEST_2_2), str(LATEST_2_3)]:
args['kafka'] = self.kafka.bootstrap_servers()
else:
args['kafka'] = ""

if self.KAFKA_STREAMS_VERSION == str(LATEST_0_10_0) or self.KAFKA_STREAMS_VERSION == str(LATEST_0_10_1):
args['zk'] = self.kafka.zk.connect_setting()
else:
Expand All @@ -507,7 +502,7 @@ def start_cmd(self, node):

cmd = "( export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%(log4j)s\"; " \
"INCLUDE_TEST_JARS=true UPGRADE_KAFKA_STREAMS_TEST_VERSION=%(version)s " \
" %(kafka_run_class)s %(streams_class_name)s %(kafka)s %(zk)s %(config_file)s " \
" %(kafka_run_class)s %(streams_class_name)s %(zk)s %(config_file)s " \
" & echo $! >&3 ) 1>> %(stdout)s 2>> %(stderr)s 3> %(pidfile)s" % args

self.logger.info("Executing: " + cmd)
Expand Down
4 changes: 2 additions & 2 deletions tests/kafkatest/tests/streams/streams_upgrade_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -527,11 +527,11 @@ def do_rolling_bounce(self, processor, counter, current_generation):
monitors[second_other_processor] = second_other_monitor

if len(self.old_processors) > 0:
log_monitor.wait_until("Sent a version 6 subscription and got version 5 assignment back (successful version probing). Downgrade subscription metadata to commonly supported version and trigger new rebalance.",
log_monitor.wait_until("Sent a version 6 subscription and got version 5 assignment back (successful version probing). Downgrade subscription metadata to commonly supported version 5 and trigger new rebalance.",
timeout_sec=60,
err_msg="Could not detect 'successful version probing' at upgrading node " + str(node.account))
else:
log_monitor.wait_until("Sent a version 6 subscription and got version 5 assignment back (successful version probing). Downgrade subscription metadata to commonly supported version and trigger new rebalance.",
log_monitor.wait_until("Sent a version 6 subscription and got version 5 assignment back (successful version probing). Downgrade subscription metadata to commonly supported version 6 and trigger new rebalance.",
timeout_sec=60,
err_msg="Could not detect 'successful version probing with upgraded leader' at upgrading node " + str(node.account))
first_other_monitor.wait_until("Sent a version 5 subscription and group.s latest commonly supported version is 6 (successful version probing and end of rolling upgrade). Upgrading subscription metadata version to 6 for next rebalance.",
Expand Down