diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ListOffsetsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/ListOffsetsResult.java index d830ef2aadc8c..5eb00deb0697a 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/ListOffsetsResult.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/ListOffsetsResult.java @@ -35,7 +35,7 @@ public class ListOffsetsResult { private final Map> futures; - ListOffsetsResult(Map> futures) { + public ListOffsetsResult(Map> futures) { this.futures = futures; } @@ -80,7 +80,7 @@ public static class ListOffsetsResultInfo { private final long timestamp; private final Optional leaderEpoch; - ListOffsetsResultInfo(long offset, long timestamp, Optional leaderEpoch) { + public ListOffsetsResultInfo(long offset, long timestamp, Optional leaderEpoch) { this.offset = offset; this.timestamp = timestamp; this.leaderEpoch = leaderEpoch; diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/OffsetSpec.java b/clients/src/main/java/org/apache/kafka/clients/admin/OffsetSpec.java index 2517985a00cde..339e9cf815e87 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/OffsetSpec.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/OffsetSpec.java @@ -23,9 +23,9 @@ */ public class OffsetSpec { - static class EarliestSpec extends OffsetSpec { } - static class LatestSpec extends OffsetSpec { } - static class TimestampSpec extends OffsetSpec { + public static class EarliestSpec extends OffsetSpec { } + public static class LatestSpec extends OffsetSpec { } + public static class TimestampSpec extends OffsetSpec { private final long timestamp; TimestampSpec(long timestamp) { diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java index e5ad5754a68e0..bf0a9e9609073 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java @@ -64,6 +64,8 @@ public class MockAdminClient extends AdminClient { new HashMap<>(); private final Map replicaMoves = new HashMap<>(); + private final Map beginningOffsets; + private final Map endOffsets; private final String clusterId; private final List> brokerLogDirs; private final List> brokerConfigs; @@ -145,8 +147,11 @@ public MockAdminClient build() { } } - public MockAdminClient(List brokers, - Node controller) { + public MockAdminClient() { + this(Collections.singletonList(Node.noNode()), Node.noNode()); + } + + public MockAdminClient(List brokers, Node controller) { this(brokers, controller, DEFAULT_CLUSTER_ID, 1, brokers.size(), Collections.nCopies(brokers.size(), DEFAULT_LOG_DIRS)); } @@ -167,6 +172,8 @@ private MockAdminClient(List brokers, for (int i = 0; i < brokers.size(); i++) { this.brokerConfigs.add(new HashMap<>()); } + this.beginningOffsets = new HashMap<>(); + this.endOffsets = new HashMap<>(); } synchronized public void controller(Node controller) { @@ -789,7 +796,24 @@ synchronized public AlterConsumerGroupOffsetsResult alterConsumerGroupOffsets(St @Override synchronized public ListOffsetsResult listOffsets(Map topicPartitionOffsets, ListOffsetsOptions options) { - throw new UnsupportedOperationException("Not implement yet"); + Map> futures = new HashMap<>(); + + for (Map.Entry entry : topicPartitionOffsets.entrySet()) { + TopicPartition tp = entry.getKey(); + OffsetSpec spec = entry.getValue(); + KafkaFutureImpl future = new KafkaFutureImpl<>(); + + if (spec instanceof OffsetSpec.TimestampSpec) + throw new UnsupportedOperationException("Not implement yet"); + else if (spec instanceof OffsetSpec.EarliestSpec) + future.complete(new ListOffsetsResult.ListOffsetsResultInfo(beginningOffsets.get(tp), -1, Optional.empty())); + else + future.complete(new ListOffsetsResult.ListOffsetsResultInfo(endOffsets.get(tp), -1, Optional.empty())); + + futures.put(tp, future); + } + + return new ListOffsetsResult(futures); } @Override @@ -805,6 +829,14 @@ public AlterClientQuotasResult alterClientQuotas(Collection newOffsets) { + beginningOffsets.putAll(newOffsets); + } + + public synchronized void updateEndOffsets(final Map newOffsets) { + endOffsets.putAll(newOffsets); + } + private final static class TopicMetadata { final boolean isInternalTopic; final List partitions; diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogRegister.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogRegister.java index 7403aad57f0db..3f5f977db7549 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogRegister.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogRegister.java @@ -22,7 +22,7 @@ /** * See {@link StoreChangelogReader}. */ -interface ChangelogRegister { +public interface ChangelogRegister { /** * Register a state store for restoration. * diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java index ad5cce0908076..c6ee4d3fbbe48 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java @@ -16,10 +16,15 @@ */ package org.apache.kafka.streams.processor.internals; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.ListOffsetsOptions; +import org.apache.kafka.clients.admin.ListOffsetsResult; +import org.apache.kafka.clients.admin.OffsetSpec; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.InvalidOffsetException; +import org.apache.kafka.common.IsolationLevel; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.TimeoutException; @@ -43,6 +48,8 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.function.Function; import java.util.stream.Collectors; import static org.apache.kafka.streams.processor.internals.ClientUtils.fetchCommittedOffsets; @@ -199,6 +206,9 @@ int bufferedLimitIndex() { // to update offset limit for standby tasks; private Consumer mainConsumer; + // the changelog reader needs the admin client to list end offsets + private final Admin adminClient; + private long lastUpdateOffsetTime; void setMainConsumer(final Consumer consumer) { @@ -208,11 +218,13 @@ void setMainConsumer(final Consumer consumer) { public StoreChangelogReader(final Time time, final StreamsConfig config, final LogContext logContext, + final Admin adminClient, final Consumer restoreConsumer, final StateRestoreListener stateRestoreListener) { this.time = time; this.log = logContext.logger(StoreChangelogReader.class); this.state = ChangelogReaderState.ACTIVE_RESTORING; + this.adminClient = adminClient; this.restoreConsumer = restoreConsumer; this.stateRestoreListener = stateRestoreListener; @@ -564,8 +576,13 @@ private Map endOffsetForChangelogs(final Set OffsetSpec.latest())), + new ListOffsetsOptions(IsolationLevel.READ_UNCOMMITTED) + ); + return result.all().get().entrySet().stream().collect( + Collectors.toMap(Map.Entry::getKey, entry -> entry.getValue().offset())); + } catch (final TimeoutException | InterruptedException | ExecutionException e) { // if timeout exception gets thrown we just give up this time and retry in the next run loop log.debug("Could not fetch all end offsets for {}, will retry in the next run loop", partitions); return Collections.emptyMap(); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index 91c483c1498b2..09be1f2f1cfc2 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -273,14 +273,12 @@ public boolean isRunning() { private volatile ThreadMetadata threadMetadata; private StreamThread.StateListener stateListener; - private final Admin adminClient; private final ChangelogReader changelogReader; - - // package-private for testing - final ConsumerRebalanceListener rebalanceListener; - final Consumer mainConsumer; - final Consumer restoreConsumer; - final InternalTopologyBuilder builder; + private final ConsumerRebalanceListener rebalanceListener; + private final Consumer mainConsumer; + private final Consumer restoreConsumer; + private final Admin adminClient; + private final InternalTopologyBuilder builder; public static StreamThread create(final InternalTopologyBuilder builder, final StreamsConfig config, @@ -309,6 +307,7 @@ public static StreamThread create(final InternalTopologyBuilder builder, time, config, logContext, + adminClient, restoreConsumer, userStateRestoreListener ); @@ -1020,4 +1019,23 @@ int currentNumIterations() { return numIterations; } + ConsumerRebalanceListener rebalanceListener() { + return rebalanceListener; + } + + Consumer mainConsumer() { + return mainConsumer; + } + + Consumer restoreConsumer() { + return restoreConsumer; + }; + + Admin adminClient() { + return adminClient; + } + + InternalTopologyBuilder internalTopologyBuilder() { + return builder; + }; } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java index 6769f4a96fcd4..ad16cff17a205 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java @@ -16,6 +16,10 @@ */ package org.apache.kafka.streams.processor.internals; +import org.apache.kafka.clients.admin.ListOffsetsOptions; +import org.apache.kafka.clients.admin.ListOffsetsResult; +import org.apache.kafka.clients.admin.MockAdminClient; +import org.apache.kafka.clients.admin.OffsetSpec; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.MockConsumer; import org.apache.kafka.clients.consumer.OffsetAndMetadata; @@ -46,7 +50,6 @@ import org.junit.runners.Parameterized; import java.time.Duration; -import java.util.Collection; import java.util.Collections; import java.util.Map; import java.util.Properties; @@ -129,8 +132,9 @@ public void onRestoreEnd(final TopicPartition tp, final String store, final long }; private final MockConsumer consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST); + private final MockAdminClient adminClient = new MockAdminClient(); private final StoreChangelogReader changelogReader = - new StoreChangelogReader(time, config, logContext, consumer, callback); + new StoreChangelogReader(time, config, logContext, adminClient, consumer, callback); @Before public void setUp() { @@ -185,15 +189,10 @@ public void shouldInitializeChangelogAndCheckForCompletion() { EasyMock.expect(storeMetadata.offset()).andReturn(9L).anyTimes(); EasyMock.replay(stateManager, storeMetadata, store); - final MockConsumer consumer = new MockConsumer(OffsetResetStrategy.EARLIEST) { - @Override - public Map endOffsets(final Collection partitions) { - return partitions.stream().collect(Collectors.toMap(Function.identity(), partition -> 10L)); - } - }; + adminClient.updateEndOffsets(Collections.singletonMap(tp, 10L)); final StoreChangelogReader changelogReader = - new StoreChangelogReader(time, config, logContext, consumer, callback); + new StoreChangelogReader(time, config, logContext, adminClient, consumer, callback); changelogReader.register(tp, stateManager); changelogReader.restore(); @@ -226,16 +225,11 @@ public void shouldPollWithRightTimeout() { EasyMock.expect(storeMetadata.offset()).andReturn(null).andReturn(9L).anyTimes(); EasyMock.replay(stateManager, storeMetadata, store); - final MockConsumer consumer = new MockConsumer(OffsetResetStrategy.EARLIEST) { - @Override - public Map endOffsets(final Collection partitions) { - return partitions.stream().collect(Collectors.toMap(Function.identity(), partition -> 11L)); - } - }; consumer.updateBeginningOffsets(Collections.singletonMap(tp, 5L)); + adminClient.updateEndOffsets(Collections.singletonMap(tp, 11L)); final StoreChangelogReader changelogReader = - new StoreChangelogReader(time, config, logContext, consumer, callback); + new StoreChangelogReader(time, config, logContext, adminClient, consumer, callback); changelogReader.register(tp, stateManager); @@ -257,15 +251,10 @@ public void shouldRestoreFromPositionAndCheckForCompletion() { EasyMock.expect(storeMetadata.offset()).andReturn(5L).anyTimes(); EasyMock.replay(stateManager, storeMetadata, store); - final MockConsumer consumer = new MockConsumer(OffsetResetStrategy.EARLIEST) { - @Override - public Map endOffsets(final Collection partitions) { - return partitions.stream().collect(Collectors.toMap(Function.identity(), partition -> 10L)); - } - }; + adminClient.updateEndOffsets(Collections.singletonMap(tp, 10L)); final StoreChangelogReader changelogReader = - new StoreChangelogReader(time, config, logContext, consumer, callback); + new StoreChangelogReader(time, config, logContext, adminClient, consumer, callback); changelogReader.register(tp, stateManager); @@ -328,16 +317,11 @@ public void shouldRestoreFromBeginningAndCheckCompletion() { EasyMock.expect(storeMetadata.offset()).andReturn(null).andReturn(9L).anyTimes(); EasyMock.replay(stateManager, storeMetadata, store); - final MockConsumer consumer = new MockConsumer(OffsetResetStrategy.EARLIEST) { - @Override - public Map endOffsets(final Collection partitions) { - return partitions.stream().collect(Collectors.toMap(Function.identity(), partition -> 11L)); - } - }; consumer.updateBeginningOffsets(Collections.singletonMap(tp, 5L)); + adminClient.updateEndOffsets(Collections.singletonMap(tp, 11L)); final StoreChangelogReader changelogReader = - new StoreChangelogReader(time, config, logContext, consumer, callback); + new StoreChangelogReader(time, config, logContext, adminClient, consumer, callback); changelogReader.register(tp, stateManager); @@ -404,15 +388,10 @@ public void shouldCheckCompletionIfPositionLargerThanEndOffset() { EasyMock.expect(storeMetadata.offset()).andReturn(5L).anyTimes(); EasyMock.replay(activeStateManager, storeMetadata, store); - final MockConsumer consumer = new MockConsumer(OffsetResetStrategy.EARLIEST) { - @Override - public Map endOffsets(final Collection partitions) { - return partitions.stream().collect(Collectors.toMap(Function.identity(), partition -> 0L)); - } - }; + adminClient.updateEndOffsets(Collections.singletonMap(tp, 0L)); final StoreChangelogReader changelogReader = - new StoreChangelogReader(time, config, logContext, consumer, callback); + new StoreChangelogReader(time, config, logContext, adminClient, consumer, callback); changelogReader.register(tp, activeStateManager); changelogReader.restore(); @@ -444,15 +423,12 @@ public long position(final TopicPartition partition) { throw new TimeoutException("KABOOM!"); } } - - @Override - public Map endOffsets(final Collection partitions) { - return partitions.stream().collect(Collectors.toMap(Function.identity(), partition -> 10L)); - } }; + adminClient.updateEndOffsets(Collections.singletonMap(tp, 10L)); + final StoreChangelogReader changelogReader = - new StoreChangelogReader(time, config, logContext, consumer, callback); + new StoreChangelogReader(time, config, logContext, adminClient, consumer, callback); changelogReader.register(tp, activeStateManager); changelogReader.restore(); @@ -480,15 +456,12 @@ public void shouldThrowIfPositionFail() { public long position(final TopicPartition partition) { throw kaboom; } - - @Override - public Map endOffsets(final Collection partitions) { - return partitions.stream().collect(Collectors.toMap(Function.identity(), partition -> 10L)); - } }; + adminClient.updateEndOffsets(Collections.singletonMap(tp, 10L)); + final StoreChangelogReader changelogReader = - new StoreChangelogReader(time, config, logContext, consumer, callback); + new StoreChangelogReader(time, config, logContext, adminClient, consumer, callback); changelogReader.register(tp, activeStateManager); @@ -502,17 +475,22 @@ public void shouldRequestEndOffsetsAndHandleTimeoutException() { EasyMock.replay(activeStateManager, storeMetadata, store); final AtomicBoolean functionCalled = new AtomicBoolean(false); - final MockConsumer consumer = new MockConsumer(OffsetResetStrategy.EARLIEST) { + + final MockAdminClient adminClient = new MockAdminClient() { @Override - public Map endOffsets(final Collection partitions) { + public ListOffsetsResult listOffsets(final Map topicPartitionOffsets, + final ListOffsetsOptions options) { if (functionCalled.get()) { - return partitions.stream().collect(Collectors.toMap(Function.identity(), partition -> 10L)); + return super.listOffsets(topicPartitionOffsets, options); } else { functionCalled.set(true); throw new TimeoutException("KABOOM!"); } } + }; + adminClient.updateEndOffsets(Collections.singletonMap(tp, 10L)); + final MockConsumer consumer = new MockConsumer(OffsetResetStrategy.EARLIEST) { @Override public Map committed(final Set partitions) { throw new AssertionError("Should not trigger this function"); @@ -520,7 +498,7 @@ public Map committed(final Set consumer = new MockConsumer(OffsetResetStrategy.EARLIEST) { + final MockAdminClient adminClient = new MockAdminClient() { @Override - public Map endOffsets(final Collection partitions) { + public ListOffsetsResult listOffsets(final Map topicPartitionOffsets, + final ListOffsetsOptions options) { throw kaboom; } }; + adminClient.updateEndOffsets(Collections.singletonMap(tp, 0L)); final StoreChangelogReader changelogReader = - new StoreChangelogReader(time, config, logContext, consumer, callback); + new StoreChangelogReader(time, config, logContext, adminClient, consumer, callback); changelogReader.register(tp, activeStateManager); @@ -576,15 +556,12 @@ public Map committed(final Set endOffsets(final Collection partitions) { - return partitions.stream().collect(Collectors.toMap(Function.identity(), partition -> 20L)); - } }; + adminClient.updateEndOffsets(Collections.singletonMap(tp, 20L)); + final StoreChangelogReader changelogReader = - new StoreChangelogReader(time, config, logContext, consumer, callback); + new StoreChangelogReader(time, config, logContext, adminClient, consumer, callback); changelogReader.setMainConsumer(consumer); changelogReader.register(tp, stateManager); @@ -617,18 +594,16 @@ public void shouldThrowIfCommittedOffsetsFail() { EasyMock.replay(stateManager, storeMetadata, store); final MockConsumer consumer = new MockConsumer(OffsetResetStrategy.EARLIEST) { - @Override - public Map endOffsets(final Collection partitions) { - return partitions.stream().collect(Collectors.toMap(Function.identity(), partition -> 10L)); - } - @Override public Map committed(final Set partitions) { throw kaboom; } }; + + adminClient.updateEndOffsets(Collections.singletonMap(tp, 10L)); + final StoreChangelogReader changelogReader = - new StoreChangelogReader(time, config, logContext, consumer, callback); + new StoreChangelogReader(time, config, logContext, adminClient, consumer, callback); changelogReader.setMainConsumer(consumer); changelogReader.register(tp, stateManager); @@ -648,7 +623,7 @@ public void unsubscribe() { } }; final StoreChangelogReader changelogReader = - new StoreChangelogReader(time, config, logContext, consumer, callback); + new StoreChangelogReader(time, config, logContext, adminClient, consumer, callback); final StreamsException thrown = assertThrows(StreamsException.class, changelogReader::clear); assertEquals(kaboom, thrown.getCause()); @@ -705,7 +680,7 @@ public Map committed(final Set committed(final Set consumer = new MockConsumer(OffsetResetStrategy.EARLIEST) { - @Override - public Map endOffsets(final Collection partitions) { - return partitions.stream().collect(Collectors.toMap(Function.identity(), partition -> 10L)); - } - }; - final StoreChangelogReader changelogReader = new StoreChangelogReader(time, config, logContext, consumer, callback); + adminClient.updateEndOffsets(Collections.singletonMap(tp, 10L)); + adminClient.updateEndOffsets(Collections.singletonMap(tp1, 10L)); + adminClient.updateEndOffsets(Collections.singletonMap(tp2, 10L)); + final StoreChangelogReader changelogReader = new StoreChangelogReader(time, config, logContext, adminClient, consumer, callback); assertEquals(ACTIVE_RESTORING, changelogReader.state()); changelogReader.register(tp, activeStateManager); @@ -983,14 +955,10 @@ public void shouldThrowIfRestoreCallbackThrows() { EasyMock.expect(storeMetadata.offset()).andReturn(5L).anyTimes(); EasyMock.replay(activeStateManager, storeMetadata, store); - final MockConsumer consumer = new MockConsumer(OffsetResetStrategy.EARLIEST) { - @Override - public Map endOffsets(final Collection partitions) { - return partitions.stream().collect(Collectors.toMap(Function.identity(), partition -> 10L)); - } - }; + adminClient.updateEndOffsets(Collections.singletonMap(tp, 10L)); + final StoreChangelogReader changelogReader = - new StoreChangelogReader(time, config, logContext, consumer, exceptionCallback); + new StoreChangelogReader(time, config, logContext, adminClient, consumer, exceptionCallback); changelogReader.register(tp, activeStateManager); @@ -1052,6 +1020,7 @@ private void assignPartition(final long messages, null))); consumer.updateBeginningOffsets(Collections.singletonMap(topicPartition, 0L)); consumer.updateEndOffsets(Collections.singletonMap(topicPartition, Math.max(0, messages) + 1)); + adminClient.updateEndOffsets(Collections.singletonMap(topicPartition, Math.max(0, messages) + 1)); consumer.assign(Collections.singletonList(topicPartition)); } } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java index 6b15cfc550809..c17dd69305c67 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java @@ -252,7 +252,7 @@ public void shouldChangeStateInRebalanceListener() { thread.setStateListener(stateListener); assertEquals(thread.state(), StreamThread.State.CREATED); - final ConsumerRebalanceListener rebalanceListener = thread.rebalanceListener; + final ConsumerRebalanceListener rebalanceListener = thread.rebalanceListener(); final List revokedPartitions; final List assignedPartitions; @@ -267,7 +267,7 @@ public void shouldChangeStateInRebalanceListener() { // assign single partition assignedPartitions = Collections.singletonList(t1p1); - final MockConsumer mockConsumer = (MockConsumer) thread.mainConsumer; + final MockConsumer mockConsumer = (MockConsumer) thread.mainConsumer(); mockConsumer.assign(assignedPartitions); mockConsumer.updateBeginningOffsets(Collections.singletonMap(t1p1, 0L)); rebalanceListener.onPartitionsAssigned(assignedPartitions); @@ -582,10 +582,10 @@ public void shouldRespectNumIterationsInMainLoop() { thread.taskManager().handleAssignment(Collections.singletonMap(task1, assignedPartitions), emptyMap()); - final MockConsumer mockConsumer = (MockConsumer) thread.mainConsumer; + final MockConsumer mockConsumer = (MockConsumer) thread.mainConsumer(); mockConsumer.assign(Collections.singleton(t1p1)); mockConsumer.updateBeginningOffsets(Collections.singletonMap(t1p1, 0L)); - thread.rebalanceListener.onPartitionsAssigned(assignedPartitions); + thread.rebalanceListener().onPartitionsAssigned(assignedPartitions); thread.runOnce(); // processed one record, punctuated after the first record, and hence num.iterations is still 1 @@ -751,7 +751,7 @@ public void shouldInjectSharedProducerForAllTasksUsingClientSupplierOnCreateIfEo final StreamThread thread = createStreamThread(CLIENT_ID, config, false); thread.setState(StreamThread.State.STARTING); - thread.rebalanceListener.onPartitionsRevoked(Collections.emptyList()); + thread.rebalanceListener().onPartitionsRevoked(Collections.emptyList()); final Map> activeTasks = new HashMap<>(); final List assignedPartitions = new ArrayList<>(); @@ -764,21 +764,21 @@ public void shouldInjectSharedProducerForAllTasksUsingClientSupplierOnCreateIfEo thread.taskManager().handleAssignment(activeTasks, emptyMap()); - final MockConsumer mockConsumer = (MockConsumer) thread.mainConsumer; + final MockConsumer mockConsumer = (MockConsumer) thread.mainConsumer(); mockConsumer.assign(assignedPartitions); final Map beginOffsets = new HashMap<>(); beginOffsets.put(t1p1, 0L); beginOffsets.put(t1p2, 0L); mockConsumer.updateBeginningOffsets(beginOffsets); - thread.rebalanceListener.onPartitionsAssigned(new HashSet<>(assignedPartitions)); + thread.rebalanceListener().onPartitionsAssigned(new HashSet<>(assignedPartitions)); assertEquals(1, clientSupplier.producers.size()); final Producer globalProducer = clientSupplier.producers.get(0); for (final Task task : thread.activeTasks()) { assertSame(globalProducer, ((RecordCollectorImpl) ((StreamTask) task).recordCollector()).producer()); } - assertSame(clientSupplier.consumer, thread.mainConsumer); - assertSame(clientSupplier.restoreConsumer, thread.restoreConsumer); + assertSame(clientSupplier.consumer, thread.mainConsumer()); + assertSame(clientSupplier.restoreConsumer, thread.restoreConsumer()); } @Test @@ -788,7 +788,7 @@ public void shouldInjectProducerPerTaskUsingClientSupplierOnCreateIfEosEnable() final StreamThread thread = createStreamThread(CLIENT_ID, new StreamsConfig(configProps(true)), true); thread.setState(StreamThread.State.STARTING); - thread.rebalanceListener.onPartitionsRevoked(Collections.emptyList()); + thread.rebalanceListener().onPartitionsRevoked(Collections.emptyList()); final Map> activeTasks = new HashMap<>(); final List assignedPartitions = new ArrayList<>(); @@ -801,19 +801,19 @@ public void shouldInjectProducerPerTaskUsingClientSupplierOnCreateIfEosEnable() thread.taskManager().handleAssignment(activeTasks, emptyMap()); - final MockConsumer mockConsumer = (MockConsumer) thread.mainConsumer; + final MockConsumer mockConsumer = (MockConsumer) thread.mainConsumer(); mockConsumer.assign(assignedPartitions); final Map beginOffsets = new HashMap<>(); beginOffsets.put(t1p1, 0L); beginOffsets.put(t1p2, 0L); mockConsumer.updateBeginningOffsets(beginOffsets); - thread.rebalanceListener.onPartitionsAssigned(new HashSet<>(assignedPartitions)); + thread.rebalanceListener().onPartitionsAssigned(new HashSet<>(assignedPartitions)); thread.runOnce(); assertEquals(thread.activeTasks().size(), clientSupplier.producers.size()); - assertSame(clientSupplier.consumer, thread.mainConsumer); - assertSame(clientSupplier.restoreConsumer, thread.restoreConsumer); + assertSame(clientSupplier.consumer, thread.mainConsumer()); + assertSame(clientSupplier.restoreConsumer, thread.restoreConsumer()); } @Test @@ -828,7 +828,7 @@ public void shouldOnlyCompleteShutdownAfterRebalanceNotInProgress() throws Inter 10 * 1000, "Thread never started."); - thread.rebalanceListener.onPartitionsRevoked(Collections.emptyList()); + thread.rebalanceListener().onPartitionsRevoked(Collections.emptyList()); thread.taskManager().handleRebalanceStart(Collections.singleton(topic1)); final Map> activeTasks = new HashMap<>(); @@ -852,7 +852,7 @@ public void shouldOnlyCompleteShutdownAfterRebalanceNotInProgress() throws Inter assertEquals(Utils.mkSet(task1, task2), thread.taskManager().activeTaskIds()); assertEquals(StreamThread.State.PENDING_SHUTDOWN, thread.state()); - thread.rebalanceListener.onPartitionsAssigned(assignedPartitions); + thread.rebalanceListener().onPartitionsAssigned(assignedPartitions); TestUtils.waitForCondition( () -> thread.state() == StreamThread.State.DEAD, @@ -873,7 +873,7 @@ public void shouldCloseAllTaskProducersOnCloseIfEosEnabled() throws InterruptedE 10 * 1000, "Thread never started."); - thread.rebalanceListener.onPartitionsRevoked(Collections.emptyList()); + thread.rebalanceListener().onPartitionsRevoked(Collections.emptyList()); final Map> activeTasks = new HashMap<>(); final List assignedPartitions = new ArrayList<>(); @@ -885,7 +885,7 @@ public void shouldCloseAllTaskProducersOnCloseIfEosEnabled() throws InterruptedE activeTasks.put(task2, Collections.singleton(t1p2)); thread.taskManager().handleAssignment(activeTasks, emptyMap()); - thread.rebalanceListener.onPartitionsAssigned(assignedPartitions); + thread.rebalanceListener().onPartitionsAssigned(assignedPartitions); thread.shutdown(); TestUtils.waitForCondition( @@ -1064,7 +1064,7 @@ public void shouldNotThrowWhenStandbyTasksAssignedAndNoStateStoresForTopology() final StreamThread thread = createStreamThread(CLIENT_ID, config, false); thread.setState(StreamThread.State.STARTING); - thread.rebalanceListener.onPartitionsRevoked(Collections.emptyList()); + thread.rebalanceListener().onPartitionsRevoked(Collections.emptyList()); final Map> standbyTasks = new HashMap<>(); @@ -1073,7 +1073,7 @@ public void shouldNotThrowWhenStandbyTasksAssignedAndNoStateStoresForTopology() thread.taskManager().handleAssignment(emptyMap(), standbyTasks); - thread.rebalanceListener.onPartitionsAssigned(Collections.emptyList()); + thread.rebalanceListener().onPartitionsAssigned(Collections.emptyList()); } @Test @@ -1088,7 +1088,7 @@ public void shouldNotCloseTaskAndRemoveFromTaskManagerIfProducerWasFencedWhilePr consumer.updatePartitions(topic1, Collections.singletonList(new PartitionInfo(topic1, 1, null, null, null))); thread.setState(StreamThread.State.STARTING); - thread.rebalanceListener.onPartitionsRevoked(Collections.emptySet()); + thread.rebalanceListener().onPartitionsRevoked(Collections.emptySet()); final Map> activeTasks = new HashMap<>(); final List assignedPartitions = new ArrayList<>(); @@ -1099,10 +1099,10 @@ public void shouldNotCloseTaskAndRemoveFromTaskManagerIfProducerWasFencedWhilePr thread.taskManager().handleAssignment(activeTasks, emptyMap()); - final MockConsumer mockConsumer = (MockConsumer) thread.mainConsumer; + final MockConsumer mockConsumer = (MockConsumer) thread.mainConsumer(); mockConsumer.assign(assignedPartitions); mockConsumer.updateBeginningOffsets(Collections.singletonMap(t1p1, 0L)); - thread.rebalanceListener.onPartitionsAssigned(assignedPartitions); + thread.rebalanceListener().onPartitionsAssigned(assignedPartitions); thread.runOnce(); assertThat(thread.activeTasks().size(), equalTo(1)); @@ -1146,7 +1146,7 @@ public void shouldNotCloseTaskAndRemoveFromTaskManagerIfProducerGotFencedInCommi internalTopologyBuilder.addSink("out", "output", null, null, null, "name"); thread.setState(StreamThread.State.STARTING); - thread.rebalanceListener.onPartitionsRevoked(Collections.emptySet()); + thread.rebalanceListener().onPartitionsRevoked(Collections.emptySet()); final Map> activeTasks = new HashMap<>(); final List assignedPartitions = new ArrayList<>(); @@ -1157,10 +1157,10 @@ public void shouldNotCloseTaskAndRemoveFromTaskManagerIfProducerGotFencedInCommi thread.taskManager().handleAssignment(activeTasks, emptyMap()); - final MockConsumer mockConsumer = (MockConsumer) thread.mainConsumer; + final MockConsumer mockConsumer = (MockConsumer) thread.mainConsumer(); mockConsumer.assign(assignedPartitions); mockConsumer.updateBeginningOffsets(Collections.singletonMap(t1p1, 0L)); - thread.rebalanceListener.onPartitionsAssigned(assignedPartitions); + thread.rebalanceListener().onPartitionsAssigned(assignedPartitions); thread.runOnce(); @@ -1171,7 +1171,7 @@ public void shouldNotCloseTaskAndRemoveFromTaskManagerIfProducerGotFencedInCommi thread.runOnce(); clientSupplier.producers.get(0).commitTransactionException = new ProducerFencedException("Producer is fenced"); - assertThrows(TaskMigratedException.class, () -> thread.rebalanceListener.onPartitionsRevoked(assignedPartitions)); + assertThrows(TaskMigratedException.class, () -> thread.rebalanceListener().onPartitionsRevoked(assignedPartitions)); assertFalse(clientSupplier.producers.get(0).transactionCommitted()); assertFalse(clientSupplier.producers.get(0).closed()); assertEquals(1, thread.activeTasks().size()); @@ -1189,7 +1189,7 @@ public void shouldNotCloseTaskAndRemoveFromTaskManagerIfProducerGotFencedInCommi consumer.updatePartitions(topic1, Collections.singletonList(new PartitionInfo(topic1, 1, null, null, null))); thread.setState(StreamThread.State.STARTING); - thread.rebalanceListener.onPartitionsRevoked(Collections.emptySet()); + thread.rebalanceListener().onPartitionsRevoked(Collections.emptySet()); final Map> activeTasks = new HashMap<>(); final List assignedPartitions = new ArrayList<>(); @@ -1200,10 +1200,10 @@ public void shouldNotCloseTaskAndRemoveFromTaskManagerIfProducerGotFencedInCommi thread.taskManager().handleAssignment(activeTasks, emptyMap()); - final MockConsumer mockConsumer = (MockConsumer) thread.mainConsumer; + final MockConsumer mockConsumer = (MockConsumer) thread.mainConsumer(); mockConsumer.assign(assignedPartitions); mockConsumer.updateBeginningOffsets(Collections.singletonMap(t1p1, 0L)); - thread.rebalanceListener.onPartitionsAssigned(assignedPartitions); + thread.rebalanceListener().onPartitionsAssigned(assignedPartitions); thread.runOnce(); assertThat(thread.activeTasks().size(), equalTo(1)); @@ -1237,7 +1237,7 @@ public void shouldNotCloseTaskProducerWhenSuspending() { internalTopologyBuilder.addSink("out", "output", null, null, null, "name"); thread.setState(StreamThread.State.STARTING); - thread.rebalanceListener.onPartitionsRevoked(Collections.emptySet()); + thread.rebalanceListener().onPartitionsRevoked(Collections.emptySet()); final Map> activeTasks = new HashMap<>(); final List assignedPartitions = new ArrayList<>(); @@ -1248,10 +1248,10 @@ public void shouldNotCloseTaskProducerWhenSuspending() { thread.taskManager().handleAssignment(activeTasks, emptyMap()); - final MockConsumer mockConsumer = (MockConsumer) thread.mainConsumer; + final MockConsumer mockConsumer = (MockConsumer) thread.mainConsumer(); mockConsumer.assign(assignedPartitions); mockConsumer.updateBeginningOffsets(Collections.singletonMap(t1p1, 0L)); - thread.rebalanceListener.onPartitionsAssigned(assignedPartitions); + thread.rebalanceListener().onPartitionsAssigned(assignedPartitions); thread.runOnce(); @@ -1261,7 +1261,7 @@ public void shouldNotCloseTaskProducerWhenSuspending() { addRecord(mockConsumer, 0L); thread.runOnce(); - thread.rebalanceListener.onPartitionsRevoked(assignedPartitions); + thread.rebalanceListener().onPartitionsRevoked(assignedPartitions); assertTrue(clientSupplier.producers.get(0).transactionCommitted()); assertFalse(clientSupplier.producers.get(0).closed()); assertEquals(1, thread.activeTasks().size()); @@ -1298,7 +1298,7 @@ public void shouldReturnActiveTaskMetadataWhileRunningState() { ); thread.setState(StreamThread.State.STARTING); - thread.rebalanceListener.onPartitionsRevoked(Collections.emptySet()); + thread.rebalanceListener().onPartitionsRevoked(Collections.emptySet()); final Map> activeTasks = new HashMap<>(); final List assignedPartitions = new ArrayList<>(); @@ -1309,10 +1309,10 @@ public void shouldReturnActiveTaskMetadataWhileRunningState() { thread.taskManager().handleAssignment(activeTasks, emptyMap()); - final MockConsumer mockConsumer = (MockConsumer) thread.mainConsumer; + final MockConsumer mockConsumer = (MockConsumer) thread.mainConsumer(); mockConsumer.assign(assignedPartitions); mockConsumer.updateBeginningOffsets(Collections.singletonMap(t1p1, 0L)); - thread.rebalanceListener.onPartitionsAssigned(assignedPartitions); + thread.rebalanceListener().onPartitionsAssigned(assignedPartitions); thread.runOnce(); @@ -1356,7 +1356,7 @@ public void shouldReturnStandbyTaskMetadataWhileRunningState() { restoreConsumer.updateBeginningOffsets(offsets); thread.setState(StreamThread.State.STARTING); - thread.rebalanceListener.onPartitionsRevoked(Collections.emptySet()); + thread.rebalanceListener().onPartitionsRevoked(Collections.emptySet()); final Map> standbyTasks = new HashMap<>(); @@ -1365,7 +1365,7 @@ public void shouldReturnStandbyTaskMetadataWhileRunningState() { thread.taskManager().handleAssignment(emptyMap(), standbyTasks); - thread.rebalanceListener.onPartitionsAssigned(Collections.emptyList()); + thread.rebalanceListener().onPartitionsAssigned(Collections.emptyList()); thread.runOnce(); @@ -1408,7 +1408,7 @@ public void shouldUpdateStandbyTask() throws Exception { checkpoint.write(Collections.singletonMap(partition2, 5L)); thread.setState(StreamThread.State.STARTING); - thread.rebalanceListener.onPartitionsRevoked(Collections.emptySet()); + thread.rebalanceListener().onPartitionsRevoked(Collections.emptySet()); final Map> standbyTasks = new HashMap<>(); @@ -1419,7 +1419,7 @@ public void shouldUpdateStandbyTask() throws Exception { thread.taskManager().handleAssignment(emptyMap(), standbyTasks); thread.taskManager().tryToCompleteRestoration(); - thread.rebalanceListener.onPartitionsAssigned(Collections.emptyList()); + thread.rebalanceListener().onPartitionsAssigned(Collections.emptyList()); thread.runOnce(); @@ -1505,7 +1505,7 @@ public void close() {} final StreamThread thread = createStreamThread(CLIENT_ID, config, false); thread.setState(StreamThread.State.STARTING); - thread.rebalanceListener.onPartitionsRevoked(Collections.emptySet()); + thread.rebalanceListener().onPartitionsRevoked(Collections.emptySet()); final List assignedPartitions = new ArrayList<>(); final Map> activeTasks = new HashMap<>(); @@ -1518,7 +1518,7 @@ public void close() {} clientSupplier.consumer.assign(assignedPartitions); clientSupplier.consumer.updateBeginningOffsets(Collections.singletonMap(t1p1, 0L)); - thread.rebalanceListener.onPartitionsAssigned(assignedPartitions); + thread.rebalanceListener().onPartitionsAssigned(assignedPartitions); thread.runOnce(); @@ -1600,7 +1600,7 @@ public void shouldAlwaysReturnEmptyTasksMetadataWhileRebalancingStateAndTasksNot final List assignedPartitions = new ArrayList<>(); thread.setState(StreamThread.State.STARTING); - thread.rebalanceListener.onPartitionsRevoked(assignedPartitions); + thread.rebalanceListener().onPartitionsRevoked(assignedPartitions); assertThreadMetadataHasEmptyTasksWithState(thread.threadMetadata(), StreamThread.State.PARTITIONS_REVOKED); final Map> activeTasks = new HashMap<>(); @@ -1613,7 +1613,7 @@ public void shouldAlwaysReturnEmptyTasksMetadataWhileRebalancingStateAndTasksNot thread.taskManager().handleAssignment(activeTasks, standbyTasks); - thread.rebalanceListener.onPartitionsAssigned(assignedPartitions); + thread.rebalanceListener().onPartitionsAssigned(assignedPartitions); assertThreadMetadataHasEmptyTasksWithState(thread.threadMetadata(), StreamThread.State.PARTITIONS_ASSIGNED); } @@ -1632,8 +1632,9 @@ public void shouldRecoverFromInvalidOffsetExceptionOnRestoreAndFinishRestore() t internalStreamsBuilder.buildAndOptimizeTopology(); final StreamThread thread = createStreamThread("clientId", config, false); - final MockConsumer mockConsumer = (MockConsumer) thread.mainConsumer; - final MockConsumer mockRestoreConsumer = (MockConsumer) thread.restoreConsumer; + final MockConsumer mockConsumer = (MockConsumer) thread.mainConsumer(); + final MockConsumer mockRestoreConsumer = (MockConsumer) thread.restoreConsumer(); + final MockAdminClient mockAdminClient = (MockAdminClient) thread.adminClient(); final TopicPartition topicPartition = new TopicPartition("topic", 0); final Set topicPartitionSet = Collections.singleton(topicPartition); @@ -1674,11 +1675,11 @@ public void shouldRecoverFromInvalidOffsetExceptionOnRestoreAndFinishRestore() t final TopicPartition changelogPartition = new TopicPartition("stream-thread-test-count-changelog", 0); final Set changelogPartitionSet = Collections.singleton(changelogPartition); mockRestoreConsumer.updateBeginningOffsets(Collections.singletonMap(changelogPartition, 0L)); - mockRestoreConsumer.updateEndOffsets(Collections.singletonMap(changelogPartition, 2L)); + mockAdminClient.updateEndOffsets(Collections.singletonMap(changelogPartition, 2L)); mockConsumer.schedulePollTask(() -> { thread.setState(StreamThread.State.PARTITIONS_REVOKED); - thread.rebalanceListener.onPartitionsAssigned(topicPartitionSet); + thread.rebalanceListener().onPartitionsAssigned(topicPartitionSet); }); try { @@ -1686,7 +1687,7 @@ public void shouldRecoverFromInvalidOffsetExceptionOnRestoreAndFinishRestore() t TestUtils.waitForCondition( () -> mockRestoreConsumer.assignment().size() == 1, - "Never restore first record"); + "Never get the assignment"); mockRestoreConsumer.addRecord(new ConsumerRecord<>( "stream-thread-test-count-changelog", @@ -1771,10 +1772,10 @@ private void shouldLogAndRecordSkippedMetricForDeserializationException(final St Collections.singletonMap(task1, assignedPartitions), emptyMap()); - final MockConsumer mockConsumer = (MockConsumer) thread.mainConsumer; + final MockConsumer mockConsumer = (MockConsumer) thread.mainConsumer(); mockConsumer.assign(Collections.singleton(t1p1)); mockConsumer.updateBeginningOffsets(Collections.singletonMap(t1p1, 0L)); - thread.rebalanceListener.onPartitionsAssigned(assignedPartitions); + thread.rebalanceListener().onPartitionsAssigned(assignedPartitions); thread.runOnce(); if (StreamsConfig.METRICS_0100_TO_24.equals(builtInMetricsVersion)) { @@ -1863,7 +1864,7 @@ public void shouldThrowTaskMigratedExceptionHandlingTaskLost() { consumer.schedulePollTask(() -> { thread.setState(StreamThread.State.PARTITIONS_REVOKED); - thread.rebalanceListener.onPartitionsLost(assignedPartitions); + thread.rebalanceListener().onPartitionsLost(assignedPartitions); }); thread.setState(StreamThread.State.STARTING); @@ -1906,7 +1907,7 @@ public void shouldThrowTaskMigratedExceptionHandlingRevocation() { consumer.schedulePollTask(() -> { thread.setState(StreamThread.State.PARTITIONS_REVOKED); - thread.rebalanceListener.onPartitionsRevoked(assignedPartitions); + thread.rebalanceListener().onPartitionsRevoked(assignedPartitions); }); thread.setState(StreamThread.State.STARTING); @@ -2108,10 +2109,10 @@ private void shouldLogAndRecordSkippedRecordsForInvalidTimestamps(final String b assignedPartitions), emptyMap()); - final MockConsumer mockConsumer = (MockConsumer) thread.mainConsumer; + final MockConsumer mockConsumer = (MockConsumer) thread.mainConsumer(); mockConsumer.assign(Collections.singleton(t1p1)); mockConsumer.updateBeginningOffsets(Collections.singletonMap(t1p1, 0L)); - thread.rebalanceListener.onPartitionsAssigned(assignedPartitions); + thread.rebalanceListener().onPartitionsAssigned(assignedPartitions); thread.runOnce(); final MetricName skippedTotalMetric = metrics.metricName( diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java index a171a4618f1b6..9dac5341d150d 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java @@ -135,8 +135,8 @@ public void before() { final StreamsConfig streamsConfig = new StreamsConfig(properties); final MockClientSupplier clientSupplier = new MockClientSupplier(); - configureRestoreConsumer(clientSupplier, "applicationId-kv-store-changelog"); - configureRestoreConsumer(clientSupplier, "applicationId-window-store-changelog"); + configureClients(clientSupplier, "applicationId-kv-store-changelog"); + configureClients(clientSupplier, "applicationId-window-store-changelog"); final InternalTopologyBuilder internalTopologyBuilder = topology.getInternalBuilder(applicationId); final ProcessorTopology processorTopology = internalTopologyBuilder.buildTopology(); @@ -364,6 +364,7 @@ private StreamTask createStreamsTask(final StreamsConfig streamsConfig, new MockTime(), streamsConfig, logContext, + clientSupplier.adminClient, clientSupplier.restoreConsumer, new MockStateRestoreListener()), topology.storeToChangelogTopic(), partitions); @@ -414,8 +415,7 @@ private void mockThread(final boolean initialized) { EasyMock.replay(threadMock); } - private void configureRestoreConsumer(final MockClientSupplier clientSupplier, - final String topic) { + private void configureClients(final MockClientSupplier clientSupplier, final String topic) { final List partitions = Arrays.asList( new PartitionInfo(topic, 0, null, null, null), new PartitionInfo(topic, 1, null, null, null) @@ -432,5 +432,8 @@ private void configureRestoreConsumer(final MockClientSupplier clientSupplier, clientSupplier.restoreConsumer.updateBeginningOffsets(offsets); clientSupplier.restoreConsumer.updateEndOffsets(offsets); + + clientSupplier.adminClient.updateBeginningOffsets(offsets); + clientSupplier.adminClient.updateEndOffsets(offsets); } } diff --git a/streams/src/test/java/org/apache/kafka/test/MockClientSupplier.java b/streams/src/test/java/org/apache/kafka/test/MockClientSupplier.java index 641f6e528e2b0..880f2cb3bf5d1 100644 --- a/streams/src/test/java/org/apache/kafka/test/MockClientSupplier.java +++ b/streams/src/test/java/org/apache/kafka/test/MockClientSupplier.java @@ -41,11 +41,10 @@ public class MockClientSupplier implements KafkaClientSupplier { private static final ByteArraySerializer BYTE_ARRAY_SERIALIZER = new ByteArraySerializer(); private Cluster cluster; - private String applicationId; + public MockAdminClient adminClient = new MockAdminClient(); public final List> producers = new LinkedList<>(); - public final MockConsumer consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST); public final MockConsumer restoreConsumer = new MockConsumer<>(OffsetResetStrategy.LATEST); @@ -55,11 +54,12 @@ public void setApplicationIdForProducer(final String applicationId) { public void setCluster(final Cluster cluster) { this.cluster = cluster; + this.adminClient = new MockAdminClient(cluster.nodes(), cluster.nodeById(-1)); } @Override public Admin getAdmin(final Map config) { - return new MockAdminClient(cluster.nodes(), cluster.nodeById(-1)); + return adminClient; } @Override diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java index 2ca187bd8362a..7615612fe16c7 100644 --- a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java +++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java @@ -51,6 +51,7 @@ import org.apache.kafka.streams.processor.StateRestoreListener; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.internals.ChangelogRegister; import org.apache.kafka.streams.processor.internals.GlobalProcessorContextImpl; import org.apache.kafka.streams.processor.internals.GlobalStateManager; import org.apache.kafka.streams.processor.internals.GlobalStateManagerImpl; @@ -64,7 +65,6 @@ import org.apache.kafka.streams.processor.internals.RecordCollector; import org.apache.kafka.streams.processor.internals.RecordCollectorImpl; import org.apache.kafka.streams.processor.internals.StateDirectory; -import org.apache.kafka.streams.processor.internals.StoreChangelogReader; import org.apache.kafka.streams.processor.internals.StreamTask; import org.apache.kafka.streams.processor.internals.StreamThread; import org.apache.kafka.streams.processor.internals.Task; @@ -90,7 +90,6 @@ import java.io.IOException; import java.time.Duration; import java.time.Instant; -import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -474,12 +473,7 @@ private void setupTask(final StreamsConfig streamsConfig, StreamsConfig.EXACTLY_ONCE.equals(streamsConfig.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG)), logContext, stateDirectory, - new StoreChangelogReader( - mockWallClockTime, - streamsConfig, - logContext, - createRestoreConsumer(processorTopology.storeToChangelogTopic()), - stateRestoreListener), + new MockChangelogRegister(), processorTopology.storeToChangelogTopic(), new HashSet<>(partitionsByInputTopic.values()) ); @@ -1202,6 +1196,20 @@ public void close() { stateDirectory.clean(); } + static class MockChangelogRegister implements ChangelogRegister { + private final Set restoringPartitions = new HashSet<>(); + + @Override + public void register(final TopicPartition partition, final ProcessorStateManager stateManager) { + restoringPartitions.add(partition); + } + + @Override + public void unregister(final Collection partitions) { + restoringPartitions.removeAll(partitions); + } + } + static class MockTime implements Time { private final AtomicLong timeMs; private final AtomicLong highResTimeNs; @@ -1239,34 +1247,5 @@ public void sleep(final long ms) { public void waitObject(final Object obj, final Supplier condition, final long timeoutMs) { throw new UnsupportedOperationException(); } - - } - - private MockConsumer createRestoreConsumer(final Map storeToChangelogTopic) { - final MockConsumer consumer = new MockConsumer(OffsetResetStrategy.LATEST) { - @Override - public synchronized void seekToEnd(final Collection partitions) {} - - @Override - public synchronized void seekToBeginning(final Collection partitions) {} - - @Override - public synchronized long position(final TopicPartition partition) { - return 0L; - } - }; - - // for each store - for (final Map.Entry storeAndTopic : storeToChangelogTopic.entrySet()) { - final String topicName = storeAndTopic.getValue(); - // Set up the restore-state topic ... - // consumer.subscribe(new TopicPartition(topicName, 0)); - // Set up the partition that matches the ID (which is what ProcessorStateManager expects) ... - final List partitionInfos = new ArrayList<>(); - partitionInfos.add(new PartitionInfo(topicName, PARTITION_ID, null, null, null)); - consumer.updatePartitions(topicName, partitionInfos); - consumer.updateEndOffsets(Collections.singletonMap(new TopicPartition(topicName, PARTITION_ID), 0L)); - } - return consumer; } }