Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.NoOffsetForPartitionException;
import org.apache.kafka.clients.consumer.InvalidOffsetException;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
Expand Down Expand Up @@ -529,38 +529,55 @@ private ConsumerRecords<byte[], byte[]> pollRequests(final long pollTimeMs) {

try {
records = consumer.poll(pollTimeMs);
} catch (NoOffsetForPartitionException ex) {
TopicPartition partition = ex.partition();
} catch (final InvalidOffsetException e) {
resetInvalidOffsets(e);
}

return records;
}

private void resetInvalidOffsets(final InvalidOffsetException e) {
final Set<TopicPartition> partitions = e.partitions();
final Set<String> loggedTopics = new HashSet<>();
final Set<TopicPartition> seekToBeginning = new HashSet<>();
final Set<TopicPartition> seekToEnd = new HashSet<>();

for (final TopicPartition partition : partitions) {
if (builder.earliestResetTopicsPattern().matcher(partition.topic()).matches()) {
log.info(String.format("stream-thread [%s] setting topic to consume from earliest offset %s", this.getName(), partition.topic()));
consumer.seekToBeginning(ex.partitions());
addToResetList(partition, seekToBeginning, "stream-thread [%s] setting topic %s to consume from %s offset", "earliest", loggedTopics);
} else if (builder.latestResetTopicsPattern().matcher(partition.topic()).matches()) {
consumer.seekToEnd(ex.partitions());
log.info(String.format("stream-thread [%s] setting topic to consume from latest offset %s", this.getName(), partition.topic()));
addToResetList(partition, seekToEnd, "stream-thread [%s] setting topic %s to consume from %s offset", "latest", loggedTopics);
} else {

if (originalReset == null || (!originalReset.equals("earliest") && !originalReset.equals("latest"))) {
setState(State.PENDING_SHUTDOWN);
String errorMessage = "No valid committed offset found for input topic %s (partition %s) and no valid reset policy configured." +
final String errorMessage = "No valid committed offset found for input topic %s (partition %s) and no valid reset policy configured." +
" You need to set configuration parameter \"auto.offset.reset\" or specify a topic specific reset " +
"policy via KStreamBuilder#stream(StreamsConfig.AutoOffsetReset offsetReset, ...) or KStreamBuilder#table(StreamsConfig.AutoOffsetReset offsetReset, ...)";
throw new StreamsException(String.format(errorMessage, partition.topic(), partition.partition()), ex);
throw new StreamsException(String.format(errorMessage, partition.topic(), partition.partition()), e);
}

if (originalReset.equals("earliest")) {
consumer.seekToBeginning(ex.partitions());
addToResetList(partition, seekToBeginning, "stream-thread [%s] no custom setting defined for topic %s using original config %s for offset reset", "earliest", loggedTopics);
} else if (originalReset.equals("latest")) {
consumer.seekToEnd(ex.partitions());
addToResetList(partition, seekToEnd, "stream-thread [%s] no custom setting defined for topic %s using original config %s for offset reset", "latest", loggedTopics);
}
log.info(String.format("stream-thread [%s] no custom setting defined for topic %s using original config %s for offset reset", this.getName(), partition.topic(), originalReset));
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did we remove this log line? And what is the behaviour if there's no reset policy defined?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is logged in addToResetList. In StreamsConfig we default the reset policy to earliest

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Having a second look, it seems that we throw an exception if the reset policy is not one of the expected values:

if (originalReset == null || (!originalReset.equals("earliest") && !originalReset.equals("latest"))) {

So the code can never reach this line.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can reach this line: there are two places the config can be set: globally (called originalReset here) and locally per topic. Thus, this exception here, only happens if both are invalid. We check the originalReset only, if no topic-local setting was specified.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure you understood my comment. I am suggesting that the line where the log.info was removed can never be reached. Are you saying that it can be reached?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. The old line (555) was hit, when no topic-level offset-strategy was specified, but a global one. Note, in Streams, we "remember" the global config but don't provide it to the consumer -- we set consumer strategy to none always to allow for different topic-level reset strategy in the first place (ie, we apply the global one "manually" within Streams)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mjsax I think @ijuma is right in that line 555 was never reached.
I guess we're removing it anyway now.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks all, I think I now understand the misunderstanding.

}

}

if (rebalanceException != null)
throw new StreamsException(logPrefix + " Failed to rebalance", rebalanceException);
if (!seekToBeginning.isEmpty()) {
consumer.seekToBeginning(seekToBeginning);
}
if (!seekToEnd.isEmpty()) {
consumer.seekToEnd(seekToEnd);
}
}

return records;
private void addToResetList(final TopicPartition partition, final Set<TopicPartition> partitions, final String logMessage, final String resetPolicy, final Set<String> loggedTopics) {
final String topic = partition.topic();
if (loggedTopics.add(topic)) {
log.info(String.format(logMessage, getName(), topic, resetPolicy));
}
partitions.add(partition);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,16 @@

import kafka.utils.MockTime;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.TopologyBuilderException;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
Expand All @@ -43,7 +47,9 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.regex.Pattern;

Expand All @@ -56,35 +62,71 @@ public class KStreamsFineGrainedAutoResetIntegrationTest {

private static final int NUM_BROKERS = 1;
private static final String DEFAULT_OUTPUT_TOPIC = "outputTopic";
private static final String OUTPUT_TOPIC_0 = "outputTopic_0";
private static final String OUTPUT_TOPIC_1 = "outputTopic_1";
private static final String OUTPUT_TOPIC_2 = "outputTopic_2";

@ClassRule
public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
private final MockTime mockTime = CLUSTER.time;

private static final String TOPIC_1 = "topic-1";
private static final String TOPIC_2 = "topic-2";
private static final String TOPIC_A = "topic-A";
private static final String TOPIC_C = "topic-C";
private static final String TOPIC_Y = "topic-Y";
private static final String TOPIC_Z = "topic-Z";
private static final String TOPIC_1_0 = "topic-1_0";
private static final String TOPIC_2_0 = "topic-2_0";
private static final String TOPIC_A_0 = "topic-A_0";
private static final String TOPIC_C_0 = "topic-C_0";
private static final String TOPIC_Y_0 = "topic-Y_0";
private static final String TOPIC_Z_0 = "topic-Z_0";
private static final String TOPIC_1_1 = "topic-1_1";
private static final String TOPIC_2_1 = "topic-2_1";
private static final String TOPIC_A_1 = "topic-A_1";
private static final String TOPIC_C_1 = "topic-C_1";
private static final String TOPIC_Y_1 = "topic-Y_1";
private static final String TOPIC_Z_1 = "topic-Z_1";
private static final String TOPIC_1_2 = "topic-1_2";
private static final String TOPIC_2_2 = "topic-2_2";
private static final String TOPIC_A_2 = "topic-A_2";
private static final String TOPIC_C_2 = "topic-C_2";
private static final String TOPIC_Y_2 = "topic-Y_2";
private static final String TOPIC_Z_2 = "topic-Z_2";
private static final String NOOP = "noop";
private final Serde<String> stringSerde = Serdes.String();

private static final String STRING_SERDE_CLASSNAME = Serdes.String().getClass().getName();
private Properties streamsConfiguration;

private final String topic1TestMessage = "topic-1 test";
private final String topic2TestMessage = "topic-2 test";
private final String topicATestMessage = "topic-A test";
private final String topicCTestMessage = "topic-C test";
private final String topicYTestMessage = "topic-Y test";
private final String topicZTestMessage = "topic-Z test";


@BeforeClass
public static void startKafkaCluster() throws Exception {
CLUSTER.createTopic(TOPIC_1);
CLUSTER.createTopic(TOPIC_2);
CLUSTER.createTopic(TOPIC_A);
CLUSTER.createTopic(TOPIC_C);
CLUSTER.createTopic(TOPIC_Y);
CLUSTER.createTopic(TOPIC_Z);
CLUSTER.createTopic(TOPIC_1_0);
CLUSTER.createTopic(TOPIC_2_0);
CLUSTER.createTopic(TOPIC_A_0);
CLUSTER.createTopic(TOPIC_C_0);
CLUSTER.createTopic(TOPIC_Y_0);
CLUSTER.createTopic(TOPIC_Z_0);
CLUSTER.createTopic(TOPIC_1_1);
CLUSTER.createTopic(TOPIC_2_1);
CLUSTER.createTopic(TOPIC_A_1);
CLUSTER.createTopic(TOPIC_C_1);
CLUSTER.createTopic(TOPIC_Y_1);
CLUSTER.createTopic(TOPIC_Z_1);
CLUSTER.createTopic(TOPIC_1_2);
CLUSTER.createTopic(TOPIC_2_2);
CLUSTER.createTopic(TOPIC_A_2);
CLUSTER.createTopic(TOPIC_C_2);
CLUSTER.createTopic(TOPIC_Y_2);
CLUSTER.createTopic(TOPIC_Z_2);
CLUSTER.createTopic(NOOP);
CLUSTER.createTopic(DEFAULT_OUTPUT_TOPIC);

CLUSTER.createTopic(OUTPUT_TOPIC_0);
CLUSTER.createTopic(OUTPUT_TOPIC_1);
CLUSTER.createTopic(OUTPUT_TOPIC_2);
}

@Before
Expand All @@ -105,41 +147,67 @@ public void setUp() throws Exception {
}

@Test
public void shouldOnlyReadRecordsWhereEarliestSpecified() throws Exception {
public void shouldOnlyReadRecordsWhereEarliestSpecifiedWithNoCommittedOffsetsWithGlobalAutoOffsetResetLatest() throws Exception {
streamsConfiguration.put(StreamsConfig.consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), "latest");

final List<String> expectedReceivedValues = Arrays.asList(topic1TestMessage, topic2TestMessage);
shouldOnlyReadForEarliest("_0", TOPIC_1_0, TOPIC_2_0, TOPIC_A_0, TOPIC_C_0, TOPIC_Y_0, TOPIC_Z_0, OUTPUT_TOPIC_0, expectedReceivedValues);
}

@Test
public void shouldOnlyReadRecordsWhereEarliestSpecifiedWithNoCommittedOffsetsWithDefaultGlobalAutoOffsetResetEarliest() throws Exception {
final List<String> expectedReceivedValues = Arrays.asList(topic1TestMessage, topic2TestMessage, topicYTestMessage, topicZTestMessage);
shouldOnlyReadForEarliest("_1", TOPIC_1_1, TOPIC_2_1, TOPIC_A_1, TOPIC_C_1, TOPIC_Y_1, TOPIC_Z_1, OUTPUT_TOPIC_1, expectedReceivedValues);
}

@Test
public void shouldOnlyReadRecordsWhereEarliestSpecifiedWithInvalidCommittedOffsets() throws Exception {
commitInvalidOffsets();

final List<String> expectedReceivedValues = Arrays.asList(topic1TestMessage, topic2TestMessage, topicYTestMessage, topicZTestMessage);
shouldOnlyReadForEarliest("_2", TOPIC_1_2, TOPIC_2_2, TOPIC_A_2, TOPIC_C_2, TOPIC_Y_2, TOPIC_Z_2, OUTPUT_TOPIC_2, expectedReceivedValues);
}

private void shouldOnlyReadForEarliest(
final String topicSuffix,
final String topic1,
final String topic2,
final String topicA,
final String topicC,
final String topicY,
final String topicZ,
final String outputTopic,
final List<String> expectedReceivedValues) throws Exception {

final KStreamBuilder builder = new KStreamBuilder();

final KStream<String, String> pattern1Stream = builder.stream(KStreamBuilder.AutoOffsetReset.EARLIEST, Pattern.compile("topic-\\d"));
final KStream<String, String> pattern2Stream = builder.stream(KStreamBuilder.AutoOffsetReset.LATEST, Pattern.compile("topic-[A-D]"));
final KStream<String, String> namedTopicsStream = builder.stream(TOPIC_Y, TOPIC_Z);
final KStream<String, String> pattern1Stream = builder.stream(KStreamBuilder.AutoOffsetReset.EARLIEST, Pattern.compile("topic-\\d" + topicSuffix));
final KStream<String, String> pattern2Stream = builder.stream(KStreamBuilder.AutoOffsetReset.LATEST, Pattern.compile("topic-[A-D]" + topicSuffix));
final KStream<String, String> namedTopicsStream = builder.stream(topicY, topicZ);

pattern1Stream.to(stringSerde, stringSerde, DEFAULT_OUTPUT_TOPIC);
pattern2Stream.to(stringSerde, stringSerde, DEFAULT_OUTPUT_TOPIC);
namedTopicsStream.to(stringSerde, stringSerde, DEFAULT_OUTPUT_TOPIC);
pattern1Stream.print();
pattern2Stream.print();
namedTopicsStream.print();
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this intentional? cc @enothereska

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like it's for debugging. @mjsax ?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, will remove before merging.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thx. Missed to remove it...

pattern1Stream.to(stringSerde, stringSerde, outputTopic);
pattern2Stream.to(stringSerde, stringSerde, outputTopic);
namedTopicsStream.to(stringSerde, stringSerde, outputTopic);

final Properties producerConfig = TestUtils.producerConfig(CLUSTER.bootstrapServers(), StringSerializer.class, StringSerializer.class);

final String topic1TestMessage = "topic-1 test";
final String topic2TestMessage = "topic-2 test";
final String topicATestMessage = "topic-A test";
final String topicCTestMessage = "topic-C test";
final String topicYTestMessage = "topic-Y test";
final String topicZTestMessage = "topic-Z test";

IntegrationTestUtils.produceValuesSynchronously(TOPIC_1, Collections.singletonList(topic1TestMessage), producerConfig, mockTime);
IntegrationTestUtils.produceValuesSynchronously(TOPIC_2, Collections.singletonList(topic2TestMessage), producerConfig, mockTime);
IntegrationTestUtils.produceValuesSynchronously(TOPIC_A, Collections.singletonList(topicATestMessage), producerConfig, mockTime);
IntegrationTestUtils.produceValuesSynchronously(TOPIC_C, Collections.singletonList(topicCTestMessage), producerConfig, mockTime);
IntegrationTestUtils.produceValuesSynchronously(TOPIC_Y, Collections.singletonList(topicYTestMessage), producerConfig, mockTime);
IntegrationTestUtils.produceValuesSynchronously(TOPIC_Z, Collections.singletonList(topicZTestMessage), producerConfig, mockTime);
IntegrationTestUtils.produceValuesSynchronously(topic1, Collections.singletonList(topic1TestMessage), producerConfig, mockTime);
IntegrationTestUtils.produceValuesSynchronously(topic2, Collections.singletonList(topic2TestMessage), producerConfig, mockTime);
IntegrationTestUtils.produceValuesSynchronously(topicA, Collections.singletonList(topicATestMessage), producerConfig, mockTime);
IntegrationTestUtils.produceValuesSynchronously(topicC, Collections.singletonList(topicCTestMessage), producerConfig, mockTime);
IntegrationTestUtils.produceValuesSynchronously(topicY, Collections.singletonList(topicYTestMessage), producerConfig, mockTime);
IntegrationTestUtils.produceValuesSynchronously(topicZ, Collections.singletonList(topicZTestMessage), producerConfig, mockTime);

final Properties consumerConfig = TestUtils.consumerConfig(CLUSTER.bootstrapServers(), StringDeserializer.class, StringDeserializer.class);

final KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration);
streams.start();

final List<String> expectedReceivedValues = Arrays.asList(topic1TestMessage, topic2TestMessage, topicYTestMessage, topicZTestMessage);
final List<KeyValue<String, String>> receivedKeyValues = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, DEFAULT_OUTPUT_TOPIC, 4);
final List<String> actualValues = new ArrayList<>(4);
final List<KeyValue<String, String>> receivedKeyValues = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, outputTopic, expectedReceivedValues.size());
final List<String> actualValues = new ArrayList<>(expectedReceivedValues.size());

for (final KeyValue<String, String> receivedKeyValue : receivedKeyValues) {
actualValues.add(receivedKeyValue.value);
Expand All @@ -149,35 +217,50 @@ public void shouldOnlyReadRecordsWhereEarliestSpecified() throws Exception {
Collections.sort(actualValues);
Collections.sort(expectedReceivedValues);
assertThat(actualValues, equalTo(expectedReceivedValues));

}

private void commitInvalidOffsets() {
final KafkaConsumer consumer = new KafkaConsumer(TestUtils.consumerConfig(
CLUSTER.bootstrapServers(),
streamsConfiguration.getProperty(StreamsConfig.APPLICATION_ID_CONFIG),
StringDeserializer.class,
StringDeserializer.class));

final Map<TopicPartition, OffsetAndMetadata> invalidOffsets = new HashMap<>();
invalidOffsets.put(new TopicPartition(TOPIC_1_2, 0), new OffsetAndMetadata(5, null));
invalidOffsets.put(new TopicPartition(TOPIC_2_2, 0), new OffsetAndMetadata(5, null));
invalidOffsets.put(new TopicPartition(TOPIC_A_2, 0), new OffsetAndMetadata(5, null));
invalidOffsets.put(new TopicPartition(TOPIC_C_2, 0), new OffsetAndMetadata(5, null));
invalidOffsets.put(new TopicPartition(TOPIC_Y_2, 0), new OffsetAndMetadata(5, null));
invalidOffsets.put(new TopicPartition(TOPIC_Z_2, 0), new OffsetAndMetadata(5, null));

consumer.commitSync(invalidOffsets);

consumer.close();
}

@Test(expected = TopologyBuilderException.class)
public void shouldThrowExceptionOverlappingPattern() throws Exception {
final KStreamBuilder builder = new KStreamBuilder();
//NOTE this would realistically get caught when building topology, the test is for completeness
final KStream<String, String> pattern1Stream = builder.stream(KStreamBuilder.AutoOffsetReset.EARLIEST, Pattern.compile("topic-[A-D]"));
final KStream<String, String> pattern2Stream = builder.stream(KStreamBuilder.AutoOffsetReset.LATEST, Pattern.compile("topic-[A-D]"));
final KStream<String, String> namedTopicsStream = builder.stream(TOPIC_Y, TOPIC_Z);
builder.stream(KStreamBuilder.AutoOffsetReset.EARLIEST, Pattern.compile("topic-[A-D]_1"));
builder.stream(KStreamBuilder.AutoOffsetReset.LATEST, Pattern.compile("topic-[A-D]_1"));
builder.stream(TOPIC_Y_1, TOPIC_Z_1);

builder.earliestResetTopicsPattern();

}

@Test(expected = TopologyBuilderException.class)
public void shouldThrowExceptionOverlappingTopic() throws Exception {
final KStreamBuilder builder = new KStreamBuilder();
//NOTE this would realistically get caught when building topology, the test is for completeness
final KStream<String, String> pattern1Stream = builder.stream(KStreamBuilder.AutoOffsetReset.EARLIEST, Pattern.compile("topic-[A-D]"));
final KStream<String, String> pattern2Stream = builder.stream(KStreamBuilder.AutoOffsetReset.LATEST, Pattern.compile("topic-\\d]"));
final KStream<String, String> namedTopicsStream = builder.stream(KStreamBuilder.AutoOffsetReset.LATEST, TOPIC_A, TOPIC_Z);
builder.stream(KStreamBuilder.AutoOffsetReset.EARLIEST, Pattern.compile("topic-[A-D]_1"));
builder.stream(KStreamBuilder.AutoOffsetReset.LATEST, Pattern.compile("topic-\\d_1"));
builder.stream(KStreamBuilder.AutoOffsetReset.LATEST, TOPIC_A_1, TOPIC_Z_1);

builder.latestResetTopicsPattern();

}


@Test
public void shouldThrowStreamsExceptionNoResetSpecified() throws Exception {
Properties props = new Properties();
Expand Down