From 37146f4cd2ae8c09330d2be45023ff2e5a6d0eb8 Mon Sep 17 00:00:00 2001 From: "Matthias J. Sax" Date: Mon, 30 Mar 2020 18:58:48 -0700 Subject: [PATCH 01/15] MINOR: extend Kafka Streams EOS system test --- .../processor/internals/StandbyTask.java | 2 +- .../processor/internals/TaskManager.java | 4 +- .../kafka/streams/tests/EosTestClient.java | 94 ++----- .../kafka/streams/tests/EosTestDriver.java | 241 ++++++++++-------- .../kafka/streams/tests/SmokeTestUtil.java | 63 +++-- .../kafka/streams/tests/StreamsEosTest.java | 10 + tests/kafkatest/services/streams.py | 25 +- tests/kafkatest/services/streams_property.py | 1 + .../tests/streams/streams_eos_test.py | 41 +-- 9 files changed, 247 insertions(+), 234 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java index aecbe2fa67caf..7bf89ce37ef75 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java @@ -72,7 +72,7 @@ public class StandbyTask extends AbstractTask implements Task { processorContext = new StandbyContextImpl(id, config, stateMgr, metrics); closeTaskSensor = ThreadMetrics.closeTaskSensor(Thread.currentThread().getName(), metrics); - this.eosEnabled = StreamsConfig.EXACTLY_ONCE.equals(config.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG)); + eosEnabled = StreamThread.eosEnabled(config); } @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java index 97205e4435faa..f0e294a43a228 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java @@ -758,7 +758,9 @@ private int commitInternal(final Collection tasks) { } } - commitOffsetsOrTransaction(consumedOffsetsAndMetadataPerTask); + if (!consumedOffsetsAndMetadataPerTask.isEmpty()) { + commitOffsetsOrTransaction(consumedOffsetsAndMetadataPerTask); + } for (final Task task : tasks) { if (task.commitNeeded()) { diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/EosTestClient.java b/streams/src/test/java/org/apache/kafka/streams/tests/EosTestClient.java index 6e7afa8a6c2d9..092ad20f2a745 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/EosTestClient.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/EosTestClient.java @@ -16,21 +16,17 @@ */ package org.apache.kafka.streams.tests; -import java.time.Duration; import org.apache.kafka.common.serialization.Serdes; -import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.common.utils.Exit; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; -import org.apache.kafka.streams.kstream.Aggregator; -import org.apache.kafka.streams.kstream.Initializer; import org.apache.kafka.streams.kstream.KGroupedStream; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.kstream.Produced; -import org.apache.kafka.streams.state.KeyValueStore; +import java.time.Duration; import java.util.Properties; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -75,26 +71,20 @@ public void start() { uncaughtException = false; streams = createKafkaStreams(properties); - streams.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { - @Override - public void uncaughtException(final Thread t, final Throwable e) { - System.out.println(System.currentTimeMillis()); - System.out.println("EOS-TEST-CLIENT-EXCEPTION"); - e.printStackTrace(); - System.out.flush(); - uncaughtException = true; - } + streams.setUncaughtExceptionHandler((t, e) -> { + System.out.println(System.currentTimeMillis()); + System.out.println("EOS-TEST-CLIENT-EXCEPTION"); + e.printStackTrace(); + System.out.flush(); + uncaughtException = true; }); - streams.setStateListener(new KafkaStreams.StateListener() { - @Override - public void onChange(final KafkaStreams.State newState, final KafkaStreams.State oldState) { - // don't remove this -- it's required test output - System.out.println(System.currentTimeMillis()); - System.out.println("StateChange: " + oldState + " -> " + newState); - System.out.flush(); - if (newState == KafkaStreams.State.NOT_RUNNING) { - notRunningCallbackReceived.set(true); - } + streams.setStateListener((newState, oldState) -> { + // don't remove this -- it's required test output + System.out.println(System.currentTimeMillis()); + System.out.println("StateChange: " + oldState + " -> " + newState); + System.out.flush(); + if (newState == KafkaStreams.State.NOT_RUNNING) { + notRunningCallbackReceived.set(true); } }); streams.start(); @@ -112,8 +102,8 @@ private KafkaStreams createKafkaStreams(final Properties props) { props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1); props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 2); props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3); - props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE); props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); + props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 5000); // increase commit interval to make sure a client is killed having an open transaction props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass()); @@ -127,41 +117,17 @@ private KafkaStreams createKafkaStreams(final Properties props) { // min groupedData .aggregate( - new Initializer() { - @Override - public Integer apply() { - return Integer.MAX_VALUE; - } - }, - new Aggregator() { - @Override - public Integer apply(final String aggKey, - final Integer value, - final Integer aggregate) { - return (value < aggregate) ? value : aggregate; - } - }, - Materialized.>with(null, intSerde)) + () -> Integer.MAX_VALUE, + (aggKey, value, aggregate) -> (value < aggregate) ? value : aggregate, + Materialized.with(null, intSerde)) .toStream() .to("min", Produced.with(stringSerde, intSerde)); // sum groupedData.aggregate( - new Initializer() { - @Override - public Long apply() { - return 0L; - } - }, - new Aggregator() { - @Override - public Long apply(final String aggKey, - final Integer value, - final Long aggregate) { - return (long) value + aggregate; - } - }, - Materialized.>with(null, longSerde)) + () -> 0L, + (aggKey, value, aggregate) -> (long) value + aggregate, + Materialized.with(null, longSerde)) .toStream() .to("sum", Produced.with(stringSerde, longSerde)); @@ -174,21 +140,9 @@ public Long apply(final String aggKey, // max groupedDataAfterRepartitioning .aggregate( - new Initializer() { - @Override - public Integer apply() { - return Integer.MIN_VALUE; - } - }, - new Aggregator() { - @Override - public Integer apply(final String aggKey, - final Integer value, - final Integer aggregate) { - return (value > aggregate) ? value : aggregate; - } - }, - Materialized.>with(null, intSerde)) + () -> Integer.MIN_VALUE, + (aggKey, value, aggregate) -> (value > aggregate) ? value : aggregate, + Materialized.with(null, intSerde)) .toStream() .to("max", Produced.with(stringSerde, intSerde)); diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java index 7dc006f092096..af28380f3348f 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java @@ -30,7 +30,6 @@ import org.apache.kafka.common.IsolationLevel; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.errors.SerializationException; import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.apache.kafka.common.serialization.IntegerDeserializer; @@ -46,6 +45,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.Iterator; +import java.util.LinkedList; import java.util.List; import java.util.Locale; import java.util.Map; @@ -59,7 +59,8 @@ public class EosTestDriver extends SmokeTestUtil { private static final int MAX_NUMBER_OF_KEYS = 20000; private static final long MAX_IDLE_TIME_MS = 600000L; - private static boolean isRunning = true; + private volatile static boolean isRunning = true; + private volatile static boolean terminated = false; private static int numRecordsProduced = 0; @@ -68,72 +69,115 @@ private static synchronized void updateNumRecordsProduces(final int delta) { } static void generate(final String kafka) { + try { + Exit.addShutdownHook("streams-eos-test-driver-shutdown-hook", () -> { + System.out.println("Terminating"); + isRunning = false; + + final long timeout = System.currentTimeMillis() + Duration.ofMinutes(5).toMillis(); + while (!terminated) { + if (System.currentTimeMillis() > timeout) { + System.out.println("Terminating with timeout"); + break; + } - Exit.addShutdownHook("streams-eos-test-driver-shutdown-hook", () -> { - System.out.println("Terminating"); - System.out.flush(); - isRunning = false; - }); + System.out.println("Waiting for main thread to exit"); + try { + Thread.sleep(1000L); + } catch (final InterruptedException swallow) { + swallow.printStackTrace(System.err); + System.err.flush(); + break; + } - final Properties producerProps = new Properties(); - producerProps.put(ProducerConfig.CLIENT_ID_CONFIG, "EosTest"); - producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka); - producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); - producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class); - producerProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); + } - final KafkaProducer producer = new KafkaProducer<>(producerProps); + System.out.println("Terminated"); + System.out.flush(); + }); + + final Properties producerProps = new Properties(); + producerProps.put(ProducerConfig.CLIENT_ID_CONFIG, "EosTest"); + producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka); + producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class); + producerProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); - final Random rand = new Random(System.currentTimeMillis()); + final KafkaProducer producer = new KafkaProducer<>(producerProps); - while (isRunning) { - final String key = "" + rand.nextInt(MAX_NUMBER_OF_KEYS); - final int value = rand.nextInt(10000); + final Random rand = new Random(System.currentTimeMillis()); + final Map> offsets = new HashMap<>(); - final ProducerRecord record = new ProducerRecord<>("data", key, value); + while (isRunning) { + final String key = "" + rand.nextInt(MAX_NUMBER_OF_KEYS); + final int value = rand.nextInt(10000); - producer.send(record, (metadata, exception) -> { - if (exception != null) { - exception.printStackTrace(System.err); - System.err.flush(); - if (exception instanceof TimeoutException) { - try { - // message == org.apache.kafka.common.errors.TimeoutException: Expiring 4 record(s) for data-0: 30004 ms has passed since last attempt plus backoff time - final int expired = Integer.parseInt(exception.getMessage().split(" ")[2]); - updateNumRecordsProduces(-expired); - } catch (final Exception ignore) { } + final ProducerRecord record = new ProducerRecord<>("data", key, value); + + producer.send(record, (metadata, exception) -> { + if (exception != null) { + exception.printStackTrace(System.err); + System.err.flush(); + if (exception instanceof TimeoutException) { + try { + // message == org.apache.kafka.common.errors.TimeoutException: Expiring 4 record(s) for data-0: 30004 ms has passed since last attempt plus backoff time + final int expired = Integer.parseInt(exception.getMessage().split(" ")[2]); + updateNumRecordsProduces(-expired); + } catch (final Exception ignore) { + } + } + } else { + offsets.getOrDefault(metadata.partition(), new LinkedList<>()).add(metadata.offset()); } + }); + + updateNumRecordsProduces(1); + if (numRecordsProduced % 1000 == 0) { + System.out.println(numRecordsProduced + " records produced"); + System.out.flush(); } - }); + Utils.sleep(rand.nextInt(10)); + } + producer.close(); + System.out.println("Producer closed: " + numRecordsProduced + " records produced"); + System.out.flush(); - updateNumRecordsProduces(1); - if (numRecordsProduced % 1000 == 0) { - System.out.println(numRecordsProduced + " records produced"); - System.out.flush(); + // verify offsets + for (Map.Entry> offsetsOfPartition : offsets.entrySet()) { + offsetsOfPartition.getValue().sort(Long::compareTo); + for (int i = 0; i < offsetsOfPartition.getValue().size() - 1; ++i) { + if (offsetsOfPartition.getValue().get(i) != i) { + System.err.println("Offset for partition " + offsetsOfPartition.getKey() + " is not " + i + " as expected but " + offsetsOfPartition.getValue().get(i)); + System.err.flush(); + } + } + System.out.println("Max offset of partition " + offsetsOfPartition.getKey() + " is " + offsetsOfPartition.getValue().get(offsetsOfPartition.getValue().size() - 1)); } - Utils.sleep(rand.nextInt(10)); - } - producer.close(); - System.out.println("Producer closed: " + numRecordsProduced + " records produced"); - final Properties props = new Properties(); - props.put(ConsumerConfig.CLIENT_ID_CONFIG, "verifier"); - props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka); - props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); - props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); - props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED.toString().toLowerCase(Locale.ROOT)); - try (final KafkaConsumer consumer = new KafkaConsumer<>(props)) { - final List partitions = getAllPartitions(consumer, "data"); - System.out.println("Partitions: " + partitions); - consumer.assign(partitions); - consumer.seekToEnd(partitions); + final Properties props = new Properties(); + props.put(ConsumerConfig.CLIENT_ID_CONFIG, "verifier"); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); + props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED.toString().toLowerCase(Locale.ROOT)); - for (final TopicPartition tp : partitions) { - System.out.println("End-offset for " + tp + " is " + consumer.position(tp)); + try (final KafkaConsumer consumer = new KafkaConsumer<>(props)) { + final List partitions = getAllPartitions(consumer, "data"); + System.out.println("Partitions: " + partitions); + System.out.flush(); + consumer.assign(partitions); + consumer.seekToEnd(partitions); + + for (final TopicPartition tp : partitions) { + System.out.println("End-offset for " + tp + " is " + consumer.position(tp)); + System.out.flush(); + } } + System.out.flush(); + } finally { + terminated = true; } - System.out.flush(); } public static void verify(final String kafka, final boolean withRepartitioning) { @@ -144,6 +188,14 @@ public static void verify(final String kafka, final boolean withRepartitioning) props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED.toString().toLowerCase(Locale.ROOT)); + try (final KafkaConsumer committedConsumer = new KafkaConsumer<>(props)) { + verifyAllTransactionFinished(committedConsumer, kafka, withRepartitioning); + } catch (final Exception e) { + e.printStackTrace(System.err); + System.out.println("FAILED"); + return; + } + final Map committedOffsets; try (final Admin adminClient = Admin.create(props)) { ensureStreamsApplicationDown(adminClient); @@ -200,18 +252,6 @@ public static void verify(final String kafka, final boolean withRepartitioning) verifyCnt(inputRecordsPerTopicPerPartition.get("repartition"), outputRecordsPerTopicPerPartition.get("cnt")); } - try (final KafkaConsumer consumer = new KafkaConsumer<>(props)) { - final List partitions = getAllPartitions(consumer, allOutputTopics); - consumer.assign(partitions); - consumer.seekToBeginning(partitions); - - verifyAllTransactionFinished(consumer, kafka, withRepartitioning); - } catch (final Exception e) { - e.printStackTrace(System.err); - System.out.println("FAILED"); - return; - } - // do not modify: required test output System.out.println("ALL-RECORDS-DELIVERED"); System.out.flush(); @@ -226,11 +266,11 @@ private static void ensureStreamsApplicationDown(final Admin adminClient) { if (System.currentTimeMillis() > maxWaitTime && !description.members().isEmpty()) { throw new RuntimeException( - "Streams application not down after " + (MAX_IDLE_TIME_MS / 1000) + " seconds. " + + "Streams application not down after " + (MAX_IDLE_TIME_MS / 1000L) + " seconds. " + "Group: " + description ); } - sleep(1000); + sleep(1000L); } while (!description.members().isEmpty()); } @@ -242,7 +282,7 @@ private static Map getCommittedOffsets(final Admin adminCl try { final ListConsumerGroupOffsetsResult listConsumerGroupOffsetsResult = adminClient.listConsumerGroupOffsets(EosTestClient.APP_ID); topicPartitionOffsetAndMetadataMap = listConsumerGroupOffsetsResult.partitionsToOffsetAndMetadata().get(10, TimeUnit.SECONDS); - } catch (final InterruptedException | ExecutionException | java.util.concurrent.TimeoutException e) { + } catch (final Exception e) { e.printStackTrace(); throw new RuntimeException(e); } @@ -263,7 +303,7 @@ private static Map readEndOffsets, final boolean withRepartitioning, final boolean isInputTopic) { - System.err.println("read end offset: " + readEndOffsets); + System.out.println("read end offset: " + readEndOffsets); final Map>>> recordPerTopicPerPartition = new HashMap<>(); final Map maxReceivedOffsetPerPartition = new HashMap<>(); final Map maxConsumerPositionPerPartition = new HashMap<>(); @@ -271,7 +311,7 @@ private static Map receivedRecords = consumer.poll(Duration.ofMillis(100)); + final ConsumerRecords receivedRecords = consumer.poll(Duration.ofSeconds(1L)); for (final ConsumerRecord record : receivedRecords) { maxWaitTime = System.currentTimeMillis() + MAX_IDLE_TIME_MS; @@ -300,7 +340,7 @@ private static Map> expectedRecord = expectedRecords.get(inputTopicPartition).iterator(); - for (final ConsumerRecord receivedRecord : partitionRecords.getValue()) { + for (final ConsumerRecord receivedRecord : partitionRecords.getValue().subList(0, partitionRecords.getValue().size() - 1)) { final ConsumerRecord expected = expectedRecord.next(); final String receivedKey = stringDeserializer.deserialize(receivedRecord.topic(), receivedRecord.key()); @@ -382,7 +422,7 @@ private static void verifyMin(final Map> inputRecords = partitionInput.iterator(); - for (final ConsumerRecord receivedRecord : partitionMin) { + for (final ConsumerRecord receivedRecord : partitionMin.subList(0, partitionMin.size() - 1)) { final ConsumerRecord input = inputRecords.next(); final String receivedKey = stringDeserializer.deserialize(receivedRecord.topic(), receivedRecord.key()); @@ -424,7 +464,7 @@ private static void verifySum(final Map> inputRecords = partitionInput.iterator(); - for (final ConsumerRecord receivedRecord : partitionSum) { + for (final ConsumerRecord receivedRecord : partitionSum.subList(0, partitionSum.size() - 1)) { final ConsumerRecord input = inputRecords.next(); final String receivedKey = stringDeserializer.deserialize(receivedRecord.topic(), receivedRecord.key()); @@ -465,7 +505,7 @@ private static void verifyMax(final Map> inputRecords = partitionInput.iterator(); - for (final ConsumerRecord receivedRecord : partitionMax) { + for (final ConsumerRecord receivedRecord : partitionMax.subList(0, partitionMax.size() - 1)) { final ConsumerRecord input = inputRecords.next(); final String receivedKey = stringDeserializer.deserialize(receivedRecord.topic(), receivedRecord.key()); @@ -506,7 +546,7 @@ private static void verifyCnt(final Map> inputRecords = partitionInput.iterator(); - for (final ConsumerRecord receivedRecord : partitionCnt) { + for (final ConsumerRecord receivedRecord : partitionCnt.subList(0, partitionCnt.size() - 1)) { final ConsumerRecord input = inputRecords.next(); final String receivedKey = stringDeserializer.deserialize(receivedRecord.topic(), receivedRecord.key()); @@ -550,6 +590,8 @@ private static void verifyAllTransactionFinished(final KafkaConsumer endMarkerOffset = new HashMap<>(); + try (final KafkaProducer producer = new KafkaProducer<>(producerProps)) { for (final TopicPartition tp : partitions) { final ProducerRecord record = new ProducerRecord<>(tp.topic(), tp.partition(), "key", "value"); @@ -559,47 +601,36 @@ private static void verifyAllTransactionFinished(final KafkaConsumer records = consumer.poll(Duration.ofMillis(100)); - if (records.isEmpty()) { - System.out.println("No data received."); - for (final TopicPartition tp : partitions) { - System.out.println(tp + " at position " + consumer.position(tp)); - } - } - for (final ConsumerRecord record : records) { - maxWaitTime = System.currentTimeMillis() + MAX_IDLE_TIME_MS; - final String topic = record.topic(); - final TopicPartition tp = new TopicPartition(topic, record.partition()); + while (!endMarkerOffset.isEmpty() && System.currentTimeMillis() < maxWaitTime) { + consumer.seekToEnd(partitions); - try { - final String key = stringDeserializer.deserialize(topic, record.key()); - final String value = stringDeserializer.deserialize(topic, record.value()); + final Iterator iterator = partitions.iterator(); + while(iterator.hasNext()) { + final TopicPartition topicPartition = iterator.next(); - if (!("key".equals(key) && "value".equals(value) && partitions.remove(tp))) { - throw new RuntimeException("Post transactions verification failed. Received unexpected verification record: " + - "Expected record <'key','value'> from one of " + partitions + " but got" - + " <" + key + "," + value + "> [" + record.topic() + ", " + record.partition() + "]"); - } else { - System.out.println("Verifying " + tp + " successful."); - } - } catch (final SerializationException e) { - throw new RuntimeException("Post transactions verification failed. Received unexpected verification record: " + - "Expected record <'key','value'> from one of " + partitions + " but got " + record, e); + if (consumer.position(topicPartition) > endMarkerOffset.get(topicPartition)) { + iterator.remove(); + endMarkerOffset.remove(topicPartition); + System.out.println("Removing " + topicPartition + " at position " + consumer.position(topicPartition)); + } else { + System.out.println("Retry " + topicPartition + " at position " + consumer.position(topicPartition)); } - } + sleep(1000L); } - if (!partitions.isEmpty()) { - throw new RuntimeException("Could not read all verification records. Did not receive any new record within the last " + (MAX_IDLE_TIME_MS / 1000) + " sec."); + + if (!endMarkerOffset.isEmpty()) { + throw new RuntimeException("Could not read all verification records. Did not receive any new record within the last " + (MAX_IDLE_TIME_MS / 1000L) + " sec."); } } diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java index ab9a14ea274a0..441448c98c69c 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java @@ -28,7 +28,6 @@ import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.ProcessorSupplier; -import java.io.File; import java.time.Instant; public class SmokeTestUtil { @@ -45,12 +44,17 @@ static ProcessorSupplier printProcessorSupplier(final String top public Processor get() { return new AbstractProcessor() { private int numRecordsProcessed = 0; + private long smallestOffset = Long.MAX_VALUE; + private long largestOffset = Long.MIN_VALUE; @Override public void init(final ProcessorContext context) { super.init(context); System.out.println("[DEV] initializing processor: topic=" + topic + " taskId=" + context.taskId()); + System.out.flush(); numRecordsProcessed = 0; + smallestOffset = Long.MAX_VALUE; + largestOffset = Long.MIN_VALUE; } @Override @@ -60,6 +64,27 @@ public void process(final Object key, final Object value) { System.out.printf("%s: %s%n", name, Instant.now()); System.out.println("processed " + numRecordsProcessed + " records from topic=" + topic); } + + if (smallestOffset > context().offset()) { + smallestOffset = context().offset(); + } + if (largestOffset < context().offset()) { + largestOffset = context().offset(); + } + } + + @Override + public void close() { + System.out.printf("Close processor for task %s", context().taskId()); + System.out.println("processed " + numRecordsProcessed + " records"); + final long processed; + if (largestOffset >= smallestOffset) { + processed = 1L + largestOffset - smallestOffset; + } else { + processed = 0L; + } + System.out.println("offset " + smallestOffset + " to " + largestOffset + " -> processed " + processed); + System.out.flush(); } }; } @@ -76,39 +101,19 @@ public K apply(final Windowed winKey, final V value) { public static class Agg { KeyValueMapper> selector() { - return new KeyValueMapper>() { - @Override - public KeyValue apply(final String key, final Long value) { - return new KeyValue<>(value == null ? null : Long.toString(value), 1L); - } - }; + return (key, value) -> new KeyValue<>(value == null ? null : Long.toString(value), 1L); } public Initializer init() { - return new Initializer() { - @Override - public Long apply() { - return 0L; - } - }; + return () -> 0L; } Aggregator adder() { - return new Aggregator() { - @Override - public Long apply(final String aggKey, final Long value, final Long aggregate) { - return aggregate + value; - } - }; + return (aggKey, value, aggregate) -> aggregate + value; } Aggregator remover() { - return new Aggregator() { - @Override - public Long apply(final String aggKey, final Long value, final Long aggregate) { - return aggregate - value; - } - }; + return (aggKey, value, aggregate) -> aggregate - value; } } @@ -120,14 +125,6 @@ public Long apply(final String aggKey, final Long value, final Long aggregate) { static Serde doubleSerde = Serdes.Double(); - static File createDir(final File parent, final String child) { - final File dir = new File(parent, child); - - dir.mkdir(); - - return dir; - } - public static void sleep(final long duration) { try { Thread.sleep(duration); diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsEosTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsEosTest.java index 47a78bdbb9520..52af996edaa2b 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsEosTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsEosTest.java @@ -39,12 +39,22 @@ public static void main(final String[] args) throws IOException { final Properties streamsProperties = Utils.loadProps(propFileName); final String kafka = streamsProperties.getProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG); + final String processingGuarantee = streamsProperties.getProperty(StreamsConfig.PROCESSING_GUARANTEE_CONFIG); if (kafka == null) { System.err.println("No bootstrap kafka servers specified in " + StreamsConfig.BOOTSTRAP_SERVERS_CONFIG); System.exit(1); } + if ("process".equals(command) || "process-complex".equals(command)) { + if (!StreamsConfig.EXACTLY_ONCE.equals(processingGuarantee) && + !StreamsConfig.EXACTLY_ONCE_BETA.equals(processingGuarantee)) { + + System.err.println("processingGuarantee must be either " + StreamsConfig.EXACTLY_ONCE + " or " + StreamsConfig.EXACTLY_ONCE_BETA); + System.exit(1); + } + } + System.out.println("StreamsTest instance started"); System.out.println("kafka=" + kafka); System.out.println("props=" + streamsProperties); diff --git a/tests/kafkatest/services/streams.py b/tests/kafkatest/services/streams.py index 59f24a04ff6e9..ec3bf0c669a47 100644 --- a/tests/kafkatest/services/streams.py +++ b/tests/kafkatest/services/streams.py @@ -324,11 +324,20 @@ class StreamsEosTestBaseService(StreamsTestBaseService): clean_node_enabled = True - def __init__(self, test_context, kafka, command): + def __init__(self, test_context, kafka, processing_guarantee, command): super(StreamsEosTestBaseService, self).__init__(test_context, kafka, "org.apache.kafka.streams.tests.StreamsEosTest", command) + self.PROCESSING_GUARANTEES = processing_guarantee + + def prop_file(self): + properties = {streams_property.STATE_DIR: self.PERSISTENT_ROOT, + streams_property.KAFKA_SERVERS: self.kafka.bootstrap_servers(), + streams_property.PROCESSING_GUARANTEES: self.PROCESSING_GUARANTEES} + + cfg = KafkaConfig(**properties) + return cfg.render() def clean_node(self, node): if self.clean_node_enabled: @@ -366,25 +375,25 @@ def __init__(self, test_context, kafka, processing_guarantee, num_threads = 3): class StreamsEosTestDriverService(StreamsEosTestBaseService): def __init__(self, test_context, kafka): - super(StreamsEosTestDriverService, self).__init__(test_context, kafka, "run") + super(StreamsEosTestDriverService, self).__init__(test_context, kafka, "not-required", "run") class StreamsEosTestJobRunnerService(StreamsEosTestBaseService): - def __init__(self, test_context, kafka): - super(StreamsEosTestJobRunnerService, self).__init__(test_context, kafka, "process") + def __init__(self, test_context, kafka, processing_guarantee): + super(StreamsEosTestJobRunnerService, self).__init__(test_context, kafka, processing_guarantee, "process") class StreamsComplexEosTestJobRunnerService(StreamsEosTestBaseService): - def __init__(self, test_context, kafka): - super(StreamsComplexEosTestJobRunnerService, self).__init__(test_context, kafka, "process-complex") + def __init__(self, test_context, kafka, processing_guarantee): + super(StreamsComplexEosTestJobRunnerService, self).__init__(test_context, kafka, processing_guarantee, "process-complex") class StreamsEosTestVerifyRunnerService(StreamsEosTestBaseService): def __init__(self, test_context, kafka): - super(StreamsEosTestVerifyRunnerService, self).__init__(test_context, kafka, "verify") + super(StreamsEosTestVerifyRunnerService, self).__init__(test_context, kafka, "not-required", "verify") class StreamsComplexEosTestVerifyRunnerService(StreamsEosTestBaseService): def __init__(self, test_context, kafka): - super(StreamsComplexEosTestVerifyRunnerService, self).__init__(test_context, kafka, "verify-complex") + super(StreamsComplexEosTestVerifyRunnerService, self).__init__(test_context, kafka, "not-required", "verify-complex") class StreamsSmokeTestShutdownDeadlockService(StreamsSmokeTestBaseService): diff --git a/tests/kafkatest/services/streams_property.py b/tests/kafkatest/services/streams_property.py index 8900adb1812bd..845c9b50e4eed 100644 --- a/tests/kafkatest/services/streams_property.py +++ b/tests/kafkatest/services/streams_property.py @@ -21,3 +21,4 @@ KAFKA_SERVERS = "bootstrap.servers" NUM_THREADS = "num.stream.threads" PROCESSING_GUARANTEE = "processing.guarantee" +PROCESSING_GUARANTEES = "processing.guarantee" diff --git a/tests/kafkatest/tests/streams/streams_eos_test.py b/tests/kafkatest/tests/streams/streams_eos_test.py index c3d7f51af806a..bf07cb4106246 100644 --- a/tests/kafkatest/tests/streams/streams_eos_test.py +++ b/tests/kafkatest/tests/streams/streams_eos_test.py @@ -13,6 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +from ducktape.mark import parametrize from ducktape.mark.resource import cluster from kafkatest.tests.kafka_test import KafkaTest from kafkatest.services.streams import StreamsEosTestDriverService, StreamsEosTestJobRunnerService, \ @@ -37,17 +38,21 @@ def __init__(self, test_context): self.test_context = test_context @cluster(num_nodes=9) - def test_rebalance_simple(self): - self.run_rebalance(StreamsEosTestJobRunnerService(self.test_context, self.kafka), - StreamsEosTestJobRunnerService(self.test_context, self.kafka), - StreamsEosTestJobRunnerService(self.test_context, self.kafka), + @parametrize(processing_guarantee="exactly_once") + @parametrize(processing_guarantee="exactly_once_beta") + def test_rebalance_simple(self, processing_guarantee): + self.run_rebalance(StreamsEosTestJobRunnerService(self.test_context, self.kafka, processing_guarantee), + StreamsEosTestJobRunnerService(self.test_context, self.kafka, processing_guarantee), + StreamsEosTestJobRunnerService(self.test_context, self.kafka, processing_guarantee), StreamsEosTestVerifyRunnerService(self.test_context, self.kafka)) @cluster(num_nodes=9) - def test_rebalance_complex(self): - self.run_rebalance(StreamsComplexEosTestJobRunnerService(self.test_context, self.kafka), - StreamsComplexEosTestJobRunnerService(self.test_context, self.kafka), - StreamsComplexEosTestJobRunnerService(self.test_context, self.kafka), + @parametrize(processing_guarantee="exactly_once") + @parametrize(processing_guarantee="exactly_once_beta") + def test_rebalance_complex(self, processing_guarantee): + self.run_rebalance(StreamsComplexEosTestJobRunnerService(self.test_context, self.kafka, processing_guarantee), + StreamsComplexEosTestJobRunnerService(self.test_context, self.kafka, processing_guarantee), + StreamsComplexEosTestJobRunnerService(self.test_context, self.kafka, processing_guarantee), StreamsComplexEosTestVerifyRunnerService(self.test_context, self.kafka)) def run_rebalance(self, processor1, processor2, processor3, verifier): @@ -77,17 +82,21 @@ def run_rebalance(self, processor1, processor2, processor3, verifier): verifier.node.account.ssh("grep ALL-RECORDS-DELIVERED %s" % verifier.STDOUT_FILE, allow_fail=False) @cluster(num_nodes=9) - def test_failure_and_recovery(self): - self.run_failure_and_recovery(StreamsEosTestJobRunnerService(self.test_context, self.kafka), - StreamsEosTestJobRunnerService(self.test_context, self.kafka), - StreamsEosTestJobRunnerService(self.test_context, self.kafka), + @parametrize(processing_guarantee="exactly_once") + @parametrize(processing_guarantee="exactly_once_beta") + def test_failure_and_recovery(self, processing_guarantee): + self.run_failure_and_recovery(StreamsEosTestJobRunnerService(self.test_context, self.kafka, processing_guarantee), + StreamsEosTestJobRunnerService(self.test_context, self.kafka, processing_guarantee), + StreamsEosTestJobRunnerService(self.test_context, self.kafka, processing_guarantee), StreamsEosTestVerifyRunnerService(self.test_context, self.kafka)) @cluster(num_nodes=9) - def test_failure_and_recovery_complex(self): - self.run_failure_and_recovery(StreamsComplexEosTestJobRunnerService(self.test_context, self.kafka), - StreamsComplexEosTestJobRunnerService(self.test_context, self.kafka), - StreamsComplexEosTestJobRunnerService(self.test_context, self.kafka), + @parametrize(processing_guarantee="exactly_once") + @parametrize(processing_guarantee="exactly_once_beta") + def test_failure_and_recovery_complex(self, processing_guarantee): + self.run_failure_and_recovery(StreamsComplexEosTestJobRunnerService(self.test_context, self.kafka, processing_guarantee), + StreamsComplexEosTestJobRunnerService(self.test_context, self.kafka, processing_guarantee), + StreamsComplexEosTestJobRunnerService(self.test_context, self.kafka, processing_guarantee), StreamsComplexEosTestVerifyRunnerService(self.test_context, self.kafka)) def run_failure_and_recovery(self, processor1, processor2, processor3, verifier): From 2f426e832506af219e0fba54228f243fa7b2d61c Mon Sep 17 00:00:00 2001 From: "Matthias J. Sax" Date: Tue, 7 Apr 2020 10:33:18 -0700 Subject: [PATCH 02/15] fix verification --- .../org/apache/kafka/streams/tests/EosTestDriver.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java index af28380f3348f..a4faba5f79cf6 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java @@ -415,7 +415,7 @@ private static void verifyMin(final Map> partitionInput = inputPerTopicPerPartition.get(inputTopicPartition); final List> partitionMin = partitionRecords.getValue(); - if (partitionInput.size() != partitionMin.size()) { + if (partitionInput.size() != partitionMin.size() - 1) { throw new RuntimeException("Result verification failed: expected " + partitionInput.size() + " records for " + partitionRecords.getKey() + " but received " + partitionMin.size()); } @@ -457,7 +457,7 @@ private static void verifySum(final Map> partitionInput = inputPerTopicPerPartition.get(inputTopicPartition); final List> partitionSum = partitionRecords.getValue(); - if (partitionInput.size() != partitionSum.size()) { + if (partitionInput.size() != partitionSum.size() - 1) { throw new RuntimeException("Result verification failed: expected " + partitionInput.size() + " records for " + partitionRecords.getKey() + " but received " + partitionSum.size()); } @@ -498,7 +498,7 @@ private static void verifyMax(final Map> partitionInput = inputPerTopicPerPartition.get(inputTopicPartition); final List> partitionMax = partitionRecords.getValue(); - if (partitionInput.size() != partitionMax.size()) { + if (partitionInput.size() != partitionMax.size() - 1) { throw new RuntimeException("Result verification failed: expected " + partitionInput.size() + " records for " + partitionRecords.getKey() + " but received " + partitionMax.size()); } @@ -539,7 +539,7 @@ private static void verifyCnt(final Map> partitionInput = inputPerTopicPerPartition.get(inputTopicPartition); final List> partitionCnt = partitionRecords.getValue(); - if (partitionInput.size() != partitionCnt.size()) { + if (partitionInput.size() != partitionCnt.size() - 1) { throw new RuntimeException("Result verification failed: expected " + partitionInput.size() + " records for " + partitionRecords.getKey() + " but received " + partitionCnt.size()); } From 5053a5ede7aa142b88e5170bc818e681ec09a693 Mon Sep 17 00:00:00 2001 From: "Matthias J. Sax" Date: Tue, 7 Apr 2020 15:21:50 -0700 Subject: [PATCH 03/15] Extend StandbyTask unit test --- .../processor/internals/StandbyTaskTest.java | 31 ++++++++++++++++++- 1 file changed, 30 insertions(+), 1 deletion(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java index 34a1f5fcb8799..c993d1da7a266 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java @@ -414,7 +414,7 @@ public void shouldCloseStateManagerOnTaskCreated() { } @Test - public void shouldDeleteStateDirOnTaskCreatedAndEOSUncleanClose() { + public void shouldDeleteStateDirOnTaskCreatedAndEosAlphaUncleanClose() { stateManager.close(); EasyMock.expectLastCall(); @@ -442,6 +442,35 @@ public void shouldDeleteStateDirOnTaskCreatedAndEOSUncleanClose() { assertEquals(Task.State.CLOSED, task.state()); } + @Test + public void shouldDeleteStateDirOnTaskCreatedAndEosBetaUncleanClose() { + stateManager.close(); + EasyMock.expectLastCall(); + + EasyMock.expect(stateManager.baseDir()).andReturn(baseDir); + + EasyMock.replay(stateManager); + + final MetricName metricName = setupCloseTaskMetric(); + + config = new StreamsConfig(mkProperties(mkMap( + mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, applicationId), + mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:2171"), + mkEntry(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_BETA) + ))); + + task = createStandbyTask(); + + task.closeDirty(); + + final double expectedCloseTaskMetric = 1.0; + verifyCloseTaskMetric(expectedCloseTaskMetric, streamsMetrics, metricName); + + EasyMock.verify(stateManager); + + assertEquals(Task.State.CLOSED, task.state()); + } + private StandbyTask createStandbyTask() { return new StandbyTask(taskId, Collections.singleton(partition), topology, config, streamsMetrics, stateManager, stateDirectory); } From 04bef596b7f032ebe540ed3e8a3ed1028eaafdec Mon Sep 17 00:00:00 2001 From: "Matthias J. Sax" Date: Tue, 7 Apr 2020 15:22:28 -0700 Subject: [PATCH 04/15] Extand StandbyTask integration test --- .../StandbyTaskEOSIntegrationTest.java | 35 ++++++++++++++----- 1 file changed, 27 insertions(+), 8 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java index b0840188bbd8a..b82d75ac68092 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java @@ -35,10 +35,14 @@ import org.junit.Before; import org.junit.ClassRule; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import java.io.File; import java.io.IOException; import java.time.Duration; +import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.Properties; import java.util.concurrent.CountDownLatch; @@ -52,8 +56,21 @@ * An integration test to verify the conversion of a dirty-closed EOS * task towards a standby task is safe across restarts of the application. */ +@RunWith(Parameterized.class) public class StandbyTaskEOSIntegrationTest { + @Parameterized.Parameters(name = "{0}") + public static Collection data() { + return Arrays.asList(new String[][] { + {StreamsConfig.EXACTLY_ONCE}, + {StreamsConfig.EXACTLY_ONCE_BETA} + }); + } + + @Parameterized.Parameter + public String eosConfig; + + private final String appId = "eos-test-app"; private final String inputTopic = "input"; @ClassRule @@ -61,6 +78,7 @@ public class StandbyTaskEOSIntegrationTest { @Before public void createTopics() throws Exception { + CLUSTER.deleteTopicsAndWait(inputTopic, appId + "-KSTREAM-AGGREGATE-STATE-STORE-0000000001-changelog"); CLUSTER.createTopic(inputTopic, 1, 3); } @@ -77,14 +95,13 @@ public void surviveWithOneTaskAsStandby() throws ExecutionException, Interrupted new Properties()), 10L); - final String appId = "eos-test-app"; final String stateDirPath = TestUtils.tempDirectory(appId).getPath(); final CountDownLatch instanceLatch = new CountDownLatch(1); try ( - final KafkaStreams streamInstanceOne = buildStreamWithDirtyStateDir(appId, stateDirPath + "/" + appId + "-1/", instanceLatch); - final KafkaStreams streamInstanceTwo = buildStreamWithDirtyStateDir(appId, stateDirPath + "/" + appId + "-2/", instanceLatch); + final KafkaStreams streamInstanceOne = buildStreamWithDirtyStateDir(stateDirPath + "/" + appId + "-1/", instanceLatch); + final KafkaStreams streamInstanceTwo = buildStreamWithDirtyStateDir(stateDirPath + "/" + appId + "-2/", instanceLatch); ) { @@ -102,17 +119,19 @@ public void surviveWithOneTaskAsStandby() throws ExecutionException, Interrupted streamInstanceOne.close(Duration.ZERO); streamInstanceTwo.close(Duration.ZERO); + + streamInstanceOne.cleanUp(); + streamInstanceTwo.cleanUp(); } } - private KafkaStreams buildStreamWithDirtyStateDir(final String appId, - final String stateDirPath, + private KafkaStreams buildStreamWithDirtyStateDir(final String stateDirPath, final CountDownLatch recordProcessLatch) throws IOException { final StreamsBuilder builder = new StreamsBuilder(); final TaskId taskId = new TaskId(0, 0); - final Properties props = props(appId, stateDirPath); + final Properties props = props(stateDirPath); final StateDirectory stateDirectory = new StateDirectory( new StreamsConfig(props), new MockTime(), true); @@ -133,14 +152,14 @@ private KafkaStreams buildStreamWithDirtyStateDir(final String appId, return new KafkaStreams(builder.build(), props); } - private Properties props(final String appId, final String stateDirPath) { + private Properties props(final String stateDirPath) { final Properties streamsConfiguration = new Properties(); streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, appId); streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, stateDirPath); streamsConfiguration.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1); - streamsConfiguration.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE); + streamsConfiguration.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, eosConfig); streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass()); streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass()); streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000); From 9203d9c5d2fc7188da05340b43d6d54c91c603ab Mon Sep 17 00:00:00 2001 From: "Matthias J. Sax" Date: Tue, 7 Apr 2020 15:43:28 -0700 Subject: [PATCH 05/15] Extend TaskManger unit test --- .../processor/internals/TaskManagerTest.java | 22 +++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java index ab185700abd7a..2c375365ae865 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java @@ -1528,6 +1528,28 @@ public void shouldCommitActiveAndStandbyTasks() { assertThat(task01.commitNeeded, is(false)); } + @Test + public void shouldNotCommitOffsetsIfOnlyStandbyTasksAssigned() { + final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, false); + + expectRestoreToBeCompleted(consumer, changeLogReader); + expect(standbyTaskCreator.createTasks(eq(taskId00Assignment))) + .andReturn(singletonList(task00)).anyTimes(); + expectLastCall(); + + replay(activeTaskCreator, standbyTaskCreator, consumer, changeLogReader); + + taskManager.handleAssignment(Collections.emptyMap(), taskId00Assignment); + assertThat(taskManager.tryToCompleteRestoration(), is(true)); + + assertThat(task00.state(), is(Task.State.RUNNING)); + + task00.setCommitNeeded(); + + assertThat(taskManager.commitAll(), equalTo(1)); + assertThat(task00.commitNeeded, is(false)); + } + @Test public void shouldNotCommitActiveAndStandbyTasksWhileRebalanceInProgress() throws IOException { final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true); From fa0f27c62e035778185deb76da2d7d26ba04409e Mon Sep 17 00:00:00 2001 From: "Matthias J. Sax" Date: Tue, 7 Apr 2020 22:59:47 -0700 Subject: [PATCH 06/15] fix checkstyle --- .../java/org/apache/kafka/streams/tests/EosTestDriver.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java index a4faba5f79cf6..f5a0a928028fd 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java @@ -143,7 +143,7 @@ static void generate(final String kafka) { System.out.flush(); // verify offsets - for (Map.Entry> offsetsOfPartition : offsets.entrySet()) { + for (final Map.Entry> offsetsOfPartition : offsets.entrySet()) { offsetsOfPartition.getValue().sort(Long::compareTo); for (int i = 0; i < offsetsOfPartition.getValue().size() - 1; ++i) { if (offsetsOfPartition.getValue().get(i) != i) { @@ -610,12 +610,12 @@ private static void verifyAllTransactionFinished(final KafkaConsumer iterator = partitions.iterator(); - while(iterator.hasNext()) { + while (iterator.hasNext()) { final TopicPartition topicPartition = iterator.next(); if (consumer.position(topicPartition) > endMarkerOffset.get(topicPartition)) { From e48d9d8e2b8f9e2141770aa8fd0581bc0d56f3c6 Mon Sep 17 00:00:00 2001 From: "Matthias J. Sax" Date: Thu, 9 Apr 2020 14:42:58 -0700 Subject: [PATCH 07/15] Github comments --- .../kafka/streams/tests/EosTestDriver.java | 184 ++++++++---------- 1 file changed, 81 insertions(+), 103 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java index f5a0a928028fd..4a5ec9b94d65d 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java @@ -51,6 +51,7 @@ import java.util.Map; import java.util.Properties; import java.util.Random; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; @@ -60,7 +61,7 @@ public class EosTestDriver extends SmokeTestUtil { private static final long MAX_IDLE_TIME_MS = 600000L; private volatile static boolean isRunning = true; - private volatile static boolean terminated = false; + private static CountDownLatch terminated = new CountDownLatch(1); private static int numRecordsProduced = 0; @@ -69,76 +70,68 @@ private static synchronized void updateNumRecordsProduces(final int delta) { } static void generate(final String kafka) { - try { - Exit.addShutdownHook("streams-eos-test-driver-shutdown-hook", () -> { - System.out.println("Terminating"); - isRunning = false; - - final long timeout = System.currentTimeMillis() + Duration.ofMinutes(5).toMillis(); - while (!terminated) { - if (System.currentTimeMillis() > timeout) { - System.out.println("Terminating with timeout"); - break; - } - - System.out.println("Waiting for main thread to exit"); - try { - Thread.sleep(1000L); - } catch (final InterruptedException swallow) { - swallow.printStackTrace(System.err); - System.err.flush(); - break; - } + Exit.addShutdownHook("streams-eos-test-driver-shutdown-hook", () -> { + System.out.println("Terminating"); + isRunning = false; + try { + if (terminated.await(5L, TimeUnit.MINUTES)) { + System.out.println("Terminated"); + } else { + System.out.println("Terminated with timeout"); } + } catch (final InterruptedException swallow) { + swallow.printStackTrace(System.err); + System.out.println("Terminated with error"); + } + System.err.flush(); + System.out.flush(); + }); - System.out.println("Terminated"); - System.out.flush(); - }); - - final Properties producerProps = new Properties(); - producerProps.put(ProducerConfig.CLIENT_ID_CONFIG, "EosTest"); - producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka); - producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); - producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class); - producerProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); - - final KafkaProducer producer = new KafkaProducer<>(producerProps); - - final Random rand = new Random(System.currentTimeMillis()); - final Map> offsets = new HashMap<>(); - - while (isRunning) { - final String key = "" + rand.nextInt(MAX_NUMBER_OF_KEYS); - final int value = rand.nextInt(10000); + final Properties producerProps = new Properties(); + producerProps.put(ProducerConfig.CLIENT_ID_CONFIG, "EosTest"); + producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka); + producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class); + producerProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); - final ProducerRecord record = new ProducerRecord<>("data", key, value); + final Map> offsets = new HashMap<>(); - producer.send(record, (metadata, exception) -> { - if (exception != null) { - exception.printStackTrace(System.err); - System.err.flush(); - if (exception instanceof TimeoutException) { - try { - // message == org.apache.kafka.common.errors.TimeoutException: Expiring 4 record(s) for data-0: 30004 ms has passed since last attempt plus backoff time - final int expired = Integer.parseInt(exception.getMessage().split(" ")[2]); - updateNumRecordsProduces(-expired); - } catch (final Exception ignore) { + try { + try (final KafkaProducer producer = new KafkaProducer<>(producerProps)) { + final Random rand = new Random(System.currentTimeMillis()); + + while (isRunning) { + final String key = "" + rand.nextInt(MAX_NUMBER_OF_KEYS); + final int value = rand.nextInt(10000); + + final ProducerRecord record = new ProducerRecord<>("data", key, value); + + producer.send(record, (metadata, exception) -> { + if (exception != null) { + exception.printStackTrace(System.err); + System.err.flush(); + if (exception instanceof TimeoutException) { + try { + // message == org.apache.kafka.common.errors.TimeoutException: Expiring 4 record(s) for data-0: 30004 ms has passed since last attempt plus backoff time + final int expired = Integer.parseInt(exception.getMessage().split(" ")[2]); + updateNumRecordsProduces(-expired); + } catch (final Exception ignore) { + } } + } else { + offsets.getOrDefault(metadata.partition(), new LinkedList<>()).add(metadata.offset()); } - } else { - offsets.getOrDefault(metadata.partition(), new LinkedList<>()).add(metadata.offset()); - } - }); + }); - updateNumRecordsProduces(1); - if (numRecordsProduced % 1000 == 0) { - System.out.println(numRecordsProduced + " records produced"); - System.out.flush(); + updateNumRecordsProduces(1); + if (numRecordsProduced % 1000 == 0) { + System.out.println(numRecordsProduced + " records produced"); + System.out.flush(); + } + Utils.sleep(rand.nextInt(10)); } - Utils.sleep(rand.nextInt(10)); } - producer.close(); System.out.println("Producer closed: " + numRecordsProduced + " records produced"); System.out.flush(); @@ -154,7 +147,6 @@ static void generate(final String kafka) { System.out.println("Max offset of partition " + offsetsOfPartition.getKey() + " is " + offsetsOfPartition.getValue().get(offsetsOfPartition.getValue().size() - 1)); } - final Properties props = new Properties(); props.put(ConsumerConfig.CLIENT_ID_CONFIG, "verifier"); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka); @@ -176,7 +168,7 @@ static void generate(final String kafka) { } System.out.flush(); } finally { - terminated = true; + terminated.countDown(); } } @@ -188,8 +180,8 @@ public static void verify(final String kafka, final boolean withRepartitioning) props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED.toString().toLowerCase(Locale.ROOT)); - try (final KafkaConsumer committedConsumer = new KafkaConsumer<>(props)) { - verifyAllTransactionFinished(committedConsumer, kafka, withRepartitioning); + try (final KafkaConsumer consumer = new KafkaConsumer<>(props)) { + verifyAllTransactionFinished(consumer, kafka, withRepartitioning); } catch (final Exception e) { e.printStackTrace(System.err); System.out.println("FAILED"); @@ -389,7 +381,7 @@ private static void verifyReceivedAllRecords(final Map> expectedRecord = expectedRecords.get(inputTopicPartition).iterator(); - for (final ConsumerRecord receivedRecord : partitionRecords.getValue().subList(0, partitionRecords.getValue().size() - 1)) { + for (final ConsumerRecord receivedRecord : partitionRecords.getValue()) { final ConsumerRecord expected = expectedRecord.next(); final String receivedKey = stringDeserializer.deserialize(receivedRecord.topic(), receivedRecord.key()); @@ -415,14 +407,14 @@ private static void verifyMin(final Map> partitionInput = inputPerTopicPerPartition.get(inputTopicPartition); final List> partitionMin = partitionRecords.getValue(); - if (partitionInput.size() != partitionMin.size() - 1) { + if (partitionInput.size() != partitionMin.size()) { throw new RuntimeException("Result verification failed: expected " + partitionInput.size() + " records for " + partitionRecords.getKey() + " but received " + partitionMin.size()); } final Iterator> inputRecords = partitionInput.iterator(); - for (final ConsumerRecord receivedRecord : partitionMin.subList(0, partitionMin.size() - 1)) { + for (final ConsumerRecord receivedRecord : partitionMin) { final ConsumerRecord input = inputRecords.next(); final String receivedKey = stringDeserializer.deserialize(receivedRecord.topic(), receivedRecord.key()); @@ -457,14 +449,14 @@ private static void verifySum(final Map> partitionInput = inputPerTopicPerPartition.get(inputTopicPartition); final List> partitionSum = partitionRecords.getValue(); - if (partitionInput.size() != partitionSum.size() - 1) { + if (partitionInput.size() != partitionSum.size()) { throw new RuntimeException("Result verification failed: expected " + partitionInput.size() + " records for " + partitionRecords.getKey() + " but received " + partitionSum.size()); } final Iterator> inputRecords = partitionInput.iterator(); - for (final ConsumerRecord receivedRecord : partitionSum.subList(0, partitionSum.size() - 1)) { + for (final ConsumerRecord receivedRecord : partitionSum) { final ConsumerRecord input = inputRecords.next(); final String receivedKey = stringDeserializer.deserialize(receivedRecord.topic(), receivedRecord.key()); @@ -498,14 +490,14 @@ private static void verifyMax(final Map> partitionInput = inputPerTopicPerPartition.get(inputTopicPartition); final List> partitionMax = partitionRecords.getValue(); - if (partitionInput.size() != partitionMax.size() - 1) { + if (partitionInput.size() != partitionMax.size()) { throw new RuntimeException("Result verification failed: expected " + partitionInput.size() + " records for " + partitionRecords.getKey() + " but received " + partitionMax.size()); } final Iterator> inputRecords = partitionInput.iterator(); - for (final ConsumerRecord receivedRecord : partitionMax.subList(0, partitionMax.size() - 1)) { + for (final ConsumerRecord receivedRecord : partitionMax) { final ConsumerRecord input = inputRecords.next(); final String receivedKey = stringDeserializer.deserialize(receivedRecord.topic(), receivedRecord.key()); @@ -539,14 +531,14 @@ private static void verifyCnt(final Map> partitionInput = inputPerTopicPerPartition.get(inputTopicPartition); final List> partitionCnt = partitionRecords.getValue(); - if (partitionInput.size() != partitionCnt.size() - 1) { + if (partitionInput.size() != partitionCnt.size()) { throw new RuntimeException("Result verification failed: expected " + partitionInput.size() + " records for " + partitionRecords.getKey() + " but received " + partitionCnt.size()); } final Iterator> inputRecords = partitionInput.iterator(); - for (final ConsumerRecord receivedRecord : partitionCnt.subList(0, partitionCnt.size() - 1)) { + for (final ConsumerRecord receivedRecord : partitionCnt) { final ConsumerRecord input = inputRecords.next(); final String receivedKey = stringDeserializer.deserialize(receivedRecord.topic(), receivedRecord.key()); @@ -583,53 +575,39 @@ private static void verifyAllTransactionFinished(final KafkaConsumer endMarkerOffset = new HashMap<>(); + final Properties consumerProps = new Properties(); + consumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, "consumer-uncommitted"); + consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka); - try (final KafkaProducer producer = new KafkaProducer<>(producerProps)) { - for (final TopicPartition tp : partitions) { - final ProducerRecord record = new ProducerRecord<>(tp.topic(), tp.partition(), "key", "value"); + final Map topicEndOffsets; - producer.send(record, (metadata, exception) -> { - if (exception != null) { - exception.printStackTrace(System.err); - System.err.flush(); - Exit.exit(1); - } else { - endMarkerOffset.put(new TopicPartition(metadata.topic(), metadata.partition()), metadata.offset()); - System.out.println("Appended verification record to topic-partition " + metadata.topic() + "-" + metadata.partition() + " at offset " + metadata.offset()); - System.out.flush(); - } - }); - } + try (final KafkaConsumer consumerUncommitted = new KafkaConsumer<>(consumerProps)) { + topicEndOffsets = consumerUncommitted.endOffsets(partitions); } final long maxWaitTime = System.currentTimeMillis() + MAX_IDLE_TIME_MS; - while (!endMarkerOffset.isEmpty() && System.currentTimeMillis() < maxWaitTime) { + while (!topicEndOffsets.isEmpty() && System.currentTimeMillis() < maxWaitTime) { consumer.seekToEnd(partitions); final Iterator iterator = partitions.iterator(); while (iterator.hasNext()) { final TopicPartition topicPartition = iterator.next(); + final long position = consumer.position(topicPartition); - if (consumer.position(topicPartition) > endMarkerOffset.get(topicPartition)) { + if (position == topicEndOffsets.get(topicPartition)) { iterator.remove(); - endMarkerOffset.remove(topicPartition); - System.out.println("Removing " + topicPartition + " at position " + consumer.position(topicPartition)); + topicEndOffsets.remove(topicPartition); + System.out.println("Removing " + topicPartition + " at position " + position); + } else if (consumer.position(topicPartition) > topicEndOffsets.get(topicPartition)) { + throw new IllegalStateException("Offset for partition " + topicPartition + " is larger than topic endOffset: " + position + " > " + topicEndOffsets.get(topicPartition)); } else { - System.out.println("Retry " + topicPartition + " at position " + consumer.position(topicPartition)); + System.out.println("Retry " + topicPartition + " at position " + position); } } sleep(1000L); } - if (!endMarkerOffset.isEmpty()) { + if (!topicEndOffsets.isEmpty()) { throw new RuntimeException("Could not read all verification records. Did not receive any new record within the last " + (MAX_IDLE_TIME_MS / 1000L) + " sec."); } } From 9960c51a6b8ef1d023316b87f35ab01b94fe4279 Mon Sep 17 00:00:00 2001 From: "Matthias J. Sax" Date: Thu, 9 Apr 2020 16:00:31 -0700 Subject: [PATCH 08/15] Fix consumer setup --- .../test/java/org/apache/kafka/streams/tests/EosTestDriver.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java index 4a5ec9b94d65d..e95c3541e63ab 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java @@ -578,6 +578,8 @@ private static void verifyAllTransactionFinished(final KafkaConsumer topicEndOffsets; From a7585a23c00af689701e9e67fb557864631dcfde Mon Sep 17 00:00:00 2001 From: "Matthias J. Sax" Date: Fri, 10 Apr 2020 09:38:31 -0700 Subject: [PATCH 09/15] Fix unit tests --- .../processor/internals/TaskManagerTest.java | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java index 2c375365ae865..3ba72d11bb45c 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java @@ -2293,10 +2293,12 @@ private static void expectConsumerAssignmentPaused(final Consumer offsets = singletonMap(t1p0, new OffsetAndMetadata(0L, null)); + task01.setCommittableOffsetsAndMetadata(offsets); task01.setCommitNeeded(); taskManager.tasks().put(taskId01, task01); - consumer.commitSync(Collections.emptyMap()); + consumer.commitSync(offsets); expectLastCall().andThrow(new CommitFailedException()); replay(consumer); @@ -2313,10 +2315,12 @@ public void shouldThrowTaskMigratedExceptionOnCommitFailed() { @Test public void shouldThrowStreamsExceptionOnCommitTimeout() { final StateMachineTask task01 = new StateMachineTask(taskId01, taskId01Partitions, true); + final Map offsets = singletonMap(t1p0, new OffsetAndMetadata(0L, null)); + task01.setCommittableOffsetsAndMetadata(offsets); task01.setCommitNeeded(); taskManager.tasks().put(taskId01, task01); - consumer.commitSync(Collections.emptyMap()); + consumer.commitSync(offsets); expectLastCall().andThrow(new TimeoutException()); replay(consumer); @@ -2333,10 +2337,12 @@ public void shouldThrowStreamsExceptionOnCommitTimeout() { @Test public void shouldStreamsExceptionOnCommitError() { final StateMachineTask task01 = new StateMachineTask(taskId01, taskId01Partitions, true); + final Map offsets = singletonMap(t1p0, new OffsetAndMetadata(0L, null)); + task01.setCommittableOffsetsAndMetadata(offsets); task01.setCommitNeeded(); taskManager.tasks().put(taskId01, task01); - consumer.commitSync(Collections.emptyMap()); + consumer.commitSync(offsets); expectLastCall().andThrow(new KafkaException()); replay(consumer); @@ -2353,10 +2359,12 @@ public void shouldStreamsExceptionOnCommitError() { @Test public void shouldFailOnCommitFatal() { final StateMachineTask task01 = new StateMachineTask(taskId01, taskId01Partitions, true); + final Map offsets = singletonMap(t1p0, new OffsetAndMetadata(0L, null)); + task01.setCommittableOffsetsAndMetadata(offsets); task01.setCommitNeeded(); taskManager.tasks().put(taskId01, task01); - consumer.commitSync(Collections.emptyMap()); + consumer.commitSync(offsets); expectLastCall().andThrow(new RuntimeException("KABOOM")); replay(consumer); From b11c0ce54e85db2ab130ce6d7763fca3c036dcf3 Mon Sep 17 00:00:00 2001 From: "Matthias J. Sax" Date: Fri, 10 Apr 2020 11:42:52 -0700 Subject: [PATCH 10/15] Bug fix --- .../apache/kafka/streams/processor/internals/StreamThread.java | 1 + 1 file changed, 1 insertion(+) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index 961f01579a48c..b85bedd10d79d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -569,6 +569,7 @@ private void runLoop() { log.warn("Detected the states of tasks " + e.corruptedTaskWithChangelogs() + " are corrupted. " + "Will close the task as dirty and re-create and bootstrap from scratch.", e); + taskManager.commitAll(); taskManager.handleCorruption(e.corruptedTaskWithChangelogs()); } catch (final TaskMigratedException e) { log.warn("Detected that the thread is being fenced. " + From 8ed96485e258a6683c60c614c15b955010ef0e17 Mon Sep 17 00:00:00 2001 From: "Matthias J. Sax" Date: Fri, 10 Apr 2020 12:16:46 -0700 Subject: [PATCH 11/15] rebase fix --- tests/kafkatest/services/streams.py | 4 ++-- tests/kafkatest/services/streams_property.py | 1 - 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/tests/kafkatest/services/streams.py b/tests/kafkatest/services/streams.py index ec3bf0c669a47..e8788829e68bd 100644 --- a/tests/kafkatest/services/streams.py +++ b/tests/kafkatest/services/streams.py @@ -329,12 +329,12 @@ def __init__(self, test_context, kafka, processing_guarantee, command): kafka, "org.apache.kafka.streams.tests.StreamsEosTest", command) - self.PROCESSING_GUARANTEES = processing_guarantee + self.PROCESSING_GUARANTEE = processing_guarantee def prop_file(self): properties = {streams_property.STATE_DIR: self.PERSISTENT_ROOT, streams_property.KAFKA_SERVERS: self.kafka.bootstrap_servers(), - streams_property.PROCESSING_GUARANTEES: self.PROCESSING_GUARANTEES} + streams_property.PROCESSING_GUARANTEE: self.PROCESSING_GUARANTEE} cfg = KafkaConfig(**properties) return cfg.render() diff --git a/tests/kafkatest/services/streams_property.py b/tests/kafkatest/services/streams_property.py index 845c9b50e4eed..8900adb1812bd 100644 --- a/tests/kafkatest/services/streams_property.py +++ b/tests/kafkatest/services/streams_property.py @@ -21,4 +21,3 @@ KAFKA_SERVERS = "bootstrap.servers" NUM_THREADS = "num.stream.threads" PROCESSING_GUARANTEE = "processing.guarantee" -PROCESSING_GUARANTEES = "processing.guarantee" From 5a56062d4c9efb20110c9cae7255aa1bb4dca097 Mon Sep 17 00:00:00 2001 From: "Matthias J. Sax" Date: Mon, 13 Apr 2020 14:06:14 -0700 Subject: [PATCH 12/15] Fix --- .../streams/processor/internals/StreamThread.java | 11 +++++++++-- .../streams/processor/internals/TaskManager.java | 2 +- 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index b85bedd10d79d..da4dfacd17fb5 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -16,7 +16,6 @@ */ package org.apache.kafka.streams.processor.internals; -import java.util.concurrent.atomic.AtomicLong; import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; @@ -57,6 +56,8 @@ import java.util.Set; import java.util.UUID; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; import static org.apache.kafka.streams.StreamsConfig.EXACTLY_ONCE; import static org.apache.kafka.streams.StreamsConfig.EXACTLY_ONCE_BETA; @@ -569,7 +570,13 @@ private void runLoop() { log.warn("Detected the states of tasks " + e.corruptedTaskWithChangelogs() + " are corrupted. " + "Will close the task as dirty and re-create and bootstrap from scratch.", e); - taskManager.commitAll(); + taskManager.commitInternal( + taskManager.tasks() + .values() + .stream() + .filter(t -> !e.corruptedTaskWithChangelogs().containsKey(t.id())) + .collect(Collectors.toSet()) + ); taskManager.handleCorruption(e.corruptedTaskWithChangelogs()); } catch (final TaskMigratedException e) { log.warn("Detected that the thread is being fenced. " + diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java index f0e294a43a228..930a9fedbdecb 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java @@ -741,7 +741,7 @@ int commitAll() { return commitInternal(tasks.values()); } - private int commitInternal(final Collection tasks) { + int commitInternal(final Collection tasks) { if (rebalanceInProgress) { return -1; From d506697bf9605caebe9b421bb088bddcdbe78c52 Mon Sep 17 00:00:00 2001 From: "Matthias J. Sax" Date: Mon, 13 Apr 2020 18:43:29 -0700 Subject: [PATCH 13/15] FIX BROKEN SYSTEM TEST RUN --- tests/setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/setup.py b/tests/setup.py index c28fdfddff3fc..1cfd14a32aaac 100644 --- a/tests/setup.py +++ b/tests/setup.py @@ -51,7 +51,7 @@ def run_tests(self): license="apache2.0", packages=find_packages(), include_package_data=True, - install_requires=["ducktape==0.7.6", "requests==2.20.0"], + install_requires=["MarkupSafe<2.0.0", "ducktape==0.7.6", "requests==2.20.0"], tests_require=["pytest", "mock"], cmdclass={'test': PyTest}, ) From 5c76bdcdf357fc7773198b00f2e712e28ab34797 Mon Sep 17 00:00:00 2001 From: "Matthias J. Sax" Date: Tue, 14 Apr 2020 15:42:50 -0700 Subject: [PATCH 14/15] Revert "FIX BROKEN SYSTEM TEST RUN" This reverts commit d506697bf9605caebe9b421bb088bddcdbe78c52. --- tests/setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/setup.py b/tests/setup.py index 1cfd14a32aaac..c28fdfddff3fc 100644 --- a/tests/setup.py +++ b/tests/setup.py @@ -51,7 +51,7 @@ def run_tests(self): license="apache2.0", packages=find_packages(), include_package_data=True, - install_requires=["MarkupSafe<2.0.0", "ducktape==0.7.6", "requests==2.20.0"], + install_requires=["ducktape==0.7.6", "requests==2.20.0"], tests_require=["pytest", "mock"], cmdclass={'test': PyTest}, ) From 15e0b98ef384c15022ee76e6ca88b325567e31cb Mon Sep 17 00:00:00 2001 From: "Matthias J. Sax" Date: Tue, 14 Apr 2020 16:37:29 -0700 Subject: [PATCH 15/15] Add unit tests --- .../processor/internals/StreamThread.java | 4 +- .../processor/internals/TaskManager.java | 7 +- .../processor/internals/StreamThreadTest.java | 83 ++++++++++++++++--- .../processor/internals/TaskManagerTest.java | 57 +++++++++++++ 4 files changed, 133 insertions(+), 18 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index da4dfacd17fb5..1493cdd9bf457 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -548,7 +548,7 @@ public void run() { * @throws IllegalStateException If store gets registered after initialized is already finished * @throws StreamsException if the store's change log does not contain the partition */ - private void runLoop() { + void runLoop() { subscribeConsumer(); // if the thread is still in the middle of a rebalance, we should keep polling @@ -570,7 +570,7 @@ private void runLoop() { log.warn("Detected the states of tasks " + e.corruptedTaskWithChangelogs() + " are corrupted. " + "Will close the task as dirty and re-create and bootstrap from scratch.", e); - taskManager.commitInternal( + taskManager.commit( taskManager.tasks() .values() .stream() diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java index 930a9fedbdecb..2f86538a5283f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java @@ -738,11 +738,10 @@ private Stream standbyTaskStream() { * @return number of committed offsets, or -1 if we are in the middle of a rebalance and cannot commit */ int commitAll() { - return commitInternal(tasks.values()); + return commit(tasks.values()); } - int commitInternal(final Collection tasks) { - + int commit(final Collection tasks) { if (rebalanceInProgress) { return -1; } else { @@ -783,7 +782,7 @@ int maybeCommitActiveTasksPerUserRequested() { } else { for (final Task task : activeTaskIterable()) { if (task.commitRequested() && task.commitNeeded()) { - return commitInternal(activeTaskIterable()); + return commit(activeTaskIterable()); } } return 0; diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java index d0488f62d5b3d..a539ec8441b51 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java @@ -16,7 +16,6 @@ */ package org.apache.kafka.streams.processor.internals; -import java.util.concurrent.atomic.AtomicLong; import org.apache.kafka.clients.admin.MockAdminClient; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; @@ -47,6 +46,7 @@ import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler; +import org.apache.kafka.streams.errors.TaskCorruptedException; import org.apache.kafka.streams.errors.TaskMigratedException; import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.kstream.internals.ConsumedInternal; @@ -93,16 +93,21 @@ import java.util.Set; import java.util.UUID; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Stream; import static java.util.Collections.emptyMap; import static java.util.Collections.emptySet; +import static java.util.Collections.singleton; import static java.util.Collections.singletonMap; import static org.apache.kafka.common.utils.Utils.mkEntry; import static org.apache.kafka.common.utils.Utils.mkMap; import static org.apache.kafka.common.utils.Utils.mkProperties; import static org.apache.kafka.streams.processor.internals.ClientUtils.getSharedAdminClientId; import static org.apache.kafka.streams.processor.internals.StateManagerUtil.CHECKPOINT_FILE_NAME; +import static org.easymock.EasyMock.expect; +import static org.easymock.EasyMock.mock; +import static org.easymock.EasyMock.verify; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.not; import static org.hamcrest.CoreMatchers.startsWith; @@ -465,7 +470,7 @@ public void shouldNotCommitBeforeTheCommitInterval() { thread.setNow(mockTime.milliseconds()); thread.maybeCommit(); - EasyMock.verify(taskManager); + verify(taskManager); } @Test @@ -509,7 +514,7 @@ public void shouldEnforceRebalanceAfterNextScheduledProbingRebalanceTime() throw "Thread never started."); TestUtils.retryOnExceptionWithTimeout( - () -> EasyMock.verify(mockConsumer) + () -> verify(mockConsumer) ); thread.shutdown(); @@ -691,7 +696,7 @@ public void shouldNotCauseExceptionIfNothingCommitted() { thread.setNow(mockTime.milliseconds()); thread.maybeCommit(); - EasyMock.verify(taskManager); + verify(taskManager); } @Test @@ -729,7 +734,7 @@ public void shouldCommitAfterTheCommitInterval() { thread.setNow(mockTime.milliseconds()); thread.maybeCommit(); - EasyMock.verify(taskManager); + verify(taskManager); } @Test @@ -920,7 +925,7 @@ public void shouldShutdownTaskManagerOnClose() { } }); thread.run(); - EasyMock.verify(taskManager); + verify(taskManager); } @Test @@ -929,7 +934,7 @@ public void shouldNotReturnDataAfterTaskMigrated() { internalTopologyBuilder = EasyMock.createNiceMock(InternalTopologyBuilder.class); - EasyMock.expect(internalTopologyBuilder.sourceTopicCollection()).andReturn(Collections.singletonList(topic1)).times(2); + expect(internalTopologyBuilder.sourceTopicCollection()).andReturn(Collections.singletonList(topic1)).times(2); final MockConsumer consumer = new MockConsumer<>(OffsetResetStrategy.LATEST); @@ -977,7 +982,7 @@ public void restore() { final IllegalStateException thrown = assertThrows( IllegalStateException.class, thread::run); - EasyMock.verify(taskManager); + verify(taskManager); // The Mock consumer shall throw as the assignment has been wiped out, but records are assigned. assertEquals("No current assignment for partition topic1-1", thrown.getMessage()); @@ -1011,7 +1016,7 @@ public void shouldShutdownTaskManagerOnCloseWithoutStart() { new AtomicLong(Long.MAX_VALUE) ).updateThreadMetadata(getSharedAdminClientId(CLIENT_ID)); thread.shutdown(); - EasyMock.verify(taskManager); + verify(taskManager); } @Test @@ -1043,7 +1048,7 @@ public void shouldOnlyShutdownOnce() { thread.shutdown(); // Execute the run method. Verification of the mock will check that shutdown was only done once thread.run(); - EasyMock.verify(taskManager); + verify(taskManager); } @Test @@ -1905,6 +1910,60 @@ public void shouldThrowTaskMigratedExceptionHandlingRevocation() { assertThrows(TaskMigratedException.class, thread::runOnce); } + @Test + public void shouldCommitNonCorruptedTasksOnTaskCorruptedException() { + final TaskManager taskManager = EasyMock.createNiceMock(TaskManager.class); + final Consumer consumer = mock(Consumer.class); + final Task task1 = mock(Task.class); + final Task task2 = mock(Task.class); + final TaskId taskId1 = new TaskId(0, 0); + final TaskId taskId2 = new TaskId(0, 2); + + final Map> corruptedTasksWithChangelogs = mkMap( + mkEntry(taskId1, emptySet()) + ); + + expect(task1.id()).andReturn(taskId1).anyTimes(); + expect(task2.id()).andReturn(taskId2).anyTimes(); + + expect(taskManager.tasks()).andReturn(mkMap( + mkEntry(taskId1, task1), + mkEntry(taskId2, task2) + )).anyTimes(); + expect(taskManager.commit(singleton(task2))).andReturn(0); + + EasyMock.replay(task1, task2, taskManager); + + final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, StreamsConfig.METRICS_LATEST); + final StreamThread thread = new StreamThread( + mockTime, + config, + null, + consumer, + consumer, + null, + null, + taskManager, + streamsMetrics, + internalTopologyBuilder, + CLIENT_ID, + new LogContext(""), + new AtomicInteger(), + new AtomicLong(Long.MAX_VALUE) + ) { + @Override + void runOnce() { + setState(State.PENDING_SHUTDOWN); + throw new TaskCorruptedException(corruptedTasksWithChangelogs); + } + }.updateThreadMetadata(getSharedAdminClientId(CLIENT_ID)); + + thread.setState(StreamThread.State.STARTING); + thread.runLoop(); + + verify(taskManager); + } + @Test public void shouldLogAndRecordSkippedRecordsForInvalidTimestampsWithBuiltInMetricsVersion0100To24() { shouldLogAndRecordSkippedRecordsForInvalidTimestamps(StreamsConfig.METRICS_0100_TO_24); @@ -2045,7 +2104,7 @@ public void shouldTransmitTaskManagerMetrics() { new MockTime()); final Map dummyProducerMetrics = singletonMap(testMetricName, testMetric); - EasyMock.expect(taskManager.producerMetrics()).andReturn(dummyProducerMetrics); + expect(taskManager.producerMetrics()).andReturn(dummyProducerMetrics); EasyMock.replay(taskManager, consumer); final StreamsMetricsImpl streamsMetrics = @@ -2118,7 +2177,7 @@ private TaskManager mockTaskManagerCommit(final Consumer consume final int numberOfCommits, final int commits) { final TaskManager taskManager = EasyMock.createNiceMock(TaskManager.class); - EasyMock.expect(taskManager.commitAll()).andReturn(commits).times(numberOfCommits); + expect(taskManager.commitAll()).andReturn(commits).times(numberOfCommits); EasyMock.replay(taskManager, consumer); return taskManager; } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java index 3ba72d11bb45c..998ff7a89c2a1 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java @@ -127,6 +127,14 @@ public class TaskManagerTest { private final TopicPartition t1p3 = new TopicPartition(topic1, 3); private final Set taskId03Partitions = mkSet(t1p3); + private final TaskId taskId04 = new TaskId(0, 4); + private final TopicPartition t1p4 = new TopicPartition(topic1, 4); + private final Set taskId04Partitions = mkSet(t1p4); + + private final TaskId taskId05 = new TaskId(0, 5); + private final TopicPartition t1p5 = new TopicPartition(topic1, 5); + private final Set taskId05Partitions = mkSet(t1p5); + private final TaskId taskId10 = new TaskId(1, 0); private final TopicPartition t2p0 = new TopicPartition(topic2, 0); private final Set taskId10Partitions = mkSet(t2p0); @@ -1528,6 +1536,55 @@ public void shouldCommitActiveAndStandbyTasks() { assertThat(task01.commitNeeded, is(false)); } + @Test + public void shouldCommitProvidedTasksIfNeeded() { + final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true); + final StateMachineTask task01 = new StateMachineTask(taskId01, taskId01Partitions, true); + final StateMachineTask task02 = new StateMachineTask(taskId02, taskId02Partitions, true); + final StateMachineTask task03 = new StateMachineTask(taskId03, taskId03Partitions, false); + final StateMachineTask task04 = new StateMachineTask(taskId04, taskId04Partitions, false); + final StateMachineTask task05 = new StateMachineTask(taskId05, taskId05Partitions, false); + + final Map> assignmentActive = mkMap( + mkEntry(taskId00, taskId00Partitions), + mkEntry(taskId01, taskId01Partitions), + mkEntry(taskId02, taskId02Partitions) + ); + final Map> assignmentStandby = mkMap( + mkEntry(taskId03, taskId03Partitions), + mkEntry(taskId04, taskId04Partitions), + mkEntry(taskId05, taskId05Partitions) + ); + + expectRestoreToBeCompleted(consumer, changeLogReader); + expect(activeTaskCreator.createTasks(anyObject(), eq(assignmentActive))) + .andReturn(Arrays.asList(task00, task01, task02)).anyTimes(); + expect(standbyTaskCreator.createTasks(eq(assignmentStandby))) + .andReturn(Arrays.asList(task03, task04, task05)).anyTimes(); + expectLastCall(); + + replay(activeTaskCreator, standbyTaskCreator, consumer, changeLogReader); + + taskManager.handleAssignment(assignmentActive, assignmentStandby); + assertThat(taskManager.tryToCompleteRestoration(), is(true)); + + assertThat(task00.state(), is(Task.State.RUNNING)); + assertThat(task01.state(), is(Task.State.RUNNING)); + + task00.setCommitNeeded(); + task01.setCommitNeeded(); + task03.setCommitNeeded(); + task04.setCommitNeeded(); + + assertThat(taskManager.commit(Arrays.asList(task00, task02, task03, task05)), equalTo(2)); + assertThat(task00.commitNeeded, is(false)); + assertThat(task01.commitNeeded, is(true)); + assertThat(task02.commitNeeded, is(false)); + assertThat(task03.commitNeeded, is(false)); + assertThat(task04.commitNeeded, is(true)); + assertThat(task05.commitNeeded, is(false)); + } + @Test public void shouldNotCommitOffsetsIfOnlyStandbyTasksAssigned() { final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, false);