diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogReader.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogReader.java index 2f31ad136626a..7fe93c8bcad54 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogReader.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogReader.java @@ -30,11 +30,6 @@ public interface ChangelogReader extends ChangelogRegister { */ void restore(); - /** - * Update offset limit of a given changelog partition - */ - void updateLimitOffsets(); - /** * Transit to restore active changelogs mode */ diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java index b813961682306..8b73388b35579 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java @@ -24,6 +24,7 @@ import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.errors.TaskMigratedException; @@ -99,11 +100,18 @@ static class ChangelogMetadata { // only for active restoring tasks (for standby changelog it is null) // NOTE we do not book keep the current offset since we leverage state manager as its source of truth - private Long restoreEndOffset; - // only for standby tasks that use source topics as changelogs (for active it is null); - // if it is not on source topics it is also null - private Long restoreLimitOffset; + + // the end offset beyond which records should not be applied (yet) to restore the states + // + // for both active restoring tasks and standby updating tasks, it is defined as: + // * log-end-offset if the changelog is not piggy-backed with source topic + // * min(log-end-offset, committed-offset) if the changelog is piggy-backed with source topic + // + // the log-end-offset only needs to be updated once and only need to be for active tasks since for standby + // tasks it would never "complete" based on the end-offset; + // the committed-offset needs to be updated periodically for those standby tasks + private Long restoreEndOffset; // buffer records polled by the restore consumer; private final List> bufferedRecords; @@ -113,14 +121,13 @@ static class ChangelogMetadata { private int bufferedLimitIndex; private ChangelogMetadata(final StateStoreMetadata storeMetadata, final ProcessorStateManager stateManager) { + this.changelogState = ChangelogState.REGISTERED; this.storeMetadata = storeMetadata; this.stateManager = stateManager; - this.changelogState = ChangelogState.REGISTERED; this.restoreEndOffset = null; this.totalRestored = 0L; this.bufferedRecords = new ArrayList<>(); - this.restoreLimitOffset = null; this.bufferedLimitIndex = 0; } @@ -139,7 +146,7 @@ private void transitTo(final ChangelogState newState) { public String toString() { final Long currentOffset = storeMetadata.offset(); return changelogState + " " + stateManager.taskType() + - " (currentOffset " + currentOffset + ", endOffset " + restoreEndOffset + ", limitOffset " + restoreLimitOffset + ")"; + " (currentOffset " + currentOffset + ", endOffset " + restoreEndOffset + ")"; } // for testing only below @@ -155,10 +162,6 @@ Long endOffset() { return restoreEndOffset; } - Long limitOffset() { - return restoreLimitOffset; - } - List> bufferedRecords() { return bufferedRecords; } @@ -168,10 +171,14 @@ int bufferedLimitIndex() { } } + private final static long DEFAULT_OFFSET_UPDATE_MS = 5 * 60 * 1000; // five minutes + private ChangelogReaderState state; + private final Time time; private final Logger log; private final Duration pollTime; + private final long updateOffsetIntervalMs; // 1) we keep adding partitions to restore consumer whenever new tasks are registered with the state manager; // 2) we do not unassign partitions when we switch between standbys and actives, we just pause / resume them; @@ -188,18 +195,18 @@ int bufferedLimitIndex() { // to update offset limit for standby tasks; private Consumer mainConsumer; - // the flag indicating limit offsets could be updated --- this is only needed for standby tasks that have limit - // offsets enabled - private boolean updateLimitOffset; + private long lastUpdateOffsetTime; void setMainConsumer(final Consumer consumer) { this.mainConsumer = consumer; } - public StoreChangelogReader(final StreamsConfig config, + public StoreChangelogReader(final Time time, + final StreamsConfig config, final LogContext logContext, final Consumer restoreConsumer, final StateRestoreListener stateRestoreListener) { + this.time = time; this.log = logContext.logger(StoreChangelogReader.class); this.state = ChangelogReaderState.ACTIVE_RESTORING; this.restoreConsumer = restoreConsumer; @@ -209,9 +216,11 @@ public StoreChangelogReader(final StreamsConfig config, // in order to make sure we call the main consumer#poll in time. // TODO: once both of these are moved to a separate thread this may no longer be a concern this.pollTime = Duration.ofMillis(config.getLong(StreamsConfig.POLL_MS_CONFIG)); + this.updateOffsetIntervalMs = config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG) == Long.MAX_VALUE ? + DEFAULT_OFFSET_UPDATE_MS : config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG); + this.lastUpdateOffsetTime = 0L; this.changelogs = new HashMap<>(); - this.updateLimitOffset = false; } private static String recordEndOffset(final Long endOffset) { @@ -311,7 +320,7 @@ public void register(final TopicPartition partition, final ProcessorStateManager // initializing limit offset to 0L for standby changelog to effectively disable any restoration until it is updated if (stateManager.taskType() == Task.TaskType.STANDBY && stateManager.changelogAsSource(partition)) { - changelogMetadata.restoreLimitOffset = 0L; + changelogMetadata.restoreEndOffset = 0L; } if (changelogs.putIfAbsent(partition, changelogMetadata) != null) { @@ -391,10 +400,6 @@ public void restore() { return; } - if (updateLimitOffset) { - updateLimitOffsets(); - } - final Set restoringChangelogs = restoringChangelogs(); if (!restoringChangelogs.isEmpty()) { final ConsumerRecords polledRecords; @@ -420,28 +425,30 @@ public void restore() { // small batches; this can be optimized in the future, e.g. wait longer for larger batches. restoreChangelog(changelogs.get(partition)); } + + + maybeUpdateLimitOffsetsForStandbyChangelogs(); } + } - // for standby changelogs, if there are buffered records not applicable, it means that the limit offset - // is there to prevent so, we can try to update the limit offset next time. - final Set standbyChangelogs = changelogs.values().stream() - .filter(metadata -> metadata.stateManager.taskType() == Task.TaskType.STANDBY) - .collect(Collectors.toSet()); - for (final ChangelogMetadata metadata : standbyChangelogs) { - if (!metadata.bufferedRecords().isEmpty()) { - updateLimitOffset = true; - break; + private void maybeUpdateLimitOffsetsForStandbyChangelogs() { + // for standby changelogs, if the interval has elapsed and there are buffered records not applicable, + // we can try to update the limit offset next time. + if (updateOffsetIntervalMs < time.milliseconds() - lastUpdateOffsetTime) { + final Set standbyChangelogs = changelogs.values().stream() + .filter(metadata -> metadata.stateManager.taskType() == Task.TaskType.STANDBY) + .collect(Collectors.toSet()); + for (final ChangelogMetadata metadata : standbyChangelogs) { + if (!metadata.bufferedRecords().isEmpty()) { + updateLimitOffsets(); + break; + } } } } private void bufferChangelogRecords(final ChangelogMetadata changelogMetadata, final List> records) { // update the buffered records and limit index with the fetched records - final long limitOffset = Math.min( - changelogMetadata.restoreEndOffset == null ? Long.MAX_VALUE : changelogMetadata.restoreEndOffset, - changelogMetadata.restoreLimitOffset == null ? Long.MAX_VALUE : changelogMetadata.restoreLimitOffset - ); - for (final ConsumerRecord record : records) { // filter polled records for null-keys and also possibly update buffer limit index if (record.key() == null) { @@ -450,7 +457,7 @@ private void bufferChangelogRecords(final ChangelogMetadata changelogMetadata, f } else { changelogMetadata.bufferedRecords.add(record); final long offset = record.offset(); - if (offset < limitOffset) + if (changelogMetadata.restoreEndOffset == null || offset < changelogMetadata.restoreEndOffset) changelogMetadata.bufferedLimitIndex = changelogMetadata.bufferedRecords.size(); } } @@ -517,9 +524,10 @@ private Map committedOffsetForChangelogs(final Set committedOffsets; try { // those do not have a committed offset would default to 0 - return mainConsumer.committed(partitions).entrySet().stream() + committedOffsets = mainConsumer.committed(partitions).entrySet().stream() .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue() == null ? 0L : e.getValue().offset())); } catch (final TimeoutException e) { // if it timed out we just retry next time. @@ -527,6 +535,10 @@ private Map committedOffsetForChangelogs(final Set endOffsetForChangelogs(final Set partitions) { @@ -544,7 +556,10 @@ private Map endOffsetForChangelogs(final Set committedOffsets) { @@ -568,18 +581,18 @@ private void updateLimitOffsetsForStandbyChangelogs(final Map newLimit) { throw new IllegalStateException("Offset limit should monotonically increase, but was reduced for partition " + partition + ". New limit: " + newLimit + ". Previous limit: " + previousLimit); } - metadata.restoreLimitOffset = newLimit; + metadata.restoreEndOffset = newLimit; // update the limit index for buffered records while (metadata.bufferedLimitIndex < metadata.bufferedRecords.size() && - metadata.bufferedRecords.get(metadata.bufferedLimitIndex).offset() < metadata.restoreLimitOffset) + metadata.bufferedRecords.get(metadata.bufferedLimitIndex).offset() < metadata.restoreEndOffset) metadata.bufferedLimitIndex++; } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index a1b058e97deab..4600ca1debd5d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -545,7 +545,7 @@ public static StreamThread create(final InternalTopologyBuilder builder, final Map restoreConsumerConfigs = config.getRestoreConsumerConfigs(getRestoreConsumerClientId(threadId)); final Consumer restoreConsumer = clientSupplier.getRestoreConsumer(restoreConsumerConfigs); - final StoreChangelogReader changelogReader = new StoreChangelogReader(config, logContext, restoreConsumer, userStateRestoreListener); + final StoreChangelogReader changelogReader = new StoreChangelogReader(time, config, logContext, restoreConsumer, userStateRestoreListener); final ThreadCache cache = new ThreadCache(logContext, cacheSizeBytes, streamsMetrics); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/MockChangelogReader.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/MockChangelogReader.java index bcba49a2037ef..93ebeda0e0232 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/MockChangelogReader.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/MockChangelogReader.java @@ -52,11 +52,6 @@ public void transitToUpdateStandby() { // do nothing } - @Override - public void updateLimitOffsets() { - // do nothing - } - @Override public Set completedChangelogs() { // assuming all restoring partitions are completed diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java index 5a577aa7365fa..e9784eaea10b0 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java @@ -25,6 +25,7 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.processor.StateStore; @@ -46,6 +47,7 @@ import java.util.Collection; import java.util.Collections; import java.util.Map; +import java.util.Properties; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; @@ -102,6 +104,7 @@ public static Object[] data() { private final TopicPartition tp1 = new TopicPartition("one", 0); private final TopicPartition tp2 = new TopicPartition("two", 0); private final StreamsConfig config = new StreamsConfig(StreamsTestUtils.getStreamsConfig("test-reader")); + private final MockTime time = new MockTime(); private final MockStateRestoreListener callback = new MockStateRestoreListener(); private final KafkaException kaboom = new KafkaException("KABOOM!"); private final MockStateRestoreListener exceptionCallback = new MockStateRestoreListener() { @@ -127,7 +130,7 @@ public void onRestoreEnd(final TopicPartition tp, final String store, final long }; private final MockConsumer consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST); - private final StoreChangelogReader changelogReader = new StoreChangelogReader(config, logContext, consumer, callback); + private final StoreChangelogReader changelogReader = new StoreChangelogReader(time, config, logContext, consumer, callback); @Before public void setUp() { @@ -156,7 +159,6 @@ public void shouldNotRegisterSameStoreMultipleTimes() { assertEquals(StoreChangelogReader.ChangelogState.REGISTERED, changelogReader.changelogMetadata(tp).state()); assertNull(changelogReader.changelogMetadata(tp).endOffset()); - assertNull(changelogReader.changelogMetadata(tp).limitOffset()); assertEquals(0L, changelogReader.changelogMetadata(tp).totalRestored()); assertThrows(IllegalStateException.class, () -> changelogReader.register(tp, stateManager)); @@ -182,7 +184,7 @@ public Map endOffsets(final Collection par } }; - final StoreChangelogReader changelogReader = new StoreChangelogReader(config, logContext, consumer, callback); + final StoreChangelogReader changelogReader = new StoreChangelogReader(time, config, logContext, consumer, callback); changelogReader.register(tp, stateManager); changelogReader.restore(); @@ -192,7 +194,6 @@ public Map endOffsets(final Collection par assertEquals(type == ACTIVE ? 10L : null, changelogReader.changelogMetadata(tp).endOffset()); assertEquals(0L, changelogReader.changelogMetadata(tp).totalRestored()); assertEquals(type == ACTIVE ? Collections.singleton(tp) : Collections.emptySet(), changelogReader.completedChangelogs()); - assertNull(changelogReader.changelogMetadata(tp).limitOffset()); assertEquals(10L, consumer.position(tp)); assertEquals(Collections.singleton(tp), consumer.paused()); @@ -216,7 +217,7 @@ public Map endOffsets(final Collection par } }; - final StoreChangelogReader changelogReader = new StoreChangelogReader(config, logContext, consumer, callback); + final StoreChangelogReader changelogReader = new StoreChangelogReader(time, config, logContext, consumer, callback); changelogReader.register(tp, stateManager); @@ -229,7 +230,6 @@ public Map endOffsets(final Collection par assertEquals(StoreChangelogReader.ChangelogState.RESTORING, changelogReader.changelogMetadata(tp).state()); assertEquals(0L, changelogReader.changelogMetadata(tp).totalRestored()); assertTrue(changelogReader.completedChangelogs().isEmpty()); - assertNull(changelogReader.changelogMetadata(tp).limitOffset()); assertEquals(6L, consumer.position(tp)); assertEquals(Collections.emptySet(), consumer.paused()); @@ -288,7 +288,7 @@ public Map endOffsets(final Collection par }; consumer.updateBeginningOffsets(Collections.singletonMap(tp, 5L)); - final StoreChangelogReader changelogReader = new StoreChangelogReader(config, logContext, consumer, callback); + final StoreChangelogReader changelogReader = new StoreChangelogReader(time, config, logContext, consumer, callback); changelogReader.register(tp, stateManager); @@ -300,7 +300,6 @@ public Map endOffsets(final Collection par assertEquals(StoreChangelogReader.ChangelogState.RESTORING, changelogReader.changelogMetadata(tp).state()); assertEquals(0L, changelogReader.changelogMetadata(tp).totalRestored()); - assertNull(changelogReader.changelogMetadata(tp).limitOffset()); assertEquals(5L, consumer.position(tp)); assertEquals(Collections.emptySet(), consumer.paused()); @@ -363,7 +362,7 @@ public Map endOffsets(final Collection par } }; - final StoreChangelogReader changelogReader = new StoreChangelogReader(config, logContext, consumer, callback); + final StoreChangelogReader changelogReader = new StoreChangelogReader(time, config, logContext, consumer, callback); changelogReader.register(tp, activeStateManager); changelogReader.restore(); @@ -372,7 +371,6 @@ public Map endOffsets(final Collection par assertEquals(0L, (long) changelogReader.changelogMetadata(tp).endOffset()); assertEquals(0L, changelogReader.changelogMetadata(tp).totalRestored()); assertEquals(Collections.singleton(tp), changelogReader.completedChangelogs()); - assertNull(changelogReader.changelogMetadata(tp).limitOffset()); assertEquals(6L, consumer.position(tp)); assertEquals(Collections.singleton(tp), consumer.paused()); assertEquals(tp, callback.restoreTopicPartition); @@ -403,7 +401,7 @@ public Map endOffsets(final Collection par } }; - final StoreChangelogReader changelogReader = new StoreChangelogReader(config, logContext, consumer, callback); + final StoreChangelogReader changelogReader = new StoreChangelogReader(time, config, logContext, consumer, callback); changelogReader.register(tp, activeStateManager); changelogReader.restore(); @@ -411,14 +409,12 @@ public Map endOffsets(final Collection par assertEquals(StoreChangelogReader.ChangelogState.RESTORING, changelogReader.changelogMetadata(tp).state()); assertTrue(changelogReader.completedChangelogs().isEmpty()); assertEquals(10L, (long) changelogReader.changelogMetadata(tp).endOffset()); - assertNull(changelogReader.changelogMetadata(tp).limitOffset()); clearException.set(true); changelogReader.restore(); assertEquals(StoreChangelogReader.ChangelogState.COMPLETED, changelogReader.changelogMetadata(tp).state()); assertEquals(10L, (long) changelogReader.changelogMetadata(tp).endOffset()); - assertNull(changelogReader.changelogMetadata(tp).limitOffset()); assertEquals(Collections.singleton(tp), changelogReader.completedChangelogs()); assertEquals(10L, consumer.position(tp)); } @@ -440,7 +436,7 @@ public Map endOffsets(final Collection par } }; - final StoreChangelogReader changelogReader = new StoreChangelogReader(config, logContext, consumer, callback); + final StoreChangelogReader changelogReader = new StoreChangelogReader(time, config, logContext, consumer, callback); changelogReader.register(tp, activeStateManager); @@ -471,21 +467,19 @@ public Map committed(final Set endOffsets(final Collection par } }; - final StoreChangelogReader changelogReader = new StoreChangelogReader(config, logContext, consumer, callback); + final StoreChangelogReader changelogReader = new StoreChangelogReader(time, config, logContext, consumer, callback); changelogReader.register(tp, activeStateManager); @@ -533,7 +527,7 @@ public Map endOffsets(final Collection par } }; - final StoreChangelogReader changelogReader = new StoreChangelogReader(config, logContext, consumer, callback); + final StoreChangelogReader changelogReader = new StoreChangelogReader(time, config, logContext, consumer, callback); changelogReader.setMainConsumer(consumer); changelogReader.register(tp, stateManager); @@ -541,15 +535,17 @@ public Map endOffsets(final Collection par assertEquals(type == ACTIVE ? StoreChangelogReader.ChangelogState.REGISTERED : StoreChangelogReader.ChangelogState.RESTORING, changelogReader.changelogMetadata(tp).state()); - assertNull(changelogReader.changelogMetadata(tp).endOffset()); - assertEquals(type == ACTIVE ? null : 0L, changelogReader.changelogMetadata(tp).limitOffset()); + if (type == ACTIVE) { + assertNull(changelogReader.changelogMetadata(tp).endOffset()); + } else { + assertEquals(0L, (long) changelogReader.changelogMetadata(tp).endOffset()); + } assertTrue(functionCalled.get()); changelogReader.restore(); assertEquals(StoreChangelogReader.ChangelogState.RESTORING, changelogReader.changelogMetadata(tp).state()); - assertEquals(type == ACTIVE ? 10L : null, changelogReader.changelogMetadata(tp).endOffset()); - assertEquals(type == ACTIVE ? null : 0L, changelogReader.changelogMetadata(tp).limitOffset()); + assertEquals(type == ACTIVE ? 10L : 0L, (long) changelogReader.changelogMetadata(tp).endOffset()); assertEquals(6L, consumer.position(tp)); } @@ -570,7 +566,7 @@ public Map committed(final Set(topicName, 0, 6L, "key".getBytes(), "value".getBytes())); @@ -645,7 +640,11 @@ public Map committed(final Set committed(final Set(topicName, 0, 5L, "key".getBytes(), "value".getBytes())); @@ -671,8 +669,7 @@ public Map committed(final Set committed(final Set committed(final Set(topicName, 0, 15L, "key".getBytes(), "value".getBytes())); changelogReader.restore(); - assertEquals(15L, (long) changelogReader.changelogMetadata(tp).limitOffset()); + assertEquals(15L, (long) changelogReader.changelogMetadata(tp).endOffset()); assertEquals(9L, changelogReader.changelogMetadata(tp).totalRestored()); assertEquals(1, changelogReader.changelogMetadata(tp).bufferedRecords().size()); assertEquals(0, changelogReader.changelogMetadata(tp).bufferedLimitIndex()); @@ -776,7 +793,7 @@ public Map endOffsets(final Collection par return partitions.stream().collect(Collectors.toMap(Function.identity(), partition -> 10L)); } }; - final StoreChangelogReader changelogReader = new StoreChangelogReader(config, logContext, consumer, callback); + final StoreChangelogReader changelogReader = new StoreChangelogReader(time, config, logContext, consumer, callback); assertEquals(ACTIVE_RESTORING, changelogReader.state()); changelogReader.register(tp, activeStateManager); @@ -843,7 +860,7 @@ public Map endOffsets(final Collection par return partitions.stream().collect(Collectors.toMap(Function.identity(), partition -> 10L)); } }; - final StoreChangelogReader changelogReader = new StoreChangelogReader(config, logContext, consumer, exceptionCallback); + final StoreChangelogReader changelogReader = new StoreChangelogReader(time, config, logContext, consumer, exceptionCallback); changelogReader.register(tp, activeStateManager); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java index 5c6da5c4b82e9..e16b778be7246 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java @@ -318,6 +318,7 @@ private StreamTask createStreamsTask(final StreamsConfig streamsConfig, stateDirectory, topology.storeToChangelogTopic(), new StoreChangelogReader( + new MockTime(), streamsConfig, logContext, clientSupplier.restoreConsumer, diff --git a/streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java b/streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java index 588c0172c5982..50e3de74f9a09 100644 --- a/streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java +++ b/streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java @@ -74,12 +74,16 @@ public static Properties getStreamsConfig(final Serde keyDeserializer, } public static Properties getStreamsConfig(final String applicationId) { + return getStreamsConfig(applicationId, new Properties()); + } + + public static Properties getStreamsConfig(final String applicationId, final Properties additional) { return getStreamsConfig( applicationId, "localhost:9091", Serdes.ByteArraySerde.class.getName(), Serdes.ByteArraySerde.class.getName(), - new Properties()); + additional); } public static Properties getStreamsConfig() { diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java index 78f03a6f6dd52..c7ae313698ff5 100644 --- a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java +++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java @@ -390,6 +390,7 @@ public List partitionsFor(final String topic) { stateDirectory, processorTopology.storeToChangelogTopic(), new StoreChangelogReader( + mockWallClockTime, streamsConfig, logContext, createRestoreConsumer(processorTopology.storeToChangelogTopic()),