From adc67d52704eddc263291cb04fb5701df190f0af Mon Sep 17 00:00:00 2001 From: Victoria Xia Date: Mon, 24 Apr 2023 23:45:02 -0700 Subject: [PATCH 1/3] add system test --- .../streams/tests/StreamsUpgradeTest.java | 112 ++++++++++++++++++ .../tests/streams/streams_upgrade_test.py | 86 ++++++++++++++ tests/kafkatest/version.py | 2 +- 3 files changed, 199 insertions(+), 1 deletion(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java index 3f0a8000c6a69..e07b01a43b316 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java @@ -16,6 +16,8 @@ */ package org.apache.kafka.streams.tests; +import java.util.Arrays; +import java.util.Collections; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerGroupMetadata; @@ -24,18 +26,24 @@ import org.apache.kafka.common.Cluster; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.utils.ByteBufferInputStream; import org.apache.kafka.common.utils.Exit; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.KafkaClientSupplier; import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.errors.TaskAssignmentException; import org.apache.kafka.streams.kstream.Consumed; +import org.apache.kafka.streams.kstream.Grouped; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KTable; +import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.kstream.Produced; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier; @@ -69,6 +77,7 @@ import static org.apache.kafka.common.utils.Utils.mkMap; import static org.apache.kafka.streams.processor.internals.assignment.StreamsAssignmentProtocolVersions.LATEST_SUPPORTED_VERSION; import static org.apache.kafka.streams.tests.SmokeTestUtil.intSerde; +import static org.apache.kafka.streams.tests.SmokeTestUtil.longSerde; import static org.apache.kafka.streams.tests.SmokeTestUtil.stringSerde; public class StreamsUpgradeTest { @@ -107,6 +116,9 @@ public static KafkaStreams buildStreams(final Properties streamsProperties) { final boolean runFkJoin = Boolean.parseBoolean(streamsProperties.getProperty( "test.run_fk_join", "false")); + final boolean runTableAgg = Boolean.parseBoolean(streamsProperties.getProperty( + "test.run_table_agg", + "false")); if (runFkJoin) { try { final KTable fkTable = builder.table( @@ -116,6 +128,23 @@ public static KafkaStreams buildStreams(final Properties streamsProperties) { System.err.println("Caught " + e.getMessage()); } } + if (runTableAgg) { + final String aggProducePrefix = streamsProperties.getProperty("test.agg_produce_prefix", ""); + if (aggProducePrefix.isEmpty()) { + System.err.printf("'%s' must be specified when '%s' is true.", "test.agg_produce_prefix", "test.run_table_agg"); + } + final String expectedAggPrefixesStr = streamsProperties.getProperty("test.expected_agg_prefixes", ""); + if (expectedAggPrefixesStr.isEmpty()) { + System.err.printf("'%s' must be specified when '%s' is true.", "test.expected_agg_prefixes", "test.run_table_agg"); + } + final List expectedAggPrefixes = Arrays.asList(expectedAggPrefixesStr.split(",")); + + try { + buildTableAgg(dataTable, aggProducePrefix, expectedAggPrefixes); + } catch (final Exception e) { + System.err.println("Caught " + e.getMessage()); + } + } final Properties config = new Properties(); config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsUpgradeTest"); @@ -143,6 +172,89 @@ private static void buildFKTable(final KStream primaryTable, kStream.to("fk-result", Produced.with(stringSerde, stringSerde)); } + private static void buildTableAgg(final KTable sourceTable, + final String aggProducePrefix, + final List expectedAggPrefixes) { + sourceTable + .groupBy( + (k, v) -> new KeyValue<>((int) (Math.random() * 15), aggProducePrefix), // group into smaller number of keys, while still allowing for distribution to all downstream tasks + Grouped.with(intSerde, stringSerde)) + .aggregate( + () -> new Agg(Collections.emptyList(), 0), + (k, v, agg) -> { + final List seenPrefixes; + final boolean updated; + if (!agg.seenPrefixes.contains(v)) { + seenPrefixes = new ArrayList<>(agg.seenPrefixes); + seenPrefixes.add(v); + Collections.sort(seenPrefixes); + updated = true; + } else { + seenPrefixes = agg.seenPrefixes; + updated = false; + } + + final boolean shouldLog = updated || (agg.recordsProcessed % 10 == 0); // value of 10 is chosen for debugging purposes. can increase to 100 once test is passing. + if (shouldLog && seenPrefixes.containsAll(expectedAggPrefixes)) { + System.out.printf("Table aggregate processor saw expected prefixes: %s%n", String.join(",", expectedAggPrefixes)); + } else { + System.out.printf("Table aggregate processor did not see expected prefixes. Seen: %s. Expected: %s%n", String.join(",", seenPrefixes), String.join(",", expectedAggPrefixes)); // this line for debugging purposes only. + } + + return new Agg(seenPrefixes, agg.recordsProcessed + 1); + }, + (k, v, agg) -> agg, + Materialized.with(intSerde, new AggSerde())) + .mapValues((k, vAgg) -> String.join(",", vAgg.seenPrefixes)) + .toStream() + .to("table-agg-result", Produced.with(intSerde, stringSerde)); + } + + private static class Agg { + private final List seenPrefixes; + private final long recordsProcessed; + + Agg(final List seenPrefixes, final long recordsProcessed) { + this.seenPrefixes = seenPrefixes; + this.recordsProcessed = recordsProcessed; + } + + byte[] serialize() { + final byte[] rawSeenPrefixes = stringSerde.serializer().serialize("", String.join(",", seenPrefixes)); + final byte[] rawRecordsProcessed = longSerde.serializer().serialize("", recordsProcessed); + return ByteBuffer + .allocate(rawSeenPrefixes.length + rawRecordsProcessed.length) + .put(rawSeenPrefixes) + .put(rawRecordsProcessed) + .array(); + } + + static Agg deserialize(final byte[] rawAgg) { + final byte[] rawSeenPrefixes = new byte[rawAgg.length - 8]; + System.arraycopy(rawAgg, 0, rawSeenPrefixes, 0, rawSeenPrefixes.length); + final List seenPrefixes = Arrays.asList(stringSerde.deserializer().deserialize("", rawSeenPrefixes).split(",")); + + final byte[] rawRecordsProcessed = new byte[8]; + System.arraycopy(rawAgg, rawAgg.length - 8, rawRecordsProcessed, 0, 8); + final long recordsProcessed = longSerde.deserializer().deserialize("", rawRecordsProcessed); + + return new Agg(seenPrefixes, recordsProcessed); + } + } + + private static class AggSerde implements Serde { + + @Override + public Serializer serializer() { + return (topic, agg) -> agg.serialize(); + } + + @Override + public Deserializer deserializer() { + return (topic, rawAgg) -> Agg.deserialize(rawAgg); + } + } + private static class FutureKafkaClientSupplier extends DefaultKafkaClientSupplier { @Override public Consumer getConsumer(final Map config) { diff --git a/tests/kafkatest/tests/streams/streams_upgrade_test.py b/tests/kafkatest/tests/streams/streams_upgrade_test.py index cf276364ea457..04f8fd8579110 100644 --- a/tests/kafkatest/tests/streams/streams_upgrade_test.py +++ b/tests/kafkatest/tests/streams/streams_upgrade_test.py @@ -41,6 +41,7 @@ metadata_2_versions = [str(LATEST_0_10_1), str(LATEST_0_10_2), str(LATEST_0_11_0), str(LATEST_1_0), str(LATEST_1_1)] fk_join_versions = [str(LATEST_2_4), str(LATEST_2_5), str(LATEST_2_6), str(LATEST_2_7), str(LATEST_2_8), str(LATEST_3_0), str(LATEST_3_1), str(LATEST_3_2), str(LATEST_3_3)] +table_agg_versions = [str(LATEST_3_3)] """ After each release one should first check that the released version has been uploaded to @@ -236,6 +237,91 @@ def test_rolling_upgrade_with_2_bounces(self, from_version, to_version): self.stop_and_await() + @cluster(num_nodes=6) + @matrix(from_version=table_agg_versions, to_version=[str(DEV_VERSION)]) + def test_rolling_upgrade_for_table_agg(self, from_version, to_version): + """ + This test verifies that the cluster successfully upgrades despite changes in the table + repartition topic format. + + Starts 3 KafkaStreams instances with version and upgrades one-by-one to + """ + + extra_properties = {'test.run_table_agg': 'true'} + + self.set_up_services() + + self.driver.start() + + extra_properties = extra_properties.copy() + extra_properties['test.agg_produce_prefix'] = 'A' + extra_properties['test.expected_agg_prefixes'] = 'A' + self.start_all_nodes_with(from_version, extra_properties) + + counter = 1 + random.seed() + + # rolling bounce + random.shuffle(self.processors) + p3 = self.processors[-1] + for p in self.processors: + p.CLEAN_NODE_ENABLED = False + + # bounce two instances to new version (verifies that new version can process records + # written by old version) + extra_properties = extra_properties.copy() + extra_properties['test.agg_produce_prefix'] = 'B' + extra_properties['test.expected_agg_prefixes'] = 'A,B' + for p in self.processors[:-1]: + self.do_stop_start_bounce(p, from_version[:-2], to_version, counter, extra_properties) + counter = counter + 1 + + # bounce remaining instance on old version (just for verification purposes, to verify that + # instance on old version can process records written by new version) + extra_properties = extra_properties.copy() + extra_properties['test.agg_produce_prefix'] = 'A' + extra_properties['test.expected_agg_prefixes'] = 'A,B' + self.do_stop_start_bounce(p3, None, from_version, counter, extra_properties) + counter = counter + 1 + + self.wait_for_table_agg_success('A,B') + + # bounce remaining instance to new version (verifies that new version without upgrade_from + # can process records written by new version with upgrade_from) + extra_properties = extra_properties.copy() + extra_properties['test.agg_produce_prefix'] = 'C' + extra_properties['test.expected_agg_prefixes'] = 'A,B,C' + self.do_stop_start_bounce(p3, None, to_version, counter, extra_properties) + counter = counter + 1 + + # bounce first instances again without removing upgrade_from (just for verification purposes, + # to verify that instance with upgrade_from can process records written without upgrade_from) + extra_properties = extra_properties.copy() + extra_properties['test.agg_produce_prefix'] = 'B' + extra_properties['test.expected_agg_prefixes'] = 'A,B,C' + for p in self.processors[:-1]: + self.do_stop_start_bounce(p, from_version[:-2], to_version, counter, extra_properties) + counter = counter + 1 + + self.wait_for_table_agg_success('A,B,C') + + self.stop_and_await() + + def wait_for_table_agg_success(self, expected_prefixes): + agg_success_str = "Table aggregate processor saw expected prefixes: " + expected_prefixes + with self.processor1.node.account.monitor_log(self.processor1.LOG_FILE) as first_monitor: + with self.processor2.node.account.monitor_log(self.processor2.LOG_FILE) as second_monitor: + with self.processor3.node.account.monitor_log(self.processor3.LOG_FILE) as third_monitor: + first_monitor.wait_until(agg_success_str, + timeout_sec=60, + err_msg="Could not verify table aggregate processor success for '" + expected_prefixes + "' in " + str(self.processor1.node.account)) + second_monitor.wait_until(agg_success_str, + timeout_sec=60, + err_msg="Could not verify table aggregate processor success for '" + expected_prefixes + "' in " + str(self.processor2.node.account)) + third_monitor.wait_until(agg_success_str, + timeout_sec=60, + err_msg="Could not verify table aggregate processor success for '" + expected_prefixes + "' in " + str(self.processor3.node.account)) + @cluster(num_nodes=6) def test_version_probing_upgrade(self): """ diff --git a/tests/kafkatest/version.py b/tests/kafkatest/version.py index c31adc256bab0..7c49dfaf23ee8 100644 --- a/tests/kafkatest/version.py +++ b/tests/kafkatest/version.py @@ -119,7 +119,7 @@ def get_version(node=None): return DEV_BRANCH DEV_BRANCH = KafkaVersion("dev") -DEV_VERSION = KafkaVersion("3.5.0-SNAPSHOT") +DEV_VERSION = KafkaVersion("3.6.0-SNAPSHOT") LATEST_METADATA_VERSION = "3.3" From c6d9447cd913b553474de4b651e25347827ce5eb Mon Sep 17 00:00:00 2001 From: "Matthias J. Sax" Date: Mon, 1 May 2023 21:03:20 -0700 Subject: [PATCH 2/3] - add table-aggregation to older versions - fix data gen to use ring-buffer - fix application-id for older versions --- .../kafka/streams/tests/SmokeTestDriver.java | 10 +- .../kafka/streams/tests/SmokeTestUtil.java | 24 ++++ .../streams/tests/StreamsUpgradeTest.java | 78 ++++++----- .../kafka/streams/tests/SmokeTestDriver.java | 10 +- .../kafka/streams/tests/SmokeTestUtil.java | 24 ++++ .../streams/tests/StreamsUpgradeTest.java | 124 +++++++++++++++++- .../tests/streams/streams_upgrade_test.py | 43 +++--- 7 files changed, 250 insertions(+), 63 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java index f4622a15a7d61..c1248e3b99afd 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java @@ -106,7 +106,11 @@ private static class ValueList { } int next() { - return (index < values.length) ? values[index++] : -1; + final int v = values[index++]; + if (index >= values.length) { + index = 0; + } + return v; } } @@ -126,7 +130,7 @@ static void generatePerpetually(final String kafka, data[i] = new ValueList(i, i + maxRecordsPerKey - 1); } - final Random rand = new Random(); + final Random rand = new Random(System.currentTimeMillis()); try (final KafkaProducer producer = new KafkaProducer<>(producerProps)) { while (true) { @@ -313,7 +317,7 @@ public void onCompletion(final RecordMetadata metadata, final Exception exceptio } private static void shuffle(final int[] data, @SuppressWarnings("SameParameterValue") final int windowSize) { - final Random rand = new Random(); + final Random rand = new Random(System.currentTimeMillis()); for (int i = 0; i < data.length; i++) { // we shuffle data within windowSize final int j = rand.nextInt(Math.min(data.length - i, windowSize)) + i; diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java index e0c45d0c1c954..c5d7ac9ff3b70 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java @@ -34,10 +34,34 @@ public class SmokeTestUtil { final static int END = Integer.MAX_VALUE; + static ProcessorSupplier printTaskProcessorSupplier(final String topic) { + return printTaskProcessorSupplier(topic, ""); + } + static ProcessorSupplier printProcessorSupplier(final String topic) { return printProcessorSupplier(topic, ""); } + static ProcessorSupplier printTaskProcessorSupplier(final String topic, final String name) { + return () -> new ContextualProcessor() { + @Override + public void init(final ProcessorContext context) { + super.init(context); + System.out.println("[DEV] initializing processor: topic=" + topic + " taskId=" + context.taskId()); + System.out.flush(); + } + + @Override + public void process(final Record record) { } + + @Override + public void close() { + System.out.printf("Close processor for task %s%n", context().taskId()); + System.out.flush(); + } + }; + } + static ProcessorSupplier printProcessorSupplier(final String topic, final String name) { return () -> new ContextualProcessor() { private int numRecordsProcessed = 0; diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java index e07b01a43b316..b4c4879e99f04 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java @@ -93,6 +93,7 @@ public static void main(final String[] args) throws Exception { System.out.println("StreamsTest instance started (StreamsUpgradeTest trunk)"); System.out.println("props=" + streamsProperties); + // Do not use try-with-resources here; otherwise KafkaStreams will be closed when `main()` exits final KafkaStreams streams = buildStreams(streamsProperties); streams.start(); @@ -129,18 +130,18 @@ public static KafkaStreams buildStreams(final Properties streamsProperties) { } } if (runTableAgg) { - final String aggProducePrefix = streamsProperties.getProperty("test.agg_produce_prefix", ""); - if (aggProducePrefix.isEmpty()) { - System.err.printf("'%s' must be specified when '%s' is true.", "test.agg_produce_prefix", "test.run_table_agg"); + final String aggProduceValue = streamsProperties.getProperty("test.agg_produce_value", ""); + if (aggProduceValue.isEmpty()) { + System.err.printf("'%s' must be specified when '%s' is true.", "test.agg_produce_value", "test.run_table_agg"); } - final String expectedAggPrefixesStr = streamsProperties.getProperty("test.expected_agg_prefixes", ""); - if (expectedAggPrefixesStr.isEmpty()) { - System.err.printf("'%s' must be specified when '%s' is true.", "test.expected_agg_prefixes", "test.run_table_agg"); + final String expectedAggValuesStr = streamsProperties.getProperty("test.expected_agg_values", ""); + if (expectedAggValuesStr.isEmpty()) { + System.err.printf("'%s' must be specified when '%s' is true.", "test.expected_agg_values", "test.run_table_agg"); } - final List expectedAggPrefixes = Arrays.asList(expectedAggPrefixesStr.split(",")); + final List expectedAggValue = Arrays.asList(expectedAggValuesStr.split(",")); try { - buildTableAgg(dataTable, aggProducePrefix, expectedAggPrefixes); + buildTableAgg(dataTable, aggProduceValue, expectedAggValue); } catch (final Exception e) { System.err.println("Caught " + e.getMessage()); } @@ -173,72 +174,77 @@ private static void buildFKTable(final KStream primaryTable, } private static void buildTableAgg(final KTable sourceTable, - final String aggProducePrefix, - final List expectedAggPrefixes) { - sourceTable + final String aggProduceValue, + final List expectedAggValues) { + final KStream result = sourceTable .groupBy( - (k, v) -> new KeyValue<>((int) (Math.random() * 15), aggProducePrefix), // group into smaller number of keys, while still allowing for distribution to all downstream tasks + (k, v) -> new KeyValue<>(v, aggProduceValue), Grouped.with(intSerde, stringSerde)) .aggregate( () -> new Agg(Collections.emptyList(), 0), (k, v, agg) -> { - final List seenPrefixes; + final List seenValues; final boolean updated; - if (!agg.seenPrefixes.contains(v)) { - seenPrefixes = new ArrayList<>(agg.seenPrefixes); - seenPrefixes.add(v); - Collections.sort(seenPrefixes); + if (!agg.seenValues.contains(v)) { + seenValues = new ArrayList<>(agg.seenValues); + seenValues.add(v); + Collections.sort(seenValues); updated = true; } else { - seenPrefixes = agg.seenPrefixes; + seenValues = agg.seenValues; updated = false; } final boolean shouldLog = updated || (agg.recordsProcessed % 10 == 0); // value of 10 is chosen for debugging purposes. can increase to 100 once test is passing. - if (shouldLog && seenPrefixes.containsAll(expectedAggPrefixes)) { - System.out.printf("Table aggregate processor saw expected prefixes: %s%n", String.join(",", expectedAggPrefixes)); - } else { - System.out.printf("Table aggregate processor did not see expected prefixes. Seen: %s. Expected: %s%n", String.join(",", seenPrefixes), String.join(",", expectedAggPrefixes)); // this line for debugging purposes only. + if (shouldLog) { + if (seenValues.containsAll(expectedAggValues)) { + System.out.printf("Table aggregate processor saw expected values. Seen: %s. Expected: %s%n", String.join(",", seenValues), String.join(",", expectedAggValues)); + } else { + System.out.printf("Table aggregate processor did not see expected values. Seen: %s. Expected: %s%n", String.join(",", seenValues), String.join(",", expectedAggValues)); // this line for debugging purposes only. + } } - return new Agg(seenPrefixes, agg.recordsProcessed + 1); + return new Agg(seenValues, agg.recordsProcessed + 1); }, (k, v, agg) -> agg, Materialized.with(intSerde, new AggSerde())) - .mapValues((k, vAgg) -> String.join(",", vAgg.seenPrefixes)) - .toStream() - .to("table-agg-result", Produced.with(intSerde, stringSerde)); + .mapValues((k, vAgg) -> String.join(",", vAgg.seenValues)) + .toStream(); + + // adding dummy processor for better debugging (will print assigned tasks) + result.process(SmokeTestUtil.printTaskProcessorSupplier("table-repartition")); + result.to("table-agg-result", Produced.with(intSerde, stringSerde)); } private static class Agg { - private final List seenPrefixes; + private final List seenValues; private final long recordsProcessed; - Agg(final List seenPrefixes, final long recordsProcessed) { - this.seenPrefixes = seenPrefixes; + Agg(final List seenValues, final long recordsProcessed) { + this.seenValues = seenValues; this.recordsProcessed = recordsProcessed; } byte[] serialize() { - final byte[] rawSeenPrefixes = stringSerde.serializer().serialize("", String.join(",", seenPrefixes)); + final byte[] rawSeenValues = stringSerde.serializer().serialize("", String.join(",", seenValues)); final byte[] rawRecordsProcessed = longSerde.serializer().serialize("", recordsProcessed); return ByteBuffer - .allocate(rawSeenPrefixes.length + rawRecordsProcessed.length) - .put(rawSeenPrefixes) + .allocate(rawSeenValues.length + rawRecordsProcessed.length) + .put(rawSeenValues) .put(rawRecordsProcessed) .array(); } static Agg deserialize(final byte[] rawAgg) { - final byte[] rawSeenPrefixes = new byte[rawAgg.length - 8]; - System.arraycopy(rawAgg, 0, rawSeenPrefixes, 0, rawSeenPrefixes.length); - final List seenPrefixes = Arrays.asList(stringSerde.deserializer().deserialize("", rawSeenPrefixes).split(",")); + final byte[] rawSeenValues = new byte[rawAgg.length - 8]; + System.arraycopy(rawAgg, 0, rawSeenValues, 0, rawSeenValues.length); + final List seenValues = Arrays.asList(stringSerde.deserializer().deserialize("", rawSeenValues).split(",")); final byte[] rawRecordsProcessed = new byte[8]; System.arraycopy(rawAgg, rawAgg.length - 8, rawRecordsProcessed, 0, 8); final long recordsProcessed = longSerde.deserializer().deserialize("", rawRecordsProcessed); - return new Agg(seenPrefixes, recordsProcessed); + return new Agg(seenValues, recordsProcessed); } } diff --git a/streams/upgrade-system-tests-33/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java b/streams/upgrade-system-tests-33/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java index dbacbb9625b61..63b41ff4c7cf0 100644 --- a/streams/upgrade-system-tests-33/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java +++ b/streams/upgrade-system-tests-33/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java @@ -106,7 +106,11 @@ private static class ValueList { } int next() { - return (index < values.length) ? values[index++] : -1; + final int v = values[index++]; + if (index >= values.length) { + index = 0; + } + return v; } } @@ -126,7 +130,7 @@ static void generatePerpetually(final String kafka, data[i] = new ValueList(i, i + maxRecordsPerKey - 1); } - final Random rand = new Random(); + final Random rand = new Random(System.currentTimeMillis()); try (final KafkaProducer producer = new KafkaProducer<>(producerProps)) { while (true) { @@ -309,7 +313,7 @@ public void onCompletion(final RecordMetadata metadata, final Exception exceptio } private static void shuffle(final int[] data, @SuppressWarnings("SameParameterValue") final int windowSize) { - final Random rand = new Random(); + final Random rand = new Random(System.currentTimeMillis()); for (int i = 0; i < data.length; i++) { // we shuffle data within windowSize final int j = rand.nextInt(Math.min(data.length - i, windowSize)) + i; diff --git a/streams/upgrade-system-tests-33/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java b/streams/upgrade-system-tests-33/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java index 2a99f0b808597..f64fe18b9994b 100644 --- a/streams/upgrade-system-tests-33/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java +++ b/streams/upgrade-system-tests-33/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java @@ -34,10 +34,34 @@ public class SmokeTestUtil { final static int END = Integer.MAX_VALUE; + static ProcessorSupplier printTaskProcessorSupplier(final String topic) { + return printTaskProcessorSupplier(topic, ""); + } + static ProcessorSupplier printProcessorSupplier(final String topic) { return printProcessorSupplier(topic, ""); } + static ProcessorSupplier printTaskProcessorSupplier(final String topic, final String name) { + return () -> new ContextualProcessor() { + @Override + public void init(final ProcessorContext context) { + super.init(context); + System.out.println("[3.3] initializing processor: topic=" + topic + " taskId=" + context.taskId()); + System.out.flush(); + } + + @Override + public void process(final Record record) {} + + @Override + public void close() { + System.out.printf("Close processor for task %s%n", context().taskId()); + System.out.flush(); + } + }; + } + static ProcessorSupplier printProcessorSupplier(final String topic, final String name) { return () -> new ContextualProcessor() { private int numRecordsProcessed = 0; diff --git a/streams/upgrade-system-tests-33/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java b/streams/upgrade-system-tests-33/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java index 60b8305bc3587..e8eed0613927c 100644 --- a/streams/upgrade-system-tests-33/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java +++ b/streams/upgrade-system-tests-33/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java @@ -16,23 +16,34 @@ */ package org.apache.kafka.streams.tests; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.kstream.Consumed; +import org.apache.kafka.streams.kstream.Grouped; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KTable; +import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.kstream.Produced; import org.apache.kafka.streams.processor.api.ContextualProcessor; import org.apache.kafka.streams.processor.api.ProcessorContext; import org.apache.kafka.streams.processor.api.ProcessorSupplier; import org.apache.kafka.streams.processor.api.Record; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; import java.util.Properties; -import java.util.Random; import static org.apache.kafka.streams.tests.SmokeTestUtil.intSerde; +import static org.apache.kafka.streams.tests.SmokeTestUtil.longSerde; import static org.apache.kafka.streams.tests.SmokeTestUtil.stringSerde; @@ -60,6 +71,9 @@ public static void main(final String[] args) throws Exception { final boolean runFkJoin = Boolean.parseBoolean(streamsProperties.getProperty( "test.run_fk_join", "false")); + final boolean runTableAgg = Boolean.parseBoolean(streamsProperties.getProperty( + "test.run_table_agg", + "false")); if (runFkJoin) { try { final KTable fkTable = builder.table( @@ -69,14 +83,32 @@ public static void main(final String[] args) throws Exception { System.err.println("Caught " + e.getMessage()); } } + if (runTableAgg) { + final String aggProduceValue = streamsProperties.getProperty("test.agg_produce_value", ""); + if (aggProduceValue.isEmpty()) { + System.err.printf("'%s' must be specified when '%s' is true.", "test.agg_produce_value", "test.run_table_agg"); + } + final String expectedAggValuesStr = streamsProperties.getProperty("test.expected_agg_values", ""); + if (expectedAggValuesStr.isEmpty()) { + System.err.printf("'%s' must be specified when '%s' is true.", "test.expected_agg_values", "test.run_table_agg"); + } + final List expectedAggValues = Arrays.asList(expectedAggValuesStr.split(",")); + + try { + buildTableAgg(dataTable, aggProduceValue, expectedAggValues); + } catch (final Exception e) { + System.err.println("Caught " + e.getMessage()); + } + } final Properties config = new Properties(); config.setProperty( StreamsConfig.APPLICATION_ID_CONFIG, - "StreamsUpgradeTest-" + new Random().nextLong()); + "StreamsUpgradeTest"); config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000); config.putAll(streamsProperties); + // Do not use try-with-resources here; otherwise KafkaStreams will be closed when `main()` exits final KafkaStreams streams = new KafkaStreams(builder.build(), config); streams.start(); @@ -96,6 +128,94 @@ private static void buildFKTable(final KStream primaryTable, kStream.to("fk-result", Produced.with(stringSerde, stringSerde)); } + private static void buildTableAgg(final KTable sourceTable, + final String aggProduceValue, + final List expectedAggValues) { + final KStream result = sourceTable + .groupBy( + (k, v) -> new KeyValue<>(v, aggProduceValue), + Grouped.with(intSerde, stringSerde)) + .aggregate( + () -> new Agg(Collections.emptyList(), 0), + (k, v, agg) -> { + final List seenValues; + final boolean updated; + if (!agg.seenValues.contains(v)) { + seenValues = new ArrayList<>(agg.seenValues); + seenValues.add(v); + Collections.sort(seenValues); + updated = true; + } else { + seenValues = agg.seenValues; + updated = false; + } + + final boolean shouldLog = updated || (agg.recordsProcessed % 10 == 0); // value of 10 is chosen for debugging purposes. can increase to 100 once test is passing. + if (shouldLog) { + if (seenValues.containsAll(expectedAggValues)) { + System.out.printf("Table aggregate processor saw expected values. Seen: %s. Expected: %s%n", String.join(",", seenValues), String.join(",", expectedAggValues)); + } else { + System.out.printf("Table aggregate processor did not see expected values. Seen: %s. Expected: %s%n", String.join(",", seenValues), String.join(",", expectedAggValues)); // this line for debugging purposes only. + } + } + + return new Agg(seenValues, agg.recordsProcessed + 1); + }, + (k, v, agg) -> agg, + Materialized.with(intSerde, new AggSerde())) + .mapValues((k, vAgg) -> String.join(",", vAgg.seenValues)) + .toStream(); + + // adding dummy processor for better debugging (will print assigned tasks) + result.process(SmokeTestUtil.printTaskProcessorSupplier("table-repartition")); + result.to("table-agg-result", Produced.with(intSerde, stringSerde)); + } + + private static class Agg { + private final List seenValues; + private final long recordsProcessed; + + Agg(final List seenValues, final long recordsProcessed) { + this.seenValues = seenValues; + this.recordsProcessed = recordsProcessed; + } + + byte[] serialize() { + final byte[] rawSeenValuees = stringSerde.serializer().serialize("", String.join(",", seenValues)); + final byte[] rawRecordsProcessed = longSerde.serializer().serialize("", recordsProcessed); + return ByteBuffer + .allocate(rawSeenValuees.length + rawRecordsProcessed.length) + .put(rawSeenValuees) + .put(rawRecordsProcessed) + .array(); + } + + static Agg deserialize(final byte[] rawAgg) { + final byte[] rawSeenValues = new byte[rawAgg.length - 8]; + System.arraycopy(rawAgg, 0, rawSeenValues, 0, rawSeenValues.length); + final List seenValues = Arrays.asList(stringSerde.deserializer().deserialize("", rawSeenValues).split(",")); + + final byte[] rawRecordsProcessed = new byte[8]; + System.arraycopy(rawAgg, rawAgg.length - 8, rawRecordsProcessed, 0, 8); + final long recordsProcessed = longSerde.deserializer().deserialize("", rawRecordsProcessed); + + return new Agg(seenValues, recordsProcessed); + } + } + + private static class AggSerde implements Serde { + + @Override + public Serializer serializer() { + return (topic, agg) -> agg.serialize(); + } + + @Override + public Deserializer deserializer() { + return (topic, rawAgg) -> Agg.deserialize(rawAgg); + } + } + private static ProcessorSupplier printProcessorSupplier(final String topic) { return () -> new ContextualProcessor() { private int numRecordsProcessed = 0; diff --git a/tests/kafkatest/tests/streams/streams_upgrade_test.py b/tests/kafkatest/tests/streams/streams_upgrade_test.py index 04f8fd8579110..26ab291dcf152 100644 --- a/tests/kafkatest/tests/streams/streams_upgrade_test.py +++ b/tests/kafkatest/tests/streams/streams_upgrade_test.py @@ -176,7 +176,7 @@ def test_upgrade_downgrade_brokers(self, from_version, to_version): self.perform_broker_upgrade(to_version) log_monitor.wait_until(connected_message, - timeout_sec=120, + timeout_sec=60, err_msg=("Never saw output '%s' on " % connected_message) + str(processor.node.account)) stdout_monitor.wait_until(self.processed_data_msg, @@ -253,9 +253,14 @@ def test_rolling_upgrade_for_table_agg(self, from_version, to_version): self.driver.start() + # encoding different target values for different versions + # - old version: value=A + # - new version with `upgrade_from` flag set: value=B + # - new version w/o `upgrade_from` set set: value=C + extra_properties = extra_properties.copy() - extra_properties['test.agg_produce_prefix'] = 'A' - extra_properties['test.expected_agg_prefixes'] = 'A' + extra_properties['test.agg_produce_value'] = 'A' + extra_properties['test.expected_agg_values'] = 'A' self.start_all_nodes_with(from_version, extra_properties) counter = 1 @@ -270,8 +275,8 @@ def test_rolling_upgrade_for_table_agg(self, from_version, to_version): # bounce two instances to new version (verifies that new version can process records # written by old version) extra_properties = extra_properties.copy() - extra_properties['test.agg_produce_prefix'] = 'B' - extra_properties['test.expected_agg_prefixes'] = 'A,B' + extra_properties['test.agg_produce_value'] = 'B' + extra_properties['test.expected_agg_values'] = 'A,B' for p in self.processors[:-1]: self.do_stop_start_bounce(p, from_version[:-2], to_version, counter, extra_properties) counter = counter + 1 @@ -279,8 +284,8 @@ def test_rolling_upgrade_for_table_agg(self, from_version, to_version): # bounce remaining instance on old version (just for verification purposes, to verify that # instance on old version can process records written by new version) extra_properties = extra_properties.copy() - extra_properties['test.agg_produce_prefix'] = 'A' - extra_properties['test.expected_agg_prefixes'] = 'A,B' + extra_properties['test.agg_produce_value'] = 'A' + extra_properties['test.expected_agg_values'] = 'A,B' self.do_stop_start_bounce(p3, None, from_version, counter, extra_properties) counter = counter + 1 @@ -289,16 +294,16 @@ def test_rolling_upgrade_for_table_agg(self, from_version, to_version): # bounce remaining instance to new version (verifies that new version without upgrade_from # can process records written by new version with upgrade_from) extra_properties = extra_properties.copy() - extra_properties['test.agg_produce_prefix'] = 'C' - extra_properties['test.expected_agg_prefixes'] = 'A,B,C' + extra_properties['test.agg_produce_value'] = 'C' + extra_properties['test.expected_agg_values'] = 'A,B,C' self.do_stop_start_bounce(p3, None, to_version, counter, extra_properties) counter = counter + 1 # bounce first instances again without removing upgrade_from (just for verification purposes, # to verify that instance with upgrade_from can process records written without upgrade_from) extra_properties = extra_properties.copy() - extra_properties['test.agg_produce_prefix'] = 'B' - extra_properties['test.expected_agg_prefixes'] = 'A,B,C' + extra_properties['test.agg_produce_value'] = 'B' + extra_properties['test.expected_agg_values'] = 'A,B,C' for p in self.processors[:-1]: self.do_stop_start_bounce(p, from_version[:-2], to_version, counter, extra_properties) counter = counter + 1 @@ -307,20 +312,20 @@ def test_rolling_upgrade_for_table_agg(self, from_version, to_version): self.stop_and_await() - def wait_for_table_agg_success(self, expected_prefixes): - agg_success_str = "Table aggregate processor saw expected prefixes: " + expected_prefixes - with self.processor1.node.account.monitor_log(self.processor1.LOG_FILE) as first_monitor: - with self.processor2.node.account.monitor_log(self.processor2.LOG_FILE) as second_monitor: - with self.processor3.node.account.monitor_log(self.processor3.LOG_FILE) as third_monitor: + def wait_for_table_agg_success(self, expected_values): + agg_success_str = "Table aggregate processor saw expected values. Seen: " + expected_values + with self.processor1.node.account.monitor_log(self.processor1.STDOUT_FILE) as first_monitor: + with self.processor2.node.account.monitor_log(self.processor2.STDOUT_FILE) as second_monitor: + with self.processor3.node.account.monitor_log(self.processor3.STDOUT_FILE) as third_monitor: first_monitor.wait_until(agg_success_str, timeout_sec=60, - err_msg="Could not verify table aggregate processor success for '" + expected_prefixes + "' in " + str(self.processor1.node.account)) + err_msg="Could not verify table aggregate processor success for '" + expected_values + "' in " + str(self.processor1.node.account)) second_monitor.wait_until(agg_success_str, timeout_sec=60, - err_msg="Could not verify table aggregate processor success for '" + expected_prefixes + "' in " + str(self.processor2.node.account)) + err_msg="Could not verify table aggregate processor success for '" + expected_values + "' in " + str(self.processor2.node.account)) third_monitor.wait_until(agg_success_str, timeout_sec=60, - err_msg="Could not verify table aggregate processor success for '" + expected_prefixes + "' in " + str(self.processor3.node.account)) + err_msg="Could not verify table aggregate processor success for '" + expected_values + "' in " + str(self.processor3.node.account)) @cluster(num_nodes=6) def test_version_probing_upgrade(self): From a70e6ba464cc6ef2196a7c2e419a308b627f41f1 Mon Sep 17 00:00:00 2001 From: "Matthias J. Sax" Date: Thu, 4 May 2023 17:41:53 -0700 Subject: [PATCH 3/3] Github comments --- .../kafka/streams/tests/SmokeTestDriver.java | 2 - .../kafka/streams/tests/SmokeTestUtil.java | 51 ++++++++++--------- .../kafka/streams/tests/SmokeTestDriver.java | 3 -- .../kafka/streams/tests/SmokeTestUtil.java | 51 ++++++++++--------- 4 files changed, 52 insertions(+), 55 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java index c1248e3b99afd..6a0abb0dbc110 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java @@ -98,8 +98,6 @@ private static class ValueList { values[i] = min + i; } // We want to randomize the order of data to test not completely predictable processing order - // However, values are also use as a timestamp of the record. (TODO: separate data and timestamp) - // We keep some correlation of time and order. Thus, the shuffling is done with a sliding window shuffle(values, 10); index = 0; diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java index c5d7ac9ff3b70..ba2371d6480f0 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java @@ -35,35 +35,15 @@ public class SmokeTestUtil { final static int END = Integer.MAX_VALUE; static ProcessorSupplier printTaskProcessorSupplier(final String topic) { - return printTaskProcessorSupplier(topic, ""); + return () -> new SmokeTestProcessor(topic); } static ProcessorSupplier printProcessorSupplier(final String topic) { return printProcessorSupplier(topic, ""); } - static ProcessorSupplier printTaskProcessorSupplier(final String topic, final String name) { - return () -> new ContextualProcessor() { - @Override - public void init(final ProcessorContext context) { - super.init(context); - System.out.println("[DEV] initializing processor: topic=" + topic + " taskId=" + context.taskId()); - System.out.flush(); - } - - @Override - public void process(final Record record) { } - - @Override - public void close() { - System.out.printf("Close processor for task %s%n", context().taskId()); - System.out.flush(); - } - }; - } - static ProcessorSupplier printProcessorSupplier(final String topic, final String name) { - return () -> new ContextualProcessor() { + return () -> new SmokeTestProcessor(topic) { private int numRecordsProcessed = 0; private long smallestOffset = Long.MAX_VALUE; private long largestOffset = Long.MIN_VALUE; @@ -71,8 +51,6 @@ static ProcessorSupplier printProcessorSupplier(fina @Override public void init(final ProcessorContext context) { super.init(context); - System.out.println("[DEV] initializing processor: topic=" + topic + " taskId=" + context.taskId()); - System.out.flush(); numRecordsProcessed = 0; smallestOffset = Long.MAX_VALUE; largestOffset = Long.MIN_VALUE; @@ -98,7 +76,7 @@ public void process(final Record record) { @Override public void close() { - System.out.printf("Close processor for task %s%n", context().taskId()); + super.close(); System.out.println("processed " + numRecordsProcessed + " records"); final long processed; if (largestOffset >= smallestOffset) { @@ -152,4 +130,27 @@ public static void sleep(final long duration) { } catch (final Exception ignore) { } } + private static class SmokeTestProcessor extends ContextualProcessor { + private final String topic; + + public SmokeTestProcessor(String topic) { + this.topic = topic; + } + + @Override + public void init(final ProcessorContext context) { + super.init(context); + System.out.println("[DEV] initializing processor: topic=" + topic + " taskId=" + context.taskId()); + System.out.flush(); + } + + @Override + public void process(final Record record) { } + + @Override + public void close() { + System.out.printf("Close processor for task %s%n", context().taskId()); + System.out.flush(); + } + } } diff --git a/streams/upgrade-system-tests-33/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java b/streams/upgrade-system-tests-33/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java index 63b41ff4c7cf0..acfb754bfd903 100644 --- a/streams/upgrade-system-tests-33/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java +++ b/streams/upgrade-system-tests-33/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java @@ -97,9 +97,6 @@ private static class ValueList { for (int i = 0; i < values.length; i++) { values[i] = min + i; } - // We want to randomize the order of data to test not completely predictable processing order - // However, values are also use as a timestamp of the record. (TODO: separate data and timestamp) - // We keep some correlation of time and order. Thus, the shuffling is done with a sliding window shuffle(values, 10); index = 0; diff --git a/streams/upgrade-system-tests-33/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java b/streams/upgrade-system-tests-33/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java index f64fe18b9994b..2ba3c29c198b5 100644 --- a/streams/upgrade-system-tests-33/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java +++ b/streams/upgrade-system-tests-33/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java @@ -35,35 +35,15 @@ public class SmokeTestUtil { final static int END = Integer.MAX_VALUE; static ProcessorSupplier printTaskProcessorSupplier(final String topic) { - return printTaskProcessorSupplier(topic, ""); + return () -> new SmokeTestProcessor(topic); } static ProcessorSupplier printProcessorSupplier(final String topic) { return printProcessorSupplier(topic, ""); } - static ProcessorSupplier printTaskProcessorSupplier(final String topic, final String name) { - return () -> new ContextualProcessor() { - @Override - public void init(final ProcessorContext context) { - super.init(context); - System.out.println("[3.3] initializing processor: topic=" + topic + " taskId=" + context.taskId()); - System.out.flush(); - } - - @Override - public void process(final Record record) {} - - @Override - public void close() { - System.out.printf("Close processor for task %s%n", context().taskId()); - System.out.flush(); - } - }; - } - static ProcessorSupplier printProcessorSupplier(final String topic, final String name) { - return () -> new ContextualProcessor() { + return () -> new SmokeTestProcessor(topic) { private int numRecordsProcessed = 0; private long smallestOffset = Long.MAX_VALUE; private long largestOffset = Long.MIN_VALUE; @@ -71,8 +51,6 @@ static ProcessorSupplier printProcessorSupplier(fina @Override public void init(final ProcessorContext context) { super.init(context); - System.out.println("[3.3] initializing processor: topic=" + topic + " taskId=" + context.taskId()); - System.out.flush(); numRecordsProcessed = 0; smallestOffset = Long.MAX_VALUE; largestOffset = Long.MIN_VALUE; @@ -98,7 +76,7 @@ public void process(final Record record) { @Override public void close() { - System.out.printf("Close processor for task %s%n", context().taskId()); + super.close(); System.out.println("processed " + numRecordsProcessed + " records"); final long processed; if (largestOffset >= smallestOffset) { @@ -152,4 +130,27 @@ public static void sleep(final long duration) { } catch (final Exception ignore) { } } + private static class SmokeTestProcessor extends ContextualProcessor { + private final String topic; + + public SmokeTestProcessor(String topic) { + this.topic = topic; + } + + @Override + public void init(final ProcessorContext context) { + super.init(context); + System.out.println("[3.3] initializing processor: topic=" + topic + " taskId=" + context.taskId()); + System.out.flush(); + } + + @Override + public void process(final Record record) {} + + @Override + public void close() { + System.out.printf("Close processor for task %s%n", context().taskId()); + System.out.flush(); + } + } }