From 1510910bbbd891a23ed6148f5e0f9f4ee776b149 Mon Sep 17 00:00:00 2001 From: John Roesler Date: Sat, 2 Feb 2019 09:25:39 -0600 Subject: [PATCH 01/14] MINOR: add test for StreamsSmokeTestDriver Also, add more output for debuggability --- .../utils/EmbeddedKafkaCluster.java | 10 +- .../kafka/streams/tests/SmokeTestClient.java | 173 ++++------- .../kafka/streams/tests/SmokeTestDriver.java | 292 ++++++++++-------- .../streams/tests/SmokeTestDriverTest.java | 106 +++++++ .../kafka/streams/tests/SmokeTestUtil.java | 7 +- .../kafka/streams/tests/StreamsSmokeTest.java | 11 +- 6 files changed, 356 insertions(+), 243 deletions(-) create mode 100644 streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriverTest.java diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java index 3aff0a2336ab9..d90a63e6722c1 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java @@ -42,7 +42,7 @@ /** * Runs an in-memory, "embedded" Kafka cluster with 1 ZooKeeper instance and supplied number of Kafka brokers. */ -public class EmbeddedKafkaCluster extends ExternalResource { +public class EmbeddedKafkaCluster extends ExternalResource implements AutoCloseable { private static final Logger log = LoggerFactory.getLogger(EmbeddedKafkaCluster.class); private static final int DEFAULT_BROKER_PORT = 0; // 0 results in a random port being selected @@ -115,7 +115,7 @@ private void putIfAbsent(final Properties props, final String propertyKey, final /** * Stop the Kafka cluster. */ - private void stop() { + public void stop() { for (final KafkaEmbedded broker : brokers) { broker.stop(); } @@ -151,6 +151,11 @@ protected void after() { stop(); } + @Override + public void close() { + stop(); + } + /** * Create multiple Kafka topics each with 1 partition and a replication factor of 1. * @@ -298,6 +303,7 @@ public void waitForRemainingTopics(final long timeoutMs, final String... topics) TestUtils.waitForCondition(new TopicsRemainingCondition(topics), timeoutMs, "Topics are not expected after " + timeoutMs + " milli seconds."); } + private final class TopicsDeletedCondition implements TestCondition { final Set deletedTopics = new HashSet<>(); diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java index a396ad19fb9b3..d0aee00a82825 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java @@ -16,89 +16,86 @@ */ package org.apache.kafka.streams.tests; -import java.time.Duration; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.utils.Bytes; -import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; -import org.apache.kafka.streams.kstream.Aggregator; +import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.kstream.Grouped; -import org.apache.kafka.streams.kstream.Initializer; import org.apache.kafka.streams.kstream.KGroupedStream; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.Materialized; -import org.apache.kafka.streams.kstream.Predicate; import org.apache.kafka.streams.kstream.Produced; import org.apache.kafka.streams.kstream.TimeWindows; -import org.apache.kafka.streams.kstream.ValueJoiner; -import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.Stores; import org.apache.kafka.streams.state.WindowStore; +import org.apache.kafka.test.TestUtils; +import java.time.Duration; import java.util.Properties; -public class SmokeTestClient extends SmokeTestUtil { +public class SmokeTestClient extends SmokeTestUtil implements AutoCloseable { - private final Properties streamsProperties; + private final String name; private Thread thread; private KafkaStreams streams; private boolean uncaughtException = false; + private boolean started; - public SmokeTestClient(final Properties streamsProperties) { + SmokeTestClient(final String name) { super(); - this.streamsProperties = streamsProperties; + this.name = name; } - public void start() { + public boolean started() { + return started; + } + + public void start(final Properties streamsProperties) { streams = createKafkaStreams(streamsProperties); - streams.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { - @Override - public void uncaughtException(final Thread t, final Throwable e) { - System.out.println("SMOKE-TEST-CLIENT-EXCEPTION"); - uncaughtException = true; - e.printStackTrace(); - } + streams.setUncaughtExceptionHandler((t, e) -> { + System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION"); + System.out.println("SMOKE-TEST-CLIENT-EXCEPTION"); + uncaughtException = true; + e.printStackTrace(); }); - Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { - @Override - public void run() { - close(); - } - })); + Runtime.getRuntime().addShutdownHook(new Thread(this::close)); - thread = new Thread() { - public void run() { - streams.start(); - } - }; + thread = new Thread(() -> streams.start()); thread.start(); } + void closeAsync() { + streams.close(Duration.ZERO); + } + public void close() { streams.close(Duration.ofSeconds(5)); // do not remove these printouts since they are needed for health scripts if (!uncaughtException) { + System.out.println(name + ": SMOKE-TEST-CLIENT-CLOSED"); System.out.println("SMOKE-TEST-CLIENT-CLOSED"); } try { thread.join(); } catch (final Exception ex) { // do not remove these printouts since they are needed for health scripts + System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION"); System.out.println("SMOKE-TEST-CLIENT-EXCEPTION"); // ignore } } - private static Properties getStreamsConfig(final Properties props) { + private Properties getStreamsConfig(final Properties props) { final Properties fullProps = new Properties(props); fullProps.put(StreamsConfig.APPLICATION_ID_CONFIG, "SmokeTest"); + fullProps.put(StreamsConfig.CLIENT_ID_CONFIG, "SmokeTest-" + name); fullProps.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 3); fullProps.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 2); fullProps.put(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, 100); @@ -106,23 +103,18 @@ private static Properties getStreamsConfig(final Properties props) { fullProps.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3); fullProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); fullProps.put(ProducerConfig.ACKS_CONFIG, "all"); - + fullProps.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath()); fullProps.putAll(props); return fullProps; } - private static KafkaStreams createKafkaStreams(final Properties props) { + private KafkaStreams createKafkaStreams(final Properties props) { final StreamsBuilder builder = new StreamsBuilder(); final Consumed stringIntConsumed = Consumed.with(stringSerde, intSerde); final KStream source = builder.stream("data", stringIntConsumed); source.to("echo", Produced.with(stringSerde, intSerde)); - final KStream data = source.filter(new Predicate() { - @Override - public boolean test(final String key, final Integer value) { - return value == null || value != END; - } - }); - data.process(SmokeTestUtil.printProcessorSupplier("data")); + final KStream data = source.filter((key, value) -> value == null || value != END); + data.process(SmokeTestUtil.printProcessorSupplier("data", name)); // min final KGroupedStream groupedData = data.groupByKey(Grouped.with(stringSerde, intSerde)); @@ -130,97 +122,66 @@ public boolean test(final String key, final Integer value) { groupedData .windowedBy(TimeWindows.of(Duration.ofDays(1))) .aggregate( - new Initializer() { - public Integer apply() { - return Integer.MAX_VALUE; - } - }, - new Aggregator() { - @Override - public Integer apply(final String aggKey, final Integer value, final Integer aggregate) { - return (value < aggregate) ? value : aggregate; - } - }, + () -> Integer.MAX_VALUE, + (aggKey, value, aggregate) -> (value < aggregate) ? value : aggregate, Materialized.>as("uwin-min").withValueSerde(intSerde)) - .toStream(new Unwindow()) + .toStream(new Unwindow<>()) .to("min", Produced.with(stringSerde, intSerde)); final KTable minTable = builder.table( "min", Consumed.with(stringSerde, intSerde), - Materialized.>as("minStoreName")); - minTable.toStream().process(SmokeTestUtil.printProcessorSupplier("min")); + Materialized.as("minStoreName")); + minTable.toStream().process(SmokeTestUtil.printProcessorSupplier("min", name)); // max groupedData .windowedBy(TimeWindows.of(Duration.ofDays(2))) .aggregate( - new Initializer() { - public Integer apply() { - return Integer.MIN_VALUE; - } - }, - new Aggregator() { - @Override - public Integer apply(final String aggKey, final Integer value, final Integer aggregate) { - return (value > aggregate) ? value : aggregate; - } - }, + () -> Integer.MIN_VALUE, + (aggKey, value, aggregate) -> (value > aggregate) ? value : aggregate, Materialized.>as("uwin-max").withValueSerde(intSerde)) - .toStream(new Unwindow()) + .toStream(new Unwindow<>()) .to("max", Produced.with(stringSerde, intSerde)); final KTable maxTable = builder.table( "max", Consumed.with(stringSerde, intSerde), - Materialized.>as("maxStoreName")); - maxTable.toStream().process(SmokeTestUtil.printProcessorSupplier("max")); + Materialized.as("maxStoreName")); + maxTable.toStream().process(SmokeTestUtil.printProcessorSupplier("max", name)); // sum groupedData .windowedBy(TimeWindows.of(Duration.ofDays(2))) .aggregate( - new Initializer() { - public Long apply() { - return 0L; - } - }, - new Aggregator() { - @Override - public Long apply(final String aggKey, final Integer value, final Long aggregate) { - return (long) value + aggregate; - } - }, + () -> 0L, + (aggKey, value, aggregate) -> (long) value + aggregate, Materialized.>as("win-sum").withValueSerde(longSerde)) - .toStream(new Unwindow()) + .toStream(new Unwindow<>()) .to("sum", Produced.with(stringSerde, longSerde)); final Consumed stringLongConsumed = Consumed.with(stringSerde, longSerde); final KTable sumTable = builder.table("sum", stringLongConsumed); - sumTable.toStream().process(SmokeTestUtil.printProcessorSupplier("sum")); + sumTable.toStream().process(SmokeTestUtil.printProcessorSupplier("sum", name)); // cnt groupedData .windowedBy(TimeWindows.of(Duration.ofDays(2))) - .count(Materialized.>as("uwin-cnt")) - .toStream(new Unwindow()) + .count(Materialized.as("uwin-cnt")) + .toStream(new Unwindow<>()) .to("cnt", Produced.with(stringSerde, longSerde)); final KTable cntTable = builder.table( "cnt", Consumed.with(stringSerde, longSerde), - Materialized.>as("cntStoreName")); - cntTable.toStream().process(SmokeTestUtil.printProcessorSupplier("cnt")); + Materialized.as("cntStoreName")); + cntTable.toStream().process(SmokeTestUtil.printProcessorSupplier("cnt", name)); // dif maxTable .join( minTable, - new ValueJoiner() { - public Integer apply(final Integer value1, final Integer value2) { - return value1 - value2; - } - }) + (value1, value2) -> value1 - value2) .toStream() .to("dif", Produced.with(stringSerde, intSerde)); @@ -228,31 +189,31 @@ public Integer apply(final Integer value1, final Integer value2) { sumTable .join( cntTable, - new ValueJoiner() { - public Double apply(final Long value1, final Long value2) { - return (double) value1 / (double) value2; - } - }) + (value1, value2) -> (double) value1 / (double) value2) .toStream() .to("avg", Produced.with(stringSerde, doubleSerde)); // test repartition final Agg agg = new Agg(); cntTable.groupBy(agg.selector(), Grouped.with(stringSerde, longSerde)) - .aggregate(agg.init(), agg.adder(), agg.remover(), - Materialized.as(Stores.inMemoryKeyValueStore("cntByCnt")) - .withKeySerde(Serdes.String()) - .withValueSerde(Serdes.Long())) - .toStream() - .to("tagg", Produced.with(stringSerde, longSerde)); + .aggregate(agg.init(), agg.adder(), agg.remover(), + Materialized.as(Stores.inMemoryKeyValueStore("cntByCnt")) + .withKeySerde(Serdes.String()) + .withValueSerde(Serdes.Long())) + .toStream() + .to("tagg", Produced.with(stringSerde, longSerde)); final KafkaStreams streamsClient = new KafkaStreams(builder.build(), getStreamsConfig(props)); - streamsClient.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { - @Override - public void uncaughtException(final Thread t, final Throwable e) { - System.out.println("FATAL: An unexpected exception is encountered on thread " + t + ": " + e); - streamsClient.close(Duration.ofSeconds(30)); + streamsClient.setStateListener(((newState, oldState) -> { + System.out.printf("%s: %s -> %s%n", name, oldState, newState); + if (oldState == KafkaStreams.State.REBALANCING && newState == KafkaStreams.State.RUNNING) { + started = true; } + })); + streamsClient.setUncaughtExceptionHandler((t, e) -> { + System.out.println(name + ": FATAL: An unexpected exception is encountered on thread " + t + ": " + e); + System.out.println("FATAL: An unexpected exception is encountered on thread " + t + ": " + e); + streamsClient.close(Duration.ofSeconds(30)); }); return streamsClient; diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java index 078cbe4b5ccfe..94b14a6935fc2 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java @@ -30,23 +30,25 @@ import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.utils.Exit; import org.apache.kafka.common.utils.Utils; -import org.apache.kafka.streams.StreamsConfig; -import org.apache.kafka.test.TestUtils; -import java.io.File; import java.time.Duration; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Properties; import java.util.Random; import java.util.Set; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; +import java.util.stream.Stream; public class SmokeTestDriver extends SmokeTestUtil { @@ -77,71 +79,6 @@ int next() { } } - // This main() is not used by the system test. It is intended to be used for local debugging. - public static void main(final String[] args) throws InterruptedException { - final String kafka = "localhost:9092"; - final File stateDir = TestUtils.tempDirectory(); - - final int numKeys = 20; - final int maxRecordsPerKey = 1000; - - final Thread driver = new Thread(() -> { - try { - final Map> allData = generate(kafka, numKeys, maxRecordsPerKey); - verify(kafka, allData, maxRecordsPerKey); - } catch (final Exception ex) { - ex.printStackTrace(); - } - }); - - final Properties props = new Properties(); - props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka); - props.put(StreamsConfig.STATE_DIR_CONFIG, createDir(stateDir, "1").getAbsolutePath()); - final SmokeTestClient streams1 = new SmokeTestClient(props); - props.put(StreamsConfig.STATE_DIR_CONFIG, createDir(stateDir, "2").getAbsolutePath()); - final SmokeTestClient streams2 = new SmokeTestClient(props); - props.put(StreamsConfig.STATE_DIR_CONFIG, createDir(stateDir, "3").getAbsolutePath()); - final SmokeTestClient streams3 = new SmokeTestClient(props); - props.put(StreamsConfig.STATE_DIR_CONFIG, createDir(stateDir, "4").getAbsolutePath()); - final SmokeTestClient streams4 = new SmokeTestClient(props); - - System.out.println("starting the driver"); - driver.start(); - - System.out.println("starting the first and second client"); - streams1.start(); - streams2.start(); - - sleep(10000); - - System.out.println("starting the third client"); - streams3.start(); - - System.out.println("closing the first client"); - streams1.close(); - System.out.println("closed the first client"); - - sleep(10000); - - System.out.println("starting the forth client"); - streams4.start(); - - driver.join(); - - System.out.println("driver stopped"); - streams2.close(); - streams3.close(); - streams4.close(); - - System.out.println("shutdown"); - } - - public static Map> generate(final String kafka, - final int numKeys, - final int maxRecordsPerKey) { - return generate(kafka, numKeys, maxRecordsPerKey, true); - } - public static Map> generate(final String kafka, final int numKeys, final int maxRecordsPerKey, @@ -252,20 +189,32 @@ private static void shuffle(final int[] data, final int windowSize) { } } - public static void verify(final String kafka, final Map> allData, final int maxRecordsPerKey) { + public static boolean verify(final String kafka, final Map> allData, final int maxRecordsPerKey) { final Properties props = new Properties(); props.put(ConsumerConfig.CLIENT_ID_CONFIG, "verifier"); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka); - props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); - final KafkaConsumer consumer = new KafkaConsumer<>(props); - final List partitions = getAllPartitions(consumer, "echo", "max", "min", "dif", "sum", "cnt", "avg", "wcnt", "tagg"); + final KafkaConsumer consumer = new KafkaConsumer<>(props); + final List partitions = getAllPartitions(consumer, "data", "echo", "max", "min", "dif", "sum", "cnt", "avg", "wcnt", "tagg"); consumer.assign(partitions); consumer.seekToBeginning(partitions); final int recordsGenerated = allData.size() * maxRecordsPerKey; int recordsProcessed = 0; + final Map processed = Stream.of("data", "echo", "max", "min", "dif", "sum", "cnt", "avg", "wcnt", "tagg").collect(Collectors.toMap(t -> t, t -> new AtomicInteger(0))); + + final HashMap> dataEvents = new HashMap<>(); + final HashMap> echoEvents = new HashMap<>(); + final HashMap> maxEvents = new HashMap<>(); + final HashMap> minEvents = new HashMap<>(); + final HashMap> difEvents = new HashMap<>(); + final HashMap> sumEvents = new HashMap<>(); + final HashMap> cntEvents = new HashMap<>(); + final HashMap> avgEvents = new HashMap<>(); + final HashMap> wcntEvents = new HashMap<>(); + final HashMap> taggEvents = new HashMap<>(); final HashMap max = new HashMap<>(); final HashMap min = new HashMap<>(); @@ -280,65 +229,96 @@ public static void verify(final String kafka, final Map> al final HashMap> received = new HashMap<>(); for (final String key : allData.keySet()) { keys.add(key); - received.put(key, new HashSet()); + received.put(key, new HashSet<>()); } int retry = 0; final long start = System.currentTimeMillis(); while (System.currentTimeMillis() - start < TimeUnit.MINUTES.toMillis(6)) { - final ConsumerRecords records = consumer.poll(Duration.ofMillis(500)); + final ConsumerRecords records = consumer.poll(Duration.ofMillis(500)); if (records.isEmpty() && recordsProcessed >= recordsGenerated) { - if (verifyMin(min, allData, false) - && verifyMax(max, allData, false) - && verifyDif(dif, allData, false) - && verifySum(sum, allData, false) - && verifyCnt(cnt, allData, false) - && verifyAvg(avg, allData, false) - && verifyTAgg(tagg, allData, false)) { + if (verifyMin(min, allData, false, dataEvents, minEvents) + && verifyMax(max, allData, false, dataEvents, maxEvents) + && verifyDif(dif, allData, false, dataEvents, difEvents) + && verifySum(sum, allData, false, dataEvents, sumEvents) + && verifyCnt(cnt, allData, false, dataEvents, cntEvents) + && verifyAvg(avg, allData, false, dataEvents, avgEvents) + && verifyTAgg(tagg, allData, false, dataEvents, taggEvents)) { break; } if (retry++ > MAX_RECORD_EMPTY_RETRIES) { break; } } else { - for (final ConsumerRecord record : records) { - final String key = stringSerde.deserializer().deserialize("", record.key()); + for (final ConsumerRecord record : records) { + final String key = record.key(); + processed.get(record.topic()).incrementAndGet(); switch (record.topic()) { - case "echo": - final Integer value = intSerde.deserializer().deserialize("", record.value()); + case "data": { + addEvent(key, dataEvents, intSerde.deserializer().deserialize("", record.value())); + break; + } + case "echo": { recordsProcessed++; if (recordsProcessed % 100 == 0) { System.out.println("Echo records processed = " + recordsProcessed); } - received.get(key).add(value); + final Integer deserialize = intSerde.deserializer().deserialize("", record.value()); + received.get(key).add(deserialize); + addEvent(key, echoEvents, deserialize); break; - case "min": - min.put(key, intSerde.deserializer().deserialize("", record.value())); + } + case "min": { + final Integer deserialize = intSerde.deserializer().deserialize("", record.value()); + min.put(key, deserialize); + addEvent(key, minEvents, deserialize); break; - case "max": - max.put(key, intSerde.deserializer().deserialize("", record.value())); + } + case "max": { + final Integer deserialize = intSerde.deserializer().deserialize("", record.value()); + max.put(key, deserialize); + addEvent(key, maxEvents, deserialize); break; - case "dif": - dif.put(key, intSerde.deserializer().deserialize("", record.value())); + } + case "dif": { + final Integer deserialize = intSerde.deserializer().deserialize("", record.value()); + dif.put(key, deserialize); + addEvent(key, difEvents, deserialize); break; - case "sum": - sum.put(key, longSerde.deserializer().deserialize("", record.value())); + } + case "sum": { + final Long deserialize = longSerde.deserializer().deserialize("", record.value()); + sum.put(key, deserialize); break; - case "cnt": - cnt.put(key, longSerde.deserializer().deserialize("", record.value())); + } + case "cnt": { + final Long deserialize = longSerde.deserializer().deserialize("", record.value()); + cnt.put(key, deserialize); + addEvent(key, cntEvents, deserialize); break; - case "avg": - avg.put(key, doubleSerde.deserializer().deserialize("", record.value())); + } + case "avg": { + final Double deserialize = doubleSerde.deserializer().deserialize("", record.value()); + avg.put(key, deserialize); + addEvent(key, avgEvents, deserialize); break; - case "wcnt": - wcnt.put(key, longSerde.deserializer().deserialize("", record.value())); + } + case "wcnt": { + final Long deserialize = longSerde.deserializer().deserialize("", record.value()); + wcnt.put(key, deserialize); + addEvent(key, wcntEvents, deserialize); break; - case "tagg": - tagg.put(key, longSerde.deserializer().deserialize("", record.value())); + } + case "tagg": { + final Long deserialize = longSerde.deserializer().deserialize("", record.value()); + tagg.put(key, deserialize); + addEvent(key, taggEvents, deserialize); break; + } default: System.out.println("unknown topic: " + record.topic()); } } + System.out.println(processed); } } consumer.close(); @@ -369,18 +349,30 @@ && verifyTAgg(tagg, allData, false)) { System.out.println("missedRecords=" + missedCount); } - success &= verifyMin(min, allData, true); - success &= verifyMax(max, allData, true); - success &= verifyDif(dif, allData, true); - success &= verifySum(sum, allData, true); - success &= verifyCnt(cnt, allData, true); - success &= verifyAvg(avg, allData, true); - success &= verifyTAgg(tagg, allData, true); + success &= verifyMin(min, allData, true, dataEvents, minEvents); + success &= verifyMax(max, allData, true, dataEvents, maxEvents); + success &= verifyDif(dif, allData, true, dataEvents, difEvents); + success &= verifySum(sum, allData, true, dataEvents, sumEvents); + success &= verifyCnt(cnt, allData, true, dataEvents, cntEvents); + success &= verifyAvg(avg, allData, true, dataEvents, avgEvents); + success &= verifyTAgg(tagg, allData, true, dataEvents, taggEvents); System.out.println(success ? "SUCCESS" : "FAILURE"); + return success; } - private static boolean verifyMin(final Map map, final Map> allData, final boolean print) { + private static void addEvent(final String key, final HashMap> eventsMap, final V value) { + if (!eventsMap.containsKey(key)) { + eventsMap.put(key, new LinkedList<>()); + } + eventsMap.get(key).add(value); + } + + private static boolean verifyMin(final Map map, + final Map> allData, + final boolean print, + final HashMap> dataEvents, + final HashMap> minEvents) { if (map.isEmpty()) { if (print) { System.out.println("min is empty"); @@ -398,10 +390,16 @@ private static boolean verifyMin(final Map map, final Map entry : map.entrySet()) { - final int expected = getMin(entry.getKey()); + final String key = entry.getKey(); + final int expected = getMin(key); if (expected != entry.getValue()) { if (print) { - System.out.println("fail: key=" + entry.getKey() + " min=" + entry.getValue() + " expected=" + expected); + System.out.printf("fail: key=%s min=%d expected=%d%n\tdata=%s%n\tmins=%s%n", + key, + entry.getValue(), + expected, + dataEvents.get(key), + minEvents.get(key)); } return false; } @@ -410,7 +408,11 @@ private static boolean verifyMin(final Map map, final Map map, final Map> allData, final boolean print) { + private static boolean verifyMax(final Map map, + final Map> allData, + final boolean print, + final HashMap> dataEvents, + final HashMap> maxEvents) { if (map.isEmpty()) { if (print) { System.out.println("max is empty"); @@ -428,10 +430,16 @@ private static boolean verifyMax(final Map map, final Map entry : map.entrySet()) { - final int expected = getMax(entry.getKey()); + final String key = entry.getKey(); + final int expected = getMax(key); if (expected != entry.getValue()) { if (print) { - System.out.println("fail: key=" + entry.getKey() + " max=" + entry.getValue() + " expected=" + expected); + System.out.printf("fail: key=%s max=%d expected=%d%n\tdata=%s%n\tmaxs=%s%n", + key, + entry.getValue(), + expected, + dataEvents.get(key), + maxEvents.get(key)); } return false; } @@ -440,7 +448,11 @@ private static boolean verifyMax(final Map map, final Map map, final Map> allData, final boolean print) { + private static boolean verifyDif(final Map map, + final Map> allData, + final boolean print, + final HashMap> dataEvents, + final HashMap> difEvents) { if (map.isEmpty()) { if (print) { System.out.println("dif is empty"); @@ -458,12 +470,18 @@ private static boolean verifyDif(final Map map, final Map entry : map.entrySet()) { - final int min = getMin(entry.getKey()); - final int max = getMax(entry.getKey()); + final String key = entry.getKey(); + final int min = getMin(key); + final int max = getMax(key); final int expected = max - min; if (entry.getValue() == null || expected != entry.getValue()) { if (print) { - System.out.println("fail: key=" + entry.getKey() + " dif=" + entry.getValue() + " expected=" + expected); + System.out.printf("fail: key=%s dif=%d expected=%d%n\tdata=%s%n\tdifs=%s%n", + key, + entry.getValue(), + expected, + dataEvents.get(key), + difEvents.get(key)); } return false; } @@ -472,7 +490,11 @@ private static boolean verifyDif(final Map map, final Map map, final Map> allData, final boolean print) { + private static boolean verifyCnt(final Map map, + final Map> allData, + final boolean print, + final HashMap> dataEvents, + final HashMap> cntEvents) { if (map.isEmpty()) { if (print) { System.out.println("cnt is empty"); @@ -490,12 +512,18 @@ private static boolean verifyCnt(final Map map, final Map entry : map.entrySet()) { - final int min = getMin(entry.getKey()); - final int max = getMax(entry.getKey()); + final String key = entry.getKey(); + final int min = getMin(key); + final int max = getMax(key); final long expected = (max - min) + 1L; if (expected != entry.getValue()) { if (print) { - System.out.println("fail: key=" + entry.getKey() + " cnt=" + entry.getValue() + " expected=" + expected); + System.out.printf("fail: key=%s cnt=%d expected=%d%n\tdata=%s%n\tcnts=%s%n", + key, + entry.getValue(), + expected, + dataEvents.get(key), + cntEvents.get(key)); } return false; } @@ -504,7 +532,11 @@ private static boolean verifyCnt(final Map map, final Map map, final Map> allData, final boolean print) { + private static boolean verifySum(final Map map, + final Map> allData, + final boolean print, + final HashMap> dataEvents, + final HashMap> sumEvents) { if (map.isEmpty()) { if (print) { System.out.println("sum is empty"); @@ -536,7 +568,11 @@ private static boolean verifySum(final Map map, final Map map, final Map> allData, final boolean print) { + private static boolean verifyAvg(final Map map, + final Map> allData, + final boolean print, + final HashMap> dataEvents, + final HashMap> avgEvents) { if (map.isEmpty()) { if (print) { System.out.println("avg is empty"); @@ -570,7 +606,11 @@ private static boolean verifyAvg(final Map map, final Map map, final Map> allData, final boolean print) { + private static boolean verifyTAgg(final Map map, + final Map> allData, + final boolean print, + final HashMap> dataEvents, + final HashMap> taggEvents) { if (map.isEmpty()) { if (print) { System.out.println("tagg is empty"); diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriverTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriverTest.java new file mode 100644 index 0000000000000..5d22495a08f56 --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriverTest.java @@ -0,0 +1,106 @@ +package org.apache.kafka.streams.tests; + +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.apache.kafka.streams.tests.SmokeTestDriver.generate; +import static org.apache.kafka.streams.tests.SmokeTestDriver.verify; + +public class SmokeTestDriverTest { + private static class Driver extends Thread { + private String bootstrapServers; + private int numKeys; + private int maxRecordsPerKey; + private Exception exception = null; + private boolean success; + + private Driver(final String bootstrapServers, final int numKeys, final int maxRecordsPerKey) { + this.bootstrapServers = bootstrapServers; + this.numKeys = numKeys; + this.maxRecordsPerKey = maxRecordsPerKey; + } + + @Override + public void run() { + try { + final Map> allData = generate(bootstrapServers, numKeys, maxRecordsPerKey, true); + success = verify(bootstrapServers, allData, maxRecordsPerKey); + + } catch (final Exception ex) { + this.exception = ex; + } + } + + public Exception exception() { + return exception; + } + + public boolean success() { + return success; + } + + } + + @Test + public void shouldWorkWithRebalance() throws InterruptedException, IOException { + int numClientsCreated = 0; + final ArrayList clients = new ArrayList<>(); + try (final EmbeddedKafkaCluster embeddedKafkaCluster = new EmbeddedKafkaCluster(3); + ) { + embeddedKafkaCluster.start(); + embeddedKafkaCluster.createTopics("data", "echo", "max", "min", "dif", "sum", "cnt", "avg", "wcnt", "tagg"); + + final String bootstrapServers = embeddedKafkaCluster.bootstrapServers(); + final Driver driver = new Driver(bootstrapServers, 10, 1000); + driver.start(); + System.out.println("started streams"); + + + final Properties props = new Properties(); + props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + + // cycle out Streams instances as long as the test is running. + while (driver.isAlive()) { + // wait for the last added client to start + if (!clients.isEmpty()) { + while (!clients.get(clients.size() - 1).started()) { + Thread.sleep(100); + } + } + + // take a nap + Thread.sleep(1000); + + // add a new client + final SmokeTestClient smokeTestClient = new SmokeTestClient("streams" + numClientsCreated++); + clients.add(smokeTestClient); + smokeTestClient.start(props); + + // let the oldest client die of "natural causes" + if (clients.size() >= 3) { + clients.remove(0).closeAsync(); + } + } + driver.join(); + Assert.assertNull(driver.exception()); + Assert.assertTrue(driver.success()); + } finally { + for (final SmokeTestClient client : clients) { + client.closeAsync(); + } + for (final SmokeTestClient client : clients) { + client.close(); + } + } + } + +} diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java index 9e62e3fc9aecb..aa58d44c4d461 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java @@ -29,16 +29,17 @@ import org.apache.kafka.streams.processor.ProcessorSupplier; import java.io.File; +import java.time.Instant; public class SmokeTestUtil { final static int END = Integer.MAX_VALUE; static ProcessorSupplier printProcessorSupplier(final String topic) { - return printProcessorSupplier(topic, false); + return printProcessorSupplier(topic, ""); } - private static ProcessorSupplier printProcessorSupplier(final String topic, final boolean printOffset) { + static ProcessorSupplier printProcessorSupplier(final String topic, final String name) { return new ProcessorSupplier() { @Override public Processor get() { @@ -56,7 +57,7 @@ public void init(final ProcessorContext context) { public void process(final Object key, final Object value) { numRecordsProcessed++; if (numRecordsProcessed % 100 == 0) { - System.out.println(System.currentTimeMillis()); + System.out.printf("%s: %s%n", name, Instant.now()); System.out.println("processed " + numRecordsProcessed + " records from topic=" + topic); } } diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsSmokeTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsSmokeTest.java index f34471be72574..1de15b78d9225 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsSmokeTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsSmokeTest.java @@ -23,6 +23,8 @@ import java.util.Map; import java.util.Properties; import java.util.Set; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicInteger; public class StreamsSmokeTest { @@ -56,9 +58,6 @@ public static void main(final String[] args) throws InterruptedException, IOExce System.out.println("disableAutoTerminate=" + disableAutoTerminate); switch (command) { - case "standalone": - SmokeTestDriver.main(args); - break; case "run": // this starts the driver (data generation and result verification) final int numKeys = 10; @@ -66,14 +65,14 @@ public static void main(final String[] args) throws InterruptedException, IOExce if (disableAutoTerminate) { SmokeTestDriver.generate(kafka, numKeys, maxRecordsPerKey, false); } else { - final Map> allData = SmokeTestDriver.generate(kafka, numKeys, maxRecordsPerKey); + final Map> allData = SmokeTestDriver.generate(kafka, numKeys, maxRecordsPerKey, true); SmokeTestDriver.verify(kafka, allData, maxRecordsPerKey); } break; case "process": // this starts a KafkaStreams client - final SmokeTestClient client = new SmokeTestClient(streamsProperties); - client.start(); + final SmokeTestClient client = new SmokeTestClient(UUID.randomUUID().toString()); + client.start(streamsProperties); break; case "close-deadlock-test": final ShutdownDeadlockTest test = new ShutdownDeadlockTest(kafka); From ec757b47f0db302cd1fd4c91898fb860d3639bda Mon Sep 17 00:00:00 2001 From: John Roesler Date: Mon, 4 Feb 2019 17:56:21 -0600 Subject: [PATCH 02/14] cleanup --- .../kafka/streams/integration/utils/EmbeddedKafkaCluster.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java index d90a63e6722c1..351c6d46a6f4a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java @@ -115,7 +115,7 @@ private void putIfAbsent(final Properties props, final String propertyKey, final /** * Stop the Kafka cluster. */ - public void stop() { + private void stop() { for (final KafkaEmbedded broker : brokers) { broker.stop(); } From 29ed93e6695e11d92a855014413028bbac028c1b Mon Sep 17 00:00:00 2001 From: John Roesler Date: Mon, 4 Feb 2019 17:58:01 -0600 Subject: [PATCH 03/14] cleanup --- .../org/apache/kafka/streams/tests/SmokeTestClient.java | 2 +- .../org/apache/kafka/streams/tests/SmokeTestDriverTest.java | 6 ++---- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java index d0aee00a82825..daa23fd3cf826 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java @@ -38,7 +38,7 @@ import java.time.Duration; import java.util.Properties; -public class SmokeTestClient extends SmokeTestUtil implements AutoCloseable { +public class SmokeTestClient extends SmokeTestUtil { private final String name; diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriverTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriverTest.java index 5d22495a08f56..7c5cdd73011fa 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriverTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriverTest.java @@ -10,7 +10,6 @@ import java.util.Map; import java.util.Properties; import java.util.Set; -import java.util.concurrent.atomic.AtomicInteger; import static org.apache.kafka.streams.tests.SmokeTestDriver.generate; import static org.apache.kafka.streams.tests.SmokeTestDriver.verify; @@ -44,7 +43,7 @@ public Exception exception() { return exception; } - public boolean success() { + boolean success() { return success; } @@ -54,8 +53,7 @@ public boolean success() { public void shouldWorkWithRebalance() throws InterruptedException, IOException { int numClientsCreated = 0; final ArrayList clients = new ArrayList<>(); - try (final EmbeddedKafkaCluster embeddedKafkaCluster = new EmbeddedKafkaCluster(3); - ) { + try (final EmbeddedKafkaCluster embeddedKafkaCluster = new EmbeddedKafkaCluster(3)) { embeddedKafkaCluster.start(); embeddedKafkaCluster.createTopics("data", "echo", "max", "min", "dif", "sum", "cnt", "avg", "wcnt", "tagg"); From c5c6f6e5d7693fd77833e9ec0a898d311fcdc8ac Mon Sep 17 00:00:00 2001 From: John Roesler Date: Tue, 5 Feb 2019 12:21:43 -0600 Subject: [PATCH 04/14] refactor --- .../kafka/streams/tests/SmokeTestClient.java | 4 +- .../kafka/streams/tests/SmokeTestDriver.java | 521 +++++------------- ...va => SmokeTestDriverIntegrationTest.java} | 49 +- .../kafka/streams/tests/StreamsSmokeTest.java | 1 - 4 files changed, 182 insertions(+), 393 deletions(-) rename streams/src/test/java/org/apache/kafka/streams/tests/{SmokeTestDriverTest.java => SmokeTestDriverIntegrationTest.java} (65%) diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java index daa23fd3cf826..0ce58fbf2dea8 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java @@ -204,12 +204,12 @@ private KafkaStreams createKafkaStreams(final Properties props) { .to("tagg", Produced.with(stringSerde, longSerde)); final KafkaStreams streamsClient = new KafkaStreams(builder.build(), getStreamsConfig(props)); - streamsClient.setStateListener(((newState, oldState) -> { + streamsClient.setStateListener((newState, oldState) -> { System.out.printf("%s: %s -> %s%n", name, oldState, newState); if (oldState == KafkaStreams.State.REBALANCING && newState == KafkaStreams.State.RUNNING) { started = true; } - })); + }); streamsClient.setUncaughtExceptionHandler((t, e) -> { System.out.println(name + ": FATAL: An unexpected exception is encountered on thread " + t + ": " + e); System.out.println("FATAL: An unexpected exception is encountered on thread " + t + ": " + e); diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java index 94b14a6935fc2..f88d5aa010153 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java @@ -47,12 +47,13 @@ import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.Stream; -public class SmokeTestDriver extends SmokeTestUtil { +import static org.apache.kafka.common.utils.Utils.mkEntry; - private static final int MAX_RECORD_EMPTY_RETRIES = 60; +public class SmokeTestDriver extends SmokeTestUtil { private static class ValueList { public final String key; @@ -79,10 +80,10 @@ int next() { } } - public static Map> generate(final String kafka, - final int numKeys, - final int maxRecordsPerKey, - final boolean autoTerminate) { + static Map> generate(final String kafka, + final int numKeys, + final int maxRecordsPerKey, + final boolean autoTerminate) { final Properties producerProps = new Properties(); producerProps.put(ProducerConfig.CLIENT_ID_CONFIG, "SmokeTest"); producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka); @@ -120,7 +121,11 @@ public static Map> generate(final String kafka, } else { final ProducerRecord record = - new ProducerRecord<>("data", stringSerde.serializer().serialize("", key), intSerde.serializer().serialize("", value)); + new ProducerRecord<>( + "data", + stringSerde.serializer().serialize("", key), + intSerde.serializer().serialize("", value) + ); producer.send(record, new TestCallback(record, needRetry)); @@ -176,7 +181,7 @@ public void onCompletion(final RecordMetadata metadata, final Exception exceptio } } - private static void shuffle(final int[] data, final int windowSize) { + private static void shuffle(final int[] data, @SuppressWarnings("SameParameterValue") final int windowSize) { final Random rand = new Random(); for (int i = 0; i < data.length; i++) { // we shuffle data within windowSize @@ -189,7 +194,9 @@ private static void shuffle(final int[] data, final int windowSize) { } } - public static boolean verify(final String kafka, final Map> allData, final int maxRecordsPerKey) { + public static boolean verify(final String kafka, + final Map> inputs, + final int maxRecordsPerKey) { final Properties props = new Properties(); props.put(ConsumerConfig.CLIENT_ID_CONFIG, "verifier"); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka); @@ -197,127 +204,43 @@ public static boolean verify(final String kafka, final Map> props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); final KafkaConsumer consumer = new KafkaConsumer<>(props); - final List partitions = getAllPartitions(consumer, "data", "echo", "max", "min", "dif", "sum", "cnt", "avg", "wcnt", "tagg"); + final String[] topics = {"data", "echo", "max", "min", "dif", "sum", "cnt", "avg", "tagg"}; + final List partitions = getAllPartitions(consumer, topics); consumer.assign(partitions); consumer.seekToBeginning(partitions); - final int recordsGenerated = allData.size() * maxRecordsPerKey; + final int recordsGenerated = inputs.size() * maxRecordsPerKey; int recordsProcessed = 0; - final Map processed = Stream.of("data", "echo", "max", "min", "dif", "sum", "cnt", "avg", "wcnt", "tagg").collect(Collectors.toMap(t -> t, t -> new AtomicInteger(0))); - - final HashMap> dataEvents = new HashMap<>(); - final HashMap> echoEvents = new HashMap<>(); - final HashMap> maxEvents = new HashMap<>(); - final HashMap> minEvents = new HashMap<>(); - final HashMap> difEvents = new HashMap<>(); - final HashMap> sumEvents = new HashMap<>(); - final HashMap> cntEvents = new HashMap<>(); - final HashMap> avgEvents = new HashMap<>(); - final HashMap> wcntEvents = new HashMap<>(); - final HashMap> taggEvents = new HashMap<>(); - - final HashMap max = new HashMap<>(); - final HashMap min = new HashMap<>(); - final HashMap dif = new HashMap<>(); - final HashMap sum = new HashMap<>(); - final HashMap cnt = new HashMap<>(); - final HashMap avg = new HashMap<>(); - final HashMap wcnt = new HashMap<>(); - final HashMap tagg = new HashMap<>(); - - final HashSet keys = new HashSet<>(); - final HashMap> received = new HashMap<>(); - for (final String key : allData.keySet()) { - keys.add(key); - received.put(key, new HashSet<>()); - } - int retry = 0; + final Map processed = + Stream.of(topics) + .collect(Collectors.toMap(t -> t, t -> new AtomicInteger(0))); + + final Map>> events = new HashMap<>(); + final long start = System.currentTimeMillis(); while (System.currentTimeMillis() - start < TimeUnit.MINUTES.toMillis(6)) { - final ConsumerRecords records = consumer.poll(Duration.ofMillis(500)); + final ConsumerRecords records = consumer.poll(Duration.ofSeconds(30)); if (records.isEmpty() && recordsProcessed >= recordsGenerated) { - if (verifyMin(min, allData, false, dataEvents, minEvents) - && verifyMax(max, allData, false, dataEvents, maxEvents) - && verifyDif(dif, allData, false, dataEvents, difEvents) - && verifySum(sum, allData, false, dataEvents, sumEvents) - && verifyCnt(cnt, allData, false, dataEvents, cntEvents) - && verifyAvg(avg, allData, false, dataEvents, avgEvents) - && verifyTAgg(tagg, allData, false, dataEvents, taggEvents)) { - break; - } - if (retry++ > MAX_RECORD_EMPTY_RETRIES) { - break; - } + break; } else { for (final ConsumerRecord record : records) { final String key = record.key(); - processed.get(record.topic()).incrementAndGet(); - switch (record.topic()) { - case "data": { - addEvent(key, dataEvents, intSerde.deserializer().deserialize("", record.value())); - break; - } - case "echo": { - recordsProcessed++; - if (recordsProcessed % 100 == 0) { - System.out.println("Echo records processed = " + recordsProcessed); - } - final Integer deserialize = intSerde.deserializer().deserialize("", record.value()); - received.get(key).add(deserialize); - addEvent(key, echoEvents, deserialize); - break; - } - case "min": { - final Integer deserialize = intSerde.deserializer().deserialize("", record.value()); - min.put(key, deserialize); - addEvent(key, minEvents, deserialize); - break; - } - case "max": { - final Integer deserialize = intSerde.deserializer().deserialize("", record.value()); - max.put(key, deserialize); - addEvent(key, maxEvents, deserialize); - break; - } - case "dif": { - final Integer deserialize = intSerde.deserializer().deserialize("", record.value()); - dif.put(key, deserialize); - addEvent(key, difEvents, deserialize); - break; - } - case "sum": { - final Long deserialize = longSerde.deserializer().deserialize("", record.value()); - sum.put(key, deserialize); - break; - } - case "cnt": { - final Long deserialize = longSerde.deserializer().deserialize("", record.value()); - cnt.put(key, deserialize); - addEvent(key, cntEvents, deserialize); - break; - } - case "avg": { - final Double deserialize = doubleSerde.deserializer().deserialize("", record.value()); - avg.put(key, deserialize); - addEvent(key, avgEvents, deserialize); - break; - } - case "wcnt": { - final Long deserialize = longSerde.deserializer().deserialize("", record.value()); - wcnt.put(key, deserialize); - addEvent(key, wcntEvents, deserialize); - break; - } - case "tagg": { - final Long deserialize = longSerde.deserializer().deserialize("", record.value()); - tagg.put(key, deserialize); - addEvent(key, taggEvents, deserialize); - break; + + final String topic = record.topic(); + processed.get(topic).incrementAndGet(); + + if (topic.equals("echo")) { + recordsProcessed++; + if (recordsProcessed % 100 == 0) { + System.out.println("Echo records processed = " + recordsProcessed); } - default: - System.out.println("unknown topic: " + record.topic()); } + + events.computeIfAbsent(topic, t -> new HashMap<>()) + .computeIfAbsent(key, k -> new LinkedList<>()) + .add(deserializeValue(record)); } + System.out.println(processed); } } @@ -337,316 +260,168 @@ && verifyTAgg(tagg, allData, false, dataEvents, taggEvents)) { } boolean success; - success = allData.equals(received); + + final Map> received = + events.get("echo") + .entrySet() + .stream() + .map(entry -> mkEntry(entry.getKey(), new HashSet<>(entry.getValue()))) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + + success = inputs.equals(received); if (success) { System.out.println("ALL-RECORDS-DELIVERED"); } else { int missedCount = 0; - for (final Map.Entry> entry : allData.entrySet()) { + for (final Map.Entry> entry : inputs.entrySet()) { missedCount += received.get(entry.getKey()).size(); } System.out.println("missedRecords=" + missedCount); } - success &= verifyMin(min, allData, true, dataEvents, minEvents); - success &= verifyMax(max, allData, true, dataEvents, maxEvents); - success &= verifyDif(dif, allData, true, dataEvents, difEvents); - success &= verifySum(sum, allData, true, dataEvents, sumEvents); - success &= verifyCnt(cnt, allData, true, dataEvents, cntEvents); - success &= verifyAvg(avg, allData, true, dataEvents, avgEvents); - success &= verifyTAgg(tagg, allData, true, dataEvents, taggEvents); + success &= verifyAll(inputs, events); System.out.println(success ? "SUCCESS" : "FAILURE"); return success; } - private static void addEvent(final String key, final HashMap> eventsMap, final V value) { - if (!eventsMap.containsKey(key)) { - eventsMap.put(key, new LinkedList<>()); - } - eventsMap.get(key).add(value); - } - - private static boolean verifyMin(final Map map, - final Map> allData, - final boolean print, - final HashMap> dataEvents, - final HashMap> minEvents) { - if (map.isEmpty()) { - if (print) { - System.out.println("min is empty"); + private static Number deserializeValue(final ConsumerRecord record) { + final Number value; + switch (record.topic()) { + case "data": { + value = intSerde.deserializer().deserialize("", record.value()); + break; } - return false; - } else { - if (print) { - System.out.println("verifying min"); + case "echo": { + value = intSerde.deserializer().deserialize("", record.value()); + break; } - - if (map.size() != allData.size()) { - if (print) { - System.out.println("fail: resultCount=" + map.size() + " expectedCount=" + allData.size()); - } - return false; + case "min": { + value = intSerde.deserializer().deserialize("", record.value()); + break; } - for (final Map.Entry entry : map.entrySet()) { - final String key = entry.getKey(); - final int expected = getMin(key); - if (expected != entry.getValue()) { - if (print) { - System.out.printf("fail: key=%s min=%d expected=%d%n\tdata=%s%n\tmins=%s%n", - key, - entry.getValue(), - expected, - dataEvents.get(key), - minEvents.get(key)); - } - return false; - } + case "max": { + value = intSerde.deserializer().deserialize("", record.value()); + break; } - } - return true; - } - - private static boolean verifyMax(final Map map, - final Map> allData, - final boolean print, - final HashMap> dataEvents, - final HashMap> maxEvents) { - if (map.isEmpty()) { - if (print) { - System.out.println("max is empty"); + case "dif": { + value = intSerde.deserializer().deserialize("", record.value()); + break; } - return false; - } else { - if (print) { - System.out.println("verifying max"); + case "sum": { + value = longSerde.deserializer().deserialize("", record.value()); + break; } - - if (map.size() != allData.size()) { - if (print) { - System.out.println("fail: resultCount=" + map.size() + " expectedCount=" + allData.size()); - } - return false; + case "cnt": { + value = longSerde.deserializer().deserialize("", record.value()); + break; } - for (final Map.Entry entry : map.entrySet()) { - final String key = entry.getKey(); - final int expected = getMax(key); - if (expected != entry.getValue()) { - if (print) { - System.out.printf("fail: key=%s max=%d expected=%d%n\tdata=%s%n\tmaxs=%s%n", - key, - entry.getValue(), - expected, - dataEvents.get(key), - maxEvents.get(key)); - } - return false; - } + case "avg": { + value = doubleSerde.deserializer().deserialize("", record.value()); + break; + } + case "tagg": { + value = longSerde.deserializer().deserialize("", record.value()); + break; } + default: + throw new RuntimeException("unknown topic: " + record.topic()); } - return true; + return value; } - private static boolean verifyDif(final Map map, - final Map> allData, - final boolean print, - final HashMap> dataEvents, - final HashMap> difEvents) { - if (map.isEmpty()) { - if (print) { - System.out.println("dif is empty"); - } - return false; - } else { - if (print) { - System.out.println("verifying dif"); - } - - if (map.size() != allData.size()) { - if (print) { - System.out.println("fail: resultCount=" + map.size() + " expectedCount=" + allData.size()); - } - return false; - } - for (final Map.Entry entry : map.entrySet()) { - final String key = entry.getKey(); - final int min = getMin(key); - final int max = getMax(key); - final int expected = max - min; - if (entry.getValue() == null || expected != entry.getValue()) { - if (print) { - System.out.printf("fail: key=%s dif=%d expected=%d%n\tdata=%s%n\tdifs=%s%n", - key, - entry.getValue(), - expected, - dataEvents.get(key), - difEvents.get(key)); - } - return false; - } - } - } - return true; + private static boolean verifyAll(final Map> inputs, + final Map>> events) { + final Map> observedInputEvents = events.get("data"); + boolean pass; + pass = verifyTAgg(inputs, events.get("tagg")); + pass &= verify("min", inputs, observedInputEvents, events, SmokeTestDriver::getMin); + pass &= verify("max", inputs, observedInputEvents, events, SmokeTestDriver::getMax); + pass &= verify("dif", inputs, observedInputEvents, events, key -> getMax(key).intValue() - getMin(key).intValue()); + pass &= verify("sum", inputs, observedInputEvents, events, SmokeTestDriver::getSum); + pass &= verify("cnt", inputs, observedInputEvents, events, key1 -> getMax(key1).intValue() - getMin(key1).intValue() + 1L); + pass &= verify("avg", inputs, observedInputEvents, events, SmokeTestDriver::getAvg); + return pass; } - private static boolean verifyCnt(final Map map, - final Map> allData, - final boolean print, - final HashMap> dataEvents, - final HashMap> cntEvents) { - if (map.isEmpty()) { - if (print) { - System.out.println("cnt is empty"); - } + private static boolean verify(final String topicName, + final Map> inputData, + final Map> consumedInputEvents, + final Map>> allEvents, + final Function keyToExpectation) { + final Map> outputEvents = allEvents.get(topicName); + if (outputEvents.isEmpty()) { + System.out.println(topicName + " is empty"); return false; } else { - if (print) { - System.out.println("verifying cnt"); - } + System.out.println("verifying " + topicName); - if (map.size() != allData.size()) { - if (print) { - System.out.println("fail: resultCount=" + map.size() + " expectedCount=" + allData.size()); - } + if (outputEvents.size() != inputData.size()) { + System.out.println("fail: resultCount=" + outputEvents.size() + " expectedCount=" + inputData.size()); return false; } - for (final Map.Entry entry : map.entrySet()) { + for (final Map.Entry> entry : outputEvents.entrySet()) { final String key = entry.getKey(); - final int min = getMin(key); - final int max = getMax(key); - final long expected = (max - min) + 1L; - if (expected != entry.getValue()) { - if (print) { - System.out.printf("fail: key=%s cnt=%d expected=%d%n\tdata=%s%n\tcnts=%s%n", - key, - entry.getValue(), - expected, - dataEvents.get(key), - cntEvents.get(key)); - } + final Number expected = keyToExpectation.apply(key); + final Number actual = entry.getValue().getLast(); + if (!expected.equals(actual)) { + System.out.printf("fail: key=%s %s=%s expected=%s%n\t inputEvents=%s%n\toutputEvents=%s%n", + key, + topicName, + actual, + expected, + consumedInputEvents.get(key), + entry.getValue()); return false; } } + return true; } - return true; } - private static boolean verifySum(final Map map, - final Map> allData, - final boolean print, - final HashMap> dataEvents, - final HashMap> sumEvents) { - if (map.isEmpty()) { - if (print) { - System.out.println("sum is empty"); - } - return false; - } else { - if (print) { - System.out.println("verifying sum"); - } - - if (map.size() != allData.size()) { - if (print) { - System.out.println("fail: resultCount=" + map.size() + " expectedCount=" + allData.size()); - } - return false; - } - for (final Map.Entry entry : map.entrySet()) { - final int min = getMin(entry.getKey()); - final int max = getMax(entry.getKey()); - final long expected = ((long) min + (long) max) * (max - min + 1L) / 2L; - if (expected != entry.getValue()) { - if (print) { - System.out.println("fail: key=" + entry.getKey() + " sum=" + entry.getValue() + " expected=" + expected); - } - return false; - } - } - } - return true; + private static Long getSum(final String key) { + final int min = getMin(key).intValue(); + final int max = getMax(key).intValue(); + return ((long) min + (long) max) * (max - min + 1L) / 2L; } - private static boolean verifyAvg(final Map map, - final Map> allData, - final boolean print, - final HashMap> dataEvents, - final HashMap> avgEvents) { - if (map.isEmpty()) { - if (print) { - System.out.println("avg is empty"); - } - return false; - } else { - if (print) { - System.out.println("verifying avg"); - } - - if (map.size() != allData.size()) { - if (print) { - System.out.println("fail: resultCount=" + map.size() + " expectedCount=" + allData.size()); - } - return false; - } - for (final Map.Entry entry : map.entrySet()) { - final int min = getMin(entry.getKey()); - final int max = getMax(entry.getKey()); - final double expected = ((long) min + (long) max) / 2.0; - - if (entry.getValue() == null || expected != entry.getValue()) { - if (print) { - System.out.println("fail: key=" + entry.getKey() + " avg=" + entry.getValue() + " expected=" + expected); - } - return false; - } - } - } - return true; + private static Double getAvg(final String key) { + final int min = getMin(key).intValue(); + final int max = getMax(key).intValue(); + return ((long) min + (long) max) / 2.0; } - private static boolean verifyTAgg(final Map map, - final Map> allData, - final boolean print, - final HashMap> dataEvents, - final HashMap> taggEvents) { - if (map.isEmpty()) { - if (print) { - System.out.println("tagg is empty"); - } + private static boolean verifyTAgg(final Map> allData, + final Map> taggEvents) { + if (taggEvents.isEmpty()) { + System.out.println("tagg is empty"); return false; } else { - if (print) { - System.out.println("verifying tagg"); - } + System.out.println("verifying tagg"); // generate expected answer final Map expected = new HashMap<>(); for (final String key : allData.keySet()) { - final int min = getMin(key); - final int max = getMax(key); + final int min = getMin(key).intValue(); + final int max = getMax(key).intValue(); final String cnt = Long.toString(max - min + 1L); - if (expected.containsKey(cnt)) { - expected.put(cnt, expected.get(cnt) + 1L); - } else { - expected.put(cnt, 1L); - } + expected.put(cnt, expected.getOrDefault(cnt, 0L) + 1); } // check the result - for (final Map.Entry entry : map.entrySet()) { + for (final Map.Entry> entry : taggEvents.entrySet()) { final String key = entry.getKey(); Long expectedCount = expected.remove(key); if (expectedCount == null) { expectedCount = 0L; } - if (entry.getValue().longValue() != expectedCount.longValue()) { - if (print) { - System.out.println("fail: key=" + key + " tagg=" + entry.getValue() + " expected=" + expected.get(key)); - } + if (entry.getValue().getLast().longValue() != expectedCount) { + System.out.println("fail: key=" + key + " tagg=" + entry.getValue() + " expected=" + expected.get(key)); + System.out.println("\t outputEvents: " + entry.getValue()); return false; } } @@ -655,26 +430,14 @@ private static boolean verifyTAgg(final Map map, return true; } - private static int getMin(final String key) { + private static Number getMin(final String key) { return Integer.parseInt(key.split("-")[0]); } - private static int getMax(final String key) { + private static Number getMax(final String key) { return Integer.parseInt(key.split("-")[1]); } - private static int getMinFromWKey(final String key) { - return getMin(key.split("@")[0]); - } - - private static int getMaxFromWKey(final String key) { - return getMax(key.split("@")[0]); - } - - private static long getStartFromWKey(final String key) { - return Long.parseLong(key.split("@")[1]); - } - private static List getAllPartitions(final KafkaConsumer consumer, final String... topics) { final ArrayList partitions = new ArrayList<>(); diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriverTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriverIntegrationTest.java similarity index 65% rename from streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriverTest.java rename to streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriverIntegrationTest.java index 7c5cdd73011fa..5bfa2886d2dce 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriverTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriverIntegrationTest.java @@ -1,3 +1,19 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.kafka.streams.tests; import org.apache.kafka.streams.StreamsConfig; @@ -14,7 +30,7 @@ import static org.apache.kafka.streams.tests.SmokeTestDriver.generate; import static org.apache.kafka.streams.tests.SmokeTestDriver.verify; -public class SmokeTestDriverTest { +public class SmokeTestDriverIntegrationTest { private static class Driver extends Thread { private String bootstrapServers; private int numKeys; @@ -79,7 +95,7 @@ public void shouldWorkWithRebalance() throws InterruptedException, IOException { Thread.sleep(1000); // add a new client - final SmokeTestClient smokeTestClient = new SmokeTestClient("streams" + numClientsCreated++); + final SmokeTestClient smokeTestClient = new SmokeTestClient("streams-" + numClientsCreated++); clients.add(smokeTestClient); smokeTestClient.start(props); @@ -88,16 +104,27 @@ public void shouldWorkWithRebalance() throws InterruptedException, IOException { clients.remove(0).closeAsync(); } } - driver.join(); - Assert.assertNull(driver.exception()); - Assert.assertTrue(driver.success()); - } finally { - for (final SmokeTestClient client : clients) { - client.closeAsync(); - } - for (final SmokeTestClient client : clients) { - client.close(); + try { + // wait for verification to finish + driver.join(); + + // check to make sure that it actually succeeded + Assert.assertNull(driver.exception()); + Assert.assertTrue(driver.success()); + + } finally { + // whether or not the assertions failed, tell all the streams instances to stop + for (final SmokeTestClient client : clients) { + client.closeAsync(); + } + + // then, wait for them to stop + for (final SmokeTestClient client : clients) { + client.close(); + } } + + // When the try-with-resources block exits, Java will close the EmbeddedKafkaCluster } } diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsSmokeTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsSmokeTest.java index 1de15b78d9225..4c2f6d20d8459 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsSmokeTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsSmokeTest.java @@ -24,7 +24,6 @@ import java.util.Properties; import java.util.Set; import java.util.UUID; -import java.util.concurrent.atomic.AtomicInteger; public class StreamsSmokeTest { From f949650a9e6f76d3dffd77b1efc0afdeb92de783 Mon Sep 17 00:00:00 2001 From: John Roesler Date: Tue, 5 Feb 2019 13:39:58 -0600 Subject: [PATCH 05/14] refactor --- .../tests/SmokeTestDriverIntegrationTest.java | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriverIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriverIntegrationTest.java index 5bfa2886d2dce..67e0382989c37 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriverIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriverIntegrationTest.java @@ -66,11 +66,15 @@ boolean success() { } @Test - public void shouldWorkWithRebalance() throws InterruptedException, IOException { + public void shouldWorkWithRebalance() throws InterruptedException { int numClientsCreated = 0; final ArrayList clients = new ArrayList<>(); try (final EmbeddedKafkaCluster embeddedKafkaCluster = new EmbeddedKafkaCluster(3)) { - embeddedKafkaCluster.start(); + try { + embeddedKafkaCluster.start(); + } catch (final IOException e) { + throw new RuntimeException(e); + } embeddedKafkaCluster.createTopics("data", "echo", "max", "min", "dif", "sum", "cnt", "avg", "wcnt", "tagg"); final String bootstrapServers = embeddedKafkaCluster.bootstrapServers(); @@ -84,13 +88,6 @@ public void shouldWorkWithRebalance() throws InterruptedException, IOException { // cycle out Streams instances as long as the test is running. while (driver.isAlive()) { - // wait for the last added client to start - if (!clients.isEmpty()) { - while (!clients.get(clients.size() - 1).started()) { - Thread.sleep(100); - } - } - // take a nap Thread.sleep(1000); @@ -99,6 +96,10 @@ public void shouldWorkWithRebalance() throws InterruptedException, IOException { clients.add(smokeTestClient); smokeTestClient.start(props); + while (!clients.get(clients.size() - 1).started()) { + Thread.sleep(100); + } + // let the oldest client die of "natural causes" if (clients.size() >= 3) { clients.remove(0).closeAsync(); From 80bd3f10ba94711b42497e47ff4b5b03c7824507 Mon Sep 17 00:00:00 2001 From: John Roesler Date: Tue, 5 Feb 2019 13:43:30 -0600 Subject: [PATCH 06/14] remove redundant printlns --- .../java/org/apache/kafka/streams/tests/SmokeTestClient.java | 4 ---- 1 file changed, 4 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java index 0ce58fbf2dea8..cb4ceb33a148a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java @@ -60,7 +60,6 @@ public void start(final Properties streamsProperties) { streams = createKafkaStreams(streamsProperties); streams.setUncaughtExceptionHandler((t, e) -> { System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION"); - System.out.println("SMOKE-TEST-CLIENT-EXCEPTION"); uncaughtException = true; e.printStackTrace(); }); @@ -80,14 +79,12 @@ public void close() { // do not remove these printouts since they are needed for health scripts if (!uncaughtException) { System.out.println(name + ": SMOKE-TEST-CLIENT-CLOSED"); - System.out.println("SMOKE-TEST-CLIENT-CLOSED"); } try { thread.join(); } catch (final Exception ex) { // do not remove these printouts since they are needed for health scripts System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION"); - System.out.println("SMOKE-TEST-CLIENT-EXCEPTION"); // ignore } } @@ -212,7 +209,6 @@ private KafkaStreams createKafkaStreams(final Properties props) { }); streamsClient.setUncaughtExceptionHandler((t, e) -> { System.out.println(name + ": FATAL: An unexpected exception is encountered on thread " + t + ": " + e); - System.out.println("FATAL: An unexpected exception is encountered on thread " + t + ": " + e); streamsClient.close(Duration.ofSeconds(30)); }); From 69592814dea6abf8361d1474f1ca5db6fc260744 Mon Sep 17 00:00:00 2001 From: John Roesler Date: Tue, 5 Feb 2019 13:56:41 -0600 Subject: [PATCH 07/14] Update EmbeddedKafkaCluster.java --- .../kafka/streams/integration/utils/EmbeddedKafkaCluster.java | 1 - 1 file changed, 1 deletion(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java index 351c6d46a6f4a..f88cb1499f1ef 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java @@ -303,7 +303,6 @@ public void waitForRemainingTopics(final long timeoutMs, final String... topics) TestUtils.waitForCondition(new TopicsRemainingCondition(topics), timeoutMs, "Topics are not expected after " + timeoutMs + " milli seconds."); } - private final class TopicsDeletedCondition implements TestCondition { final Set deletedTopics = new HashSet<>(); From cae80e18f1e9f568654812df96510ba703ed8509 Mon Sep 17 00:00:00 2001 From: John Roesler Date: Tue, 5 Feb 2019 14:42:10 -0600 Subject: [PATCH 08/14] move to integration package --- .../SmokeTestDriverIntegrationTest.java | 3 ++- .../org/apache/kafka/streams/tests/SmokeTestClient.java | 4 ++-- .../org/apache/kafka/streams/tests/SmokeTestDriver.java | 8 ++++---- 3 files changed, 8 insertions(+), 7 deletions(-) rename streams/src/test/java/org/apache/kafka/streams/{tests => integration}/SmokeTestDriverIntegrationTest.java (97%) diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriverIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/SmokeTestDriverIntegrationTest.java similarity index 97% rename from streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriverIntegrationTest.java rename to streams/src/test/java/org/apache/kafka/streams/integration/SmokeTestDriverIntegrationTest.java index 67e0382989c37..cb71d77a34210 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriverIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/SmokeTestDriverIntegrationTest.java @@ -14,10 +14,11 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.kafka.streams.tests; +package org.apache.kafka.streams.integration; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; +import org.apache.kafka.streams.tests.SmokeTestClient; import org.junit.Assert; import org.junit.Test; diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java index cb4ceb33a148a..13eb397574192 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java @@ -47,7 +47,7 @@ public class SmokeTestClient extends SmokeTestUtil { private boolean uncaughtException = false; private boolean started; - SmokeTestClient(final String name) { + public SmokeTestClient(final String name) { super(); this.name = name; } @@ -70,7 +70,7 @@ public void start(final Properties streamsProperties) { thread.start(); } - void closeAsync() { + public void closeAsync() { streams.close(Duration.ZERO); } diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java index f88d5aa010153..87cf246ed13da 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java @@ -80,10 +80,10 @@ int next() { } } - static Map> generate(final String kafka, - final int numKeys, - final int maxRecordsPerKey, - final boolean autoTerminate) { + public static Map> generate(final String kafka, + final int numKeys, + final int maxRecordsPerKey, + final boolean autoTerminate) { final Properties producerProps = new Properties(); producerProps.put(ProducerConfig.CLIENT_ID_CONFIG, "SmokeTest"); producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka); From 913d8fa80e8471dd8a9803474918bf0f442af474 Mon Sep 17 00:00:00 2001 From: John Roesler Date: Wed, 6 Feb 2019 12:42:11 -0600 Subject: [PATCH 09/14] replace early-exit on pass --- .../kafka/streams/tests/SmokeTestDriver.java | 109 ++++++++++++------ 1 file changed, 76 insertions(+), 33 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java index 87cf246ed13da..fb157e28b8fba 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java @@ -34,6 +34,9 @@ import org.apache.kafka.common.utils.Exit; import org.apache.kafka.common.utils.Utils; +import java.io.ByteArrayOutputStream; +import java.io.PrintStream; +import java.nio.charset.StandardCharsets; import java.time.Duration; import java.util.ArrayList; import java.util.Collections; @@ -55,6 +58,8 @@ public class SmokeTestDriver extends SmokeTestUtil { + private static final int MAX_RECORD_EMPTY_RETRIES = 60; + private static class ValueList { public final String key; private final int[] values; @@ -217,12 +222,21 @@ public static boolean verify(final String kafka, final Map>> events = new HashMap<>(); + VerificationResult verificationResult = new VerificationResult(false, "no results yet"); + int retry = 0; final long start = System.currentTimeMillis(); while (System.currentTimeMillis() - start < TimeUnit.MINUTES.toMillis(6)) { - final ConsumerRecords records = consumer.poll(Duration.ofSeconds(30)); + final ConsumerRecords records = consumer.poll(Duration.ofMillis(500)); if (records.isEmpty() && recordsProcessed >= recordsGenerated) { - break; + verificationResult = verifyAll(inputs, events); + if (verificationResult.passed()) { + break; + } + if (retry++ > MAX_RECORD_EMPTY_RETRIES) { + break; + } } else { + retry = 0; for (final ConsumerRecord record : records) { final String key = record.key(); @@ -280,7 +294,13 @@ public static boolean verify(final String kafka, System.out.println("missedRecords=" + missedCount); } - success &= verifyAll(inputs, events); + // give it one more try if it's not already passing. + if (!verificationResult.passed()) { + verificationResult = verifyAll(inputs, events); + } + success &= verificationResult.passed(); + + System.out.println(verificationResult.result()); System.out.println(success ? "SUCCESS" : "FAILURE"); return success; @@ -331,34 +351,56 @@ private static Number deserializeValue(final ConsumerRecord reco return value; } - private static boolean verifyAll(final Map> inputs, - final Map>> events) { - final Map> observedInputEvents = events.get("data"); + private static class VerificationResult { + private final boolean passed; + private final String result; + + VerificationResult(final boolean passed, final String result) { + this.passed = passed; + this.result = result; + } + + boolean passed() { + return passed; + } + + String result() { + return result; + } + } + + private static VerificationResult verifyAll(final Map> inputs, + final Map>> events) { + final ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); + final PrintStream resultStream = new PrintStream(byteArrayOutputStream); boolean pass; - pass = verifyTAgg(inputs, events.get("tagg")); - pass &= verify("min", inputs, observedInputEvents, events, SmokeTestDriver::getMin); - pass &= verify("max", inputs, observedInputEvents, events, SmokeTestDriver::getMax); - pass &= verify("dif", inputs, observedInputEvents, events, key -> getMax(key).intValue() - getMin(key).intValue()); - pass &= verify("sum", inputs, observedInputEvents, events, SmokeTestDriver::getSum); - pass &= verify("cnt", inputs, observedInputEvents, events, key1 -> getMax(key1).intValue() - getMin(key1).intValue() + 1L); - pass &= verify("avg", inputs, observedInputEvents, events, SmokeTestDriver::getAvg); - return pass; + pass = verifyTAgg(resultStream, inputs, events.get("tagg")); + pass &= verify(resultStream, "min", inputs, events, SmokeTestDriver::getMin); + pass &= verify(resultStream, "max", inputs, events, SmokeTestDriver::getMax); + pass &= verify(resultStream, "dif", inputs, events, key -> getMax(key).intValue() - getMin(key).intValue()); + pass &= verify(resultStream, "sum", inputs, events, SmokeTestDriver::getSum); + pass &= verify(resultStream, "cnt", inputs, events, key1 -> getMax(key1).intValue() - getMin(key1).intValue() + 1L); + pass &= verify(resultStream, "avg", inputs, events, SmokeTestDriver::getAvg); + resultStream.close(); + return new VerificationResult(pass, new String(byteArrayOutputStream.toByteArray(), StandardCharsets.UTF_8)); } - private static boolean verify(final String topicName, + private static boolean verify(final PrintStream resultStream, + final String topicName, final Map> inputData, - final Map> consumedInputEvents, - final Map>> allEvents, + final Map>> events, final Function keyToExpectation) { - final Map> outputEvents = allEvents.get(topicName); + final Map> consumedInputEvents = events.get("data"); + + final Map> outputEvents = events.get(topicName); if (outputEvents.isEmpty()) { - System.out.println(topicName + " is empty"); + resultStream.println(topicName + " is empty"); return false; } else { - System.out.println("verifying " + topicName); + resultStream.println("verifying " + topicName); if (outputEvents.size() != inputData.size()) { - System.out.println("fail: resultCount=" + outputEvents.size() + " expectedCount=" + inputData.size()); + resultStream.println("fail: resultCount=" + outputEvents.size() + " expectedCount=" + inputData.size()); return false; } for (final Map.Entry> entry : outputEvents.entrySet()) { @@ -366,13 +408,13 @@ private static boolean verify(final String topicName, final Number expected = keyToExpectation.apply(key); final Number actual = entry.getValue().getLast(); if (!expected.equals(actual)) { - System.out.printf("fail: key=%s %s=%s expected=%s%n\t inputEvents=%s%n\toutputEvents=%s%n", - key, - topicName, - actual, - expected, - consumedInputEvents.get(key), - entry.getValue()); + resultStream.printf("fail: key=%s %s=%s expected=%s%n\t inputEvents=%s%n\toutputEvents=%s%n", + key, + topicName, + actual, + expected, + consumedInputEvents.get(key), + entry.getValue()); return false; } } @@ -393,13 +435,14 @@ private static Double getAvg(final String key) { } - private static boolean verifyTAgg(final Map> allData, + private static boolean verifyTAgg(final PrintStream resultStream, + final Map> allData, final Map> taggEvents) { if (taggEvents.isEmpty()) { - System.out.println("tagg is empty"); + resultStream.println("tagg is empty"); return false; } else { - System.out.println("verifying tagg"); + resultStream.println("verifying tagg"); // generate expected answer final Map expected = new HashMap<>(); @@ -420,8 +463,8 @@ private static boolean verifyTAgg(final Map> allData, } if (entry.getValue().getLast().longValue() != expectedCount) { - System.out.println("fail: key=" + key + " tagg=" + entry.getValue() + " expected=" + expected.get(key)); - System.out.println("\t outputEvents: " + entry.getValue()); + resultStream.println("fail: key=" + key + " tagg=" + entry.getValue() + " expected=" + expected.get(key)); + resultStream.println("\t outputEvents: " + entry.getValue()); return false; } } From dcd1f082d67b7a0239eb34022b573069df62ae90 Mon Sep 17 00:00:00 2001 From: John Roesler Date: Wed, 6 Feb 2019 12:49:53 -0600 Subject: [PATCH 10/14] use classrule for embedded kafka --- .../SmokeTestDriverIntegrationTest.java | 102 +++++++++--------- .../utils/EmbeddedKafkaCluster.java | 7 +- .../kafka/streams/tests/SmokeTestDriver.java | 14 +-- 3 files changed, 58 insertions(+), 65 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/SmokeTestDriverIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/SmokeTestDriverIntegrationTest.java index cb71d77a34210..5e646d057d555 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/SmokeTestDriverIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/SmokeTestDriverIntegrationTest.java @@ -19,10 +19,11 @@ import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; import org.apache.kafka.streams.tests.SmokeTestClient; +import org.apache.kafka.streams.tests.SmokeTestDriver; import org.junit.Assert; +import org.junit.ClassRule; import org.junit.Test; -import java.io.IOException; import java.util.ArrayList; import java.util.Map; import java.util.Properties; @@ -32,12 +33,16 @@ import static org.apache.kafka.streams.tests.SmokeTestDriver.verify; public class SmokeTestDriverIntegrationTest { + @ClassRule + public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(3); + + private static class Driver extends Thread { private String bootstrapServers; private int numKeys; private int maxRecordsPerKey; private Exception exception = null; - private boolean success; + private SmokeTestDriver.VerificationResult result; private Driver(final String bootstrapServers, final int numKeys, final int maxRecordsPerKey) { this.bootstrapServers = bootstrapServers; @@ -49,7 +54,7 @@ private Driver(final String bootstrapServers, final int numKeys, final int maxRe public void run() { try { final Map> allData = generate(bootstrapServers, numKeys, maxRecordsPerKey, true); - success = verify(bootstrapServers, allData, maxRecordsPerKey); + result = verify(bootstrapServers, allData, maxRecordsPerKey); } catch (final Exception ex) { this.exception = ex; @@ -60,8 +65,8 @@ public Exception exception() { return exception; } - boolean success() { - return success; + SmokeTestDriver.VerificationResult result() { + return result; } } @@ -70,64 +75,57 @@ boolean success() { public void shouldWorkWithRebalance() throws InterruptedException { int numClientsCreated = 0; final ArrayList clients = new ArrayList<>(); - try (final EmbeddedKafkaCluster embeddedKafkaCluster = new EmbeddedKafkaCluster(3)) { - try { - embeddedKafkaCluster.start(); - } catch (final IOException e) { - throw new RuntimeException(e); - } - embeddedKafkaCluster.createTopics("data", "echo", "max", "min", "dif", "sum", "cnt", "avg", "wcnt", "tagg"); - final String bootstrapServers = embeddedKafkaCluster.bootstrapServers(); - final Driver driver = new Driver(bootstrapServers, 10, 1000); - driver.start(); - System.out.println("started streams"); + CLUSTER.createTopics("data", "echo", "max", "min", "dif", "sum", "cnt", "avg", "wcnt", "tagg"); + final String bootstrapServers = CLUSTER.bootstrapServers(); + final Driver driver = new Driver(bootstrapServers, 10, 1000); + driver.start(); + System.out.println("started streams"); - final Properties props = new Properties(); - props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); - // cycle out Streams instances as long as the test is running. - while (driver.isAlive()) { - // take a nap - Thread.sleep(1000); + final Properties props = new Properties(); + props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); - // add a new client - final SmokeTestClient smokeTestClient = new SmokeTestClient("streams-" + numClientsCreated++); - clients.add(smokeTestClient); - smokeTestClient.start(props); + // cycle out Streams instances as long as the test is running. + while (driver.isAlive()) { + // take a nap + Thread.sleep(1000); - while (!clients.get(clients.size() - 1).started()) { - Thread.sleep(100); - } + // add a new client + final SmokeTestClient smokeTestClient = new SmokeTestClient("streams-" + numClientsCreated++); + clients.add(smokeTestClient); + smokeTestClient.start(props); - // let the oldest client die of "natural causes" - if (clients.size() >= 3) { - clients.remove(0).closeAsync(); - } + while (!clients.get(clients.size() - 1).started()) { + Thread.sleep(100); } - try { - // wait for verification to finish - driver.join(); - - // check to make sure that it actually succeeded - Assert.assertNull(driver.exception()); - Assert.assertTrue(driver.success()); - - } finally { - // whether or not the assertions failed, tell all the streams instances to stop - for (final SmokeTestClient client : clients) { - client.closeAsync(); - } - - // then, wait for them to stop - for (final SmokeTestClient client : clients) { - client.close(); - } + + // let the oldest client die of "natural causes" + if (clients.size() >= 3) { + clients.remove(0).closeAsync(); + } + } + try { + // wait for verification to finish + driver.join(); + + + } finally { + // whether or not the assertions failed, tell all the streams instances to stop + for (final SmokeTestClient client : clients) { + client.closeAsync(); } - // When the try-with-resources block exits, Java will close the EmbeddedKafkaCluster + // then, wait for them to stop + for (final SmokeTestClient client : clients) { + client.close(); + } } + + // check to make sure that it actually succeeded + Assert.assertNull(driver.exception()); + Assert.assertTrue(driver.result().passed()); } } diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java index f88cb1499f1ef..3aff0a2336ab9 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java @@ -42,7 +42,7 @@ /** * Runs an in-memory, "embedded" Kafka cluster with 1 ZooKeeper instance and supplied number of Kafka brokers. */ -public class EmbeddedKafkaCluster extends ExternalResource implements AutoCloseable { +public class EmbeddedKafkaCluster extends ExternalResource { private static final Logger log = LoggerFactory.getLogger(EmbeddedKafkaCluster.class); private static final int DEFAULT_BROKER_PORT = 0; // 0 results in a random port being selected @@ -151,11 +151,6 @@ protected void after() { stop(); } - @Override - public void close() { - stop(); - } - /** * Create multiple Kafka topics each with 1 partition and a replication factor of 1. * diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java index fb157e28b8fba..f9560e6ddcb8f 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java @@ -199,9 +199,9 @@ private static void shuffle(final int[] data, @SuppressWarnings("SameParameterVa } } - public static boolean verify(final String kafka, - final Map> inputs, - final int maxRecordsPerKey) { + public static VerificationResult verify(final String kafka, + final Map> inputs, + final int maxRecordsPerKey) { final Properties props = new Properties(); props.put(ConsumerConfig.CLIENT_ID_CONFIG, "verifier"); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka); @@ -303,7 +303,7 @@ public static boolean verify(final String kafka, System.out.println(verificationResult.result()); System.out.println(success ? "SUCCESS" : "FAILURE"); - return success; + return verificationResult; } private static Number deserializeValue(final ConsumerRecord record) { @@ -351,7 +351,7 @@ private static Number deserializeValue(final ConsumerRecord reco return value; } - private static class VerificationResult { + public static class VerificationResult { private final boolean passed; private final String result; @@ -360,11 +360,11 @@ private static class VerificationResult { this.result = result; } - boolean passed() { + public boolean passed() { return passed; } - String result() { + public String result() { return result; } } From 512ea92c3a235965893d13f90081dd388b8e0cdf Mon Sep 17 00:00:00 2001 From: John Roesler Date: Tue, 12 Feb 2019 16:21:52 -0600 Subject: [PATCH 11/14] pull in smoke test improvements from side branch --- .../SmokeTestDriverIntegrationTest.java | 11 +- .../kafka/streams/tests/SmokeTestClient.java | 32 ++-- .../kafka/streams/tests/SmokeTestDriver.java | 179 ++++++++++-------- 3 files changed, 126 insertions(+), 96 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/SmokeTestDriverIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/SmokeTestDriverIntegrationTest.java index 5e646d057d555..82f86c28b1dfb 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/SmokeTestDriverIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/SmokeTestDriverIntegrationTest.java @@ -76,12 +76,12 @@ public void shouldWorkWithRebalance() throws InterruptedException { int numClientsCreated = 0; final ArrayList clients = new ArrayList<>(); - CLUSTER.createTopics("data", "echo", "max", "min", "dif", "sum", "cnt", "avg", "wcnt", "tagg"); + CLUSTER.createTopics(SmokeTestDriver.topics()); final String bootstrapServers = CLUSTER.bootstrapServers(); final Driver driver = new Driver(bootstrapServers, 10, 1000); driver.start(); - System.out.println("started streams"); + System.out.println("started driver"); final Properties props = new Properties(); @@ -124,8 +124,11 @@ public void shouldWorkWithRebalance() throws InterruptedException { } // check to make sure that it actually succeeded - Assert.assertNull(driver.exception()); - Assert.assertTrue(driver.result().passed()); + if (driver.exception() != null) { + driver.exception().printStackTrace(); + throw new AssertionError(driver.exception()); + } + Assert.assertTrue(driver.result().result(), driver.result().passed()); } } diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java index 13eb397574192..a8f974a3cc0c6 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java @@ -23,6 +23,7 @@ import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.kstream.Grouped; import org.apache.kafka.streams.kstream.KGroupedStream; @@ -106,6 +107,23 @@ private Properties getStreamsConfig(final Properties props) { } private KafkaStreams createKafkaStreams(final Properties props) { + final Topology build = getTopology(); + final KafkaStreams streamsClient = new KafkaStreams(build, getStreamsConfig(props)); + streamsClient.setStateListener((newState, oldState) -> { + System.out.printf("%s: %s -> %s%n", name, oldState, newState); + if (oldState == KafkaStreams.State.REBALANCING && newState == KafkaStreams.State.RUNNING) { + started = true; + } + }); + streamsClient.setUncaughtExceptionHandler((t, e) -> { + System.out.println(name + ": FATAL: An unexpected exception is encountered on thread " + t + ": " + e); + streamsClient.close(Duration.ofSeconds(30)); + }); + + return streamsClient; + } + + public Topology getTopology() { final StreamsBuilder builder = new StreamsBuilder(); final Consumed stringIntConsumed = Consumed.with(stringSerde, intSerde); final KStream source = builder.stream("data", stringIntConsumed); @@ -200,18 +218,6 @@ private KafkaStreams createKafkaStreams(final Properties props) { .toStream() .to("tagg", Produced.with(stringSerde, longSerde)); - final KafkaStreams streamsClient = new KafkaStreams(builder.build(), getStreamsConfig(props)); - streamsClient.setStateListener((newState, oldState) -> { - System.out.printf("%s: %s -> %s%n", name, oldState, newState); - if (oldState == KafkaStreams.State.REBALANCING && newState == KafkaStreams.State.RUNNING) { - started = true; - } - }); - streamsClient.setUncaughtExceptionHandler((t, e) -> { - System.out.println(name + ": FATAL: An unexpected exception is encountered on thread " + t + ": " + e); - streamsClient.close(Duration.ofSeconds(30)); - }); - - return streamsClient; + return builder.build(); } } diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java index f9560e6ddcb8f..bc3a4f6a4b81d 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java @@ -28,8 +28,8 @@ import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.TimeoutException; -import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.utils.Exit; import org.apache.kafka.common.utils.Utils; @@ -38,7 +38,9 @@ import java.io.PrintStream; import java.nio.charset.StandardCharsets; import java.time.Duration; +import java.time.Instant; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -54,11 +56,23 @@ import java.util.stream.Collectors; import java.util.stream.Stream; +import static java.util.Collections.emptyMap; import static org.apache.kafka.common.utils.Utils.mkEntry; public class SmokeTestDriver extends SmokeTestUtil { - - private static final int MAX_RECORD_EMPTY_RETRIES = 60; + private static final String[] TOPICS = new String[] { + "data", + "echo", + "max", + "min", "min-suppressed", "min-raw", + "dif", + "sum", + "cnt", + "avg", + "tagg" + }; + + private static final int MAX_RECORD_EMPTY_RETRIES = 30; private static class ValueList { public final String key; @@ -85,6 +99,10 @@ int next() { } } + public static String[] topics() { + return Arrays.copyOf(TOPICS, TOPICS.length); + } + public static Map> generate(final String kafka, final int numKeys, final int maxRecordsPerKey, @@ -148,6 +166,7 @@ public static Map> generate(final String kafka, while (!needRetry.isEmpty()) { final List> needRetry2 = new ArrayList<>(); for (final ProducerRecord record : needRetry) { + System.out.println("retry producing " + stringSerde.deserializer().deserialize("", record.key())); producer.send(record, new TestCallback(record, needRetry2)); } producer.flush(); @@ -199,6 +218,46 @@ private static void shuffle(final int[] data, @SuppressWarnings("SameParameterVa } } + public static class NumberDeserializer implements Deserializer { + + @Override + public void configure(final Map configs, final boolean isKey) { + + } + + @Override + public Number deserialize(final String topic, final byte[] data) { + final Number value; + switch (topic) { + case "data": + case "echo": + case "min": + case "min-raw": + case "min-suppressed": + case "max": + case "dif": + value = intSerde.deserializer().deserialize(topic, data); + break; + case "sum": + case "cnt": + case "tagg": + value = longSerde.deserializer().deserialize(topic, data); + break; + case "avg": + value = doubleSerde.deserializer().deserialize(topic, data); + break; + default: + throw new RuntimeException("unknown topic: " + topic); + } + return value; + } + + @Override + public void close() { + + } + } + public static VerificationResult verify(final String kafka, final Map> inputs, final int maxRecordsPerKey) { @@ -206,38 +265,39 @@ public static VerificationResult verify(final String kafka, props.put(ConsumerConfig.CLIENT_ID_CONFIG, "verifier"); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); - props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, NumberDeserializer.class); - final KafkaConsumer consumer = new KafkaConsumer<>(props); - final String[] topics = {"data", "echo", "max", "min", "dif", "sum", "cnt", "avg", "tagg"}; - final List partitions = getAllPartitions(consumer, topics); + final KafkaConsumer consumer = new KafkaConsumer<>(props); + final List partitions = getAllPartitions(consumer, TOPICS); consumer.assign(partitions); consumer.seekToBeginning(partitions); final int recordsGenerated = inputs.size() * maxRecordsPerKey; int recordsProcessed = 0; final Map processed = - Stream.of(topics) + Stream.of(TOPICS) .collect(Collectors.toMap(t -> t, t -> new AtomicInteger(0))); - final Map>> events = new HashMap<>(); + final Map>>> events = new HashMap<>(); VerificationResult verificationResult = new VerificationResult(false, "no results yet"); int retry = 0; final long start = System.currentTimeMillis(); while (System.currentTimeMillis() - start < TimeUnit.MINUTES.toMillis(6)) { - final ConsumerRecords records = consumer.poll(Duration.ofMillis(500)); + final ConsumerRecords records = consumer.poll(Duration.ofSeconds(1)); if (records.isEmpty() && recordsProcessed >= recordsGenerated) { verificationResult = verifyAll(inputs, events); if (verificationResult.passed()) { break; - } - if (retry++ > MAX_RECORD_EMPTY_RETRIES) { + } else if (retry++ > MAX_RECORD_EMPTY_RETRIES) { + System.out.println(Instant.now() + " Didn't get any more results, verification hasn't passed, and out of retries."); break; + } else { + System.out.println(Instant.now() + " Didn't get any more results, but verification hasn't passed (yet). Retrying..."); } } else { retry = 0; - for (final ConsumerRecord record : records) { + for (final ConsumerRecord record : records) { final String key = record.key(); final String topic = record.topic(); @@ -252,7 +312,7 @@ public static VerificationResult verify(final String kafka, events.computeIfAbsent(topic, t -> new HashMap<>()) .computeIfAbsent(key, k -> new LinkedList<>()) - .add(deserializeValue(record)); + .add(record); } System.out.println(processed); @@ -275,11 +335,14 @@ public static VerificationResult verify(final String kafka, boolean success; - final Map> received = + final Map> received = events.get("echo") .entrySet() .stream() - .map(entry -> mkEntry(entry.getKey(), new HashSet<>(entry.getValue()))) + .map(entry -> mkEntry( + entry.getKey(), + entry.getValue().stream().map(ConsumerRecord::value).collect(Collectors.toSet())) + ) .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); success = inputs.equals(received); @@ -306,51 +369,6 @@ public static VerificationResult verify(final String kafka, return verificationResult; } - private static Number deserializeValue(final ConsumerRecord record) { - final Number value; - switch (record.topic()) { - case "data": { - value = intSerde.deserializer().deserialize("", record.value()); - break; - } - case "echo": { - value = intSerde.deserializer().deserialize("", record.value()); - break; - } - case "min": { - value = intSerde.deserializer().deserialize("", record.value()); - break; - } - case "max": { - value = intSerde.deserializer().deserialize("", record.value()); - break; - } - case "dif": { - value = intSerde.deserializer().deserialize("", record.value()); - break; - } - case "sum": { - value = longSerde.deserializer().deserialize("", record.value()); - break; - } - case "cnt": { - value = longSerde.deserializer().deserialize("", record.value()); - break; - } - case "avg": { - value = doubleSerde.deserializer().deserialize("", record.value()); - break; - } - case "tagg": { - value = longSerde.deserializer().deserialize("", record.value()); - break; - } - default: - throw new RuntimeException("unknown topic: " + record.topic()); - } - return value; - } - public static class VerificationResult { private final boolean passed; private final String result; @@ -370,7 +388,7 @@ public String result() { } private static VerificationResult verifyAll(final Map> inputs, - final Map>> events) { + final Map>>> events) { final ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); final PrintStream resultStream = new PrintStream(byteArrayOutputStream); boolean pass; @@ -386,34 +404,34 @@ private static VerificationResult verifyAll(final Map> inpu } private static boolean verify(final PrintStream resultStream, - final String topicName, + final String topic, final Map> inputData, - final Map>> events, + final Map>>> events, final Function keyToExpectation) { - final Map> consumedInputEvents = events.get("data"); - - final Map> outputEvents = events.get(topicName); + final Map>> observedInputEvents = events.get("data"); + final Map>> outputEvents = events.getOrDefault(topic, emptyMap()); if (outputEvents.isEmpty()) { - resultStream.println(topicName + " is empty"); + resultStream.println(topic + " is empty"); return false; } else { - resultStream.println("verifying " + topicName); + resultStream.printf("verifying %s with %d keys%n", topic, outputEvents.size()); if (outputEvents.size() != inputData.size()) { - resultStream.println("fail: resultCount=" + outputEvents.size() + " expectedCount=" + inputData.size()); + resultStream.printf("fail: resultCount=%d expectedCount=%s%n\tresult=%s%n\texpected=%s%n", + outputEvents.size(), inputData.size(), outputEvents.keySet(), inputData.keySet()); return false; } - for (final Map.Entry> entry : outputEvents.entrySet()) { + for (final Map.Entry>> entry : outputEvents.entrySet()) { final String key = entry.getKey(); final Number expected = keyToExpectation.apply(key); - final Number actual = entry.getValue().getLast(); + final Number actual = entry.getValue().getLast().value(); if (!expected.equals(actual)) { - resultStream.printf("fail: key=%s %s=%s expected=%s%n\t inputEvents=%s%n\toutputEvents=%s%n", + resultStream.printf("%s fail: key=%s actual=%s expected=%s%n\t inputEvents=%s%n\toutputEvents=%s%n", + topic, key, - topicName, actual, expected, - consumedInputEvents.get(key), + observedInputEvents.get(key), entry.getValue()); return false; } @@ -437,8 +455,11 @@ private static Double getAvg(final String key) { private static boolean verifyTAgg(final PrintStream resultStream, final Map> allData, - final Map> taggEvents) { - if (taggEvents.isEmpty()) { + final Map>> taggEvents) { + if (taggEvents == null) { + resultStream.println("tagg is missing"); + return false; + } else if (taggEvents.isEmpty()) { resultStream.println("tagg is empty"); return false; } else { @@ -455,14 +476,14 @@ private static boolean verifyTAgg(final PrintStream resultStream, } // check the result - for (final Map.Entry> entry : taggEvents.entrySet()) { + for (final Map.Entry>> entry : taggEvents.entrySet()) { final String key = entry.getKey(); Long expectedCount = expected.remove(key); if (expectedCount == null) { expectedCount = 0L; } - if (entry.getValue().getLast().longValue() != expectedCount) { + if (entry.getValue().getLast().value().longValue() != expectedCount) { resultStream.println("fail: key=" + key + " tagg=" + entry.getValue() + " expected=" + expected.get(key)); resultStream.println("\t outputEvents: " + entry.getValue()); return false; From d3ba59ff8c3c1327c0c5cbe335a1bc59141a6321 Mon Sep 17 00:00:00 2001 From: John Roesler Date: Tue, 12 Feb 2019 16:28:44 -0600 Subject: [PATCH 12/14] try-with-resources --- .../kafka/streams/tests/SmokeTestDriver.java | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java index bc3a4f6a4b81d..6fbb01519f2cb 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java @@ -390,16 +390,16 @@ public String result() { private static VerificationResult verifyAll(final Map> inputs, final Map>>> events) { final ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); - final PrintStream resultStream = new PrintStream(byteArrayOutputStream); boolean pass; - pass = verifyTAgg(resultStream, inputs, events.get("tagg")); - pass &= verify(resultStream, "min", inputs, events, SmokeTestDriver::getMin); - pass &= verify(resultStream, "max", inputs, events, SmokeTestDriver::getMax); - pass &= verify(resultStream, "dif", inputs, events, key -> getMax(key).intValue() - getMin(key).intValue()); - pass &= verify(resultStream, "sum", inputs, events, SmokeTestDriver::getSum); - pass &= verify(resultStream, "cnt", inputs, events, key1 -> getMax(key1).intValue() - getMin(key1).intValue() + 1L); - pass &= verify(resultStream, "avg", inputs, events, SmokeTestDriver::getAvg); - resultStream.close(); + try (final PrintStream resultStream = new PrintStream(byteArrayOutputStream)) { + pass = verifyTAgg(resultStream, inputs, events.get("tagg")); + pass &= verify(resultStream, "min", inputs, events, SmokeTestDriver::getMin); + pass &= verify(resultStream, "max", inputs, events, SmokeTestDriver::getMax); + pass &= verify(resultStream, "dif", inputs, events, key -> getMax(key).intValue() - getMin(key).intValue()); + pass &= verify(resultStream, "sum", inputs, events, SmokeTestDriver::getSum); + pass &= verify(resultStream, "cnt", inputs, events, key1 -> getMax(key1).intValue() - getMin(key1).intValue() + 1L); + pass &= verify(resultStream, "avg", inputs, events, SmokeTestDriver::getAvg); + } return new VerificationResult(pass, new String(byteArrayOutputStream.toByteArray(), StandardCharsets.UTF_8)); } From 59d84752b331781fd7b84cbc4517928ee880993c Mon Sep 17 00:00:00 2001 From: John Roesler Date: Wed, 13 Feb 2019 10:11:06 -0600 Subject: [PATCH 13/14] format events instead of printing long lines --- .../apache/kafka/streams/tests/SmokeTestDriver.java | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java index 6fbb01519f2cb..466109874602a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java @@ -431,8 +431,8 @@ private static boolean verify(final PrintStream resultStream, key, actual, expected, - observedInputEvents.get(key), - entry.getValue()); + indent("\t\t", observedInputEvents.get(key)), + indent("\t\t", entry.getValue())); return false; } } @@ -440,6 +440,15 @@ private static boolean verify(final PrintStream resultStream, } } + private static String indent(@SuppressWarnings("SameParameterValue") final String prefix, + final LinkedList> list) { + final StringBuilder stringBuilder = new StringBuilder(); + for (final ConsumerRecord record : list) { + stringBuilder.append(prefix).append(record).append('\n'); + } + return stringBuilder.toString(); + } + private static Long getSum(final String key) { final int min = getMin(key).intValue(); final int max = getMax(key).intValue(); From c5633477428c1b393d469431d92e5b3c61a2d03c Mon Sep 17 00:00:00 2001 From: John Roesler Date: Thu, 14 Feb 2019 12:07:43 -0600 Subject: [PATCH 14/14] minor formatting fix --- .../java/org/apache/kafka/streams/tests/SmokeTestDriver.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java index 466109874602a..de40818288666 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java @@ -426,7 +426,7 @@ private static boolean verify(final PrintStream resultStream, final Number expected = keyToExpectation.apply(key); final Number actual = entry.getValue().getLast().value(); if (!expected.equals(actual)) { - resultStream.printf("%s fail: key=%s actual=%s expected=%s%n\t inputEvents=%s%n\toutputEvents=%s%n", + resultStream.printf("%s fail: key=%s actual=%s expected=%s%n\t inputEvents=%n%s%n\toutputEvents=%n%s%n", topic, key, actual,