From d1a4a0a148d3eda948fb69f4cda19daa04d4a92a Mon Sep 17 00:00:00 2001 From: "Matthias J. Sax" Date: Mon, 27 Mar 2017 18:01:55 -0700 Subject: [PATCH 1/5] MINOR: StreamThread should catch InvalidTopicException --- .../processor/internals/StreamThread.java | 67 ++++++++++++------- 1 file changed, 41 insertions(+), 26 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index 9791a0a3bea8c..9397f6906b39e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -22,7 +22,7 @@ import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.clients.consumer.NoOffsetForPartitionException; +import org.apache.kafka.clients.consumer.InvalidOffsetException; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.TopicPartition; @@ -529,37 +529,43 @@ private ConsumerRecords pollRequests(final long pollTimeMs) { try { records = consumer.poll(pollTimeMs); - } catch (NoOffsetForPartitionException ex) { - TopicPartition partition = ex.partition(); - if (builder.earliestResetTopicsPattern().matcher(partition.topic()).matches()) { - log.info(String.format("stream-thread [%s] setting topic to consume from earliest offset %s", this.getName(), partition.topic())); - consumer.seekToBeginning(ex.partitions()); - } else if (builder.latestResetTopicsPattern().matcher(partition.topic()).matches()) { - consumer.seekToEnd(ex.partitions()); - log.info(String.format("stream-thread [%s] setting topic to consume from latest offset %s", this.getName(), partition.topic())); - } else { - - if (originalReset == null || (!originalReset.equals("earliest") && !originalReset.equals("latest"))) { - setState(State.PENDING_SHUTDOWN); - String errorMessage = "No valid committed offset found for input topic %s (partition %s) and no valid reset policy configured." + - " You need to set configuration parameter \"auto.offset.reset\" or specify a topic specific reset " + - "policy via KStreamBuilder#stream(StreamsConfig.AutoOffsetReset offsetReset, ...) or KStreamBuilder#table(StreamsConfig.AutoOffsetReset offsetReset, ...)"; - throw new StreamsException(String.format(errorMessage, partition.topic(), partition.partition()), ex); - } + } catch (final InvalidOffsetException e) { + final Set partitions = e.partitions(); + final Set loggedTopics = new HashSet<>(); + final Set seekToBeginning = new HashSet<>(); + final Set seekToEnd = new HashSet<>(); + + for (final TopicPartition partition : partitions) { + if (builder.earliestResetTopicsPattern().matcher(partition.topic()).matches()) { + reset(partition, seekToBeginning, "stream-thread [%s] setting topic %s to consume from %s offset", "earliest", loggedTopics); + } else if (builder.latestResetTopicsPattern().matcher(partition.topic()).matches()) { + reset(partition, seekToEnd, "stream-thread [%s] setting topic %s to consume from %s offset", "latest", loggedTopics); + } else { + if (originalReset == null || (!originalReset.equals("earliest") && !originalReset.equals("latest"))) { + setState(State.PENDING_SHUTDOWN); + final String errorMessage = "No valid committed offset found for input topic %s (partition %s) and no valid reset policy configured." + + " You need to set configuration parameter \"auto.offset.reset\" or specify a topic specific reset " + + "policy via KStreamBuilder#stream(StreamsConfig.AutoOffsetReset offsetReset, ...) or KStreamBuilder#table(StreamsConfig.AutoOffsetReset offsetReset, ...)"; + throw new StreamsException(String.format(errorMessage, partition.topic(), partition.partition()), e); + } - if (originalReset.equals("earliest")) { - consumer.seekToBeginning(ex.partitions()); - } else if (originalReset.equals("latest")) { - consumer.seekToEnd(ex.partitions()); + log.info(String.format("stream-thread [%s] no custom setting defined for topic %s using original config %s for offset reset", getName(), partition.topic(), originalReset)); + if (originalReset.equals("earliest")) { + reset(partition, seekToBeginning, "stream-thread [%s] no custom setting defined for topic %s using original config %s for offset reset", "earliest", loggedTopics); + } else if (originalReset.equals("latest")) { + reset(partition, seekToEnd, "stream-thread [%s] no custom setting defined for topic %s using original config %s for offset reset", "latest", loggedTopics); + } } - log.info(String.format("stream-thread [%s] no custom setting defined for topic %s using original config %s for offset reset", this.getName(), partition.topic(), originalReset)); } + if (!seekToBeginning.isEmpty()) { + consumer.seekToBeginning(seekToBeginning); + } + if (!seekToEnd.isEmpty()) { + consumer.seekToEnd(seekToEnd); + } } - if (rebalanceException != null) - throw new StreamsException(logPrefix + " Failed to rebalance", rebalanceException); - return records; } @@ -687,6 +693,15 @@ private void runLoop() { log.info("{} Shutting down at user request", logPrefix); } + private void reset(final TopicPartition partition, final Set partitions, final String logMessage, final String resetPolicy, final Set loggedTopics) { + final String topic = partition.topic(); + if (!loggedTopics.contains(topic)) { + loggedTopics.add(topic); + log.info(String.format(logMessage, getName(), topic, resetPolicy)); + } + partitions.add(partition); + } + private void maybeUpdateStandbyTasks() { if (!standbyTasks.isEmpty()) { if (processStandbyRecords) { From e846ada5dc98361cdcb0b5e705407dd2f18bc298 Mon Sep 17 00:00:00 2001 From: "Matthias J. Sax" Date: Thu, 30 Mar 2017 18:28:25 -0700 Subject: [PATCH 2/5] Github comments and rebasing --- .../processor/internals/StreamThread.java | 27 ++-- ...msFineGrainedAutoResetIntegrationTest.java | 145 ++++++++++++++---- 2 files changed, 126 insertions(+), 46 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index 9397f6906b39e..8f465e34adb86 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -537,9 +537,9 @@ private ConsumerRecords pollRequests(final long pollTimeMs) { for (final TopicPartition partition : partitions) { if (builder.earliestResetTopicsPattern().matcher(partition.topic()).matches()) { - reset(partition, seekToBeginning, "stream-thread [%s] setting topic %s to consume from %s offset", "earliest", loggedTopics); + addToResetList(partition, seekToBeginning, "stream-thread [%s] setting topic %s to consume from %s offset", "earliest", loggedTopics); } else if (builder.latestResetTopicsPattern().matcher(partition.topic()).matches()) { - reset(partition, seekToEnd, "stream-thread [%s] setting topic %s to consume from %s offset", "latest", loggedTopics); + addToResetList(partition, seekToEnd, "stream-thread [%s] setting topic %s to consume from %s offset", "latest", loggedTopics); } else { if (originalReset == null || (!originalReset.equals("earliest") && !originalReset.equals("latest"))) { setState(State.PENDING_SHUTDOWN); @@ -549,11 +549,10 @@ private ConsumerRecords pollRequests(final long pollTimeMs) { throw new StreamsException(String.format(errorMessage, partition.topic(), partition.partition()), e); } - log.info(String.format("stream-thread [%s] no custom setting defined for topic %s using original config %s for offset reset", getName(), partition.topic(), originalReset)); if (originalReset.equals("earliest")) { - reset(partition, seekToBeginning, "stream-thread [%s] no custom setting defined for topic %s using original config %s for offset reset", "earliest", loggedTopics); + addToResetList(partition, seekToBeginning, "stream-thread [%s] no custom setting defined for topic %s using original config %s for offset reset", "earliest", loggedTopics); } else if (originalReset.equals("latest")) { - reset(partition, seekToEnd, "stream-thread [%s] no custom setting defined for topic %s using original config %s for offset reset", "latest", loggedTopics); + addToResetList(partition, seekToEnd, "stream-thread [%s] no custom setting defined for topic %s using original config %s for offset reset", "latest", loggedTopics); } } } @@ -569,6 +568,15 @@ private ConsumerRecords pollRequests(final long pollTimeMs) { return records; } + private void addToResetList(final TopicPartition partition, final Set partitions, final String logMessage, final String resetPolicy, final Set loggedTopics) { + final String topic = partition.topic(); + if (!loggedTopics.contains(topic)) { + loggedTopics.add(topic); + log.info(String.format(logMessage, getName(), topic, resetPolicy)); + } + partitions.add(partition); + } + /** * Take records and add them to each respective task * @param records Records, can be null @@ -693,15 +701,6 @@ private void runLoop() { log.info("{} Shutting down at user request", logPrefix); } - private void reset(final TopicPartition partition, final Set partitions, final String logMessage, final String resetPolicy, final Set loggedTopics) { - final String topic = partition.topic(); - if (!loggedTopics.contains(topic)) { - loggedTopics.add(topic); - log.info(String.format(logMessage, getName(), topic, resetPolicy)); - } - partitions.add(partition); - } - private void maybeUpdateStandbyTasks() { if (!standbyTasks.isEmpty()) { if (processStandbyRecords) { diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamsFineGrainedAutoResetIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamsFineGrainedAutoResetIntegrationTest.java index 3028b6b2d7288..9fc3531bee3a5 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamsFineGrainedAutoResetIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamsFineGrainedAutoResetIntegrationTest.java @@ -19,12 +19,16 @@ import kafka.utils.MockTime; import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.errors.TopologyBuilderException; import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; @@ -43,7 +47,9 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Properties; import java.util.regex.Pattern; @@ -61,12 +67,18 @@ public class KStreamsFineGrainedAutoResetIntegrationTest { public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS); private final MockTime mockTime = CLUSTER.time; - private static final String TOPIC_1 = "topic-1"; - private static final String TOPIC_2 = "topic-2"; - private static final String TOPIC_A = "topic-A"; - private static final String TOPIC_C = "topic-C"; - private static final String TOPIC_Y = "topic-Y"; - private static final String TOPIC_Z = "topic-Z"; + private static final String TOPIC_1_1 = "topic-1_1"; + private static final String TOPIC_2_1 = "topic-2_1"; + private static final String TOPIC_A_1 = "topic-A_1"; + private static final String TOPIC_C_1 = "topic-C_1"; + private static final String TOPIC_Y_1 = "topic-Y_1"; + private static final String TOPIC_Z_1 = "topic-Z_1"; + private static final String TOPIC_1_2 = "topic-1_2"; + private static final String TOPIC_2_2 = "topic-2_2"; + private static final String TOPIC_A_2 = "topic-A_2"; + private static final String TOPIC_C_2 = "topic-C_2"; + private static final String TOPIC_Y_2 = "topic-Y_2"; + private static final String TOPIC_Z_2 = "topic-Z_2"; private static final String NOOP = "noop"; private final Serde stringSerde = Serdes.String(); @@ -76,15 +88,20 @@ public class KStreamsFineGrainedAutoResetIntegrationTest { @BeforeClass public static void startKafkaCluster() throws Exception { - CLUSTER.createTopic(TOPIC_1); - CLUSTER.createTopic(TOPIC_2); - CLUSTER.createTopic(TOPIC_A); - CLUSTER.createTopic(TOPIC_C); - CLUSTER.createTopic(TOPIC_Y); - CLUSTER.createTopic(TOPIC_Z); + CLUSTER.createTopic(TOPIC_1_1); + CLUSTER.createTopic(TOPIC_2_1); + CLUSTER.createTopic(TOPIC_A_1); + CLUSTER.createTopic(TOPIC_C_1); + CLUSTER.createTopic(TOPIC_Y_1); + CLUSTER.createTopic(TOPIC_Z_1); + CLUSTER.createTopic(TOPIC_1_2); + CLUSTER.createTopic(TOPIC_2_2); + CLUSTER.createTopic(TOPIC_A_2); + CLUSTER.createTopic(TOPIC_C_2); + CLUSTER.createTopic(TOPIC_Y_2); + CLUSTER.createTopic(TOPIC_Z_2); CLUSTER.createTopic(NOOP); CLUSTER.createTopic(DEFAULT_OUTPUT_TOPIC); - } @Before @@ -105,12 +122,12 @@ public void setUp() throws Exception { } @Test - public void shouldOnlyReadRecordsWhereEarliestSpecified() throws Exception { + public void shouldOnlyReadRecordsWhereEarliestSpecifiedWithNoCommittedOffsets() throws Exception { final KStreamBuilder builder = new KStreamBuilder(); - final KStream pattern1Stream = builder.stream(KStreamBuilder.AutoOffsetReset.EARLIEST, Pattern.compile("topic-\\d")); - final KStream pattern2Stream = builder.stream(KStreamBuilder.AutoOffsetReset.LATEST, Pattern.compile("topic-[A-D]")); - final KStream namedTopicsStream = builder.stream(TOPIC_Y, TOPIC_Z); + final KStream pattern1Stream = builder.stream(KStreamBuilder.AutoOffsetReset.EARLIEST, Pattern.compile("topic-\\d_1")); + final KStream pattern2Stream = builder.stream(KStreamBuilder.AutoOffsetReset.LATEST, Pattern.compile("topic-[A-D]_1")); + final KStream namedTopicsStream = builder.stream(TOPIC_Y_1, TOPIC_Z_1); pattern1Stream.to(stringSerde, stringSerde, DEFAULT_OUTPUT_TOPIC); pattern2Stream.to(stringSerde, stringSerde, DEFAULT_OUTPUT_TOPIC); @@ -125,12 +142,12 @@ public void shouldOnlyReadRecordsWhereEarliestSpecified() throws Exception { final String topicYTestMessage = "topic-Y test"; final String topicZTestMessage = "topic-Z test"; - IntegrationTestUtils.produceValuesSynchronously(TOPIC_1, Collections.singletonList(topic1TestMessage), producerConfig, mockTime); - IntegrationTestUtils.produceValuesSynchronously(TOPIC_2, Collections.singletonList(topic2TestMessage), producerConfig, mockTime); - IntegrationTestUtils.produceValuesSynchronously(TOPIC_A, Collections.singletonList(topicATestMessage), producerConfig, mockTime); - IntegrationTestUtils.produceValuesSynchronously(TOPIC_C, Collections.singletonList(topicCTestMessage), producerConfig, mockTime); - IntegrationTestUtils.produceValuesSynchronously(TOPIC_Y, Collections.singletonList(topicYTestMessage), producerConfig, mockTime); - IntegrationTestUtils.produceValuesSynchronously(TOPIC_Z, Collections.singletonList(topicZTestMessage), producerConfig, mockTime); + IntegrationTestUtils.produceValuesSynchronously(TOPIC_1_1, Collections.singletonList(topic1TestMessage), producerConfig, mockTime); + IntegrationTestUtils.produceValuesSynchronously(TOPIC_2_1, Collections.singletonList(topic2TestMessage), producerConfig, mockTime); + IntegrationTestUtils.produceValuesSynchronously(TOPIC_A_1, Collections.singletonList(topicATestMessage), producerConfig, mockTime); + IntegrationTestUtils.produceValuesSynchronously(TOPIC_C_1, Collections.singletonList(topicCTestMessage), producerConfig, mockTime); + IntegrationTestUtils.produceValuesSynchronously(TOPIC_Y_1, Collections.singletonList(topicYTestMessage), producerConfig, mockTime); + IntegrationTestUtils.produceValuesSynchronously(TOPIC_Z_1, Collections.singletonList(topicZTestMessage), producerConfig, mockTime); final Properties consumerConfig = TestUtils.consumerConfig(CLUSTER.bootstrapServers(), StringDeserializer.class, StringDeserializer.class); @@ -149,35 +166,99 @@ public void shouldOnlyReadRecordsWhereEarliestSpecified() throws Exception { Collections.sort(actualValues); Collections.sort(expectedReceivedValues); assertThat(actualValues, equalTo(expectedReceivedValues)); + } + + @Test + public void shouldOnlyReadRecordsWhereEarliestSpecifiedWithInvalidCommittedOffsets() throws Exception { + commitInvalidOffsets(); + + final KStreamBuilder builder = new KStreamBuilder(); + + final KStream pattern1Stream = builder.stream(KStreamBuilder.AutoOffsetReset.EARLIEST, Pattern.compile("topic-\\d_2")); + final KStream pattern2Stream = builder.stream(KStreamBuilder.AutoOffsetReset.LATEST, Pattern.compile("topic-[A-D]_2")); + final KStream namedTopicsStream = builder.stream(TOPIC_Y_2, TOPIC_Z_2); + + pattern1Stream.to(stringSerde, stringSerde, DEFAULT_OUTPUT_TOPIC); + pattern2Stream.to(stringSerde, stringSerde, DEFAULT_OUTPUT_TOPIC); + namedTopicsStream.to(stringSerde, stringSerde, DEFAULT_OUTPUT_TOPIC); + + final Properties producerConfig = TestUtils.producerConfig(CLUSTER.bootstrapServers(), StringSerializer.class, StringSerializer.class); + + final String topic1TestMessage = "topic-1 test"; + final String topic2TestMessage = "topic-2 test"; + final String topicATestMessage = "topic-A test"; + final String topicCTestMessage = "topic-C test"; + final String topicYTestMessage = "topic-Y test"; + final String topicZTestMessage = "topic-Z test"; + + IntegrationTestUtils.produceValuesSynchronously(TOPIC_1_2, Collections.singletonList(topic1TestMessage), producerConfig, mockTime); + IntegrationTestUtils.produceValuesSynchronously(TOPIC_2_2, Collections.singletonList(topic2TestMessage), producerConfig, mockTime); + IntegrationTestUtils.produceValuesSynchronously(TOPIC_A_2, Collections.singletonList(topicATestMessage), producerConfig, mockTime); + IntegrationTestUtils.produceValuesSynchronously(TOPIC_C_2, Collections.singletonList(topicCTestMessage), producerConfig, mockTime); + IntegrationTestUtils.produceValuesSynchronously(TOPIC_Y_2, Collections.singletonList(topicYTestMessage), producerConfig, mockTime); + IntegrationTestUtils.produceValuesSynchronously(TOPIC_Z_2, Collections.singletonList(topicZTestMessage), producerConfig, mockTime); + + final Properties consumerConfig = TestUtils.consumerConfig(CLUSTER.bootstrapServers(), StringDeserializer.class, StringDeserializer.class); + final KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration); + streams.start(); + + final List expectedReceivedValues = Arrays.asList(topic1TestMessage, topic2TestMessage, topicYTestMessage, topicZTestMessage); + final List> receivedKeyValues = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, DEFAULT_OUTPUT_TOPIC, 4); + final List actualValues = new ArrayList<>(4); + + for (final KeyValue receivedKeyValue : receivedKeyValues) { + actualValues.add(receivedKeyValue.value); + } + + streams.close(); + Collections.sort(actualValues); + Collections.sort(expectedReceivedValues); + assertThat(actualValues, equalTo(expectedReceivedValues)); } + private void commitInvalidOffsets() { + final KafkaConsumer consumer = new KafkaConsumer(TestUtils.consumerConfig( + CLUSTER.bootstrapServers(), + streamsConfiguration.getProperty(StreamsConfig.APPLICATION_ID_CONFIG), + StringDeserializer.class, + StringDeserializer.class)); + + final Map invalidOffsets = new HashMap<>(); + invalidOffsets.put(new TopicPartition(TOPIC_1_2, 0), new OffsetAndMetadata(5, null)); + invalidOffsets.put(new TopicPartition(TOPIC_2_2, 0), new OffsetAndMetadata(5, null)); + invalidOffsets.put(new TopicPartition(TOPIC_A_2, 0), new OffsetAndMetadata(5, null)); + invalidOffsets.put(new TopicPartition(TOPIC_C_2, 0), new OffsetAndMetadata(5, null)); + invalidOffsets.put(new TopicPartition(TOPIC_Y_2, 0), new OffsetAndMetadata(5, null)); + invalidOffsets.put(new TopicPartition(TOPIC_Z_2, 0), new OffsetAndMetadata(5, null)); + + consumer.commitSync(invalidOffsets); + + consumer.close(); + } @Test(expected = TopologyBuilderException.class) public void shouldThrowExceptionOverlappingPattern() throws Exception { final KStreamBuilder builder = new KStreamBuilder(); //NOTE this would realistically get caught when building topology, the test is for completeness - final KStream pattern1Stream = builder.stream(KStreamBuilder.AutoOffsetReset.EARLIEST, Pattern.compile("topic-[A-D]")); - final KStream pattern2Stream = builder.stream(KStreamBuilder.AutoOffsetReset.LATEST, Pattern.compile("topic-[A-D]")); - final KStream namedTopicsStream = builder.stream(TOPIC_Y, TOPIC_Z); + builder.stream(KStreamBuilder.AutoOffsetReset.EARLIEST, Pattern.compile("topic-[A-D]_1")); + builder.stream(KStreamBuilder.AutoOffsetReset.LATEST, Pattern.compile("topic-[A-D]_1")); + builder.stream(TOPIC_Y_1, TOPIC_Z_1); builder.earliestResetTopicsPattern(); - } @Test(expected = TopologyBuilderException.class) public void shouldThrowExceptionOverlappingTopic() throws Exception { final KStreamBuilder builder = new KStreamBuilder(); //NOTE this would realistically get caught when building topology, the test is for completeness - final KStream pattern1Stream = builder.stream(KStreamBuilder.AutoOffsetReset.EARLIEST, Pattern.compile("topic-[A-D]")); - final KStream pattern2Stream = builder.stream(KStreamBuilder.AutoOffsetReset.LATEST, Pattern.compile("topic-\\d]")); - final KStream namedTopicsStream = builder.stream(KStreamBuilder.AutoOffsetReset.LATEST, TOPIC_A, TOPIC_Z); + builder.stream(KStreamBuilder.AutoOffsetReset.EARLIEST, Pattern.compile("topic-[A-D]_1")); + builder.stream(KStreamBuilder.AutoOffsetReset.LATEST, Pattern.compile("topic-\\d_1")); + builder.stream(KStreamBuilder.AutoOffsetReset.LATEST, TOPIC_A_1, TOPIC_Z_1); builder.latestResetTopicsPattern(); - } - @Test public void shouldThrowStreamsExceptionNoResetSpecified() throws Exception { Properties props = new Properties(); From a4bdf4dff6085e12c804a3069f3f5f061d5adb22 Mon Sep 17 00:00:00 2001 From: "Matthias J. Sax" Date: Fri, 31 Mar 2017 13:48:13 -0700 Subject: [PATCH 3/5] Damian's comment --- .../apache/kafka/streams/processor/internals/StreamThread.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index 8f465e34adb86..fdaae5fff52c1 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -570,8 +570,7 @@ private ConsumerRecords pollRequests(final long pollTimeMs) { private void addToResetList(final TopicPartition partition, final Set partitions, final String logMessage, final String resetPolicy, final Set loggedTopics) { final String topic = partition.topic(); - if (!loggedTopics.contains(topic)) { - loggedTopics.add(topic); + if (loggedTopics.add(topic)) { log.info(String.format(logMessage, getName(), topic, resetPolicy)); } partitions.add(partition); From ac3c9eea72d1d5f38adae7d641ba286c0e43c0ab Mon Sep 17 00:00:00 2001 From: "Matthias J. Sax" Date: Mon, 3 Apr 2017 09:28:01 -0700 Subject: [PATCH 4/5] Damian's comment --- .../processor/internals/StreamThread.java | 64 ++++++++++--------- 1 file changed, 34 insertions(+), 30 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index fdaae5fff52c1..a4f294917638b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -530,42 +530,46 @@ private ConsumerRecords pollRequests(final long pollTimeMs) { try { records = consumer.poll(pollTimeMs); } catch (final InvalidOffsetException e) { - final Set partitions = e.partitions(); - final Set loggedTopics = new HashSet<>(); - final Set seekToBeginning = new HashSet<>(); - final Set seekToEnd = new HashSet<>(); - - for (final TopicPartition partition : partitions) { - if (builder.earliestResetTopicsPattern().matcher(partition.topic()).matches()) { - addToResetList(partition, seekToBeginning, "stream-thread [%s] setting topic %s to consume from %s offset", "earliest", loggedTopics); - } else if (builder.latestResetTopicsPattern().matcher(partition.topic()).matches()) { - addToResetList(partition, seekToEnd, "stream-thread [%s] setting topic %s to consume from %s offset", "latest", loggedTopics); - } else { - if (originalReset == null || (!originalReset.equals("earliest") && !originalReset.equals("latest"))) { - setState(State.PENDING_SHUTDOWN); - final String errorMessage = "No valid committed offset found for input topic %s (partition %s) and no valid reset policy configured." + - " You need to set configuration parameter \"auto.offset.reset\" or specify a topic specific reset " + - "policy via KStreamBuilder#stream(StreamsConfig.AutoOffsetReset offsetReset, ...) or KStreamBuilder#table(StreamsConfig.AutoOffsetReset offsetReset, ...)"; - throw new StreamsException(String.format(errorMessage, partition.topic(), partition.partition()), e); - } + resetInvalidOffsets(e); + } - if (originalReset.equals("earliest")) { - addToResetList(partition, seekToBeginning, "stream-thread [%s] no custom setting defined for topic %s using original config %s for offset reset", "earliest", loggedTopics); - } else if (originalReset.equals("latest")) { - addToResetList(partition, seekToEnd, "stream-thread [%s] no custom setting defined for topic %s using original config %s for offset reset", "latest", loggedTopics); - } + return records; + } + + private void resetInvalidOffsets(final InvalidOffsetException e) { + final Set partitions = e.partitions(); + final Set loggedTopics = new HashSet<>(); + final Set seekToBeginning = new HashSet<>(); + final Set seekToEnd = new HashSet<>(); + + for (final TopicPartition partition : partitions) { + if (builder.earliestResetTopicsPattern().matcher(partition.topic()).matches()) { + addToResetList(partition, seekToBeginning, "stream-thread [%s] setting topic %s to consume from %s offset", "earliest", loggedTopics); + } else if (builder.latestResetTopicsPattern().matcher(partition.topic()).matches()) { + addToResetList(partition, seekToEnd, "stream-thread [%s] setting topic %s to consume from %s offset", "latest", loggedTopics); + } else { + if (originalReset == null || (!originalReset.equals("earliest") && !originalReset.equals("latest"))) { + setState(State.PENDING_SHUTDOWN); + final String errorMessage = "No valid committed offset found for input topic %s (partition %s) and no valid reset policy configured." + + " You need to set configuration parameter \"auto.offset.reset\" or specify a topic specific reset " + + "policy via KStreamBuilder#stream(StreamsConfig.AutoOffsetReset offsetReset, ...) or KStreamBuilder#table(StreamsConfig.AutoOffsetReset offsetReset, ...)"; + throw new StreamsException(String.format(errorMessage, partition.topic(), partition.partition()), e); } - } - if (!seekToBeginning.isEmpty()) { - consumer.seekToBeginning(seekToBeginning); - } - if (!seekToEnd.isEmpty()) { - consumer.seekToEnd(seekToEnd); + if (originalReset.equals("earliest")) { + addToResetList(partition, seekToBeginning, "stream-thread [%s] no custom setting defined for topic %s using original config %s for offset reset", "earliest", loggedTopics); + } else if (originalReset.equals("latest")) { + addToResetList(partition, seekToEnd, "stream-thread [%s] no custom setting defined for topic %s using original config %s for offset reset", "latest", loggedTopics); + } } } - return records; + if (!seekToBeginning.isEmpty()) { + consumer.seekToBeginning(seekToBeginning); + } + if (!seekToEnd.isEmpty()) { + consumer.seekToEnd(seekToEnd); + } } private void addToResetList(final TopicPartition partition, final Set partitions, final String logMessage, final String resetPolicy, final Set loggedTopics) { From fee0a4f83b1eb47655b4276f0504d6899cd538a1 Mon Sep 17 00:00:00 2001 From: "Matthias J. Sax" Date: Wed, 5 Apr 2017 17:49:52 -0700 Subject: [PATCH 5/5] added one more test --- ...msFineGrainedAutoResetIntegrationTest.java | 128 +++++++++--------- 1 file changed, 65 insertions(+), 63 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamsFineGrainedAutoResetIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamsFineGrainedAutoResetIntegrationTest.java index 9fc3531bee3a5..1c15a98ae026b 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamsFineGrainedAutoResetIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamsFineGrainedAutoResetIntegrationTest.java @@ -62,11 +62,20 @@ public class KStreamsFineGrainedAutoResetIntegrationTest { private static final int NUM_BROKERS = 1; private static final String DEFAULT_OUTPUT_TOPIC = "outputTopic"; + private static final String OUTPUT_TOPIC_0 = "outputTopic_0"; + private static final String OUTPUT_TOPIC_1 = "outputTopic_1"; + private static final String OUTPUT_TOPIC_2 = "outputTopic_2"; @ClassRule public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS); private final MockTime mockTime = CLUSTER.time; + private static final String TOPIC_1_0 = "topic-1_0"; + private static final String TOPIC_2_0 = "topic-2_0"; + private static final String TOPIC_A_0 = "topic-A_0"; + private static final String TOPIC_C_0 = "topic-C_0"; + private static final String TOPIC_Y_0 = "topic-Y_0"; + private static final String TOPIC_Z_0 = "topic-Z_0"; private static final String TOPIC_1_1 = "topic-1_1"; private static final String TOPIC_2_1 = "topic-2_1"; private static final String TOPIC_A_1 = "topic-A_1"; @@ -85,9 +94,22 @@ public class KStreamsFineGrainedAutoResetIntegrationTest { private static final String STRING_SERDE_CLASSNAME = Serdes.String().getClass().getName(); private Properties streamsConfiguration; + private final String topic1TestMessage = "topic-1 test"; + private final String topic2TestMessage = "topic-2 test"; + private final String topicATestMessage = "topic-A test"; + private final String topicCTestMessage = "topic-C test"; + private final String topicYTestMessage = "topic-Y test"; + private final String topicZTestMessage = "topic-Z test"; + @BeforeClass public static void startKafkaCluster() throws Exception { + CLUSTER.createTopic(TOPIC_1_0); + CLUSTER.createTopic(TOPIC_2_0); + CLUSTER.createTopic(TOPIC_A_0); + CLUSTER.createTopic(TOPIC_C_0); + CLUSTER.createTopic(TOPIC_Y_0); + CLUSTER.createTopic(TOPIC_Z_0); CLUSTER.createTopic(TOPIC_1_1); CLUSTER.createTopic(TOPIC_2_1); CLUSTER.createTopic(TOPIC_A_1); @@ -102,6 +124,9 @@ public static void startKafkaCluster() throws Exception { CLUSTER.createTopic(TOPIC_Z_2); CLUSTER.createTopic(NOOP); CLUSTER.createTopic(DEFAULT_OUTPUT_TOPIC); + CLUSTER.createTopic(OUTPUT_TOPIC_0); + CLUSTER.createTopic(OUTPUT_TOPIC_1); + CLUSTER.createTopic(OUTPUT_TOPIC_2); } @Before @@ -122,90 +147,67 @@ public void setUp() throws Exception { } @Test - public void shouldOnlyReadRecordsWhereEarliestSpecifiedWithNoCommittedOffsets() throws Exception { - final KStreamBuilder builder = new KStreamBuilder(); - - final KStream pattern1Stream = builder.stream(KStreamBuilder.AutoOffsetReset.EARLIEST, Pattern.compile("topic-\\d_1")); - final KStream pattern2Stream = builder.stream(KStreamBuilder.AutoOffsetReset.LATEST, Pattern.compile("topic-[A-D]_1")); - final KStream namedTopicsStream = builder.stream(TOPIC_Y_1, TOPIC_Z_1); - - pattern1Stream.to(stringSerde, stringSerde, DEFAULT_OUTPUT_TOPIC); - pattern2Stream.to(stringSerde, stringSerde, DEFAULT_OUTPUT_TOPIC); - namedTopicsStream.to(stringSerde, stringSerde, DEFAULT_OUTPUT_TOPIC); - - final Properties producerConfig = TestUtils.producerConfig(CLUSTER.bootstrapServers(), StringSerializer.class, StringSerializer.class); - - final String topic1TestMessage = "topic-1 test"; - final String topic2TestMessage = "topic-2 test"; - final String topicATestMessage = "topic-A test"; - final String topicCTestMessage = "topic-C test"; - final String topicYTestMessage = "topic-Y test"; - final String topicZTestMessage = "topic-Z test"; - - IntegrationTestUtils.produceValuesSynchronously(TOPIC_1_1, Collections.singletonList(topic1TestMessage), producerConfig, mockTime); - IntegrationTestUtils.produceValuesSynchronously(TOPIC_2_1, Collections.singletonList(topic2TestMessage), producerConfig, mockTime); - IntegrationTestUtils.produceValuesSynchronously(TOPIC_A_1, Collections.singletonList(topicATestMessage), producerConfig, mockTime); - IntegrationTestUtils.produceValuesSynchronously(TOPIC_C_1, Collections.singletonList(topicCTestMessage), producerConfig, mockTime); - IntegrationTestUtils.produceValuesSynchronously(TOPIC_Y_1, Collections.singletonList(topicYTestMessage), producerConfig, mockTime); - IntegrationTestUtils.produceValuesSynchronously(TOPIC_Z_1, Collections.singletonList(topicZTestMessage), producerConfig, mockTime); + public void shouldOnlyReadRecordsWhereEarliestSpecifiedWithNoCommittedOffsetsWithGlobalAutoOffsetResetLatest() throws Exception { + streamsConfiguration.put(StreamsConfig.consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), "latest"); - final Properties consumerConfig = TestUtils.consumerConfig(CLUSTER.bootstrapServers(), StringDeserializer.class, StringDeserializer.class); - - final KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration); - streams.start(); + final List expectedReceivedValues = Arrays.asList(topic1TestMessage, topic2TestMessage); + shouldOnlyReadForEarliest("_0", TOPIC_1_0, TOPIC_2_0, TOPIC_A_0, TOPIC_C_0, TOPIC_Y_0, TOPIC_Z_0, OUTPUT_TOPIC_0, expectedReceivedValues); + } + @Test + public void shouldOnlyReadRecordsWhereEarliestSpecifiedWithNoCommittedOffsetsWithDefaultGlobalAutoOffsetResetEarliest() throws Exception { final List expectedReceivedValues = Arrays.asList(topic1TestMessage, topic2TestMessage, topicYTestMessage, topicZTestMessage); - final List> receivedKeyValues = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, DEFAULT_OUTPUT_TOPIC, 4); - final List actualValues = new ArrayList<>(4); - - for (final KeyValue receivedKeyValue : receivedKeyValues) { - actualValues.add(receivedKeyValue.value); - } - - streams.close(); - Collections.sort(actualValues); - Collections.sort(expectedReceivedValues); - assertThat(actualValues, equalTo(expectedReceivedValues)); + shouldOnlyReadForEarliest("_1", TOPIC_1_1, TOPIC_2_1, TOPIC_A_1, TOPIC_C_1, TOPIC_Y_1, TOPIC_Z_1, OUTPUT_TOPIC_1, expectedReceivedValues); } @Test public void shouldOnlyReadRecordsWhereEarliestSpecifiedWithInvalidCommittedOffsets() throws Exception { commitInvalidOffsets(); + final List expectedReceivedValues = Arrays.asList(topic1TestMessage, topic2TestMessage, topicYTestMessage, topicZTestMessage); + shouldOnlyReadForEarliest("_2", TOPIC_1_2, TOPIC_2_2, TOPIC_A_2, TOPIC_C_2, TOPIC_Y_2, TOPIC_Z_2, OUTPUT_TOPIC_2, expectedReceivedValues); + } + + private void shouldOnlyReadForEarliest( + final String topicSuffix, + final String topic1, + final String topic2, + final String topicA, + final String topicC, + final String topicY, + final String topicZ, + final String outputTopic, + final List expectedReceivedValues) throws Exception { + final KStreamBuilder builder = new KStreamBuilder(); - final KStream pattern1Stream = builder.stream(KStreamBuilder.AutoOffsetReset.EARLIEST, Pattern.compile("topic-\\d_2")); - final KStream pattern2Stream = builder.stream(KStreamBuilder.AutoOffsetReset.LATEST, Pattern.compile("topic-[A-D]_2")); - final KStream namedTopicsStream = builder.stream(TOPIC_Y_2, TOPIC_Z_2); + final KStream pattern1Stream = builder.stream(KStreamBuilder.AutoOffsetReset.EARLIEST, Pattern.compile("topic-\\d" + topicSuffix)); + final KStream pattern2Stream = builder.stream(KStreamBuilder.AutoOffsetReset.LATEST, Pattern.compile("topic-[A-D]" + topicSuffix)); + final KStream namedTopicsStream = builder.stream(topicY, topicZ); - pattern1Stream.to(stringSerde, stringSerde, DEFAULT_OUTPUT_TOPIC); - pattern2Stream.to(stringSerde, stringSerde, DEFAULT_OUTPUT_TOPIC); - namedTopicsStream.to(stringSerde, stringSerde, DEFAULT_OUTPUT_TOPIC); + pattern1Stream.print(); + pattern2Stream.print(); + namedTopicsStream.print(); + pattern1Stream.to(stringSerde, stringSerde, outputTopic); + pattern2Stream.to(stringSerde, stringSerde, outputTopic); + namedTopicsStream.to(stringSerde, stringSerde, outputTopic); final Properties producerConfig = TestUtils.producerConfig(CLUSTER.bootstrapServers(), StringSerializer.class, StringSerializer.class); - final String topic1TestMessage = "topic-1 test"; - final String topic2TestMessage = "topic-2 test"; - final String topicATestMessage = "topic-A test"; - final String topicCTestMessage = "topic-C test"; - final String topicYTestMessage = "topic-Y test"; - final String topicZTestMessage = "topic-Z test"; - - IntegrationTestUtils.produceValuesSynchronously(TOPIC_1_2, Collections.singletonList(topic1TestMessage), producerConfig, mockTime); - IntegrationTestUtils.produceValuesSynchronously(TOPIC_2_2, Collections.singletonList(topic2TestMessage), producerConfig, mockTime); - IntegrationTestUtils.produceValuesSynchronously(TOPIC_A_2, Collections.singletonList(topicATestMessage), producerConfig, mockTime); - IntegrationTestUtils.produceValuesSynchronously(TOPIC_C_2, Collections.singletonList(topicCTestMessage), producerConfig, mockTime); - IntegrationTestUtils.produceValuesSynchronously(TOPIC_Y_2, Collections.singletonList(topicYTestMessage), producerConfig, mockTime); - IntegrationTestUtils.produceValuesSynchronously(TOPIC_Z_2, Collections.singletonList(topicZTestMessage), producerConfig, mockTime); + IntegrationTestUtils.produceValuesSynchronously(topic1, Collections.singletonList(topic1TestMessage), producerConfig, mockTime); + IntegrationTestUtils.produceValuesSynchronously(topic2, Collections.singletonList(topic2TestMessage), producerConfig, mockTime); + IntegrationTestUtils.produceValuesSynchronously(topicA, Collections.singletonList(topicATestMessage), producerConfig, mockTime); + IntegrationTestUtils.produceValuesSynchronously(topicC, Collections.singletonList(topicCTestMessage), producerConfig, mockTime); + IntegrationTestUtils.produceValuesSynchronously(topicY, Collections.singletonList(topicYTestMessage), producerConfig, mockTime); + IntegrationTestUtils.produceValuesSynchronously(topicZ, Collections.singletonList(topicZTestMessage), producerConfig, mockTime); final Properties consumerConfig = TestUtils.consumerConfig(CLUSTER.bootstrapServers(), StringDeserializer.class, StringDeserializer.class); final KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration); streams.start(); - final List expectedReceivedValues = Arrays.asList(topic1TestMessage, topic2TestMessage, topicYTestMessage, topicZTestMessage); - final List> receivedKeyValues = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, DEFAULT_OUTPUT_TOPIC, 4); - final List actualValues = new ArrayList<>(4); + final List> receivedKeyValues = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, outputTopic, expectedReceivedValues.size()); + final List actualValues = new ArrayList<>(expectedReceivedValues.size()); for (final KeyValue receivedKeyValue : receivedKeyValues) { actualValues.add(receivedKeyValue.value);