diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/SmokeTestDriverIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/SmokeTestDriverIntegrationTest.java new file mode 100644 index 0000000000000..82f86c28b1dfb --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/integration/SmokeTestDriverIntegrationTest.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.integration; + +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; +import org.apache.kafka.streams.tests.SmokeTestClient; +import org.apache.kafka.streams.tests.SmokeTestDriver; +import org.junit.Assert; +import org.junit.ClassRule; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Map; +import java.util.Properties; +import java.util.Set; + +import static org.apache.kafka.streams.tests.SmokeTestDriver.generate; +import static org.apache.kafka.streams.tests.SmokeTestDriver.verify; + +public class SmokeTestDriverIntegrationTest { + @ClassRule + public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(3); + + + private static class Driver extends Thread { + private String bootstrapServers; + private int numKeys; + private int maxRecordsPerKey; + private Exception exception = null; + private SmokeTestDriver.VerificationResult result; + + private Driver(final String bootstrapServers, final int numKeys, final int maxRecordsPerKey) { + this.bootstrapServers = bootstrapServers; + this.numKeys = numKeys; + this.maxRecordsPerKey = maxRecordsPerKey; + } + + @Override + public void run() { + try { + final Map> allData = generate(bootstrapServers, numKeys, maxRecordsPerKey, true); + result = verify(bootstrapServers, allData, maxRecordsPerKey); + + } catch (final Exception ex) { + this.exception = ex; + } + } + + public Exception exception() { + return exception; + } + + SmokeTestDriver.VerificationResult result() { + return result; + } + + } + + @Test + public void shouldWorkWithRebalance() throws InterruptedException { + int numClientsCreated = 0; + final ArrayList clients = new ArrayList<>(); + + CLUSTER.createTopics(SmokeTestDriver.topics()); + + final String bootstrapServers = CLUSTER.bootstrapServers(); + final Driver driver = new Driver(bootstrapServers, 10, 1000); + driver.start(); + System.out.println("started driver"); + + + final Properties props = new Properties(); + props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + + // cycle out Streams instances as long as the test is running. + while (driver.isAlive()) { + // take a nap + Thread.sleep(1000); + + // add a new client + final SmokeTestClient smokeTestClient = new SmokeTestClient("streams-" + numClientsCreated++); + clients.add(smokeTestClient); + smokeTestClient.start(props); + + while (!clients.get(clients.size() - 1).started()) { + Thread.sleep(100); + } + + // let the oldest client die of "natural causes" + if (clients.size() >= 3) { + clients.remove(0).closeAsync(); + } + } + try { + // wait for verification to finish + driver.join(); + + + } finally { + // whether or not the assertions failed, tell all the streams instances to stop + for (final SmokeTestClient client : clients) { + client.closeAsync(); + } + + // then, wait for them to stop + for (final SmokeTestClient client : clients) { + client.close(); + } + } + + // check to make sure that it actually succeeded + if (driver.exception() != null) { + driver.exception().printStackTrace(); + throw new AssertionError(driver.exception()); + } + Assert.assertTrue(driver.result().result(), driver.result().passed()); + } + +} diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java index a396ad19fb9b3..a8f974a3cc0c6 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java @@ -16,89 +16,84 @@ */ package org.apache.kafka.streams.tests; -import java.time.Duration; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.utils.Bytes; -import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; -import org.apache.kafka.streams.kstream.Aggregator; +import org.apache.kafka.streams.Topology; +import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.kstream.Grouped; -import org.apache.kafka.streams.kstream.Initializer; import org.apache.kafka.streams.kstream.KGroupedStream; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.Materialized; -import org.apache.kafka.streams.kstream.Predicate; import org.apache.kafka.streams.kstream.Produced; import org.apache.kafka.streams.kstream.TimeWindows; -import org.apache.kafka.streams.kstream.ValueJoiner; -import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.Stores; import org.apache.kafka.streams.state.WindowStore; +import org.apache.kafka.test.TestUtils; +import java.time.Duration; import java.util.Properties; public class SmokeTestClient extends SmokeTestUtil { - private final Properties streamsProperties; + private final String name; private Thread thread; private KafkaStreams streams; private boolean uncaughtException = false; + private boolean started; - public SmokeTestClient(final Properties streamsProperties) { + public SmokeTestClient(final String name) { super(); - this.streamsProperties = streamsProperties; + this.name = name; } - public void start() { + public boolean started() { + return started; + } + + public void start(final Properties streamsProperties) { streams = createKafkaStreams(streamsProperties); - streams.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { - @Override - public void uncaughtException(final Thread t, final Throwable e) { - System.out.println("SMOKE-TEST-CLIENT-EXCEPTION"); - uncaughtException = true; - e.printStackTrace(); - } + streams.setUncaughtExceptionHandler((t, e) -> { + System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION"); + uncaughtException = true; + e.printStackTrace(); }); - Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { - @Override - public void run() { - close(); - } - })); + Runtime.getRuntime().addShutdownHook(new Thread(this::close)); - thread = new Thread() { - public void run() { - streams.start(); - } - }; + thread = new Thread(() -> streams.start()); thread.start(); } + public void closeAsync() { + streams.close(Duration.ZERO); + } + public void close() { streams.close(Duration.ofSeconds(5)); // do not remove these printouts since they are needed for health scripts if (!uncaughtException) { - System.out.println("SMOKE-TEST-CLIENT-CLOSED"); + System.out.println(name + ": SMOKE-TEST-CLIENT-CLOSED"); } try { thread.join(); } catch (final Exception ex) { // do not remove these printouts since they are needed for health scripts - System.out.println("SMOKE-TEST-CLIENT-EXCEPTION"); + System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION"); // ignore } } - private static Properties getStreamsConfig(final Properties props) { + private Properties getStreamsConfig(final Properties props) { final Properties fullProps = new Properties(props); fullProps.put(StreamsConfig.APPLICATION_ID_CONFIG, "SmokeTest"); + fullProps.put(StreamsConfig.CLIENT_ID_CONFIG, "SmokeTest-" + name); fullProps.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 3); fullProps.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 2); fullProps.put(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, 100); @@ -106,23 +101,35 @@ private static Properties getStreamsConfig(final Properties props) { fullProps.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3); fullProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); fullProps.put(ProducerConfig.ACKS_CONFIG, "all"); - + fullProps.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath()); fullProps.putAll(props); return fullProps; } - private static KafkaStreams createKafkaStreams(final Properties props) { + private KafkaStreams createKafkaStreams(final Properties props) { + final Topology build = getTopology(); + final KafkaStreams streamsClient = new KafkaStreams(build, getStreamsConfig(props)); + streamsClient.setStateListener((newState, oldState) -> { + System.out.printf("%s: %s -> %s%n", name, oldState, newState); + if (oldState == KafkaStreams.State.REBALANCING && newState == KafkaStreams.State.RUNNING) { + started = true; + } + }); + streamsClient.setUncaughtExceptionHandler((t, e) -> { + System.out.println(name + ": FATAL: An unexpected exception is encountered on thread " + t + ": " + e); + streamsClient.close(Duration.ofSeconds(30)); + }); + + return streamsClient; + } + + public Topology getTopology() { final StreamsBuilder builder = new StreamsBuilder(); final Consumed stringIntConsumed = Consumed.with(stringSerde, intSerde); final KStream source = builder.stream("data", stringIntConsumed); source.to("echo", Produced.with(stringSerde, intSerde)); - final KStream data = source.filter(new Predicate() { - @Override - public boolean test(final String key, final Integer value) { - return value == null || value != END; - } - }); - data.process(SmokeTestUtil.printProcessorSupplier("data")); + final KStream data = source.filter((key, value) -> value == null || value != END); + data.process(SmokeTestUtil.printProcessorSupplier("data", name)); // min final KGroupedStream groupedData = data.groupByKey(Grouped.with(stringSerde, intSerde)); @@ -130,97 +137,66 @@ public boolean test(final String key, final Integer value) { groupedData .windowedBy(TimeWindows.of(Duration.ofDays(1))) .aggregate( - new Initializer() { - public Integer apply() { - return Integer.MAX_VALUE; - } - }, - new Aggregator() { - @Override - public Integer apply(final String aggKey, final Integer value, final Integer aggregate) { - return (value < aggregate) ? value : aggregate; - } - }, + () -> Integer.MAX_VALUE, + (aggKey, value, aggregate) -> (value < aggregate) ? value : aggregate, Materialized.>as("uwin-min").withValueSerde(intSerde)) - .toStream(new Unwindow()) + .toStream(new Unwindow<>()) .to("min", Produced.with(stringSerde, intSerde)); final KTable minTable = builder.table( "min", Consumed.with(stringSerde, intSerde), - Materialized.>as("minStoreName")); - minTable.toStream().process(SmokeTestUtil.printProcessorSupplier("min")); + Materialized.as("minStoreName")); + minTable.toStream().process(SmokeTestUtil.printProcessorSupplier("min", name)); // max groupedData .windowedBy(TimeWindows.of(Duration.ofDays(2))) .aggregate( - new Initializer() { - public Integer apply() { - return Integer.MIN_VALUE; - } - }, - new Aggregator() { - @Override - public Integer apply(final String aggKey, final Integer value, final Integer aggregate) { - return (value > aggregate) ? value : aggregate; - } - }, + () -> Integer.MIN_VALUE, + (aggKey, value, aggregate) -> (value > aggregate) ? value : aggregate, Materialized.>as("uwin-max").withValueSerde(intSerde)) - .toStream(new Unwindow()) + .toStream(new Unwindow<>()) .to("max", Produced.with(stringSerde, intSerde)); final KTable maxTable = builder.table( "max", Consumed.with(stringSerde, intSerde), - Materialized.>as("maxStoreName")); - maxTable.toStream().process(SmokeTestUtil.printProcessorSupplier("max")); + Materialized.as("maxStoreName")); + maxTable.toStream().process(SmokeTestUtil.printProcessorSupplier("max", name)); // sum groupedData .windowedBy(TimeWindows.of(Duration.ofDays(2))) .aggregate( - new Initializer() { - public Long apply() { - return 0L; - } - }, - new Aggregator() { - @Override - public Long apply(final String aggKey, final Integer value, final Long aggregate) { - return (long) value + aggregate; - } - }, + () -> 0L, + (aggKey, value, aggregate) -> (long) value + aggregate, Materialized.>as("win-sum").withValueSerde(longSerde)) - .toStream(new Unwindow()) + .toStream(new Unwindow<>()) .to("sum", Produced.with(stringSerde, longSerde)); final Consumed stringLongConsumed = Consumed.with(stringSerde, longSerde); final KTable sumTable = builder.table("sum", stringLongConsumed); - sumTable.toStream().process(SmokeTestUtil.printProcessorSupplier("sum")); + sumTable.toStream().process(SmokeTestUtil.printProcessorSupplier("sum", name)); // cnt groupedData .windowedBy(TimeWindows.of(Duration.ofDays(2))) - .count(Materialized.>as("uwin-cnt")) - .toStream(new Unwindow()) + .count(Materialized.as("uwin-cnt")) + .toStream(new Unwindow<>()) .to("cnt", Produced.with(stringSerde, longSerde)); final KTable cntTable = builder.table( "cnt", Consumed.with(stringSerde, longSerde), - Materialized.>as("cntStoreName")); - cntTable.toStream().process(SmokeTestUtil.printProcessorSupplier("cnt")); + Materialized.as("cntStoreName")); + cntTable.toStream().process(SmokeTestUtil.printProcessorSupplier("cnt", name)); // dif maxTable .join( minTable, - new ValueJoiner() { - public Integer apply(final Integer value1, final Integer value2) { - return value1 - value2; - } - }) + (value1, value2) -> value1 - value2) .toStream() .to("dif", Produced.with(stringSerde, intSerde)); @@ -228,33 +204,20 @@ public Integer apply(final Integer value1, final Integer value2) { sumTable .join( cntTable, - new ValueJoiner() { - public Double apply(final Long value1, final Long value2) { - return (double) value1 / (double) value2; - } - }) + (value1, value2) -> (double) value1 / (double) value2) .toStream() .to("avg", Produced.with(stringSerde, doubleSerde)); // test repartition final Agg agg = new Agg(); cntTable.groupBy(agg.selector(), Grouped.with(stringSerde, longSerde)) - .aggregate(agg.init(), agg.adder(), agg.remover(), - Materialized.as(Stores.inMemoryKeyValueStore("cntByCnt")) - .withKeySerde(Serdes.String()) - .withValueSerde(Serdes.Long())) - .toStream() - .to("tagg", Produced.with(stringSerde, longSerde)); - - final KafkaStreams streamsClient = new KafkaStreams(builder.build(), getStreamsConfig(props)); - streamsClient.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { - @Override - public void uncaughtException(final Thread t, final Throwable e) { - System.out.println("FATAL: An unexpected exception is encountered on thread " + t + ": " + e); - streamsClient.close(Duration.ofSeconds(30)); - } - }); - - return streamsClient; + .aggregate(agg.init(), agg.adder(), agg.remover(), + Materialized.as(Stores.inMemoryKeyValueStore("cntByCnt")) + .withKeySerde(Serdes.String()) + .withValueSerde(Serdes.Long())) + .toStream() + .to("tagg", Produced.with(stringSerde, longSerde)); + + return builder.build(); } } diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java index 078cbe4b5ccfe..de40818288666 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java @@ -28,29 +28,51 @@ import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.TimeoutException; -import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.utils.Exit; import org.apache.kafka.common.utils.Utils; -import org.apache.kafka.streams.StreamsConfig; -import org.apache.kafka.test.TestUtils; -import java.io.File; +import java.io.ByteArrayOutputStream; +import java.io.PrintStream; +import java.nio.charset.StandardCharsets; import java.time.Duration; +import java.time.Instant; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Properties; import java.util.Random; import java.util.Set; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.Stream; -public class SmokeTestDriver extends SmokeTestUtil { +import static java.util.Collections.emptyMap; +import static org.apache.kafka.common.utils.Utils.mkEntry; - private static final int MAX_RECORD_EMPTY_RETRIES = 60; +public class SmokeTestDriver extends SmokeTestUtil { + private static final String[] TOPICS = new String[] { + "data", + "echo", + "max", + "min", "min-suppressed", "min-raw", + "dif", + "sum", + "cnt", + "avg", + "tagg" + }; + + private static final int MAX_RECORD_EMPTY_RETRIES = 30; private static class ValueList { public final String key; @@ -77,69 +99,8 @@ int next() { } } - // This main() is not used by the system test. It is intended to be used for local debugging. - public static void main(final String[] args) throws InterruptedException { - final String kafka = "localhost:9092"; - final File stateDir = TestUtils.tempDirectory(); - - final int numKeys = 20; - final int maxRecordsPerKey = 1000; - - final Thread driver = new Thread(() -> { - try { - final Map> allData = generate(kafka, numKeys, maxRecordsPerKey); - verify(kafka, allData, maxRecordsPerKey); - } catch (final Exception ex) { - ex.printStackTrace(); - } - }); - - final Properties props = new Properties(); - props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka); - props.put(StreamsConfig.STATE_DIR_CONFIG, createDir(stateDir, "1").getAbsolutePath()); - final SmokeTestClient streams1 = new SmokeTestClient(props); - props.put(StreamsConfig.STATE_DIR_CONFIG, createDir(stateDir, "2").getAbsolutePath()); - final SmokeTestClient streams2 = new SmokeTestClient(props); - props.put(StreamsConfig.STATE_DIR_CONFIG, createDir(stateDir, "3").getAbsolutePath()); - final SmokeTestClient streams3 = new SmokeTestClient(props); - props.put(StreamsConfig.STATE_DIR_CONFIG, createDir(stateDir, "4").getAbsolutePath()); - final SmokeTestClient streams4 = new SmokeTestClient(props); - - System.out.println("starting the driver"); - driver.start(); - - System.out.println("starting the first and second client"); - streams1.start(); - streams2.start(); - - sleep(10000); - - System.out.println("starting the third client"); - streams3.start(); - - System.out.println("closing the first client"); - streams1.close(); - System.out.println("closed the first client"); - - sleep(10000); - - System.out.println("starting the forth client"); - streams4.start(); - - driver.join(); - - System.out.println("driver stopped"); - streams2.close(); - streams3.close(); - streams4.close(); - - System.out.println("shutdown"); - } - - public static Map> generate(final String kafka, - final int numKeys, - final int maxRecordsPerKey) { - return generate(kafka, numKeys, maxRecordsPerKey, true); + public static String[] topics() { + return Arrays.copyOf(TOPICS, TOPICS.length); } public static Map> generate(final String kafka, @@ -183,7 +144,11 @@ public static Map> generate(final String kafka, } else { final ProducerRecord record = - new ProducerRecord<>("data", stringSerde.serializer().serialize("", key), intSerde.serializer().serialize("", value)); + new ProducerRecord<>( + "data", + stringSerde.serializer().serialize("", key), + intSerde.serializer().serialize("", value) + ); producer.send(record, new TestCallback(record, needRetry)); @@ -201,6 +166,7 @@ public static Map> generate(final String kafka, while (!needRetry.isEmpty()) { final List> needRetry2 = new ArrayList<>(); for (final ProducerRecord record : needRetry) { + System.out.println("retry producing " + stringSerde.deserializer().deserialize("", record.key())); producer.send(record, new TestCallback(record, needRetry2)); } producer.flush(); @@ -239,7 +205,7 @@ public void onCompletion(final RecordMetadata metadata, final Exception exceptio } } - private static void shuffle(final int[] data, final int windowSize) { + private static void shuffle(final int[] data, @SuppressWarnings("SameParameterValue") final int windowSize) { final Random rand = new Random(); for (int i = 0; i < data.length; i++) { // we shuffle data within windowSize @@ -252,93 +218,104 @@ private static void shuffle(final int[] data, final int windowSize) { } } - public static void verify(final String kafka, final Map> allData, final int maxRecordsPerKey) { + public static class NumberDeserializer implements Deserializer { + + @Override + public void configure(final Map configs, final boolean isKey) { + + } + + @Override + public Number deserialize(final String topic, final byte[] data) { + final Number value; + switch (topic) { + case "data": + case "echo": + case "min": + case "min-raw": + case "min-suppressed": + case "max": + case "dif": + value = intSerde.deserializer().deserialize(topic, data); + break; + case "sum": + case "cnt": + case "tagg": + value = longSerde.deserializer().deserialize(topic, data); + break; + case "avg": + value = doubleSerde.deserializer().deserialize(topic, data); + break; + default: + throw new RuntimeException("unknown topic: " + topic); + } + return value; + } + + @Override + public void close() { + + } + } + + public static VerificationResult verify(final String kafka, + final Map> inputs, + final int maxRecordsPerKey) { final Properties props = new Properties(); props.put(ConsumerConfig.CLIENT_ID_CONFIG, "verifier"); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka); - props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); - props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, NumberDeserializer.class); - final KafkaConsumer consumer = new KafkaConsumer<>(props); - final List partitions = getAllPartitions(consumer, "echo", "max", "min", "dif", "sum", "cnt", "avg", "wcnt", "tagg"); + final KafkaConsumer consumer = new KafkaConsumer<>(props); + final List partitions = getAllPartitions(consumer, TOPICS); consumer.assign(partitions); consumer.seekToBeginning(partitions); - final int recordsGenerated = allData.size() * maxRecordsPerKey; + final int recordsGenerated = inputs.size() * maxRecordsPerKey; int recordsProcessed = 0; + final Map processed = + Stream.of(TOPICS) + .collect(Collectors.toMap(t -> t, t -> new AtomicInteger(0))); - final HashMap max = new HashMap<>(); - final HashMap min = new HashMap<>(); - final HashMap dif = new HashMap<>(); - final HashMap sum = new HashMap<>(); - final HashMap cnt = new HashMap<>(); - final HashMap avg = new HashMap<>(); - final HashMap wcnt = new HashMap<>(); - final HashMap tagg = new HashMap<>(); - - final HashSet keys = new HashSet<>(); - final HashMap> received = new HashMap<>(); - for (final String key : allData.keySet()) { - keys.add(key); - received.put(key, new HashSet()); - } + final Map>>> events = new HashMap<>(); + + VerificationResult verificationResult = new VerificationResult(false, "no results yet"); int retry = 0; final long start = System.currentTimeMillis(); while (System.currentTimeMillis() - start < TimeUnit.MINUTES.toMillis(6)) { - final ConsumerRecords records = consumer.poll(Duration.ofMillis(500)); + final ConsumerRecords records = consumer.poll(Duration.ofSeconds(1)); if (records.isEmpty() && recordsProcessed >= recordsGenerated) { - if (verifyMin(min, allData, false) - && verifyMax(max, allData, false) - && verifyDif(dif, allData, false) - && verifySum(sum, allData, false) - && verifyCnt(cnt, allData, false) - && verifyAvg(avg, allData, false) - && verifyTAgg(tagg, allData, false)) { + verificationResult = verifyAll(inputs, events); + if (verificationResult.passed()) { break; - } - if (retry++ > MAX_RECORD_EMPTY_RETRIES) { + } else if (retry++ > MAX_RECORD_EMPTY_RETRIES) { + System.out.println(Instant.now() + " Didn't get any more results, verification hasn't passed, and out of retries."); break; + } else { + System.out.println(Instant.now() + " Didn't get any more results, but verification hasn't passed (yet). Retrying..."); } } else { - for (final ConsumerRecord record : records) { - final String key = stringSerde.deserializer().deserialize("", record.key()); - switch (record.topic()) { - case "echo": - final Integer value = intSerde.deserializer().deserialize("", record.value()); - recordsProcessed++; - if (recordsProcessed % 100 == 0) { - System.out.println("Echo records processed = " + recordsProcessed); - } - received.get(key).add(value); - break; - case "min": - min.put(key, intSerde.deserializer().deserialize("", record.value())); - break; - case "max": - max.put(key, intSerde.deserializer().deserialize("", record.value())); - break; - case "dif": - dif.put(key, intSerde.deserializer().deserialize("", record.value())); - break; - case "sum": - sum.put(key, longSerde.deserializer().deserialize("", record.value())); - break; - case "cnt": - cnt.put(key, longSerde.deserializer().deserialize("", record.value())); - break; - case "avg": - avg.put(key, doubleSerde.deserializer().deserialize("", record.value())); - break; - case "wcnt": - wcnt.put(key, longSerde.deserializer().deserialize("", record.value())); - break; - case "tagg": - tagg.put(key, longSerde.deserializer().deserialize("", record.value())); - break; - default: - System.out.println("unknown topic: " + record.topic()); + retry = 0; + for (final ConsumerRecord record : records) { + final String key = record.key(); + + final String topic = record.topic(); + processed.get(topic).incrementAndGet(); + + if (topic.equals("echo")) { + recordsProcessed++; + if (recordsProcessed % 100 == 0) { + System.out.println("Echo records processed = " + recordsProcessed); + } } + + events.computeIfAbsent(topic, t -> new HashMap<>()) + .computeIfAbsent(key, k -> new LinkedList<>()) + .add(record); } + + System.out.println(processed); } } consumer.close(); @@ -357,256 +334,167 @@ && verifyTAgg(tagg, allData, false)) { } boolean success; - success = allData.equals(received); + + final Map> received = + events.get("echo") + .entrySet() + .stream() + .map(entry -> mkEntry( + entry.getKey(), + entry.getValue().stream().map(ConsumerRecord::value).collect(Collectors.toSet())) + ) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + + success = inputs.equals(received); if (success) { System.out.println("ALL-RECORDS-DELIVERED"); } else { int missedCount = 0; - for (final Map.Entry> entry : allData.entrySet()) { + for (final Map.Entry> entry : inputs.entrySet()) { missedCount += received.get(entry.getKey()).size(); } System.out.println("missedRecords=" + missedCount); } - success &= verifyMin(min, allData, true); - success &= verifyMax(max, allData, true); - success &= verifyDif(dif, allData, true); - success &= verifySum(sum, allData, true); - success &= verifyCnt(cnt, allData, true); - success &= verifyAvg(avg, allData, true); - success &= verifyTAgg(tagg, allData, true); + // give it one more try if it's not already passing. + if (!verificationResult.passed()) { + verificationResult = verifyAll(inputs, events); + } + success &= verificationResult.passed(); + + System.out.println(verificationResult.result()); System.out.println(success ? "SUCCESS" : "FAILURE"); + return verificationResult; } - private static boolean verifyMin(final Map map, final Map> allData, final boolean print) { - if (map.isEmpty()) { - if (print) { - System.out.println("min is empty"); - } - return false; - } else { - if (print) { - System.out.println("verifying min"); - } + public static class VerificationResult { + private final boolean passed; + private final String result; - if (map.size() != allData.size()) { - if (print) { - System.out.println("fail: resultCount=" + map.size() + " expectedCount=" + allData.size()); - } - return false; - } - for (final Map.Entry entry : map.entrySet()) { - final int expected = getMin(entry.getKey()); - if (expected != entry.getValue()) { - if (print) { - System.out.println("fail: key=" + entry.getKey() + " min=" + entry.getValue() + " expected=" + expected); - } - return false; - } - } + VerificationResult(final boolean passed, final String result) { + this.passed = passed; + this.result = result; } - return true; - } - private static boolean verifyMax(final Map map, final Map> allData, final boolean print) { - if (map.isEmpty()) { - if (print) { - System.out.println("max is empty"); - } - return false; - } else { - if (print) { - System.out.println("verifying max"); - } + public boolean passed() { + return passed; + } - if (map.size() != allData.size()) { - if (print) { - System.out.println("fail: resultCount=" + map.size() + " expectedCount=" + allData.size()); - } - return false; - } - for (final Map.Entry entry : map.entrySet()) { - final int expected = getMax(entry.getKey()); - if (expected != entry.getValue()) { - if (print) { - System.out.println("fail: key=" + entry.getKey() + " max=" + entry.getValue() + " expected=" + expected); - } - return false; - } - } + public String result() { + return result; } - return true; } - private static boolean verifyDif(final Map map, final Map> allData, final boolean print) { - if (map.isEmpty()) { - if (print) { - System.out.println("dif is empty"); - } - return false; - } else { - if (print) { - System.out.println("verifying dif"); - } - - if (map.size() != allData.size()) { - if (print) { - System.out.println("fail: resultCount=" + map.size() + " expectedCount=" + allData.size()); - } - return false; - } - for (final Map.Entry entry : map.entrySet()) { - final int min = getMin(entry.getKey()); - final int max = getMax(entry.getKey()); - final int expected = max - min; - if (entry.getValue() == null || expected != entry.getValue()) { - if (print) { - System.out.println("fail: key=" + entry.getKey() + " dif=" + entry.getValue() + " expected=" + expected); - } - return false; - } - } + private static VerificationResult verifyAll(final Map> inputs, + final Map>>> events) { + final ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); + boolean pass; + try (final PrintStream resultStream = new PrintStream(byteArrayOutputStream)) { + pass = verifyTAgg(resultStream, inputs, events.get("tagg")); + pass &= verify(resultStream, "min", inputs, events, SmokeTestDriver::getMin); + pass &= verify(resultStream, "max", inputs, events, SmokeTestDriver::getMax); + pass &= verify(resultStream, "dif", inputs, events, key -> getMax(key).intValue() - getMin(key).intValue()); + pass &= verify(resultStream, "sum", inputs, events, SmokeTestDriver::getSum); + pass &= verify(resultStream, "cnt", inputs, events, key1 -> getMax(key1).intValue() - getMin(key1).intValue() + 1L); + pass &= verify(resultStream, "avg", inputs, events, SmokeTestDriver::getAvg); } - return true; + return new VerificationResult(pass, new String(byteArrayOutputStream.toByteArray(), StandardCharsets.UTF_8)); } - private static boolean verifyCnt(final Map map, final Map> allData, final boolean print) { - if (map.isEmpty()) { - if (print) { - System.out.println("cnt is empty"); - } + private static boolean verify(final PrintStream resultStream, + final String topic, + final Map> inputData, + final Map>>> events, + final Function keyToExpectation) { + final Map>> observedInputEvents = events.get("data"); + final Map>> outputEvents = events.getOrDefault(topic, emptyMap()); + if (outputEvents.isEmpty()) { + resultStream.println(topic + " is empty"); return false; } else { - if (print) { - System.out.println("verifying cnt"); - } + resultStream.printf("verifying %s with %d keys%n", topic, outputEvents.size()); - if (map.size() != allData.size()) { - if (print) { - System.out.println("fail: resultCount=" + map.size() + " expectedCount=" + allData.size()); - } + if (outputEvents.size() != inputData.size()) { + resultStream.printf("fail: resultCount=%d expectedCount=%s%n\tresult=%s%n\texpected=%s%n", + outputEvents.size(), inputData.size(), outputEvents.keySet(), inputData.keySet()); return false; } - for (final Map.Entry entry : map.entrySet()) { - final int min = getMin(entry.getKey()); - final int max = getMax(entry.getKey()); - final long expected = (max - min) + 1L; - if (expected != entry.getValue()) { - if (print) { - System.out.println("fail: key=" + entry.getKey() + " cnt=" + entry.getValue() + " expected=" + expected); - } + for (final Map.Entry>> entry : outputEvents.entrySet()) { + final String key = entry.getKey(); + final Number expected = keyToExpectation.apply(key); + final Number actual = entry.getValue().getLast().value(); + if (!expected.equals(actual)) { + resultStream.printf("%s fail: key=%s actual=%s expected=%s%n\t inputEvents=%n%s%n\toutputEvents=%n%s%n", + topic, + key, + actual, + expected, + indent("\t\t", observedInputEvents.get(key)), + indent("\t\t", entry.getValue())); return false; } } + return true; } - return true; } - private static boolean verifySum(final Map map, final Map> allData, final boolean print) { - if (map.isEmpty()) { - if (print) { - System.out.println("sum is empty"); - } - return false; - } else { - if (print) { - System.out.println("verifying sum"); - } - - if (map.size() != allData.size()) { - if (print) { - System.out.println("fail: resultCount=" + map.size() + " expectedCount=" + allData.size()); - } - return false; - } - for (final Map.Entry entry : map.entrySet()) { - final int min = getMin(entry.getKey()); - final int max = getMax(entry.getKey()); - final long expected = ((long) min + (long) max) * (max - min + 1L) / 2L; - if (expected != entry.getValue()) { - if (print) { - System.out.println("fail: key=" + entry.getKey() + " sum=" + entry.getValue() + " expected=" + expected); - } - return false; - } - } + private static String indent(@SuppressWarnings("SameParameterValue") final String prefix, + final LinkedList> list) { + final StringBuilder stringBuilder = new StringBuilder(); + for (final ConsumerRecord record : list) { + stringBuilder.append(prefix).append(record).append('\n'); } - return true; + return stringBuilder.toString(); } - private static boolean verifyAvg(final Map map, final Map> allData, final boolean print) { - if (map.isEmpty()) { - if (print) { - System.out.println("avg is empty"); - } - return false; - } else { - if (print) { - System.out.println("verifying avg"); - } + private static Long getSum(final String key) { + final int min = getMin(key).intValue(); + final int max = getMax(key).intValue(); + return ((long) min + (long) max) * (max - min + 1L) / 2L; + } - if (map.size() != allData.size()) { - if (print) { - System.out.println("fail: resultCount=" + map.size() + " expectedCount=" + allData.size()); - } - return false; - } - for (final Map.Entry entry : map.entrySet()) { - final int min = getMin(entry.getKey()); - final int max = getMax(entry.getKey()); - final double expected = ((long) min + (long) max) / 2.0; - - if (entry.getValue() == null || expected != entry.getValue()) { - if (print) { - System.out.println("fail: key=" + entry.getKey() + " avg=" + entry.getValue() + " expected=" + expected); - } - return false; - } - } - } - return true; + private static Double getAvg(final String key) { + final int min = getMin(key).intValue(); + final int max = getMax(key).intValue(); + return ((long) min + (long) max) / 2.0; } - private static boolean verifyTAgg(final Map map, final Map> allData, final boolean print) { - if (map.isEmpty()) { - if (print) { - System.out.println("tagg is empty"); - } + private static boolean verifyTAgg(final PrintStream resultStream, + final Map> allData, + final Map>> taggEvents) { + if (taggEvents == null) { + resultStream.println("tagg is missing"); + return false; + } else if (taggEvents.isEmpty()) { + resultStream.println("tagg is empty"); return false; } else { - if (print) { - System.out.println("verifying tagg"); - } + resultStream.println("verifying tagg"); // generate expected answer final Map expected = new HashMap<>(); for (final String key : allData.keySet()) { - final int min = getMin(key); - final int max = getMax(key); + final int min = getMin(key).intValue(); + final int max = getMax(key).intValue(); final String cnt = Long.toString(max - min + 1L); - if (expected.containsKey(cnt)) { - expected.put(cnt, expected.get(cnt) + 1L); - } else { - expected.put(cnt, 1L); - } + expected.put(cnt, expected.getOrDefault(cnt, 0L) + 1); } // check the result - for (final Map.Entry entry : map.entrySet()) { + for (final Map.Entry>> entry : taggEvents.entrySet()) { final String key = entry.getKey(); Long expectedCount = expected.remove(key); if (expectedCount == null) { expectedCount = 0L; } - if (entry.getValue().longValue() != expectedCount.longValue()) { - if (print) { - System.out.println("fail: key=" + key + " tagg=" + entry.getValue() + " expected=" + expected.get(key)); - } + if (entry.getValue().getLast().value().longValue() != expectedCount) { + resultStream.println("fail: key=" + key + " tagg=" + entry.getValue() + " expected=" + expected.get(key)); + resultStream.println("\t outputEvents: " + entry.getValue()); return false; } } @@ -615,26 +503,14 @@ private static boolean verifyTAgg(final Map map, final Map getAllPartitions(final KafkaConsumer consumer, final String... topics) { final ArrayList partitions = new ArrayList<>(); diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java index 9e62e3fc9aecb..aa58d44c4d461 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java @@ -29,16 +29,17 @@ import org.apache.kafka.streams.processor.ProcessorSupplier; import java.io.File; +import java.time.Instant; public class SmokeTestUtil { final static int END = Integer.MAX_VALUE; static ProcessorSupplier printProcessorSupplier(final String topic) { - return printProcessorSupplier(topic, false); + return printProcessorSupplier(topic, ""); } - private static ProcessorSupplier printProcessorSupplier(final String topic, final boolean printOffset) { + static ProcessorSupplier printProcessorSupplier(final String topic, final String name) { return new ProcessorSupplier() { @Override public Processor get() { @@ -56,7 +57,7 @@ public void init(final ProcessorContext context) { public void process(final Object key, final Object value) { numRecordsProcessed++; if (numRecordsProcessed % 100 == 0) { - System.out.println(System.currentTimeMillis()); + System.out.printf("%s: %s%n", name, Instant.now()); System.out.println("processed " + numRecordsProcessed + " records from topic=" + topic); } } diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsSmokeTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsSmokeTest.java index f34471be72574..4c2f6d20d8459 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsSmokeTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsSmokeTest.java @@ -23,6 +23,7 @@ import java.util.Map; import java.util.Properties; import java.util.Set; +import java.util.UUID; public class StreamsSmokeTest { @@ -56,9 +57,6 @@ public static void main(final String[] args) throws InterruptedException, IOExce System.out.println("disableAutoTerminate=" + disableAutoTerminate); switch (command) { - case "standalone": - SmokeTestDriver.main(args); - break; case "run": // this starts the driver (data generation and result verification) final int numKeys = 10; @@ -66,14 +64,14 @@ public static void main(final String[] args) throws InterruptedException, IOExce if (disableAutoTerminate) { SmokeTestDriver.generate(kafka, numKeys, maxRecordsPerKey, false); } else { - final Map> allData = SmokeTestDriver.generate(kafka, numKeys, maxRecordsPerKey); + final Map> allData = SmokeTestDriver.generate(kafka, numKeys, maxRecordsPerKey, true); SmokeTestDriver.verify(kafka, allData, maxRecordsPerKey); } break; case "process": // this starts a KafkaStreams client - final SmokeTestClient client = new SmokeTestClient(streamsProperties); - client.start(); + final SmokeTestClient client = new SmokeTestClient(UUID.randomUUID().toString()); + client.start(streamsProperties); break; case "close-deadlock-test": final ShutdownDeadlockTest test = new ShutdownDeadlockTest(kafka);