Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion clients/src/test/java/org/apache/kafka/test/TestUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,6 @@ public static Properties producerConfig(final String bootstrapServers,
final Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.put(ProducerConfig.ACKS_CONFIG, "all");
properties.put(ProducerConfig.RETRIES_CONFIG, 0);
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializer);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializer);
properties.putAll(additional);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,14 +59,14 @@ public class PartitionGroup {
private boolean allBuffered;


public static class RecordInfo {
static class RecordInfo {
RecordQueue queue;

public ProcessorNode node() {
ProcessorNode node() {
return queue.source();
}

public TopicPartition partition() {
TopicPartition partition() {
return queue.partition();
}

Expand All @@ -88,15 +88,15 @@ RecordQueue queue() {
long partitionTimestamp(final TopicPartition partition) {
final RecordQueue queue = partitionQueues.get(partition);
if (queue == null) {
throw new NullPointerException("Partition " + partition + " not found.");
throw new IllegalStateException("Partition " + partition + " not found.");
}
return queue.partitionTime();
}

void setPartitionTime(final TopicPartition partition, final long partitionTime) {
final RecordQueue queue = partitionQueues.get(partition);
if (queue == null) {
throw new NullPointerException("Partition " + partition + " not found.");
throw new IllegalStateException("Partition " + partition + " not found.");
}
if (streamTime < partitionTime) {
streamTime = partitionTime;
Expand Down Expand Up @@ -152,6 +152,10 @@ record = queue.poll();
int addRawRecords(final TopicPartition partition, final Iterable<ConsumerRecord<byte[], byte[]>> rawRecords) {
final RecordQueue recordQueue = partitionQueues.get(partition);

if (recordQueue == null) {
throw new IllegalStateException("Partition " + partition + " not found.");
}

final int oldSize = recordQueue.size();
final int newSize = recordQueue.addRawRecords(rawRecords);

Expand All @@ -172,25 +176,35 @@ int addRawRecords(final TopicPartition partition, final Iterable<ConsumerRecord<
return newSize;
}

public Set<TopicPartition> partitions() {
Set<TopicPartition> partitions() {
return Collections.unmodifiableSet(partitionQueues.keySet());
}

/**
* Return the stream-time of this partition group defined as the largest timestamp seen across all partitions
*/
public long streamTime() {
long streamTime() {
return streamTime;
}

Long headRecordOffset(final TopicPartition partition) {
final RecordQueue recordQueue = partitionQueues.get(partition);

if (recordQueue == null) {
throw new IllegalStateException("Partition " + partition + " not found.");
}

return recordQueue.headRecordOffset();
}

/**
* @throws IllegalStateException if the record's partition does not belong to this partition group
*/
int numBuffered(final TopicPartition partition) {
final RecordQueue recordQueue = partitionQueues.get(partition);

if (recordQueue == null) {
throw new IllegalStateException(String.format("Record's partition %s does not belong to this partition-group.", partition));
throw new IllegalStateException("Partition " + partition + " not found.");
}

return recordQueue.size();
Expand All @@ -204,14 +218,15 @@ boolean allPartitionsBuffered() {
return allBuffered;
}

public void close() {
void close() {
clear();
partitionQueues.clear();
streamTime = RecordQueue.UNKNOWN;
}

public void clear() {
void clear() {
nonEmptyQueuesByTime.clear();
totalBuffered = 0;
for (final RecordQueue queue : partitionQueues.values()) {
queue.clear();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public class RecordQueue {
);
this.log = logContext.logger(RecordQueue.class);
}

void setPartitionTime(final long partitionTime) {
this.partitionTime = partitionTime;
}
Expand Down Expand Up @@ -156,6 +156,10 @@ public long headRecordTimestamp() {
return headRecord == null ? UNKNOWN : headRecord.timestamp;
}

public Long headRecordOffset() {
return headRecord == null ? null : headRecord.offset();
}

/**
* Clear the fifo queue of its elements
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
Expand Down Expand Up @@ -336,7 +337,21 @@ private void commitState() {
final Map<TopicPartition, OffsetAndMetadata> consumedOffsetsAndMetadata = new HashMap<>(consumedOffsets.size());
for (final Map.Entry<TopicPartition, Long> entry : consumedOffsets.entrySet()) {
final TopicPartition partition = entry.getKey();
final long offset = entry.getValue() + 1L;
Long offset = partitionGroup.headRecordOffset(partition);
if (offset == null) {
try {
offset = consumer.position(partition);
} catch (final TimeoutException error) {
// the `consumer.position()` call should never block, because we know that we did process data
// for the requested partition and thus the consumer should have a valid local position
// that it can return immediately

// hence, a `TimeoutException` indicates a bug and thus we rethrow it as fatal `IllegalStateException`
throw new IllegalStateException(error);
} catch (final KafkaException fatal) {
throw new StreamsException(fatal);
}
}
final long partitionTime = partitionTimes.get(partition);
consumedOffsetsAndMetadata.put(partition, new OffsetAndMetadata(offset, encodeTimestamp(partitionTime)));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -802,6 +802,13 @@ void runOnce() {
throw new StreamsException(logPrefix + "Unexpected state " + state + " during normal iteration");
}

final long pollLatency = advanceNowAndComputeLatency();

if (records != null && !records.isEmpty()) {
pollSensor.record(pollLatency, now);
addRecordsToTasks(records);
}

// Shutdown hook could potentially be triggered and transit the thread state to PENDING_SHUTDOWN during #pollRequests().
// The task manager internal states could be uninitialized if the state transition happens during #onPartitionsAssigned().
// Should only proceed when the thread is still running after #pollRequests(), because no external state mutation
Expand All @@ -811,13 +818,6 @@ void runOnce() {
return;
}

final long pollLatency = advanceNowAndComputeLatency();

if (records != null && !records.isEmpty()) {
pollSensor.record(pollLatency, now);
addRecordsToTasks(records);
}

// we can always let changelog reader to try restoring in order to initialize the changelogs;
// if there's no active restoring or standby updating it would not try to fetch any data
changelogReader.restore();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import kafka.tools.StreamsResetter;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.ConsumerGroupDescription;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.SslConfigs;
Expand All @@ -42,7 +41,6 @@
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestCondition;
import org.apache.kafka.test.TestUtils;
import org.junit.AfterClass;
import org.junit.Assert;
Expand All @@ -62,9 +60,9 @@
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;

import static java.time.Duration.ofMillis;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForEmptyConsumerGroup;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;

Expand Down Expand Up @@ -170,25 +168,11 @@ private void prepareConfigs() {
private static final long CLEANUP_CONSUMER_TIMEOUT = 2000L;
private static final int TIMEOUT_MULTIPLIER = 15;

private class ConsumerGroupInactiveCondition implements TestCondition {
@Override
public boolean conditionMet() {
try {
final ConsumerGroupDescription groupDescription = adminClient.describeConsumerGroups(Collections.singletonList(appID)).describedGroups().get(appID).get();
return groupDescription.members().isEmpty();
} catch (final ExecutionException | InterruptedException e) {
return false;
}
}
}

void prepareTest() throws Exception {
prepareConfigs();
prepareEnvironment();

// busy wait until cluster (ie, ConsumerGroupCoordinator) is available
TestUtils.waitForCondition(new ConsumerGroupInactiveCondition(), TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT,
"Test consumer group " + appID + " still active even after waiting " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms.");
waitForEmptyConsumerGroup(adminClient, appID, TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT);

cluster.deleteAndRecreateTopics(INPUT_TOPIC, OUTPUT_TOPIC, OUTPUT_TOPIC_2, OUTPUT_TOPIC_2_RERUN);

Expand Down Expand Up @@ -286,15 +270,13 @@ void testReprocessingFromScratchAfterResetWithoutIntermediateUserTopic() throws
final List<KeyValue<Long, Long>> result = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig, OUTPUT_TOPIC, 10);

streams.close();
TestUtils.waitForCondition(new ConsumerGroupInactiveCondition(), TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT,
"Streams Application consumer group " + appID + " did not time out after " + (TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT) + " ms.");
waitForEmptyConsumerGroup(adminClient, appID, TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT);

// RESET
streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfig);
streams.cleanUp();
cleanGlobal(false, null, null);
TestUtils.waitForCondition(new ConsumerGroupInactiveCondition(), TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT,
"Reset Tool consumer group " + appID + " did not time out after " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms.");
waitForEmptyConsumerGroup(adminClient, appID, TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT);

assertInternalTopicsGotDeleted(null);

Expand All @@ -305,8 +287,7 @@ void testReprocessingFromScratchAfterResetWithoutIntermediateUserTopic() throws

assertThat(resultRerun, equalTo(result));

TestUtils.waitForCondition(new ConsumerGroupInactiveCondition(), TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT,
"Reset Tool consumer group " + appID + " did not time out after " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms.");
waitForEmptyConsumerGroup(adminClient, appID, TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT);
cleanGlobal(false, null, null);
}

Expand All @@ -325,8 +306,7 @@ void testReprocessingFromScratchAfterResetWithIntermediateUserTopic() throws Exc
final List<KeyValue<Long, Long>> result2 = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig, OUTPUT_TOPIC_2, 40);

streams.close();
TestUtils.waitForCondition(new ConsumerGroupInactiveCondition(), TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT,
"Streams Application consumer group " + appID + " did not time out after " + (TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT) + " ms.");
waitForEmptyConsumerGroup(adminClient, appID, TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT);

// insert bad record to make sure intermediate user topic gets seekToEnd()
mockTime.sleep(1);
Expand All @@ -341,8 +321,7 @@ void testReprocessingFromScratchAfterResetWithIntermediateUserTopic() throws Exc
streams = new KafkaStreams(setupTopologyWithIntermediateUserTopic(OUTPUT_TOPIC_2_RERUN), streamsConfig);
streams.cleanUp();
cleanGlobal(true, null, null);
TestUtils.waitForCondition(new ConsumerGroupInactiveCondition(), TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT,
"Reset Tool consumer group " + appID + " did not time out after " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms.");
waitForEmptyConsumerGroup(adminClient, appID, TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT);

assertInternalTopicsGotDeleted(INTERMEDIATE_USER_TOPIC);

Expand All @@ -363,8 +342,7 @@ void testReprocessingFromScratchAfterResetWithIntermediateUserTopic() throws Exc
}
assertThat(resultIntermediate.get(10), equalTo(badMessage));

TestUtils.waitForCondition(new ConsumerGroupInactiveCondition(), TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT,
"Reset Tool consumer group " + appID + " did not time out after " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms.");
waitForEmptyConsumerGroup(adminClient, appID, TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT);
cleanGlobal(true, null, null);

cluster.deleteTopicAndWait(INTERMEDIATE_USER_TOPIC);
Expand All @@ -380,8 +358,7 @@ void testReprocessingFromFileAfterResetWithoutIntermediateUserTopic() throws Exc
final List<KeyValue<Long, Long>> result = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig, OUTPUT_TOPIC, 10);

streams.close();
TestUtils.waitForCondition(new ConsumerGroupInactiveCondition(), TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT,
"Streams Application consumer group " + appID + " did not time out after " + (TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT) + " ms.");
waitForEmptyConsumerGroup(adminClient, appID, TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT);

// RESET
final File resetFile = File.createTempFile("reset", ".csv");
Expand All @@ -393,8 +370,7 @@ void testReprocessingFromFileAfterResetWithoutIntermediateUserTopic() throws Exc
streams.cleanUp();

cleanGlobal(false, "--from-file", resetFile.getAbsolutePath());
TestUtils.waitForCondition(new ConsumerGroupInactiveCondition(), TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT,
"Reset Tool consumer group " + appID + " did not time out after " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms.");
waitForEmptyConsumerGroup(adminClient, appID, TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT);

assertInternalTopicsGotDeleted(null);

Expand All @@ -408,8 +384,7 @@ void testReprocessingFromFileAfterResetWithoutIntermediateUserTopic() throws Exc
result.remove(0);
assertThat(resultRerun, equalTo(result));

TestUtils.waitForCondition(new ConsumerGroupInactiveCondition(), TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT,
"Reset Tool consumer group " + appID + " did not time out after " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms.");
waitForEmptyConsumerGroup(adminClient, appID, TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT);
cleanGlobal(false, null, null);
}

Expand All @@ -423,8 +398,7 @@ void testReprocessingFromDateTimeAfterResetWithoutIntermediateUserTopic() throws
final List<KeyValue<Long, Long>> result = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig, OUTPUT_TOPIC, 10);

streams.close();
TestUtils.waitForCondition(new ConsumerGroupInactiveCondition(), TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT,
"Streams Application consumer group " + appID + " did not time out after " + (TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT) + " ms.");
waitForEmptyConsumerGroup(adminClient, appID, TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT);

// RESET
final File resetFile = File.createTempFile("reset", ".csv");
Expand All @@ -441,8 +415,7 @@ void testReprocessingFromDateTimeAfterResetWithoutIntermediateUserTopic() throws
calendar.add(Calendar.DATE, -1);

cleanGlobal(false, "--to-datetime", format.format(calendar.getTime()));
TestUtils.waitForCondition(new ConsumerGroupInactiveCondition(), TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT,
"Reset Tool consumer group " + appID + " did not time out after " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms.");
waitForEmptyConsumerGroup(adminClient, appID, TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT);

assertInternalTopicsGotDeleted(null);

Expand All @@ -455,8 +428,7 @@ void testReprocessingFromDateTimeAfterResetWithoutIntermediateUserTopic() throws

assertThat(resultRerun, equalTo(result));

TestUtils.waitForCondition(new ConsumerGroupInactiveCondition(), TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT,
"Reset Tool consumer group " + appID + " did not time out after " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms.");
waitForEmptyConsumerGroup(adminClient, appID, TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT);
cleanGlobal(false, null, null);
}

Expand All @@ -470,8 +442,7 @@ void testReprocessingByDurationAfterResetWithoutIntermediateUserTopic() throws E
final List<KeyValue<Long, Long>> result = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig, OUTPUT_TOPIC, 10);

streams.close();
TestUtils.waitForCondition(new ConsumerGroupInactiveCondition(), TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT,
"Streams Application consumer group " + appID + " did not time out after " + (TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT) + " ms.");
waitForEmptyConsumerGroup(adminClient, appID, TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT);

// RESET
final File resetFile = File.createTempFile("reset", ".csv");
Expand All @@ -483,8 +454,7 @@ void testReprocessingByDurationAfterResetWithoutIntermediateUserTopic() throws E
streams.cleanUp();
cleanGlobal(false, "--by-duration", "PT1M");

TestUtils.waitForCondition(new ConsumerGroupInactiveCondition(), TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT,
"Reset Tool consumer group " + appID + " did not time out after " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms.");
waitForEmptyConsumerGroup(adminClient, appID, TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT);

assertInternalTopicsGotDeleted(null);

Expand All @@ -497,8 +467,7 @@ void testReprocessingByDurationAfterResetWithoutIntermediateUserTopic() throws E

assertThat(resultRerun, equalTo(result));

TestUtils.waitForCondition(new ConsumerGroupInactiveCondition(), TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT,
"Reset Tool consumer group " + appID + " did not time out after " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms.");
waitForEmptyConsumerGroup(adminClient, appID, TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT);
cleanGlobal(false, null, null);
}

Expand Down
Loading