diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index a71c593f811b2..c185b72ae80fb 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -163,7 +163,7 @@
files="(KStreamImpl|KTableImpl).java"/>
+ files="(StreamsPartitionAssignor|StreamThread|TaskManager|PartitionGroup).java"/>
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index 23ef941ef500c..b8a213cf70cbe 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -134,15 +134,16 @@
@SuppressWarnings("deprecation")
public class StreamsConfig extends AbstractConfig {
- private final static Logger log = LoggerFactory.getLogger(StreamsConfig.class);
+ private static final Logger log = LoggerFactory.getLogger(StreamsConfig.class);
private static final ConfigDef CONFIG;
private final boolean eosEnabled;
- private final static long DEFAULT_COMMIT_INTERVAL_MS = 30000L;
- private final static long EOS_DEFAULT_COMMIT_INTERVAL_MS = 100L;
+ private static final long DEFAULT_COMMIT_INTERVAL_MS = 30000L;
+ private static final long EOS_DEFAULT_COMMIT_INTERVAL_MS = 100L;
- public final static int DUMMY_THREAD_INDEX = 1;
+ public static final int DUMMY_THREAD_INDEX = 1;
+ public static final long MAX_TASK_IDLE_MS_DISABLED = -1;
/**
* Prefix used to provide default topic configs to be applied when creating internal topics.
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java
index 482a2c5d9d175..86b76e3d84ed7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java
@@ -242,7 +242,8 @@ private StreamTask createActiveTask(final TaskId taskId,
time,
stateManager,
recordCollector,
- context
+ context,
+ logContext
);
log.trace("Created task {} with assigned partitions {}", taskId, inputPartitions);
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
index 559e3b1838486..46b3429f226e6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
@@ -17,16 +17,21 @@
package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.streams.StreamsConfig;
+import org.slf4j.Logger;
import java.util.Collections;
import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.Set;
-import java.util.Iterator;
-import java.util.HashSet;
import java.util.function.Function;
/**
@@ -53,14 +58,18 @@
*/
public class PartitionGroup {
+ private final Logger logger;
private final Map partitionQueues;
+ private final Sensor enforcedProcessingSensor;
+ private final long maxTaskIdleMs;
private final Sensor recordLatenessSensor;
private final PriorityQueue nonEmptyQueuesByTime;
private long streamTime;
private int totalBuffered;
private boolean allBuffered;
-
+ private final Map fetchedLags = new HashMap<>();
+ private final Map idlePartitionDeadlines = new HashMap<>();
static class RecordInfo {
RecordQueue queue;
@@ -78,15 +87,144 @@ RecordQueue queue() {
}
}
- PartitionGroup(final Map partitionQueues, final Sensor recordLatenessSensor) {
+ PartitionGroup(final LogContext logContext,
+ final Map partitionQueues,
+ final Sensor recordLatenessSensor,
+ final Sensor enforcedProcessingSensor,
+ final long maxTaskIdleMs) {
+ this.logger = logContext.logger(PartitionGroup.class);
nonEmptyQueuesByTime = new PriorityQueue<>(partitionQueues.size(), Comparator.comparingLong(RecordQueue::headRecordTimestamp));
this.partitionQueues = partitionQueues;
+ this.enforcedProcessingSensor = enforcedProcessingSensor;
+ this.maxTaskIdleMs = maxTaskIdleMs;
this.recordLatenessSensor = recordLatenessSensor;
totalBuffered = 0;
allBuffered = false;
streamTime = RecordQueue.UNKNOWN;
}
+ public void addFetchedMetadata(final TopicPartition partition, final ConsumerRecords.Metadata metadata) {
+ final Long lag = metadata.lag();
+ if (lag != null) {
+ logger.trace("added fetched lag {}: {}", partition, lag);
+ fetchedLags.put(partition, lag);
+ }
+ }
+
+ public boolean readyToProcess(final long wallClockTime) {
+ if (logger.isTraceEnabled()) {
+ for (final Map.Entry entry : partitionQueues.entrySet()) {
+ logger.trace(
+ "buffered/lag {}: {}/{}",
+ entry.getKey(),
+ entry.getValue().size(),
+ fetchedLags.get(entry.getKey())
+ );
+ }
+ }
+ // Log-level strategy:
+ // TRACE for messages that don't wait for fetches
+ // TRACE when we waited for a fetch and decided to wait some more, as configured
+ // TRACE when we are ready for processing and didn't have to enforce processing
+ // INFO when we enforce processing, since this has to wait for fetches AND may result in disorder
+
+ if (maxTaskIdleMs == StreamsConfig.MAX_TASK_IDLE_MS_DISABLED) {
+ if (logger.isTraceEnabled() && !allBuffered && totalBuffered > 0) {
+ final Set bufferedPartitions = new HashSet<>();
+ final Set emptyPartitions = new HashSet<>();
+ for (final Map.Entry entry : partitionQueues.entrySet()) {
+ if (entry.getValue().isEmpty()) {
+ emptyPartitions.add(entry.getKey());
+ } else {
+ bufferedPartitions.add(entry.getKey());
+ }
+ }
+ logger.trace("Ready for processing because max.task.idle.ms is disabled." +
+ "\n\tThere may be out-of-order processing for this task as a result." +
+ "\n\tBuffered partitions: {}" +
+ "\n\tNon-buffered partitions: {}",
+ bufferedPartitions,
+ emptyPartitions);
+ }
+ return true;
+ }
+
+ final Set queued = new HashSet<>();
+ Map enforced = null;
+
+ for (final Map.Entry entry : partitionQueues.entrySet()) {
+ final TopicPartition partition = entry.getKey();
+ final RecordQueue queue = entry.getValue();
+
+ final Long nullableFetchedLag = fetchedLags.get(partition);
+
+ if (!queue.isEmpty()) {
+ // this partition is ready for processing
+ idlePartitionDeadlines.remove(partition);
+ queued.add(partition);
+ } else if (nullableFetchedLag == null) {
+ // must wait to fetch metadata for the partition
+ idlePartitionDeadlines.remove(partition);
+ logger.trace("Waiting to fetch data for {}", partition);
+ return false;
+ } else if (nullableFetchedLag > 0L) {
+ // must wait to poll the data we know to be on the broker
+ idlePartitionDeadlines.remove(partition);
+ logger.trace(
+ "Lag for {} is currently {}, but no data is buffered locally. Waiting to buffer some records.",
+ partition,
+ nullableFetchedLag
+ );
+ return false;
+ } else {
+ // p is known to have zero lag. wait for maxTaskIdleMs to see if more data shows up.
+ // One alternative would be to set the deadline to nullableMetadata.receivedTimestamp + maxTaskIdleMs
+ // instead. That way, we would start the idling timer as of the freshness of our knowledge about the zero
+ // lag instead of when we happen to run this method, but realistically it's probably a small difference
+ // and using wall clock time seems more intuitive for users,
+ // since the log message will be as of wallClockTime.
+ idlePartitionDeadlines.putIfAbsent(partition, wallClockTime + maxTaskIdleMs);
+ final long deadline = idlePartitionDeadlines.get(partition);
+ if (wallClockTime < deadline) {
+ logger.trace(
+ "Lag for {} is currently 0 and current time is {}. Waiting for new data to be produced for configured idle time {} (deadline is {}).",
+ partition,
+ wallClockTime,
+ maxTaskIdleMs,
+ deadline
+ );
+ return false;
+ } else {
+ // this partition is ready for processing due to the task idling deadline passing
+ if (enforced == null) {
+ enforced = new HashMap<>();
+ }
+ enforced.put(partition, deadline);
+ }
+ }
+ }
+ if (enforced == null) {
+ logger.trace("All partitions were buffered locally, so this task is ready for processing.");
+ return true;
+ } else if (queued.isEmpty()) {
+ logger.trace("No partitions were buffered locally, so this task is not ready for processing.");
+ return false;
+ } else {
+ enforcedProcessingSensor.record(1.0d, wallClockTime);
+ logger.info("Continuing to process although some partition timestamps were not buffered locally." +
+ "\n\tThere may be out-of-order processing for this task as a result." +
+ "\n\tPartitions with local data: {}." +
+ "\n\tPartitions we gave up waiting for, with their corresponding deadlines: {}." +
+ "\n\tConfigured max.task.idle.ms: {}." +
+ "\n\tCurrent wall-clock time: {}.",
+ queued,
+ enforced,
+ maxTaskIdleMs,
+ wallClockTime);
+ return true;
+ }
+ }
+
// visible for testing
long partitionTimestamp(final TopicPartition partition) {
final RecordQueue queue = partitionQueues.get(partition);
@@ -239,7 +377,7 @@ int numBuffered() {
return totalBuffered;
}
- boolean allPartitionsBuffered() {
+ boolean allPartitionsBufferedLocally() {
return allBuffered;
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
index 4efb10eb566cf..d866954668618 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
@@ -17,6 +17,7 @@
package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.Sensor;
@@ -286,6 +287,11 @@ public void addRecords(final TopicPartition partition, final Iterable e2eLatencySensors = new HashMap<>();
private final InternalProcessorContext processorContext;
private final RecordQueueCreator recordQueueCreator;
- private long idleStartTimeMs;
private boolean commitNeeded = false;
private boolean commitRequested = false;
private boolean hasPendingTxCommit = false;
@@ -116,7 +114,8 @@ public StreamTask(final TaskId id,
final Time time,
final ProcessorStateManager stateMgr,
final RecordCollector recordCollector,
- final InternalProcessorContext processorContext) {
+ final InternalProcessorContext processorContext,
+ final LogContext logContext) {
super(
id,
topology,
@@ -140,12 +139,6 @@ public StreamTask(final TaskId id,
this.streamsMetrics = streamsMetrics;
closeTaskSensor = ThreadMetrics.closeTaskSensor(threadId, streamsMetrics);
final String taskId = id.toString();
- if (streamsMetrics.version() == Version.FROM_0100_TO_24) {
- final Sensor parent = ThreadMetrics.commitOverTasksSensor(threadId, streamsMetrics);
- enforcedProcessingSensor = TaskMetrics.enforcedProcessingSensor(threadId, taskId, streamsMetrics, parent);
- } else {
- enforcedProcessingSensor = TaskMetrics.enforcedProcessingSensor(threadId, taskId, streamsMetrics);
- }
processRatioSensor = TaskMetrics.activeProcessRatioSensor(threadId, taskId, streamsMetrics);
processLatencySensor = TaskMetrics.processLatencySensor(threadId, taskId, streamsMetrics);
punctuateLatencySensor = TaskMetrics.punctuateSensor(threadId, taskId, streamsMetrics);
@@ -168,17 +161,30 @@ public StreamTask(final TaskId id,
streamTimePunctuationQueue = new PunctuationQueue();
systemTimePunctuationQueue = new PunctuationQueue();
- maxTaskIdleMs = config.getLong(StreamsConfig.MAX_TASK_IDLE_MS_CONFIG);
maxBufferedSize = config.getInt(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG);
// initialize the consumed and committed offset cache
consumedOffsets = new HashMap<>();
- recordQueueCreator = new RecordQueueCreator(logContext, config.defaultTimestampExtractor(), config.defaultDeserializationExceptionHandler());
+ recordQueueCreator = new RecordQueueCreator(this.logContext, config.defaultTimestampExtractor(), config.defaultDeserializationExceptionHandler());
recordInfo = new PartitionGroup.RecordInfo();
- partitionGroup = new PartitionGroup(createPartitionQueues(),
- TaskMetrics.recordLatenessSensor(threadId, taskId, streamsMetrics));
+
+ final Sensor enforcedProcessingSensor;
+ if (streamsMetrics.version() == Version.FROM_0100_TO_24) {
+ final Sensor parent = ThreadMetrics.commitOverTasksSensor(threadId, streamsMetrics);
+ enforcedProcessingSensor = TaskMetrics.enforcedProcessingSensor(threadId, taskId, streamsMetrics, parent);
+ } else {
+ enforcedProcessingSensor = TaskMetrics.enforcedProcessingSensor(threadId, taskId, streamsMetrics);
+ }
+ final long maxTaskIdleMs = config.getLong(StreamsConfig.MAX_TASK_IDLE_MS_CONFIG);
+ partitionGroup = new PartitionGroup(
+ logContext,
+ createPartitionQueues(),
+ TaskMetrics.recordLatenessSensor(threadId, taskId, streamsMetrics),
+ enforcedProcessingSensor,
+ maxTaskIdleMs
+ );
stateMgr.registerGlobalStateStores(topology.globalStateStores());
}
@@ -234,7 +240,6 @@ public void completeRestoration() {
initializeMetadata();
initializeTopology();
processorContext.initialize();
- idleStartTimeMs = RecordQueue.UNKNOWN;
transitionTo(State.RUNNING);
@@ -624,7 +629,12 @@ private void close(final boolean clean) {
/**
* An active task is processable if its buffer contains data for all of its input
- * source topic partitions, or if it is enforced to be processable
+ * source topic partitions, or if it is enforced to be processable.
+ *
+ * Note that this method is _NOT_ idempotent, because the internal bookkeeping
+ * consumes the partition metadata. For example, unit tests may have to invoke
+ * {@link #addFetchedMetadata(TopicPartition, ConsumerRecords.Metadata)} again
+ * invoking this method.
*/
public boolean isProcessable(final long wallClockTime) {
if (state() == State.CLOSED) {
@@ -642,26 +652,7 @@ public boolean isProcessable(final long wallClockTime) {
return false;
}
- if (partitionGroup.allPartitionsBuffered()) {
- idleStartTimeMs = RecordQueue.UNKNOWN;
- return true;
- } else if (partitionGroup.numBuffered() > 0) {
- if (idleStartTimeMs == RecordQueue.UNKNOWN) {
- idleStartTimeMs = wallClockTime;
- }
-
- if (wallClockTime - idleStartTimeMs >= maxTaskIdleMs) {
- enforcedProcessingSensor.record(1.0d, wallClockTime);
- return true;
- } else {
- return false;
- }
- } else {
- // there's no data in any of the topics; we should reset the enforced
- // processing timer
- idleStartTimeMs = RecordQueue.UNKNOWN;
- return false;
- }
+ return partitionGroup.readyToProcess(wallClockTime);
}
/**
@@ -910,6 +901,11 @@ public void addRecords(final TopicPartition partition, final Iterable 0) {
- log.debug("Main Consumer poll completed in {} ms and fetched {} records", pollLatency, numRecords);
- }
+
+ log.debug(
+ "Main Consumer poll completed in {} ms and fetched {} records and {} metadata",
+ pollLatency,
+ numRecords,
+ records.metadata().size()
+ );
pollSensor.record(pollLatency, now);
- if (!records.isEmpty()) {
+ if (!records.isEmpty() || !records.metadata().isEmpty()) {
pollRecordsSensor.record(numRecords, now);
taskManager.addRecordsToTasks(records);
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
index fd4b7b705ae63..ac1d97bc2c5d3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
@@ -417,10 +417,12 @@ public GroupAssignment assign(final Cluster metadata, final GroupSubscription gr
return new GroupAssignment(assignment);
} catch (final MissingSourceTopicException e) {
+ log.error("Caught an error in the task assignment. Returning an error assignment.", e);
return new GroupAssignment(
errorAssignment(clientMetadataMap, AssignorError.INCOMPLETE_SOURCE_TOPIC_METADATA.code())
);
} catch (final TaskAssignmentException e) {
+ log.error("Caught an error in the task assignment. Returning an error assignment.", e);
return new GroupAssignment(
errorAssignment(clientMetadataMap, AssignorError.ASSIGNMENT_ERROR.code())
);
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
index 206a5efd6a7de..8e3aa0b41e854 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
@@ -17,6 +17,7 @@
package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException;
@@ -157,6 +158,11 @@ enum TaskType {
void addRecords(TopicPartition partition, Iterable> records);
+ /**
+ * Add to this task any metadata returned from the poll.
+ */
+ void addFetchedMetadata(TopicPartition partition, ConsumerRecords.Metadata metadata);
+
default boolean process(final long wallClockTime) {
return false;
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index 6ae0e642997be..1972202e0d194 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -389,7 +389,6 @@ private void handleCloseAndRecycle(final Set tasksToRecycle,
tasksToRecycle.removeAll(tasksToCloseDirty);
for (final Task oldTask : tasksToRecycle) {
- final Task newTask;
try {
if (oldTask.isActive()) {
final Set partitions = standbyTasksToCreate.remove(oldTask.id());
@@ -971,8 +970,7 @@ int commitAll() {
* @param records Records, can be null
*/
void addRecordsToTasks(final ConsumerRecords records) {
- for (final TopicPartition partition : records.partitions()) {
- // TODO: change type to `StreamTask`
+ for (final TopicPartition partition : union(HashSet::new, records.partitions(), records.metadata().keySet())) {
final Task activeTask = tasks.activeTasksForInputPartition(partition);
if (activeTask == null) {
@@ -982,6 +980,7 @@ void addRecordsToTasks(final ConsumerRecords records) {
}
activeTask.addRecords(partition, records.records(partition));
+ activeTask.addFetchedMetadata(partition, records.metadata().get(partition));
}
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
index e3999982cfcd5..58b9c19cd4aec 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
@@ -128,6 +128,7 @@ public void setUp() throws InterruptedException {
properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
properties.put(ConsumerConfig.METADATA_MAX_AGE_CONFIG, "1000");
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+ properties.put(StreamsConfig.MAX_TASK_IDLE_MS_CONFIG, 0L);
streamsConfiguration = StreamsTestUtils.getStreamsConfig(
IntegrationTestUtils.safeUniqueTestName(RegexSourceIntegrationTest.class, new TestName()),
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/SmokeTestDriverIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/SmokeTestDriverIntegrationTest.java
index ea21d55947fe5..c35139b823276 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/SmokeTestDriverIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/SmokeTestDriverIntegrationTest.java
@@ -23,7 +23,6 @@
import org.apache.kafka.streams.tests.SmokeTestClient;
import org.apache.kafka.streams.tests.SmokeTestDriver;
import org.apache.kafka.test.IntegrationTest;
-
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
index db659cf41dedb..4d2ca1fb9d4ae 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
@@ -728,7 +728,8 @@ private static List waitUntilFinalKeyValueRecordsReceived(final Pro
return finalAccumData.equals(finalExpected);
};
- final String conditionDetails = "Did not receive all " + expectedRecords + " records from topic " + topic;
+ final String conditionDetails = "Did not receive all " + expectedRecords + " records from topic " +
+ topic + " (got " + accumData + ")";
TestUtils.waitForCondition(valuesRead, waitTime, conditionDetails);
}
return accumData;
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
index 3425b30b24c24..09558d23854e8 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
@@ -17,6 +17,7 @@
package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.Metrics;
@@ -29,34 +30,40 @@
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler;
import org.apache.kafka.streams.processor.TimestampExtractor;
+import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
import org.apache.kafka.test.InternalMockProcessorContext;
import org.apache.kafka.test.MockSourceNode;
import org.apache.kafka.test.MockTimestampExtractor;
-import org.junit.Before;
+import org.hamcrest.Matchers;
import org.junit.Test;
import java.util.Arrays;
import java.util.List;
+import java.util.UUID;
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
-import static org.hamcrest.CoreMatchers.is;
import static org.apache.kafka.common.utils.Utils.mkSet;
+import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.nullValue;
+import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.notNullValue;
+import static org.hamcrest.Matchers.nullValue;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThrows;
-import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
public class PartitionGroupTest {
- private final LogContext logContext = new LogContext();
+
+ private final long maxTaskIdleMs = StreamsConfig.MAX_TASK_IDLE_MS_DISABLED;
+ private final LogContext logContext = new LogContext("[test] ");
private final Time time = new MockTime();
private final Serializer intSerializer = new IntegerSerializer();
private final Deserializer intDeserializer = new IntegerDeserializer();
@@ -73,9 +80,9 @@ public class PartitionGroupTest {
private final byte[] recordKey = intSerializer.serialize(null, 1);
private final Metrics metrics = new Metrics();
+ private final Sensor enforcedProcessingSensor = metrics.sensor(UUID.randomUUID().toString());
private final MetricName lastLatenessValue = new MetricName("record-lateness-last-value", "", "", mkMap());
- private PartitionGroup group;
private static Sensor getValueSensor(final Metrics metrics, final MetricName metricName) {
final Sensor lastRecordedValue = metrics.sensor(metricName.name());
@@ -83,18 +90,12 @@ private static Sensor getValueSensor(final Metrics metrics, final MetricName met
return lastRecordedValue;
}
- @Before
- public void setUp() {
- group = new PartitionGroup(
- mkMap(mkEntry(partition1, queue1), mkEntry(partition2, queue2)),
- getValueSensor(metrics, lastLatenessValue)
- );
- }
-
@Test
public void testTimeTracking() {
- testFirstBatch();
- testSecondBatch();
+ final PartitionGroup group = getBasicGroup();
+
+ testFirstBatch(group);
+ testSecondBatch(group);
}
private RecordQueue createQueue1() {
@@ -127,7 +128,7 @@ private TopicPartition createPartition2() {
return new TopicPartition(topics[0], 2);
}
- private void testFirstBatch() {
+ private void testFirstBatch(final PartitionGroup group) {
StampedRecord record;
final PartitionGroup.RecordInfo info = new PartitionGroup.RecordInfo();
assertThat(group.numBuffered(), is(0));
@@ -151,7 +152,7 @@ private void testFirstBatch() {
// 2:[2, 4, 6]
// st: -1 since no records was being processed yet
- verifyBuffered(6, 3, 3);
+ verifyBuffered(6, 3, 3, group);
assertThat(group.partitionTimestamp(partition1), is(RecordQueue.UNKNOWN));
assertThat(group.partitionTimestamp(partition2), is(RecordQueue.UNKNOWN));
assertThat(group.headRecordOffset(partition1), is(1L));
@@ -169,7 +170,7 @@ record = group.nextRecord(info, time.milliseconds());
assertThat(group.partitionTimestamp(partition2), is(RecordQueue.UNKNOWN));
assertThat(group.headRecordOffset(partition1), is(3L));
assertThat(group.headRecordOffset(partition2), is(2L));
- verifyTimes(record, 1L, 1L);
+ verifyTimes(record, 1L, 1L, group);
assertThat(metrics.metric(lastLatenessValue).metricValue(), is(0.0));
// get one record, now the time should be advanced
@@ -182,12 +183,12 @@ record = group.nextRecord(info, time.milliseconds());
assertThat(group.partitionTimestamp(partition2), is(2L));
assertThat(group.headRecordOffset(partition1), is(3L));
assertThat(group.headRecordOffset(partition2), is(4L));
- verifyTimes(record, 2L, 2L);
- verifyBuffered(4, 2, 2);
+ verifyTimes(record, 2L, 2L, group);
+ verifyBuffered(4, 2, 2, group);
assertEquals(0.0, metrics.metric(lastLatenessValue).metricValue());
}
- private void testSecondBatch() {
+ private void testSecondBatch(final PartitionGroup group) {
StampedRecord record;
final PartitionGroup.RecordInfo info = new PartitionGroup.RecordInfo();
@@ -200,7 +201,7 @@ private void testSecondBatch() {
// 1:[3, 5, 2, 4]
// 2:[4, 6]
// st: 2 (just adding records shouldn't change it)
- verifyBuffered(6, 4, 2);
+ verifyBuffered(6, 4, 2, group);
assertThat(group.partitionTimestamp(partition1), is(1L));
assertThat(group.partitionTimestamp(partition2), is(2L));
assertThat(group.headRecordOffset(partition1), is(3L));
@@ -218,8 +219,8 @@ record = group.nextRecord(info, time.milliseconds());
assertThat(group.partitionTimestamp(partition2), is(2L));
assertThat(group.headRecordOffset(partition1), is(5L));
assertThat(group.headRecordOffset(partition2), is(4L));
- verifyTimes(record, 3L, 3L);
- verifyBuffered(5, 3, 2);
+ verifyTimes(record, 3L, 3L, group);
+ verifyBuffered(5, 3, 2, group);
assertThat(metrics.metric(lastLatenessValue).metricValue(), is(0.0));
// get one record, time should be advanced
@@ -232,8 +233,8 @@ record = group.nextRecord(info, time.milliseconds());
assertThat(group.partitionTimestamp(partition2), is(4L));
assertThat(group.headRecordOffset(partition1), is(5L));
assertThat(group.headRecordOffset(partition2), is(6L));
- verifyTimes(record, 4L, 4L);
- verifyBuffered(4, 3, 1);
+ verifyTimes(record, 4L, 4L, group);
+ verifyBuffered(4, 3, 1, group);
assertThat(metrics.metric(lastLatenessValue).metricValue(), is(0.0));
// get one more record, time should be advanced
@@ -246,8 +247,8 @@ record = group.nextRecord(info, time.milliseconds());
assertThat(group.partitionTimestamp(partition2), is(4L));
assertThat(group.headRecordOffset(partition1), is(2L));
assertThat(group.headRecordOffset(partition2), is(6L));
- verifyTimes(record, 5L, 5L);
- verifyBuffered(3, 2, 1);
+ verifyTimes(record, 5L, 5L, group);
+ verifyBuffered(3, 2, 1, group);
assertThat(metrics.metric(lastLatenessValue).metricValue(), is(0.0));
// get one more record, time should not be advanced
@@ -260,8 +261,8 @@ record = group.nextRecord(info, time.milliseconds());
assertThat(group.partitionTimestamp(partition2), is(4L));
assertThat(group.headRecordOffset(partition1), is(4L));
assertThat(group.headRecordOffset(partition2), is(6L));
- verifyTimes(record, 2L, 5L);
- verifyBuffered(2, 1, 1);
+ verifyTimes(record, 2L, 5L, group);
+ verifyBuffered(2, 1, 1, group);
assertThat(metrics.metric(lastLatenessValue).metricValue(), is(3.0));
// get one more record, time should not be advanced
@@ -274,8 +275,8 @@ record = group.nextRecord(info, time.milliseconds());
assertThat(group.partitionTimestamp(partition2), is(4L));
assertNull(group.headRecordOffset(partition1));
assertThat(group.headRecordOffset(partition2), is(6L));
- verifyTimes(record, 4L, 5L);
- verifyBuffered(1, 0, 1);
+ verifyTimes(record, 4L, 5L, group);
+ verifyBuffered(1, 0, 1, group);
assertThat(metrics.metric(lastLatenessValue).metricValue(), is(1.0));
// get one more record, time should be advanced
@@ -288,13 +289,15 @@ record = group.nextRecord(info, time.milliseconds());
assertThat(group.partitionTimestamp(partition2), is(6L));
assertNull(group.headRecordOffset(partition1));
assertNull(group.headRecordOffset(partition2));
- verifyTimes(record, 6L, 6L);
- verifyBuffered(0, 0, 0);
+ verifyTimes(record, 6L, 6L, group);
+ verifyBuffered(0, 0, 0, group);
assertThat(metrics.metric(lastLatenessValue).metricValue(), is(0.0));
}
@Test
public void shouldChooseNextRecordBasedOnHeadTimestamp() {
+ final PartitionGroup group = getBasicGroup();
+
assertEquals(0, group.numBuffered());
// add three 3 records with timestamp 1, 5, 3 to partition-1
@@ -305,7 +308,7 @@ public void shouldChooseNextRecordBasedOnHeadTimestamp() {
group.addRawRecords(partition1, list1);
- verifyBuffered(3, 3, 0);
+ verifyBuffered(3, 3, 0, group);
assertEquals(-1L, group.streamTime());
assertEquals(0.0, metrics.metric(lastLatenessValue).metricValue());
@@ -341,12 +344,18 @@ record = group.nextRecord(info, time.milliseconds());
assertEquals(record.timestamp, 3L);
}
- private void verifyTimes(final StampedRecord record, final long recordTime, final long streamTime) {
+ private void verifyTimes(final StampedRecord record,
+ final long recordTime,
+ final long streamTime,
+ final PartitionGroup group) {
assertThat(record.timestamp, is(recordTime));
assertThat(group.streamTime(), is(streamTime));
}
- private void verifyBuffered(final int totalBuffered, final int partitionOneBuffered, final int partitionTwoBuffered) {
+ private void verifyBuffered(final int totalBuffered,
+ final int partitionOneBuffered,
+ final int partitionTwoBuffered,
+ final PartitionGroup group) {
assertEquals(totalBuffered, group.numBuffered());
assertEquals(partitionOneBuffered, group.numBuffered(partition1));
assertEquals(partitionTwoBuffered, group.numBuffered(partition2));
@@ -354,6 +363,8 @@ private void verifyBuffered(final int totalBuffered, final int partitionOneBuffe
@Test
public void shouldSetPartitionTimestampAndStreamTime() {
+ final PartitionGroup group = getBasicGroup();
+
group.setPartitionTime(partition1, 100L);
assertEquals(100L, group.partitionTimestamp(partition1));
assertEquals(100L, group.streamTime());
@@ -364,6 +375,8 @@ public void shouldSetPartitionTimestampAndStreamTime() {
@Test
public void shouldThrowIllegalStateExceptionUponAddRecordsIfPartitionUnknown() {
+ final PartitionGroup group = getBasicGroup();
+
final IllegalStateException exception = assertThrows(
IllegalStateException.class,
() -> group.addRawRecords(unknownPartition, null));
@@ -372,6 +385,8 @@ public void shouldThrowIllegalStateExceptionUponAddRecordsIfPartitionUnknown() {
@Test
public void shouldThrowIllegalStateExceptionUponNumBufferedIfPartitionUnknown() {
+ final PartitionGroup group = getBasicGroup();
+
final IllegalStateException exception = assertThrows(
IllegalStateException.class,
() -> group.numBuffered(unknownPartition));
@@ -380,6 +395,8 @@ public void shouldThrowIllegalStateExceptionUponNumBufferedIfPartitionUnknown()
@Test
public void shouldThrowIllegalStateExceptionUponSetPartitionTimestampIfPartitionUnknown() {
+ final PartitionGroup group = getBasicGroup();
+
final IllegalStateException exception = assertThrows(
IllegalStateException.class,
() -> group.setPartitionTime(unknownPartition, 0L));
@@ -388,6 +405,8 @@ public void shouldThrowIllegalStateExceptionUponSetPartitionTimestampIfPartition
@Test
public void shouldThrowIllegalStateExceptionUponGetPartitionTimestampIfPartitionUnknown() {
+ final PartitionGroup group = getBasicGroup();
+
final IllegalStateException exception = assertThrows(
IllegalStateException.class,
() -> group.partitionTimestamp(unknownPartition));
@@ -396,6 +415,8 @@ public void shouldThrowIllegalStateExceptionUponGetPartitionTimestampIfPartition
@Test
public void shouldThrowIllegalStateExceptionUponGetHeadRecordOffsetIfPartitionUnknown() {
+ final PartitionGroup group = getBasicGroup();
+
final IllegalStateException exception = assertThrows(
IllegalStateException.class,
() -> group.headRecordOffset(unknownPartition));
@@ -404,6 +425,8 @@ public void shouldThrowIllegalStateExceptionUponGetHeadRecordOffsetIfPartitionUn
@Test
public void shouldEmptyPartitionsOnClear() {
+ final PartitionGroup group = getBasicGroup();
+
final List> list = Arrays.asList(
new ConsumerRecord<>("topic", 1, 1L, recordKey, recordValue),
new ConsumerRecord<>("topic", 1, 3L, recordKey, recordValue),
@@ -424,6 +447,8 @@ public void shouldEmptyPartitionsOnClear() {
@Test
public void shouldUpdatePartitionQueuesShrink() {
+ final PartitionGroup group = getBasicGroup();
+
final List> list1 = Arrays.asList(
new ConsumerRecord<>("topic", 1, 1L, recordKey, recordValue),
new ConsumerRecord<>("topic", 1, 5L, recordKey, recordValue));
@@ -434,7 +459,7 @@ public void shouldUpdatePartitionQueuesShrink() {
new ConsumerRecord<>("topic", 2, 6L, recordKey, recordValue));
group.addRawRecords(partition2, list2);
assertEquals(list1.size() + list2.size(), group.numBuffered());
- assertTrue(group.allPartitionsBuffered());
+ assertTrue(group.allPartitionsBufferedLocally());
group.nextRecord(new PartitionGroup.RecordInfo(), time.milliseconds());
// shrink list of queues
@@ -443,7 +468,7 @@ public void shouldUpdatePartitionQueuesShrink() {
return null;
});
- assertTrue(group.allPartitionsBuffered()); // because didn't add any new partitions
+ assertTrue(group.allPartitionsBufferedLocally()); // because didn't add any new partitions
assertEquals(list2.size(), group.numBuffered());
assertEquals(1, group.streamTime());
assertThrows(IllegalStateException.class, () -> group.partitionTimestamp(partition1));
@@ -453,9 +478,12 @@ public void shouldUpdatePartitionQueuesShrink() {
@Test
public void shouldUpdatePartitionQueuesExpand() {
- group = new PartitionGroup(
- mkMap(mkEntry(partition1, queue1)),
- getValueSensor(metrics, lastLatenessValue)
+ final PartitionGroup group = new PartitionGroup(
+ logContext,
+ mkMap(mkEntry(partition1, queue1)),
+ getValueSensor(metrics, lastLatenessValue),
+ enforcedProcessingSensor,
+ maxTaskIdleMs
);
final List> list1 = Arrays.asList(
new ConsumerRecord<>("topic", 1, 1L, recordKey, recordValue),
@@ -463,7 +491,7 @@ public void shouldUpdatePartitionQueuesExpand() {
group.addRawRecords(partition1, list1);
assertEquals(list1.size(), group.numBuffered());
- assertTrue(group.allPartitionsBuffered());
+ assertTrue(group.allPartitionsBufferedLocally());
group.nextRecord(new PartitionGroup.RecordInfo(), time.milliseconds());
// expand list of queues
@@ -472,7 +500,7 @@ public void shouldUpdatePartitionQueuesExpand() {
return createQueue2();
});
- assertFalse(group.allPartitionsBuffered()); // because added new partition
+ assertFalse(group.allPartitionsBufferedLocally()); // because added new partition
assertEquals(1, group.numBuffered());
assertEquals(1, group.streamTime());
assertThat(group.partitionTimestamp(partition1), equalTo(1L));
@@ -482,16 +510,19 @@ public void shouldUpdatePartitionQueuesExpand() {
@Test
public void shouldUpdatePartitionQueuesShrinkAndExpand() {
- group = new PartitionGroup(
- mkMap(mkEntry(partition1, queue1)),
- getValueSensor(metrics, lastLatenessValue)
+ final PartitionGroup group = new PartitionGroup(
+ logContext,
+ mkMap(mkEntry(partition1, queue1)),
+ getValueSensor(metrics, lastLatenessValue),
+ enforcedProcessingSensor,
+ maxTaskIdleMs
);
final List> list1 = Arrays.asList(
new ConsumerRecord<>("topic", 1, 1L, recordKey, recordValue),
new ConsumerRecord<>("topic", 1, 5L, recordKey, recordValue));
group.addRawRecords(partition1, list1);
assertEquals(list1.size(), group.numBuffered());
- assertTrue(group.allPartitionsBuffered());
+ assertTrue(group.allPartitionsBufferedLocally());
group.nextRecord(new PartitionGroup.RecordInfo(), time.milliseconds());
// expand and shrink list of queues
@@ -500,11 +531,240 @@ public void shouldUpdatePartitionQueuesShrinkAndExpand() {
return createQueue2();
});
- assertFalse(group.allPartitionsBuffered()); // because added new partition
+ assertFalse(group.allPartitionsBufferedLocally()); // because added new partition
assertEquals(0, group.numBuffered());
assertEquals(1, group.streamTime());
assertThrows(IllegalStateException.class, () -> group.partitionTimestamp(partition1));
assertThat(group.partitionTimestamp(partition2), equalTo(RecordQueue.UNKNOWN));
assertThat(group.nextRecord(new PartitionGroup.RecordInfo(), time.milliseconds()), nullValue()); // all available records removed
}
+
+ @Test
+ public void shouldNeverWaitIfIdlingIsDisabled() {
+ final PartitionGroup group = new PartitionGroup(
+ logContext,
+ mkMap(
+ mkEntry(partition1, queue1),
+ mkEntry(partition2, queue2)
+ ),
+ getValueSensor(metrics, lastLatenessValue),
+ enforcedProcessingSensor,
+ StreamsConfig.MAX_TASK_IDLE_MS_DISABLED
+ );
+
+ final List> list1 = Arrays.asList(
+ new ConsumerRecord<>("topic", 1, 1L, recordKey, recordValue),
+ new ConsumerRecord<>("topic", 1, 5L, recordKey, recordValue));
+ group.addRawRecords(partition1, list1);
+
+ assertThat(group.allPartitionsBufferedLocally(), is(false));
+ try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(PartitionGroup.class)) {
+ LogCaptureAppender.setClassLoggerToTrace(PartitionGroup.class);
+ assertThat(group.readyToProcess(0L), is(true));
+ assertThat(
+ appender.getEvents(),
+ hasItem(Matchers.allOf(
+ Matchers.hasProperty("level", equalTo("TRACE")),
+ Matchers.hasProperty("message", equalTo(
+ "[test] Ready for processing because max.task.idle.ms is disabled.\n" +
+ "\tThere may be out-of-order processing for this task as a result.\n" +
+ "\tBuffered partitions: [topic-1]\n" +
+ "\tNon-buffered partitions: [topic-2]"
+ ))
+ ))
+ );
+ }
+ }
+
+ @Test
+ public void shouldBeReadyIfAllPartitionsAreBuffered() {
+ final PartitionGroup group = new PartitionGroup(
+ logContext,
+ mkMap(
+ mkEntry(partition1, queue1),
+ mkEntry(partition2, queue2)
+ ),
+ getValueSensor(metrics, lastLatenessValue),
+ enforcedProcessingSensor,
+ 0L
+ );
+
+ final List> list1 = Arrays.asList(
+ new ConsumerRecord<>("topic", 1, 1L, recordKey, recordValue),
+ new ConsumerRecord<>("topic", 1, 5L, recordKey, recordValue));
+ group.addRawRecords(partition1, list1);
+
+ final List> list2 = Arrays.asList(
+ new ConsumerRecord<>("topic", 2, 1L, recordKey, recordValue),
+ new ConsumerRecord<>("topic", 2, 5L, recordKey, recordValue));
+ group.addRawRecords(partition2, list2);
+
+ assertThat(group.allPartitionsBufferedLocally(), is(true));
+ try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(PartitionGroup.class)) {
+ LogCaptureAppender.setClassLoggerToTrace(PartitionGroup.class);
+ assertThat(group.readyToProcess(0L), is(true));
+ assertThat(
+ appender.getEvents(),
+ hasItem(Matchers.allOf(
+ Matchers.hasProperty("level", equalTo("TRACE")),
+ Matchers.hasProperty("message", equalTo("[test] All partitions were buffered locally, so this task is ready for processing."))
+ ))
+ );
+ }
+ }
+
+ @Test
+ public void shouldWaitForFetchesWhenMetadataIsIncomplete() {
+ final PartitionGroup group = new PartitionGroup(
+ logContext,
+ mkMap(
+ mkEntry(partition1, queue1),
+ mkEntry(partition2, queue2)
+ ),
+ getValueSensor(metrics, lastLatenessValue),
+ enforcedProcessingSensor,
+ 0L
+ );
+
+ final List> list1 = Arrays.asList(
+ new ConsumerRecord<>("topic", 1, 1L, recordKey, recordValue),
+ new ConsumerRecord<>("topic", 1, 5L, recordKey, recordValue));
+ group.addRawRecords(partition1, list1);
+
+ assertThat(group.allPartitionsBufferedLocally(), is(false));
+ try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(PartitionGroup.class)) {
+ LogCaptureAppender.setClassLoggerToTrace(PartitionGroup.class);
+ assertThat(group.readyToProcess(0L), is(false));
+ assertThat(
+ appender.getEvents(),
+ hasItem(Matchers.allOf(
+ Matchers.hasProperty("level", equalTo("TRACE")),
+ Matchers.hasProperty("message", equalTo("[test] Waiting to fetch data for topic-2"))
+ ))
+ );
+ }
+ group.addFetchedMetadata(partition2, new ConsumerRecords.Metadata(0L, 0L, 0L));
+ assertThat(group.readyToProcess(0L), is(true));
+ }
+
+ @Test
+ public void shouldWaitForPollWhenLagIsNonzero() {
+ final PartitionGroup group = new PartitionGroup(
+ logContext,
+ mkMap(
+ mkEntry(partition1, queue1),
+ mkEntry(partition2, queue2)
+ ),
+ getValueSensor(metrics, lastLatenessValue),
+ enforcedProcessingSensor,
+ 0L
+ );
+
+ final List> list1 = Arrays.asList(
+ new ConsumerRecord<>("topic", 1, 1L, recordKey, recordValue),
+ new ConsumerRecord<>("topic", 1, 5L, recordKey, recordValue));
+ group.addRawRecords(partition1, list1);
+ group.addFetchedMetadata(partition2, new ConsumerRecords.Metadata(0L, 0L, 1L));
+
+ assertThat(group.allPartitionsBufferedLocally(), is(false));
+
+ try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(PartitionGroup.class)) {
+ LogCaptureAppender.setClassLoggerToTrace(PartitionGroup.class);
+ assertThat(group.readyToProcess(0L), is(false));
+ assertThat(
+ appender.getEvents(),
+ hasItem(Matchers.allOf(
+ Matchers.hasProperty("level", equalTo("TRACE")),
+ Matchers.hasProperty("message", equalTo("[test] Lag for topic-2 is currently 1, but no data is buffered locally. Waiting to buffer some records."))
+ ))
+ );
+ }
+ }
+
+ @Test
+ public void shouldIdleAsSpecifiedWhenLagIsZero() {
+ final PartitionGroup group = new PartitionGroup(
+ logContext,
+ mkMap(
+ mkEntry(partition1, queue1),
+ mkEntry(partition2, queue2)
+ ),
+ getValueSensor(metrics, lastLatenessValue),
+ enforcedProcessingSensor,
+ 1L
+ );
+
+ final List> list1 = Arrays.asList(
+ new ConsumerRecord<>("topic", 1, 1L, recordKey, recordValue),
+ new ConsumerRecord<>("topic", 1, 5L, recordKey, recordValue));
+ group.addRawRecords(partition1, list1);
+ group.addFetchedMetadata(partition2, new ConsumerRecords.Metadata(0L, 0L, 0L));
+
+ assertThat(group.allPartitionsBufferedLocally(), is(false));
+
+ try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(PartitionGroup.class)) {
+ LogCaptureAppender.setClassLoggerToTrace(PartitionGroup.class);
+ assertThat(group.readyToProcess(0L), is(false));
+ assertThat(
+ appender.getEvents(),
+ hasItem(Matchers.allOf(
+ Matchers.hasProperty("level", equalTo("TRACE")),
+ Matchers.hasProperty("message", equalTo("[test] Lag for topic-2 is currently 0 and current time is 0. Waiting for new data to be produced for configured idle time 1 (deadline is 1)."))
+ ))
+ );
+ }
+
+ group.addFetchedMetadata(partition2, new ConsumerRecords.Metadata(0L, 0L, 0L));
+ try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(PartitionGroup.class)) {
+ LogCaptureAppender.setClassLoggerToTrace(PartitionGroup.class);
+ assertThat(group.readyToProcess(1L), is(true));
+ assertThat(
+ appender.getEvents(),
+ hasItem(Matchers.allOf(
+ Matchers.hasProperty("level", equalTo("INFO")),
+ Matchers.hasProperty("message", equalTo(
+ "[test] Continuing to process although some partition timestamps were not buffered locally.\n" +
+ "\tThere may be out-of-order processing for this task as a result.\n" +
+ "\tPartitions with local data: [topic-1].\n" +
+ "\tPartitions we gave up waiting for, with their corresponding deadlines: {topic-2=1}.\n" +
+ "\tConfigured max.task.idle.ms: 1.\n" +
+ "\tCurrent wall-clock time: 1."
+ ))
+ ))
+ );
+ }
+
+ group.addFetchedMetadata(partition2, new ConsumerRecords.Metadata(0L, 0L, 0L));
+ try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(PartitionGroup.class)) {
+ LogCaptureAppender.setClassLoggerToTrace(PartitionGroup.class);
+ assertThat(group.readyToProcess(2L), is(true));
+ assertThat(
+ appender.getEvents(),
+ hasItem(Matchers.allOf(
+ Matchers.hasProperty("level", equalTo("INFO")),
+ Matchers.hasProperty("message", equalTo(
+ "[test] Continuing to process although some partition timestamps were not buffered locally.\n" +
+ "\tThere may be out-of-order processing for this task as a result.\n" +
+ "\tPartitions with local data: [topic-1].\n" +
+ "\tPartitions we gave up waiting for, with their corresponding deadlines: {topic-2=1}.\n" +
+ "\tConfigured max.task.idle.ms: 1.\n" +
+ "\tCurrent wall-clock time: 2."
+ ))
+ ))
+ );
+ }
+ }
+
+ private PartitionGroup getBasicGroup() {
+ return new PartitionGroup(
+ logContext,
+ mkMap(
+ mkEntry(partition1, queue1),
+ mkEntry(partition2, queue2)
+ ),
+ getValueSensor(metrics, lastLatenessValue),
+ enforcedProcessingSensor,
+ maxTaskIdleMs
+ );
+ }
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index 56022b286f7cc..07d0bdc3e8b56 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -18,6 +18,7 @@
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
@@ -39,6 +40,7 @@
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsConfig;
@@ -74,7 +76,6 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.time.Duration;
-import java.util.Arrays;
import java.util.Base64;
import java.util.Collections;
import java.util.HashSet;
@@ -94,6 +95,7 @@
import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.common.utils.Utils.mkProperties;
import static org.apache.kafka.common.utils.Utils.mkSet;
+import static org.apache.kafka.streams.StreamsConfig.AT_LEAST_ONCE;
import static org.apache.kafka.streams.processor.internals.StreamTask.encodeTimestamp;
import static org.apache.kafka.streams.processor.internals.Task.State.CREATED;
import static org.apache.kafka.streams.processor.internals.Task.State.RESTORING;
@@ -123,6 +125,7 @@ public class StreamTaskTest {
private static final File BASE_DIR = TestUtils.tempDirectory();
private static final long DEFAULT_TIMESTAMP = 1000;
+ private final LogContext logContext = new LogContext("[test] ");
private final String topic1 = "topic1";
private final String topic2 = "topic2";
private final TopicPartition partition1 = new TopicPartition(topic1, 1);
@@ -184,10 +187,10 @@ private static ProcessorTopology withRepartitionTopics(final List repartitionTopics) {
return new ProcessorTopology(processorNodes,
sourcesByTopic,
- Collections.emptyMap(),
- Collections.emptyList(),
- Collections.emptyList(),
- Collections.emptyMap(),
+ emptyMap(),
+ emptyList(),
+ emptyList(),
+ emptyMap(),
repartitionTopics);
}
@@ -195,10 +198,10 @@ private static ProcessorTopology withSources(final List> sourcesByTopic) {
return new ProcessorTopology(processorNodes,
sourcesByTopic,
- Collections.emptyMap(),
- Collections.emptyList(),
- Collections.emptyList(),
- Collections.emptyMap(),
+ emptyMap(),
+ emptyList(),
+ emptyList(),
+ emptyMap(),
Collections.emptySet());
}
@@ -207,7 +210,7 @@ private static StreamsConfig createConfig() {
}
private static StreamsConfig createConfig(final String enforcedProcessingValue) {
- return createConfig(StreamsConfig.AT_LEAST_ONCE, enforcedProcessingValue);
+ return createConfig(AT_LEAST_ONCE, enforcedProcessingValue);
}
private static StreamsConfig createConfig(final String eosConfig, final String enforcedProcessingValue) {
@@ -290,7 +293,7 @@ public void shouldAttemptToDeleteStateDirectoryWhenCloseDirtyAndEosEnabled() thr
EasyMock.expect(stateManager.taskType()).andStubReturn(TaskType.ACTIVE);
stateDirectory = ctrl.createMock(StateDirectory.class);
- stateManager.registerGlobalStateStores(Collections.emptyList());
+ stateManager.registerGlobalStateStores(emptyList());
EasyMock.expectLastCall();
EasyMock.expect(stateManager.taskId()).andReturn(taskId);
@@ -339,16 +342,16 @@ public void shouldReadCommittedStreamTimeOnInitialize() {
public void shouldTransitToRestoringThenRunningAfterCreation() throws IOException {
stateDirectory = EasyMock.createNiceMock(StateDirectory.class);
EasyMock.expect(stateDirectory.lock(taskId)).andReturn(true);
- EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.singleton(changelogPartition));
- EasyMock.expect(stateManager.changelogOffsets()).andReturn(Collections.singletonMap(changelogPartition, 10L));
+ EasyMock.expect(stateManager.changelogPartitions()).andReturn(singleton(changelogPartition));
+ EasyMock.expect(stateManager.changelogOffsets()).andReturn(singletonMap(changelogPartition, 10L));
stateManager.registerStore(stateStore, stateStore.stateRestoreCallback);
EasyMock.expectLastCall();
- EasyMock.expect(recordCollector.offsets()).andReturn(Collections.emptyMap()).anyTimes();
+ EasyMock.expect(recordCollector.offsets()).andReturn(emptyMap()).anyTimes();
EasyMock.replay(stateDirectory, stateManager, recordCollector);
task = createStatefulTask(createConfig("100"), true);
- assertEquals(Task.State.CREATED, task.state());
+ assertEquals(CREATED, task.state());
task.initializeIfNeeded();
@@ -363,7 +366,7 @@ public void shouldTransitToRestoringThenRunningAfterCreation() throws IOExceptio
task.completeRestoration();
- assertEquals(Task.State.RUNNING, task.state());
+ assertEquals(RUNNING, task.state());
assertTrue(source1.initialized);
assertTrue(source2.initialized);
@@ -375,58 +378,75 @@ public void shouldProcessInOrder() {
task = createStatelessTask(createConfig(), StreamsConfig.METRICS_LATEST);
task.addRecords(partition1, asList(
- getConsumerRecord(partition1, 10),
- getConsumerRecord(partition1, 20),
- getConsumerRecord(partition1, 30)
+ getConsumerRecordWithOffsetAsTimestamp(partition1, 10, 101),
+ getConsumerRecordWithOffsetAsTimestamp(partition1, 20, 102),
+ getConsumerRecordWithOffsetAsTimestamp(partition1, 30, 103)
));
task.addRecords(partition2, asList(
- getConsumerRecord(partition2, 25),
- getConsumerRecord(partition2, 35),
- getConsumerRecord(partition2, 45)
+ getConsumerRecordWithOffsetAsTimestamp(partition2, 25, 201),
+ getConsumerRecordWithOffsetAsTimestamp(partition2, 35, 202),
+ getConsumerRecordWithOffsetAsTimestamp(partition2, 45, 203)
));
assertTrue(task.process(0L));
assertEquals(5, task.numBuffered());
assertEquals(1, source1.numReceived);
assertEquals(0, source2.numReceived);
+ assertEquals(singletonList(101), source1.values);
+ assertEquals(emptyList(), source2.values);
assertTrue(task.process(0L));
assertEquals(4, task.numBuffered());
assertEquals(2, source1.numReceived);
assertEquals(0, source2.numReceived);
+ assertEquals(asList(101, 102), source1.values);
+ assertEquals(emptyList(), source2.values);
assertTrue(task.process(0L));
assertEquals(3, task.numBuffered());
assertEquals(2, source1.numReceived);
assertEquals(1, source2.numReceived);
+ assertEquals(asList(101, 102), source1.values);
+ assertEquals(singletonList(201), source2.values);
assertTrue(task.process(0L));
assertEquals(2, task.numBuffered());
assertEquals(3, source1.numReceived);
assertEquals(1, source2.numReceived);
+ assertEquals(asList(101, 102, 103), source1.values);
+ assertEquals(singletonList(201), source2.values);
+
+ // tell the task that it doesn't need to wait for more records on partition1
+ task.addFetchedMetadata(partition1, new ConsumerRecords.Metadata(0L, 30L, 30L));
assertTrue(task.process(0L));
assertEquals(1, task.numBuffered());
assertEquals(3, source1.numReceived);
assertEquals(2, source2.numReceived);
+ assertEquals(asList(101, 102, 103), source1.values);
+ assertEquals(asList(201, 202), source2.values);
+ // tell the task that it doesn't need to wait for more records on partition1
+ task.addFetchedMetadata(partition1, new ConsumerRecords.Metadata(0L, 30L, 30L));
assertTrue(task.process(0L));
assertEquals(0, task.numBuffered());
assertEquals(3, source1.numReceived);
assertEquals(3, source2.numReceived);
+ assertEquals(asList(101, 102, 103), source1.values);
+ assertEquals(asList(201, 202, 203), source2.values);
}
@Test
public void shouldProcessRecordsAfterPrepareCommitWhenEosDisabled() {
- task = createStatelessTask(createConfig(), StreamsConfig.METRICS_LATEST);
+ task = createSingleSourceStateless(createConfig(), StreamsConfig.METRICS_LATEST);
assertFalse(task.process(time.milliseconds()));
task.addRecords(partition1, asList(
- getConsumerRecord(partition1, 10),
- getConsumerRecord(partition1, 20),
- getConsumerRecord(partition1, 30)
+ getConsumerRecordWithOffsetAsTimestamp(partition1, 10),
+ getConsumerRecordWithOffsetAsTimestamp(partition1, 20),
+ getConsumerRecordWithOffsetAsTimestamp(partition1, 30)
));
assertTrue(task.process(time.milliseconds()));
@@ -440,14 +460,14 @@ public void shouldProcessRecordsAfterPrepareCommitWhenEosDisabled() {
@Test
public void shouldNotProcessRecordsAfterPrepareCommitWhenEosAlphaEnabled() {
- task = createStatelessTask(createConfig(StreamsConfig.EXACTLY_ONCE, "0"), StreamsConfig.METRICS_LATEST);
+ task = createSingleSourceStateless(createConfig(StreamsConfig.EXACTLY_ONCE, "0"), StreamsConfig.METRICS_LATEST);
assertFalse(task.process(time.milliseconds()));
task.addRecords(partition1, asList(
- getConsumerRecord(partition1, 10),
- getConsumerRecord(partition1, 20),
- getConsumerRecord(partition1, 30)
+ getConsumerRecordWithOffsetAsTimestamp(partition1, 10),
+ getConsumerRecordWithOffsetAsTimestamp(partition1, 20),
+ getConsumerRecordWithOffsetAsTimestamp(partition1, 30)
));
assertTrue(task.process(time.milliseconds()));
@@ -462,14 +482,14 @@ public void shouldNotProcessRecordsAfterPrepareCommitWhenEosAlphaEnabled() {
@Test
public void shouldNotProcessRecordsAfterPrepareCommitWhenEosBetaEnabled() {
- task = createStatelessTask(createConfig(StreamsConfig.EXACTLY_ONCE_BETA, "0"), StreamsConfig.METRICS_LATEST);
+ task = createSingleSourceStateless(createConfig(StreamsConfig.EXACTLY_ONCE_BETA, "0"), StreamsConfig.METRICS_LATEST);
assertFalse(task.process(time.milliseconds()));
task.addRecords(partition1, asList(
- getConsumerRecord(partition1, 10),
- getConsumerRecord(partition1, 20),
- getConsumerRecord(partition1, 30)
+ getConsumerRecordWithOffsetAsTimestamp(partition1, 10),
+ getConsumerRecordWithOffsetAsTimestamp(partition1, 20),
+ getConsumerRecordWithOffsetAsTimestamp(partition1, 30)
));
assertTrue(task.process(time.milliseconds()));
@@ -484,21 +504,21 @@ public void shouldNotProcessRecordsAfterPrepareCommitWhenEosBetaEnabled() {
@Test
public void shouldRecordBufferedRecords() {
- task = createStatelessTask(createConfig(), StreamsConfig.METRICS_LATEST);
+ task = createSingleSourceStateless(createConfig(AT_LEAST_ONCE, "0"), StreamsConfig.METRICS_LATEST);
final KafkaMetric metric = getMetric("active-buffer", "%s-count", task.id().toString(), StreamsConfig.METRICS_LATEST);
assertThat(metric.metricValue(), equalTo(0.0));
task.addRecords(partition1, asList(
- getConsumerRecord(partition1, 10),
- getConsumerRecord(partition1, 20)
+ getConsumerRecordWithOffsetAsTimestamp(partition1, 10),
+ getConsumerRecordWithOffsetAsTimestamp(partition1, 20)
));
task.recordProcessTimeRatioAndBufferSize(100L, time.milliseconds());
assertThat(metric.metricValue(), equalTo(2.0));
- task.process(0L);
+ assertTrue(task.process(0L));
task.recordProcessTimeRatioAndBufferSize(100L, time.milliseconds());
assertThat(metric.metricValue(), equalTo(1.0));
@@ -567,7 +587,7 @@ public void process(final Record record) {
final Metric terminalMax = getProcessorMetric("record-e2e-latency", "%s-max", task.id().toString(), terminalNodeName, StreamsConfig.METRICS_LATEST);
// e2e latency = 10
- task.addRecords(partition1, singletonList(getConsumerRecord(0, 0L)));
+ task.addRecords(partition1, singletonList(getConsumerRecordWithOffsetAsTimestamp(0, 0L)));
task.process(10L);
assertThat(sourceAvg.metricValue(), equalTo(10.0));
@@ -581,7 +601,7 @@ public void process(final Record record) {
// e2e latency = 15
- task.addRecords(partition1, singletonList(getConsumerRecord(1, 0L)));
+ task.addRecords(partition1, singletonList(getConsumerRecordWithOffsetAsTimestamp(1, 0L)));
task.process(15L);
assertThat(sourceAvg.metricValue(), equalTo(12.5));
@@ -595,7 +615,7 @@ public void process(final Record record) {
// e2e latency = 23
- task.addRecords(partition1, singletonList(getConsumerRecord(2, 0L)));
+ task.addRecords(partition1, singletonList(getConsumerRecordWithOffsetAsTimestamp(2, 0L)));
task.process(23L);
assertThat(sourceAvg.metricValue(), equalTo(16.0));
@@ -609,7 +629,7 @@ public void process(final Record record) {
// e2e latency = 5
- task.addRecords(partition1, singletonList(getConsumerRecord(3, 0L)));
+ task.addRecords(partition1, singletonList(getConsumerRecordWithOffsetAsTimestamp(3, 0L)));
task.process(5L);
assertThat(sourceAvg.metricValue(), equalTo(13.25));
@@ -779,15 +799,15 @@ public void shouldPauseAndResumeBasedOnBufferedRecords() {
task = createStatelessTask(createConfig("100"), StreamsConfig.METRICS_LATEST);
task.addRecords(partition1, asList(
- getConsumerRecord(partition1, 10),
- getConsumerRecord(partition1, 20)
+ getConsumerRecordWithOffsetAsTimestamp(partition1, 10),
+ getConsumerRecordWithOffsetAsTimestamp(partition1, 20)
));
task.addRecords(partition2, asList(
- getConsumerRecord(partition2, 35),
- getConsumerRecord(partition2, 45),
- getConsumerRecord(partition2, 55),
- getConsumerRecord(partition2, 65)
+ getConsumerRecordWithOffsetAsTimestamp(partition2, 35),
+ getConsumerRecordWithOffsetAsTimestamp(partition2, 45),
+ getConsumerRecordWithOffsetAsTimestamp(partition2, 55),
+ getConsumerRecordWithOffsetAsTimestamp(partition2, 65)
));
assertTrue(task.process(0L));
@@ -798,9 +818,9 @@ public void shouldPauseAndResumeBasedOnBufferedRecords() {
assertTrue(consumer.paused().contains(partition2));
task.addRecords(partition1, asList(
- getConsumerRecord(partition1, 30),
- getConsumerRecord(partition1, 40),
- getConsumerRecord(partition1, 50)
+ getConsumerRecordWithOffsetAsTimestamp(partition1, 30),
+ getConsumerRecordWithOffsetAsTimestamp(partition1, 40),
+ getConsumerRecordWithOffsetAsTimestamp(partition1, 50)
));
assertEquals(2, consumer.paused().size());
@@ -835,17 +855,17 @@ public void shouldPunctuateOnceStreamTimeAfterGap() {
task.completeRestoration();
task.addRecords(partition1, asList(
- getConsumerRecord(partition1, 20),
- getConsumerRecord(partition1, 142),
- getConsumerRecord(partition1, 155),
- getConsumerRecord(partition1, 160)
+ getConsumerRecordWithOffsetAsTimestamp(partition1, 20),
+ getConsumerRecordWithOffsetAsTimestamp(partition1, 142),
+ getConsumerRecordWithOffsetAsTimestamp(partition1, 155),
+ getConsumerRecordWithOffsetAsTimestamp(partition1, 160)
));
task.addRecords(partition2, asList(
- getConsumerRecord(partition2, 25),
- getConsumerRecord(partition2, 145),
- getConsumerRecord(partition2, 159),
- getConsumerRecord(partition2, 161)
+ getConsumerRecordWithOffsetAsTimestamp(partition2, 25),
+ getConsumerRecordWithOffsetAsTimestamp(partition2, 145),
+ getConsumerRecordWithOffsetAsTimestamp(partition2, 159),
+ getConsumerRecordWithOffsetAsTimestamp(partition2, 161)
));
// st: -1
@@ -903,6 +923,9 @@ public void shouldPunctuateOnceStreamTimeAfterGap() {
assertEquals(3, source2.numReceived);
assertTrue(task.maybePunctuateStreamTime());
+ // tell the task that it doesn't need to wait for new data on partition1
+ task.addFetchedMetadata(partition1, new ConsumerRecords.Metadata(0L, 160L, 160L));
+
// st: 161
assertTrue(task.process(0L));
assertEquals(0, task.numBuffered());
@@ -920,15 +943,15 @@ public void shouldRespectPunctuateCancellationStreamTime() {
task.completeRestoration();
task.addRecords(partition1, asList(
- getConsumerRecord(partition1, 20),
- getConsumerRecord(partition1, 30),
- getConsumerRecord(partition1, 40)
+ getConsumerRecordWithOffsetAsTimestamp(partition1, 20),
+ getConsumerRecordWithOffsetAsTimestamp(partition1, 30),
+ getConsumerRecordWithOffsetAsTimestamp(partition1, 40)
));
task.addRecords(partition2, asList(
- getConsumerRecord(partition2, 25),
- getConsumerRecord(partition2, 35),
- getConsumerRecord(partition2, 45)
+ getConsumerRecordWithOffsetAsTimestamp(partition2, 25),
+ getConsumerRecordWithOffsetAsTimestamp(partition2, 35),
+ getConsumerRecordWithOffsetAsTimestamp(partition2, 45)
));
assertFalse(task.maybePunctuateStreamTime());
@@ -969,13 +992,13 @@ public void shouldRespectPunctuateCancellationSystemTime() {
@Test
public void shouldRespectCommitNeeded() {
- task = createStatelessTask(createConfig(), StreamsConfig.METRICS_LATEST);
+ task = createSingleSourceStateless(createConfig(AT_LEAST_ONCE, "0"), StreamsConfig.METRICS_LATEST);
task.initializeIfNeeded();
task.completeRestoration();
assertFalse(task.commitNeeded());
- task.addRecords(partition1, singletonList(getConsumerRecord(partition1, 0)));
+ task.addRecords(partition1, singletonList(getConsumerRecordWithOffsetAsTimestamp(partition1, 0)));
assertTrue(task.process(0L));
assertTrue(task.commitNeeded());
@@ -1007,14 +1030,14 @@ public void shouldRespectCommitNeeded() {
@Test
public void shouldCommitNextOffsetFromQueueIfAvailable() {
- task = createStatelessTask(createConfig(), StreamsConfig.METRICS_LATEST);
+ task = createSingleSourceStateless(createConfig(AT_LEAST_ONCE, "0"), StreamsConfig.METRICS_LATEST);
task.initializeIfNeeded();
task.completeRestoration();
- task.addRecords(partition1, Arrays.asList(
- getConsumerRecord(partition1, 0L),
- getConsumerRecord(partition1, 3L),
- getConsumerRecord(partition1, 5L)));
+ task.addRecords(partition1, asList(
+ getConsumerRecordWithOffsetAsTimestamp(partition1, 0L),
+ getConsumerRecordWithOffsetAsTimestamp(partition1, 3L),
+ getConsumerRecordWithOffsetAsTimestamp(partition1, 5L)));
task.process(0L);
task.process(0L);
@@ -1030,12 +1053,14 @@ public void shouldCommitConsumerPositionIfRecordQueueIsEmpty() {
task.initializeIfNeeded();
task.completeRestoration();
- consumer.addRecord(getConsumerRecord(partition1, 0L));
- consumer.addRecord(getConsumerRecord(partition1, 1L));
- consumer.addRecord(getConsumerRecord(partition1, 2L));
+ consumer.addRecord(getConsumerRecordWithOffsetAsTimestamp(partition1, 0L));
+ consumer.addRecord(getConsumerRecordWithOffsetAsTimestamp(partition1, 1L));
+ consumer.addRecord(getConsumerRecordWithOffsetAsTimestamp(partition1, 2L));
+ consumer.updateEndOffsets(mkMap(mkEntry(partition2, 0L)));
consumer.poll(Duration.ZERO);
- task.addRecords(partition1, singletonList(getConsumerRecord(partition1, 0L)));
+ task.addRecords(partition1, singletonList(getConsumerRecordWithOffsetAsTimestamp(partition1, 0L)));
+ task.addRecords(partition2, singletonList(getConsumerRecordWithOffsetAsTimestamp(partition2, 0L)));
task.process(0L);
final Map offsetsAndMetadata = task.prepareCommit();
@@ -1098,116 +1123,15 @@ public void shouldBeProcessableIfAllPartitionsBuffered() {
final byte[] bytes = ByteBuffer.allocate(4).putInt(1).array();
- task.addRecords(partition1, Collections.singleton(new ConsumerRecord<>(topic1, 1, 0, bytes, bytes)));
+ task.addRecords(partition1, singleton(new ConsumerRecord<>(topic1, 1, 0, bytes, bytes)));
assertFalse(task.process(0L));
- task.addRecords(partition2, Collections.singleton(new ConsumerRecord<>(topic2, 1, 0, bytes, bytes)));
+ task.addRecords(partition2, singleton(new ConsumerRecord<>(topic2, 1, 0, bytes, bytes)));
assertTrue(task.process(0L));
}
- @Test
- public void shouldBeProcessableIfWaitedForTooLong() {
- // max idle time is 100ms
- task = createStatelessTask(createConfig("100"), StreamsConfig.METRICS_LATEST);
- task.initializeIfNeeded();
- task.completeRestoration();
-
- final MetricName enforcedProcessMetric = metrics.metricName(
- "enforced-processing-total",
- "stream-task-metrics",
- mkMap(mkEntry("thread-id", Thread.currentThread().getName()), mkEntry("task-id", taskId.toString()))
- );
-
- assertFalse(task.process(time.milliseconds()));
- assertEquals(0.0, metrics.metric(enforcedProcessMetric).metricValue());
-
- final byte[] bytes = ByteBuffer.allocate(4).putInt(1).array();
-
- task.addRecords(partition1,
- asList(
- new ConsumerRecord<>(topic1, 1, 0, bytes, bytes),
- new ConsumerRecord<>(topic1, 1, 1, bytes, bytes),
- new ConsumerRecord<>(topic1, 1, 2, bytes, bytes)
- )
- );
-
- assertFalse(task.process(time.milliseconds()));
-
- assertFalse(task.process(time.milliseconds() + 99L));
-
- assertTrue(task.process(time.milliseconds() + 100L));
- assertEquals(1.0, metrics.metric(enforcedProcessMetric).metricValue());
-
- // once decided to enforce, continue doing that
- assertTrue(task.process(time.milliseconds() + 101L));
- assertEquals(2.0, metrics.metric(enforcedProcessMetric).metricValue());
-
- task.addRecords(partition2, Collections.singleton(new ConsumerRecord<>(topic2, 1, 0, bytes, bytes)));
-
- assertTrue(task.process(time.milliseconds() + 130L));
- assertEquals(2.0, metrics.metric(enforcedProcessMetric).metricValue());
-
- // one resumed to normal processing, the timer should be reset
-
- assertFalse(task.process(time.milliseconds() + 150L));
- assertEquals(2.0, metrics.metric(enforcedProcessMetric).metricValue());
-
- assertFalse(task.process(time.milliseconds() + 249L));
- assertEquals(2.0, metrics.metric(enforcedProcessMetric).metricValue());
-
- assertTrue(task.process(time.milliseconds() + 250L));
- assertEquals(3.0, metrics.metric(enforcedProcessMetric).metricValue());
- }
-
- @Test
- public void shouldNotBeProcessableIfNoDataAvailable() {
- task = createStatelessTask(createConfig("100"), StreamsConfig.METRICS_LATEST);
- task.initializeIfNeeded();
- task.completeRestoration();
-
- final MetricName enforcedProcessMetric = metrics.metricName(
- "enforced-processing-total",
- "stream-task-metrics",
- mkMap(mkEntry("thread-id", Thread.currentThread().getName()), mkEntry("task-id", taskId.toString()))
- );
-
- assertFalse(task.process(0L));
- assertEquals(0.0, metrics.metric(enforcedProcessMetric).metricValue());
-
- final byte[] bytes = ByteBuffer.allocate(4).putInt(1).array();
-
- task.addRecords(partition1, Collections.singleton(new ConsumerRecord<>(topic1, 1, 0, bytes, bytes)));
-
- assertFalse(task.process(time.milliseconds()));
-
- assertFalse(task.process(time.milliseconds() + 99L));
-
- assertTrue(task.process(time.milliseconds() + 100L));
- assertEquals(1.0, metrics.metric(enforcedProcessMetric).metricValue());
-
- // once the buffer is drained and no new records coming, the timer should be reset
-
- assertFalse(task.process(time.milliseconds() + 110L));
- assertEquals(1.0, metrics.metric(enforcedProcessMetric).metricValue());
-
- // check that after time is reset, we only falls into enforced processing after the
- // whole timeout has elapsed again
- task.addRecords(partition1, Collections.singleton(new ConsumerRecord<>(topic1, 1, 0, bytes, bytes)));
-
- assertFalse(task.process(time.milliseconds() + 150L));
- assertEquals(1.0, metrics.metric(enforcedProcessMetric).metricValue());
-
- assertFalse(task.process(time.milliseconds() + 249L));
- assertEquals(1.0, metrics.metric(enforcedProcessMetric).metricValue());
-
- assertTrue(task.process(time.milliseconds() + 250L));
- assertEquals(2.0, metrics.metric(enforcedProcessMetric).metricValue());
- }
-
-
- @Test
public void shouldPunctuateSystemTimeWhenIntervalElapsed() {
task = createStatelessTask(createConfig("100"), StreamsConfig.METRICS_LATEST);
task.initializeIfNeeded();
@@ -1323,8 +1247,8 @@ public void shouldNotShareHeadersBetweenPunctuateIterations() {
@Test
public void shouldWrapKafkaExceptionWithStreamsExceptionWhenProcess() {
EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes();
- EasyMock.expect(stateManager.changelogOffsets()).andReturn(Collections.emptyMap()).anyTimes();
- EasyMock.expect(recordCollector.offsets()).andReturn(Collections.emptyMap()).anyTimes();
+ EasyMock.expect(stateManager.changelogOffsets()).andReturn(emptyMap()).anyTimes();
+ EasyMock.expect(recordCollector.offsets()).andReturn(emptyMap()).anyTimes();
EasyMock.replay(stateManager, recordCollector);
task = createFaultyStatefulTask(createConfig("100"));
@@ -1333,14 +1257,14 @@ public void shouldWrapKafkaExceptionWithStreamsExceptionWhenProcess() {
task.completeRestoration();
task.addRecords(partition1, asList(
- getConsumerRecord(partition1, 10),
- getConsumerRecord(partition1, 20),
- getConsumerRecord(partition1, 30)
+ getConsumerRecordWithOffsetAsTimestamp(partition1, 10),
+ getConsumerRecordWithOffsetAsTimestamp(partition1, 20),
+ getConsumerRecordWithOffsetAsTimestamp(partition1, 30)
));
task.addRecords(partition2, asList(
- getConsumerRecord(partition2, 5), // this is the first record to process
- getConsumerRecord(partition2, 35),
- getConsumerRecord(partition2, 45)
+ getConsumerRecordWithOffsetAsTimestamp(partition2, 5), // this is the first record to process
+ getConsumerRecordWithOffsetAsTimestamp(partition2, 35),
+ getConsumerRecordWithOffsetAsTimestamp(partition2, 45)
));
assertThrows(StreamsException.class, () -> task.process(0L));
@@ -1351,7 +1275,7 @@ public void shouldReadCommittedOffsetAndRethrowTimeoutWhenCompleteRestoration()
stateDirectory = EasyMock.createNiceMock(StateDirectory.class);
EasyMock.expect(stateDirectory.lock(taskId)).andReturn(true);
EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes();
- EasyMock.expect(stateManager.changelogOffsets()).andReturn(Collections.emptyMap()).anyTimes();
+ EasyMock.expect(stateManager.changelogOffsets()).andReturn(emptyMap()).anyTimes();
EasyMock.replay(recordCollector, stateDirectory, stateManager);
@@ -1377,7 +1301,7 @@ public void shouldReInitializeTopologyWhenResuming() throws IOException {
task.suspend();
- assertEquals(Task.State.SUSPENDED, task.state());
+ assertEquals(SUSPENDED, task.state());
assertFalse(source1.initialized);
assertFalse(source2.initialized);
@@ -1389,14 +1313,14 @@ public void shouldReInitializeTopologyWhenResuming() throws IOException {
task.completeRestoration();
- assertEquals(Task.State.RUNNING, task.state());
+ assertEquals(RUNNING, task.state());
assertTrue(source1.initialized);
assertTrue(source2.initialized);
EasyMock.verify(stateManager, recordCollector);
EasyMock.reset(recordCollector);
- EasyMock.expect(recordCollector.offsets()).andReturn(Collections.emptyMap());
+ EasyMock.expect(recordCollector.offsets()).andReturn(emptyMap());
EasyMock.replay(recordCollector);
}
@@ -1404,12 +1328,12 @@ public void shouldReInitializeTopologyWhenResuming() throws IOException {
public void shouldNotCheckpointOffsetsAgainOnCommitIfSnapshotNotChangedMuch() {
final Long offset = 543L;
- EasyMock.expect(recordCollector.offsets()).andReturn(Collections.singletonMap(changelogPartition, offset)).anyTimes();
+ EasyMock.expect(recordCollector.offsets()).andReturn(singletonMap(changelogPartition, offset)).anyTimes();
stateManager.checkpoint();
EasyMock.expectLastCall().once();
EasyMock.expect(stateManager.changelogOffsets())
- .andReturn(Collections.singletonMap(changelogPartition, 10L))
- .andReturn(Collections.singletonMap(changelogPartition, 20L));
+ .andReturn(singletonMap(changelogPartition, 10L))
+ .andReturn(singletonMap(changelogPartition, 20L));
EasyMock.expectLastCall();
EasyMock.replay(stateManager, recordCollector);
@@ -1431,14 +1355,14 @@ public void shouldNotCheckpointOffsetsAgainOnCommitIfSnapshotNotChangedMuch() {
public void shouldCheckpointOffsetsOnCommitIfSnapshotMuchChanged() {
final Long offset = 543L;
- EasyMock.expect(recordCollector.offsets()).andReturn(Collections.singletonMap(changelogPartition, offset)).anyTimes();
+ EasyMock.expect(recordCollector.offsets()).andReturn(singletonMap(changelogPartition, offset)).anyTimes();
stateManager.checkpoint();
EasyMock.expectLastCall().times(2);
- EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.singleton(changelogPartition));
+ EasyMock.expect(stateManager.changelogPartitions()).andReturn(singleton(changelogPartition));
EasyMock.expect(stateManager.changelogOffsets())
- .andReturn(Collections.singletonMap(changelogPartition, 0L))
- .andReturn(Collections.singletonMap(changelogPartition, 10L))
- .andReturn(Collections.singletonMap(changelogPartition, 12000L));
+ .andReturn(singletonMap(changelogPartition, 0L))
+ .andReturn(singletonMap(changelogPartition, 10L))
+ .andReturn(singletonMap(changelogPartition, 12000L));
stateManager.registerStore(stateStore, stateStore.stateRestoreCallback);
EasyMock.expectLastCall();
EasyMock.replay(stateManager, recordCollector);
@@ -1458,10 +1382,10 @@ public void shouldCheckpointOffsetsOnCommitIfSnapshotMuchChanged() {
@Test
public void shouldNotCheckpointOffsetsOnCommitIfEosIsEnabled() {
- EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.singleton(changelogPartition));
+ EasyMock.expect(stateManager.changelogPartitions()).andReturn(singleton(changelogPartition));
stateManager.registerStore(stateStore, stateStore.stateRestoreCallback);
EasyMock.expectLastCall();
- EasyMock.expect(recordCollector.offsets()).andReturn(Collections.emptyMap()).anyTimes();
+ EasyMock.expect(recordCollector.offsets()).andReturn(emptyMap()).anyTimes();
EasyMock.replay(stateManager, recordCollector);
task = createStatefulTask(createConfig(StreamsConfig.EXACTLY_ONCE, "100"), true);
@@ -1531,7 +1455,7 @@ public void shouldCloseStateManagerEvenDuringFailureOnUncleanTaskClose() {
EasyMock.expectLastCall();
stateManager.close();
EasyMock.expectLastCall();
- EasyMock.expect(recordCollector.offsets()).andReturn(Collections.emptyMap()).anyTimes();
+ EasyMock.expect(recordCollector.offsets()).andReturn(emptyMap()).anyTimes();
EasyMock.replay(stateManager, recordCollector);
task = createFaultyStatefulTask(createConfig("100"));
@@ -1552,13 +1476,13 @@ public void shouldReturnOffsetsForRepartitionTopicsForPurging() {
final ProcessorTopology topology = withRepartitionTopics(
asList(source1, source2),
mkMap(mkEntry(topic1, source1), mkEntry(repartition.topic(), source2)),
- Collections.singleton(repartition.topic())
+ singleton(repartition.topic())
);
consumer.assign(asList(partition1, repartition));
consumer.updateBeginningOffsets(mkMap(mkEntry(repartition, 0L)));
EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.emptySet());
- EasyMock.expect(recordCollector.offsets()).andReturn(Collections.emptyMap()).anyTimes();
+ EasyMock.expect(recordCollector.offsets()).andReturn(emptyMap()).anyTimes();
EasyMock.replay(stateManager, recordCollector);
final StreamsConfig config = createConfig();
@@ -1582,27 +1506,29 @@ public void shouldReturnOffsetsForRepartitionTopicsForPurging() {
time,
stateManager,
recordCollector,
- context);
+ context,
+ logContext);
task.initializeIfNeeded();
task.completeRestoration();
- task.addRecords(partition1, singletonList(getConsumerRecord(partition1, 5L)));
- task.addRecords(repartition, singletonList(getConsumerRecord(repartition, 10L)));
+ task.addRecords(partition1, singletonList(getConsumerRecordWithOffsetAsTimestamp(partition1, 5L)));
+ task.addRecords(repartition, singletonList(getConsumerRecordWithOffsetAsTimestamp(repartition, 10L)));
assertTrue(task.process(0L));
+ task.addFetchedMetadata(partition1, new ConsumerRecords.Metadata(0L, 5L, 5L));
assertTrue(task.process(0L));
task.prepareCommit();
final Map map = task.purgeableOffsets();
- assertThat(map, equalTo(Collections.singletonMap(repartition, 11L)));
+ assertThat(map, equalTo(singletonMap(repartition, 11L)));
}
@Test
public void shouldThrowStreamsExceptionWhenFetchCommittedFailed() {
- EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.singleton(partition1));
+ EasyMock.expect(stateManager.changelogPartitions()).andReturn(singleton(partition1));
EasyMock.replay(stateManager);
final Consumer consumer = new MockConsumer(OffsetResetStrategy.EARLIEST) {
@@ -1623,7 +1549,7 @@ public Map committed(final Set task.postCommit(true));
}
@@ -1653,7 +1579,7 @@ public void shouldCheckpointForSuspendedTask() {
stateManager.checkpoint();
EasyMock.expectLastCall().once();
EasyMock.expect(stateManager.changelogOffsets())
- .andReturn(Collections.singletonMap(partition1, 1L));
+ .andReturn(singletonMap(partition1, 1L));
EasyMock.replay(stateManager);
task = createStatefulTask(createConfig("100"), true);
@@ -1666,8 +1592,8 @@ public void shouldCheckpointForSuspendedTask() {
@Test
public void shouldNotCheckpointForSuspendedRunningTaskWithSmallProgress() {
EasyMock.expect(stateManager.changelogOffsets())
- .andReturn(Collections.singletonMap(partition1, 1L))
- .andReturn(Collections.singletonMap(partition1, 2L));
+ .andReturn(singletonMap(partition1, 1L))
+ .andReturn(singletonMap(partition1, 2L));
stateManager.checkpoint();
EasyMock.expectLastCall().andThrow(new AssertionError("Checkpoint should not be called")).anyTimes();
EasyMock.replay(stateManager);
@@ -1687,8 +1613,8 @@ public void shouldNotCheckpointForSuspendedRunningTaskWithSmallProgress() {
@Test
public void shouldCheckpointForSuspendedRunningTaskWithLargeProgress() {
EasyMock.expect(stateManager.changelogOffsets())
- .andReturn(Collections.singletonMap(partition1, 12000L))
- .andReturn(Collections.singletonMap(partition1, 24000L));
+ .andReturn(singletonMap(partition1, 12000L))
+ .andReturn(singletonMap(partition1, 24000L));
stateManager.checkpoint();
EasyMock.expectLastCall().times(2);
EasyMock.replay(stateManager);
@@ -1720,7 +1646,8 @@ public void shouldCheckpointWhileUpdateSnapshotWithTheConsumedOffsetsForSuspende
task = createStatefulTask(createConfig(), true);
task.initializeIfNeeded();
task.completeRestoration();
- task.addRecords(partition1, singleton(getConsumerRecord(partition1, 10)));
+ task.addRecords(partition1, singleton(getConsumerRecordWithOffsetAsTimestamp(partition1, 10)));
+ task.addRecords(partition2, singleton(getConsumerRecordWithOffsetAsTimestamp(partition2, 10)));
task.process(100L);
assertTrue(task.commitNeeded());
@@ -1731,20 +1658,20 @@ public void shouldCheckpointWhileUpdateSnapshotWithTheConsumedOffsetsForSuspende
@Test
public void shouldReturnStateManagerChangelogOffsets() {
- EasyMock.expect(stateManager.changelogOffsets()).andReturn(Collections.singletonMap(partition1, 50L)).anyTimes();
- EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.singleton(partition1)).anyTimes();
- EasyMock.expect(recordCollector.offsets()).andReturn(Collections.emptyMap()).anyTimes();
+ EasyMock.expect(stateManager.changelogOffsets()).andReturn(singletonMap(partition1, 50L)).anyTimes();
+ EasyMock.expect(stateManager.changelogPartitions()).andReturn(singleton(partition1)).anyTimes();
+ EasyMock.expect(recordCollector.offsets()).andReturn(emptyMap()).anyTimes();
EasyMock.replay(stateManager, recordCollector);
task = createOptimizedStatefulTask(createConfig("100"), consumer);
task.initializeIfNeeded();
- assertEquals(Collections.singletonMap(partition1, 50L), task.changelogOffsets());
+ assertEquals(singletonMap(partition1, 50L), task.changelogOffsets());
task.completeRestoration();
- assertEquals(Collections.singletonMap(partition1, Task.LATEST_OFFSET), task.changelogOffsets());
+ assertEquals(singletonMap(partition1, Task.LATEST_OFFSET), task.changelogOffsets());
}
@Test
@@ -1753,7 +1680,7 @@ public void shouldNotCheckpointOnCloseCreated() {
EasyMock.expectLastCall().andThrow(new AssertionError("Flush should not be called")).anyTimes();
stateManager.checkpoint();
EasyMock.expectLastCall().andThrow(new AssertionError("Checkpoint should not be called")).anyTimes();
- EasyMock.expect(recordCollector.offsets()).andReturn(Collections.emptyMap()).anyTimes();
+ EasyMock.expect(recordCollector.offsets()).andReturn(emptyMap()).anyTimes();
EasyMock.replay(stateManager, recordCollector);
final MetricName metricName = setupCloseTaskMetric();
@@ -1780,8 +1707,8 @@ public void shouldCheckpointOnCloseRestoringIfNoProgress() {
stateManager.checkpoint();
EasyMock.expectLastCall().once();
EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes();
- EasyMock.expect(stateManager.changelogOffsets()).andReturn(Collections.emptyMap()).anyTimes();
- EasyMock.expect(recordCollector.offsets()).andReturn(Collections.emptyMap()).anyTimes();
+ EasyMock.expect(stateManager.changelogOffsets()).andReturn(emptyMap()).anyTimes();
+ EasyMock.expect(recordCollector.offsets()).andReturn(emptyMap()).anyTimes();
EasyMock.replay(stateManager, recordCollector);
task = createOptimizedStatefulTask(createConfig("100"), consumer);
@@ -1803,20 +1730,20 @@ public void shouldCheckpointOffsetsOnPostCommit() {
final long offset = 543L;
final long consumedOffset = 345L;
- EasyMock.expect(recordCollector.offsets()).andReturn(Collections.singletonMap(changelogPartition, offset)).anyTimes();
+ EasyMock.expect(recordCollector.offsets()).andReturn(singletonMap(changelogPartition, offset)).anyTimes();
EasyMock.expectLastCall();
stateManager.checkpoint();
EasyMock.expectLastCall().once();
EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes();
EasyMock.expect(stateManager.changelogOffsets())
- .andReturn(Collections.singletonMap(partition1, offset + 12000L));
+ .andReturn(singletonMap(partition1, offset + 12000L));
EasyMock.replay(recordCollector, stateManager);
task = createOptimizedStatefulTask(createConfig(), consumer);
task.initializeIfNeeded();
task.completeRestoration();
- task.addRecords(partition1, singletonList(getConsumerRecord(partition1, consumedOffset)));
+ task.addRecords(partition1, singletonList(getConsumerRecordWithOffsetAsTimestamp(partition1, consumedOffset)));
task.process(100L);
assertTrue(task.commitNeeded());
@@ -1824,7 +1751,7 @@ public void shouldCheckpointOffsetsOnPostCommit() {
task.prepareCommit();
task.postCommit(false);
- assertEquals(Task.State.SUSPENDED, task.state());
+ assertEquals(SUSPENDED, task.state());
EasyMock.verify(stateManager);
}
@@ -1836,8 +1763,8 @@ public void shouldThrowExceptionOnCloseCleanError() {
EasyMock.expect(recordCollector.offsets()).andReturn(emptyMap()).anyTimes();
stateManager.checkpoint();
EasyMock.expectLastCall().once();
- EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.singleton(changelogPartition)).anyTimes();
- EasyMock.expect(stateManager.changelogOffsets()).andReturn(Collections.singletonMap(changelogPartition, offset)).anyTimes();
+ EasyMock.expect(stateManager.changelogPartitions()).andReturn(singleton(changelogPartition)).anyTimes();
+ EasyMock.expect(stateManager.changelogOffsets()).andReturn(singletonMap(changelogPartition, offset)).anyTimes();
stateManager.close();
EasyMock.expectLastCall().andThrow(new ProcessorStateException("KABOOM!")).anyTimes();
EasyMock.replay(recordCollector, stateManager);
@@ -1847,7 +1774,7 @@ public void shouldThrowExceptionOnCloseCleanError() {
task.initializeIfNeeded();
task.completeRestoration();
- task.addRecords(partition1, singletonList(getConsumerRecord(partition1, offset)));
+ task.addRecords(partition1, singletonList(getConsumerRecordWithOffsetAsTimestamp(partition1, offset)));
task.process(100L);
assertTrue(task.commitNeeded());
@@ -1861,7 +1788,7 @@ public void shouldThrowExceptionOnCloseCleanError() {
EasyMock.verify(stateManager);
EasyMock.reset(stateManager);
- EasyMock.expect(stateManager.changelogPartitions()).andStubReturn(Collections.singleton(changelogPartition));
+ EasyMock.expect(stateManager.changelogPartitions()).andStubReturn(singleton(changelogPartition));
stateManager.close();
EasyMock.expectLastCall();
EasyMock.replay(stateManager);
@@ -1871,7 +1798,7 @@ public void shouldThrowExceptionOnCloseCleanError() {
public void shouldThrowOnCloseCleanFlushError() {
final long offset = 543L;
- EasyMock.expect(recordCollector.offsets()).andReturn(Collections.singletonMap(changelogPartition, offset));
+ EasyMock.expect(recordCollector.offsets()).andReturn(singletonMap(changelogPartition, offset));
stateManager.flushCache();
EasyMock.expectLastCall().andThrow(new ProcessorStateException("KABOOM!")).anyTimes();
stateManager.flush();
@@ -1881,7 +1808,7 @@ public void shouldThrowOnCloseCleanFlushError() {
stateManager.close();
EasyMock.expectLastCall().andThrow(new AssertionError("Close should not be called!")).anyTimes();
EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes();
- EasyMock.expect(stateManager.changelogOffsets()).andReturn(Collections.emptyMap()).anyTimes();
+ EasyMock.expect(stateManager.changelogOffsets()).andReturn(emptyMap()).anyTimes();
EasyMock.replay(recordCollector, stateManager);
final MetricName metricName = setupCloseTaskMetric();
@@ -1890,7 +1817,7 @@ public void shouldThrowOnCloseCleanFlushError() {
task.completeRestoration();
// process one record to make commit needed
- task.addRecords(partition1, singletonList(getConsumerRecord(partition1, offset)));
+ task.addRecords(partition1, singletonList(getConsumerRecordWithOffsetAsTimestamp(partition1, offset)));
task.process(100L);
assertThrows(ProcessorStateException.class, task::prepareCommit);
@@ -1916,14 +1843,14 @@ public void shouldThrowOnCloseCleanCheckpointError() {
EasyMock.expectLastCall().andThrow(new AssertionError("Close should not be called!")).anyTimes();
EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes();
EasyMock.expect(stateManager.changelogOffsets())
- .andReturn(Collections.singletonMap(partition1, offset));
+ .andReturn(singletonMap(partition1, offset));
EasyMock.replay(recordCollector, stateManager);
final MetricName metricName = setupCloseTaskMetric();
task = createOptimizedStatefulTask(createConfig("100"), consumer);
task.initializeIfNeeded();
- task.addRecords(partition1, singletonList(getConsumerRecord(partition1, offset)));
+ task.addRecords(partition1, singletonList(getConsumerRecordWithOffsetAsTimestamp(partition1, offset)));
task.process(100L);
assertTrue(task.commitNeeded());
@@ -2037,12 +1964,12 @@ public void shouldUpdatePartitions() {
@Test
public void shouldThrowIfCleanClosingDirtyTask() {
- task = createStatelessTask(createConfig(), StreamsConfig.METRICS_LATEST);
+ task = createSingleSourceStateless(createConfig(AT_LEAST_ONCE, "0"), StreamsConfig.METRICS_LATEST);
task.initializeIfNeeded();
task.completeRestoration();
- task.addRecords(partition1, singletonList(getConsumerRecord(partition1, 0)));
- task.process(0L);
+ task.addRecords(partition1, singletonList(getConsumerRecordWithOffsetAsTimestamp(partition1, 0)));
+ assertTrue(task.process(0L));
assertTrue(task.commitNeeded());
assertThrows(TaskMigratedException.class, () -> task.closeClean());
@@ -2054,7 +1981,8 @@ public void shouldThrowIfRecyclingDirtyTask() {
task.initializeIfNeeded();
task.completeRestoration();
- task.addRecords(partition1, singletonList(getConsumerRecord(partition1, 0)));
+ task.addRecords(partition1, singletonList(getConsumerRecordWithOffsetAsTimestamp(partition1, 0)));
+ task.addRecords(partition2, singletonList(getConsumerRecordWithOffsetAsTimestamp(partition2, 0)));
task.process(0L);
assertTrue(task.commitNeeded());
@@ -2145,8 +2073,8 @@ public void shouldThrowTopologyExceptionIfTaskCreatedForUnknownTopic() {
time,
stateManager,
recordCollector,
- context
- )
+ context,
+ logContext)
);
assertThat(exception.getMessage(), equalTo("Invalid topology: " +
@@ -2210,7 +2138,7 @@ private StreamTask createOptimizedStatefulTask(final StreamsConfig config, final
time,
stateManager,
recordCollector,
- context);
+ context, logContext);
}
private StreamTask createDisconnectedTask(final StreamsConfig config) {
@@ -2220,7 +2148,7 @@ private StreamTask createDisconnectedTask(final StreamsConfig config) {
asList(source1, source2),
mkMap(mkEntry(topic1, source1), mkEntry(topic2, source2)),
singletonList(stateStore),
- Collections.emptyMap());
+ emptyMap());
final MockConsumer consumer = new MockConsumer(OffsetResetStrategy.EARLIEST) {
@Override
@@ -2249,7 +2177,7 @@ public Map committed(final Set sourceNode) {
@@ -2392,10 +2358,28 @@ private StreamTask createStatelessTaskWithForwardingTopology(final SourceNode getConsumerRecordWithOffsetAsTimestamp(final TopicPartition topicPartition,
+ final long offset,
+ final int value) {
+ return new ConsumerRecord<>(
+ topicPartition.topic(),
+ topicPartition.partition(),
+ offset,
+ offset, // use the offset as the timestamp
+ TimestampType.CREATE_TIME,
+ 0L,
+ 0,
+ 0,
+ recordKey,
+ intSerializer.serialize(null, value)
+ );
}
- private ConsumerRecord getConsumerRecord(final TopicPartition topicPartition, final long offset) {
+ private ConsumerRecord getConsumerRecordWithOffsetAsTimestamp(final TopicPartition topicPartition,
+ final long offset) {
return new ConsumerRecord<>(
topicPartition.topic(),
topicPartition.partition(),
@@ -2410,7 +2394,7 @@ private ConsumerRecord getConsumerRecord(final TopicPartition to
);
}
- private ConsumerRecord getConsumerRecord(final Integer key, final long offset) {
+ private ConsumerRecord getConsumerRecordWithOffsetAsTimestamp(final Integer key, final long offset) {
return new ConsumerRecord<>(
topic1,
1,
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
index 1ba1151510f36..219a182851bb4 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
@@ -24,6 +24,7 @@
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Metric;
@@ -3057,6 +3058,11 @@ public void addRecords(final TopicPartition partition, final Iterable clazz) {
Logger.getLogger(clazz).setLevel(Level.DEBUG);
}
+ public static void setClassLoggerToTrace(final Class> clazz) {
+ Logger.getLogger(clazz).setLevel(Level.TRACE);
+ }
+
public static void unregister(final LogCaptureAppender logCaptureAppender) {
Logger.getRootLogger().removeAppender(logCaptureAppender);
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
index 20cccf8b2ebf6..5f85803812974 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
@@ -447,7 +447,7 @@ private StreamTask createStreamsTask(final StreamsConfig streamsConfig,
new MockTime(),
stateManager,
recordCollector,
- context);
+ context, logContext);
}
private void mockThread(final boolean initialized) {
diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
index 8a9007651b8a8..5349d353ad253 100644
--- a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
+++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
@@ -19,6 +19,7 @@
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
@@ -525,11 +526,21 @@ private void setupTask(final StreamsConfig streamsConfig,
mockWallClockTime,
stateManager,
recordCollector,
- context
- );
+ context,
+ logContext);
task.initializeIfNeeded();
task.completeRestoration();
task.processorContext().setRecordContext(null);
+
+ // initialize the task metadata so that all topics have zero lag
+ for (final Map.Entry entry : startOffsets.entrySet()) {
+ final ConsumerRecords.Metadata metadata = new ConsumerRecords.Metadata(
+ mockWallClockTime.milliseconds(),
+ 0L,
+ 0L
+ );
+ task.addFetchedMetadata(entry.getKey(), metadata);
+ }
} else {
task = null;
}
@@ -591,10 +602,11 @@ private void enqueueTaskRecord(final String inputTopic,
final byte[] key,
final byte[] value,
final Headers headers) {
+ final long offset = offsetsByTopicOrPatternPartition.get(topicOrPatternPartition).incrementAndGet() - 1;
task.addRecords(topicOrPatternPartition, Collections.singleton(new ConsumerRecord<>(
inputTopic,
topicOrPatternPartition.partition(),
- offsetsByTopicOrPatternPartition.get(topicOrPatternPartition).incrementAndGet() - 1,
+ offset,
timestamp,
TimestampType.CREATE_TIME,
(long) ConsumerRecord.NULL_CHECKSUM,
@@ -604,6 +616,12 @@ private void enqueueTaskRecord(final String inputTopic,
value,
headers))
);
+ final ConsumerRecords.Metadata metadata = new ConsumerRecords.Metadata(
+ mockWallClockTime.milliseconds(),
+ offset,
+ offset
+ );
+ task.addFetchedMetadata(topicOrPatternPartition, metadata);
}
private void completeAllProcessableWork() {