From b9a27a8f8c57336cac9a1ad551f3c142049c07a2 Mon Sep 17 00:00:00 2001 From: Guozhang Wang Date: Mon, 15 Jun 2020 17:43:37 -0700 Subject: [PATCH 1/8] check --- .../kafka/clients/admin/MockAdminClient.java | 30 ++++++++++++++++++- .../internals/StoreChangelogReader.java | 23 ++++++++++++-- .../processor/internals/StreamThread.java | 3 +- .../processor/internals/StreamThreadTest.java | 5 ++-- 4 files changed, 55 insertions(+), 6 deletions(-) diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java index 2b86d4fb2740b..0dd0ffecb38af 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java @@ -64,6 +64,8 @@ public class MockAdminClient extends AdminClient { new HashMap<>(); private final Map replicaMoves = new HashMap<>(); + private final Map beginningOffsets; + private final Map endOffsets; private final String clusterId; private final List> brokerLogDirs; private final List> brokerConfigs; @@ -167,6 +169,8 @@ private MockAdminClient(List brokers, for (int i = 0; i < brokers.size(); i++) { this.brokerConfigs.add(new HashMap<>()); } + this.beginningOffsets = new HashMap<>(); + this.endOffsets = new HashMap<>(); } synchronized public void controller(Node controller) { @@ -818,7 +822,24 @@ synchronized public AlterConsumerGroupOffsetsResult alterConsumerGroupOffsets(St @Override synchronized public ListOffsetsResult listOffsets(Map topicPartitionOffsets, ListOffsetsOptions options) { - throw new UnsupportedOperationException("Not implement yet"); + Map> futures = new HashMap<>(); + + for (Map.Entry entry : topicPartitionOffsets.entrySet()) { + TopicPartition tp = entry.getKey(); + OffsetSpec spec = entry.getValue(); + KafkaFutureImpl future = new KafkaFutureImpl<>(); + + if (spec instanceof OffsetSpec.TimestampSpec) + throw new UnsupportedOperationException("Not implement yet"); + else if (spec instanceof OffsetSpec.EarliestSpec) + future.complete(new ListOffsetsResult.ListOffsetsResultInfo(beginningOffsets.get(tp), -1, Optional.empty())); + else + future.complete(new ListOffsetsResult.ListOffsetsResultInfo(endOffsets.get(tp), -1, Optional.empty())); + + futures.put(tp, future); + } + + return new ListOffsetsResult(futures); } @Override @@ -834,6 +855,13 @@ public AlterClientQuotasResult alterClientQuotas(Collection newOffsets) { + beginningOffsets.putAll(newOffsets); + } + public synchronized void updateEndOffsets(final Map newOffsets) { + endOffsets.putAll(newOffsets); + } + private final static class TopicMetadata { final boolean isInternalTopic; final List partitions; diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java index ad5cce0908076..758c1f9b82f68 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java @@ -16,6 +16,9 @@ */ package org.apache.kafka.streams.processor.internals; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.ListOffsetsResult; +import org.apache.kafka.clients.admin.OffsetSpec; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; @@ -43,6 +46,8 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.function.Function; import java.util.stream.Collectors; import static org.apache.kafka.streams.processor.internals.ClientUtils.fetchCommittedOffsets; @@ -199,12 +204,19 @@ int bufferedLimitIndex() { // to update offset limit for standby tasks; private Consumer mainConsumer; + // the changelog reader needs the admin client to list end offsets + private Admin adminClient; + private long lastUpdateOffsetTime; void setMainConsumer(final Consumer consumer) { this.mainConsumer = consumer; } + void setAdminClient(final Admin adminClient) { + this.adminClient = adminClient; + } + public StoreChangelogReader(final Time time, final StreamsConfig config, final LogContext logContext, @@ -564,8 +576,15 @@ private Map endOffsetForChangelogs(final Set OffsetSpec.latest()))); + return result.all().get().entrySet().stream().collect( + Collectors.toMap(Map.Entry::getKey, entry -> entry.getValue().offset())); + } else { + return restoreConsumer.endOffsets(partitions); + } + } catch (final TimeoutException | InterruptedException | ExecutionException e) { // if timeout exception gets thrown we just give up this time and retry in the next run loop log.debug("Could not fetch all end offsets for {}, will retry in the next run loop", partitions); return Collections.emptyMap(); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index 91c483c1498b2..a9d3d13413cac 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -273,13 +273,13 @@ public boolean isRunning() { private volatile ThreadMetadata threadMetadata; private StreamThread.StateListener stateListener; - private final Admin adminClient; private final ChangelogReader changelogReader; // package-private for testing final ConsumerRebalanceListener rebalanceListener; final Consumer mainConsumer; final Consumer restoreConsumer; + final Admin adminClient; final InternalTopologyBuilder builder; public static StreamThread create(final InternalTopologyBuilder builder, @@ -369,6 +369,7 @@ public static StreamThread create(final InternalTopologyBuilder builder, final Consumer mainConsumer = clientSupplier.getConsumer(consumerConfigs); changelogReader.setMainConsumer(mainConsumer); + changelogReader.setAdminClient(adminClient); taskManager.setMainConsumer(mainConsumer); final StreamThread streamThread = new StreamThread( diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java index 6b15cfc550809..7f74cc3059733 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java @@ -1634,6 +1634,7 @@ public void shouldRecoverFromInvalidOffsetExceptionOnRestoreAndFinishRestore() t final StreamThread thread = createStreamThread("clientId", config, false); final MockConsumer mockConsumer = (MockConsumer) thread.mainConsumer; final MockConsumer mockRestoreConsumer = (MockConsumer) thread.restoreConsumer; + final MockAdminClient mockAdminClient = (MockAdminClient) thread.adminClient; final TopicPartition topicPartition = new TopicPartition("topic", 0); final Set topicPartitionSet = Collections.singleton(topicPartition); @@ -1674,7 +1675,7 @@ public void shouldRecoverFromInvalidOffsetExceptionOnRestoreAndFinishRestore() t final TopicPartition changelogPartition = new TopicPartition("stream-thread-test-count-changelog", 0); final Set changelogPartitionSet = Collections.singleton(changelogPartition); mockRestoreConsumer.updateBeginningOffsets(Collections.singletonMap(changelogPartition, 0L)); - mockRestoreConsumer.updateEndOffsets(Collections.singletonMap(changelogPartition, 2L)); + mockAdminClient.updateEndOffsets(Collections.singletonMap(changelogPartition, 2L)); mockConsumer.schedulePollTask(() -> { thread.setState(StreamThread.State.PARTITIONS_REVOKED); @@ -1686,7 +1687,7 @@ public void shouldRecoverFromInvalidOffsetExceptionOnRestoreAndFinishRestore() t TestUtils.waitForCondition( () -> mockRestoreConsumer.assignment().size() == 1, - "Never restore first record"); + "Never get the assignment"); mockRestoreConsumer.addRecord(new ConsumerRecord<>( "stream-thread-test-count-changelog", From b4589e6502121bc76fc3ac9633974c1763b2da60 Mon Sep 17 00:00:00 2001 From: Guozhang Wang Date: Tue, 16 Jun 2020 16:31:11 -0700 Subject: [PATCH 2/8] continue revamping --- .../apache/kafka/clients/admin/MockAdminClient.java | 1 + .../processor/internals/StoreChangelogReader.java | 8 +++----- .../streams/processor/internals/StreamThread.java | 1 - .../internals/StreamThreadStateStoreProviderTest.java | 10 ++++++---- .../java/org/apache/kafka/test/MockClientSupplier.java | 4 ++-- 5 files changed, 12 insertions(+), 12 deletions(-) diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java index 0dd0ffecb38af..b95e8aa0e9ac7 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java @@ -858,6 +858,7 @@ synchronized public void close(Duration timeout) {} public synchronized void updateBeginningOffsets(Map newOffsets) { beginningOffsets.putAll(newOffsets); } + public synchronized void updateEndOffsets(final Map newOffsets) { endOffsets.putAll(newOffsets); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java index 758c1f9b82f68..3aa3460928690 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java @@ -205,7 +205,7 @@ int bufferedLimitIndex() { private Consumer mainConsumer; // the changelog reader needs the admin client to list end offsets - private Admin adminClient; + private final Admin adminClient; private long lastUpdateOffsetTime; @@ -213,18 +213,16 @@ void setMainConsumer(final Consumer consumer) { this.mainConsumer = consumer; } - void setAdminClient(final Admin adminClient) { - this.adminClient = adminClient; - } - public StoreChangelogReader(final Time time, final StreamsConfig config, final LogContext logContext, + final Admin adminClient, final Consumer restoreConsumer, final StateRestoreListener stateRestoreListener) { this.time = time; this.log = logContext.logger(StoreChangelogReader.class); this.state = ChangelogReaderState.ACTIVE_RESTORING; + this.adminClient = adminClient; this.restoreConsumer = restoreConsumer; this.stateRestoreListener = stateRestoreListener; diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index a9d3d13413cac..7e84d5b92a61f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -369,7 +369,6 @@ public static StreamThread create(final InternalTopologyBuilder builder, final Consumer mainConsumer = clientSupplier.getConsumer(consumerConfigs); changelogReader.setMainConsumer(mainConsumer); - changelogReader.setAdminClient(adminClient); taskManager.setMainConsumer(mainConsumer); final StreamThread streamThread = new StreamThread( diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java index a171a4618f1b6..a1a089e352c43 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java @@ -135,8 +135,8 @@ public void before() { final StreamsConfig streamsConfig = new StreamsConfig(properties); final MockClientSupplier clientSupplier = new MockClientSupplier(); - configureRestoreConsumer(clientSupplier, "applicationId-kv-store-changelog"); - configureRestoreConsumer(clientSupplier, "applicationId-window-store-changelog"); + configureClients(clientSupplier, "applicationId-kv-store-changelog"); + configureClients(clientSupplier, "applicationId-window-store-changelog"); final InternalTopologyBuilder internalTopologyBuilder = topology.getInternalBuilder(applicationId); final ProcessorTopology processorTopology = internalTopologyBuilder.buildTopology(); @@ -364,6 +364,7 @@ private StreamTask createStreamsTask(final StreamsConfig streamsConfig, new MockTime(), streamsConfig, logContext, + clientSupplier.getAdmin(new HashMap<>()), clientSupplier.restoreConsumer, new MockStateRestoreListener()), topology.storeToChangelogTopic(), partitions); @@ -414,8 +415,7 @@ private void mockThread(final boolean initialized) { EasyMock.replay(threadMock); } - private void configureRestoreConsumer(final MockClientSupplier clientSupplier, - final String topic) { + private void configureClients(final MockClientSupplier clientSupplier, final String topic) { final List partitions = Arrays.asList( new PartitionInfo(topic, 0, null, null, null), new PartitionInfo(topic, 1, null, null, null) @@ -432,5 +432,7 @@ private void configureRestoreConsumer(final MockClientSupplier clientSupplier, clientSupplier.restoreConsumer.updateBeginningOffsets(offsets); clientSupplier.restoreConsumer.updateEndOffsets(offsets); + + } } diff --git a/streams/src/test/java/org/apache/kafka/test/MockClientSupplier.java b/streams/src/test/java/org/apache/kafka/test/MockClientSupplier.java index 641f6e528e2b0..0ee0de33e83f4 100644 --- a/streams/src/test/java/org/apache/kafka/test/MockClientSupplier.java +++ b/streams/src/test/java/org/apache/kafka/test/MockClientSupplier.java @@ -29,6 +29,7 @@ import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.kafka.streams.KafkaClientSupplier; +import java.util.Collections; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -41,11 +42,10 @@ public class MockClientSupplier implements KafkaClientSupplier { private static final ByteArraySerializer BYTE_ARRAY_SERIALIZER = new ByteArraySerializer(); private Cluster cluster; - private String applicationId; + public MockAdminClient adminClient = new MockAdminClient(Collections.emptyList(), ); public final List> producers = new LinkedList<>(); - public final MockConsumer consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST); public final MockConsumer restoreConsumer = new MockConsumer<>(OffsetResetStrategy.LATEST); From 7f816995d3a2c4e4eddae6c9db87a80f645389a8 Mon Sep 17 00:00:00 2001 From: Guozhang Wang Date: Tue, 16 Jun 2020 18:28:25 -0700 Subject: [PATCH 3/8] github comments --- .../kafka/clients/admin/MockAdminClient.java | 7 +- .../internals/StoreChangelogReader.java | 1 + .../processor/internals/StreamThread.java | 32 ++++- .../internals/StoreChangelogReaderTest.java | 135 +++++++----------- .../processor/internals/StreamThreadTest.java | 110 +++++++------- .../StreamThreadStateStoreProviderTest.java | 5 +- .../apache/kafka/test/MockClientSupplier.java | 6 +- .../kafka/streams/TopologyTestDriver.java | 2 +- 8 files changed, 143 insertions(+), 155 deletions(-) diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java index b95e8aa0e9ac7..4ae714835e88b 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java @@ -147,8 +147,11 @@ public MockAdminClient build() { } } - public MockAdminClient(List brokers, - Node controller) { + public MockAdminClient() { + this(Collections.singletonList(Node.noNode()), Node.noNode()); + } + + public MockAdminClient(List brokers, Node controller) { this(brokers, controller, DEFAULT_CLUSTER_ID, 1, brokers.size(), Collections.nCopies(brokers.size(), DEFAULT_LOG_DIRS)); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java index 3aa3460928690..b1dca51776b19 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java @@ -580,6 +580,7 @@ private Map endOffsetForChangelogs(final Set entry.getValue().offset())); } else { + // we only fall back to use restore consumer if admin client is not set in TTD return restoreConsumer.endOffsets(partitions); } } catch (final TimeoutException | InterruptedException | ExecutionException e) { diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index 7e84d5b92a61f..09be1f2f1cfc2 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -274,13 +274,11 @@ public boolean isRunning() { private StreamThread.StateListener stateListener; private final ChangelogReader changelogReader; - - // package-private for testing - final ConsumerRebalanceListener rebalanceListener; - final Consumer mainConsumer; - final Consumer restoreConsumer; - final Admin adminClient; - final InternalTopologyBuilder builder; + private final ConsumerRebalanceListener rebalanceListener; + private final Consumer mainConsumer; + private final Consumer restoreConsumer; + private final Admin adminClient; + private final InternalTopologyBuilder builder; public static StreamThread create(final InternalTopologyBuilder builder, final StreamsConfig config, @@ -309,6 +307,7 @@ public static StreamThread create(final InternalTopologyBuilder builder, time, config, logContext, + adminClient, restoreConsumer, userStateRestoreListener ); @@ -1020,4 +1019,23 @@ int currentNumIterations() { return numIterations; } + ConsumerRebalanceListener rebalanceListener() { + return rebalanceListener; + } + + Consumer mainConsumer() { + return mainConsumer; + } + + Consumer restoreConsumer() { + return restoreConsumer; + }; + + Admin adminClient() { + return adminClient; + } + + InternalTopologyBuilder internalTopologyBuilder() { + return builder; + }; } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java index 6769f4a96fcd4..98e76d0e2fcfb 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java @@ -16,6 +16,9 @@ */ package org.apache.kafka.streams.processor.internals; +import org.apache.kafka.clients.admin.ListOffsetsResult; +import org.apache.kafka.clients.admin.MockAdminClient; +import org.apache.kafka.clients.admin.OffsetSpec; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.MockConsumer; import org.apache.kafka.clients.consumer.OffsetAndMetadata; @@ -46,7 +49,6 @@ import org.junit.runners.Parameterized; import java.time.Duration; -import java.util.Collection; import java.util.Collections; import java.util.Map; import java.util.Properties; @@ -129,8 +131,9 @@ public void onRestoreEnd(final TopicPartition tp, final String store, final long }; private final MockConsumer consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST); + private final MockAdminClient adminClient = new MockAdminClient(); private final StoreChangelogReader changelogReader = - new StoreChangelogReader(time, config, logContext, consumer, callback); + new StoreChangelogReader(time, config, logContext, adminClient, consumer, callback); @Before public void setUp() { @@ -185,15 +188,10 @@ public void shouldInitializeChangelogAndCheckForCompletion() { EasyMock.expect(storeMetadata.offset()).andReturn(9L).anyTimes(); EasyMock.replay(stateManager, storeMetadata, store); - final MockConsumer consumer = new MockConsumer(OffsetResetStrategy.EARLIEST) { - @Override - public Map endOffsets(final Collection partitions) { - return partitions.stream().collect(Collectors.toMap(Function.identity(), partition -> 10L)); - } - }; + adminClient.updateEndOffsets(Collections.singletonMap(tp, 10L)); final StoreChangelogReader changelogReader = - new StoreChangelogReader(time, config, logContext, consumer, callback); + new StoreChangelogReader(time, config, logContext, adminClient, consumer, callback); changelogReader.register(tp, stateManager); changelogReader.restore(); @@ -226,16 +224,11 @@ public void shouldPollWithRightTimeout() { EasyMock.expect(storeMetadata.offset()).andReturn(null).andReturn(9L).anyTimes(); EasyMock.replay(stateManager, storeMetadata, store); - final MockConsumer consumer = new MockConsumer(OffsetResetStrategy.EARLIEST) { - @Override - public Map endOffsets(final Collection partitions) { - return partitions.stream().collect(Collectors.toMap(Function.identity(), partition -> 11L)); - } - }; consumer.updateBeginningOffsets(Collections.singletonMap(tp, 5L)); + adminClient.updateEndOffsets(Collections.singletonMap(tp, 11L)); final StoreChangelogReader changelogReader = - new StoreChangelogReader(time, config, logContext, consumer, callback); + new StoreChangelogReader(time, config, logContext, adminClient, consumer, callback); changelogReader.register(tp, stateManager); @@ -257,15 +250,10 @@ public void shouldRestoreFromPositionAndCheckForCompletion() { EasyMock.expect(storeMetadata.offset()).andReturn(5L).anyTimes(); EasyMock.replay(stateManager, storeMetadata, store); - final MockConsumer consumer = new MockConsumer(OffsetResetStrategy.EARLIEST) { - @Override - public Map endOffsets(final Collection partitions) { - return partitions.stream().collect(Collectors.toMap(Function.identity(), partition -> 10L)); - } - }; + adminClient.updateEndOffsets(Collections.singletonMap(tp, 10L)); final StoreChangelogReader changelogReader = - new StoreChangelogReader(time, config, logContext, consumer, callback); + new StoreChangelogReader(time, config, logContext, adminClient, consumer, callback); changelogReader.register(tp, stateManager); @@ -328,16 +316,11 @@ public void shouldRestoreFromBeginningAndCheckCompletion() { EasyMock.expect(storeMetadata.offset()).andReturn(null).andReturn(9L).anyTimes(); EasyMock.replay(stateManager, storeMetadata, store); - final MockConsumer consumer = new MockConsumer(OffsetResetStrategy.EARLIEST) { - @Override - public Map endOffsets(final Collection partitions) { - return partitions.stream().collect(Collectors.toMap(Function.identity(), partition -> 11L)); - } - }; consumer.updateBeginningOffsets(Collections.singletonMap(tp, 5L)); + adminClient.updateEndOffsets(Collections.singletonMap(tp, 11L)); final StoreChangelogReader changelogReader = - new StoreChangelogReader(time, config, logContext, consumer, callback); + new StoreChangelogReader(time, config, logContext, adminClient, consumer, callback); changelogReader.register(tp, stateManager); @@ -404,15 +387,10 @@ public void shouldCheckCompletionIfPositionLargerThanEndOffset() { EasyMock.expect(storeMetadata.offset()).andReturn(5L).anyTimes(); EasyMock.replay(activeStateManager, storeMetadata, store); - final MockConsumer consumer = new MockConsumer(OffsetResetStrategy.EARLIEST) { - @Override - public Map endOffsets(final Collection partitions) { - return partitions.stream().collect(Collectors.toMap(Function.identity(), partition -> 0L)); - } - }; + adminClient.updateEndOffsets(Collections.singletonMap(tp, 0L)); final StoreChangelogReader changelogReader = - new StoreChangelogReader(time, config, logContext, consumer, callback); + new StoreChangelogReader(time, config, logContext, adminClient, consumer, callback); changelogReader.register(tp, activeStateManager); changelogReader.restore(); @@ -444,15 +422,12 @@ public long position(final TopicPartition partition) { throw new TimeoutException("KABOOM!"); } } - - @Override - public Map endOffsets(final Collection partitions) { - return partitions.stream().collect(Collectors.toMap(Function.identity(), partition -> 10L)); - } }; + adminClient.updateEndOffsets(Collections.singletonMap(tp, 10L)); + final StoreChangelogReader changelogReader = - new StoreChangelogReader(time, config, logContext, consumer, callback); + new StoreChangelogReader(time, config, logContext, adminClient, consumer, callback); changelogReader.register(tp, activeStateManager); changelogReader.restore(); @@ -480,15 +455,12 @@ public void shouldThrowIfPositionFail() { public long position(final TopicPartition partition) { throw kaboom; } - - @Override - public Map endOffsets(final Collection partitions) { - return partitions.stream().collect(Collectors.toMap(Function.identity(), partition -> 10L)); - } }; + adminClient.updateEndOffsets(Collections.singletonMap(tp, 10L)); + final StoreChangelogReader changelogReader = - new StoreChangelogReader(time, config, logContext, consumer, callback); + new StoreChangelogReader(time, config, logContext, adminClient, consumer, callback); changelogReader.register(tp, activeStateManager); @@ -502,17 +474,21 @@ public void shouldRequestEndOffsetsAndHandleTimeoutException() { EasyMock.replay(activeStateManager, storeMetadata, store); final AtomicBoolean functionCalled = new AtomicBoolean(false); - final MockConsumer consumer = new MockConsumer(OffsetResetStrategy.EARLIEST) { + + final MockAdminClient adminClient = new MockAdminClient() { @Override - public Map endOffsets(final Collection partitions) { + public ListOffsetsResult listOffsets(final Map topicPartitionOffsets) { if (functionCalled.get()) { - return partitions.stream().collect(Collectors.toMap(Function.identity(), partition -> 10L)); + return super.listOffsets(topicPartitionOffsets); } else { functionCalled.set(true); throw new TimeoutException("KABOOM!"); } } + }; + adminClient.updateEndOffsets(Collections.singletonMap(tp, 10L)); + final MockConsumer consumer = new MockConsumer(OffsetResetStrategy.EARLIEST) { @Override public Map committed(final Set partitions) { throw new AssertionError("Should not trigger this function"); @@ -520,7 +496,7 @@ public Map committed(final Set consumer = new MockConsumer(OffsetResetStrategy.EARLIEST) { + final MockAdminClient adminClient = new MockAdminClient() { @Override - public Map endOffsets(final Collection partitions) { + public ListOffsetsResult listOffsets(final Map topicPartitionOffsets) { throw kaboom; } }; final StoreChangelogReader changelogReader = - new StoreChangelogReader(time, config, logContext, consumer, callback); + new StoreChangelogReader(time, config, logContext, adminClient, consumer, callback); changelogReader.register(tp, activeStateManager); @@ -576,15 +552,12 @@ public Map committed(final Set endOffsets(final Collection partitions) { - return partitions.stream().collect(Collectors.toMap(Function.identity(), partition -> 20L)); - } }; + adminClient.updateEndOffsets(Collections.singletonMap(tp, 20L)); + final StoreChangelogReader changelogReader = - new StoreChangelogReader(time, config, logContext, consumer, callback); + new StoreChangelogReader(time, config, logContext, adminClient, consumer, callback); changelogReader.setMainConsumer(consumer); changelogReader.register(tp, stateManager); @@ -617,18 +590,16 @@ public void shouldThrowIfCommittedOffsetsFail() { EasyMock.replay(stateManager, storeMetadata, store); final MockConsumer consumer = new MockConsumer(OffsetResetStrategy.EARLIEST) { - @Override - public Map endOffsets(final Collection partitions) { - return partitions.stream().collect(Collectors.toMap(Function.identity(), partition -> 10L)); - } - @Override public Map committed(final Set partitions) { throw kaboom; } }; + + adminClient.updateEndOffsets(Collections.singletonMap(tp, 10L)); + final StoreChangelogReader changelogReader = - new StoreChangelogReader(time, config, logContext, consumer, callback); + new StoreChangelogReader(time, config, logContext, adminClient, consumer, callback); changelogReader.setMainConsumer(consumer); changelogReader.register(tp, stateManager); @@ -648,7 +619,7 @@ public void unsubscribe() { } }; final StoreChangelogReader changelogReader = - new StoreChangelogReader(time, config, logContext, consumer, callback); + new StoreChangelogReader(time, config, logContext, adminClient, consumer, callback); final StreamsException thrown = assertThrows(StreamsException.class, changelogReader::clear); assertEquals(kaboom, thrown.getCause()); @@ -705,7 +676,7 @@ public Map committed(final Set committed(final Set consumer = new MockConsumer(OffsetResetStrategy.EARLIEST) { - @Override - public Map endOffsets(final Collection partitions) { - return partitions.stream().collect(Collectors.toMap(Function.identity(), partition -> 10L)); - } - }; - final StoreChangelogReader changelogReader = new StoreChangelogReader(time, config, logContext, consumer, callback); + adminClient.updateEndOffsets(Collections.singletonMap(tp, 10L)); + adminClient.updateEndOffsets(Collections.singletonMap(tp1, 10L)); + adminClient.updateEndOffsets(Collections.singletonMap(tp2, 10L)); + final StoreChangelogReader changelogReader = new StoreChangelogReader(time, config, logContext, adminClient, consumer, callback); assertEquals(ACTIVE_RESTORING, changelogReader.state()); changelogReader.register(tp, activeStateManager); @@ -983,14 +951,10 @@ public void shouldThrowIfRestoreCallbackThrows() { EasyMock.expect(storeMetadata.offset()).andReturn(5L).anyTimes(); EasyMock.replay(activeStateManager, storeMetadata, store); - final MockConsumer consumer = new MockConsumer(OffsetResetStrategy.EARLIEST) { - @Override - public Map endOffsets(final Collection partitions) { - return partitions.stream().collect(Collectors.toMap(Function.identity(), partition -> 10L)); - } - }; + adminClient.updateEndOffsets(Collections.singletonMap(tp, 10L)); + final StoreChangelogReader changelogReader = - new StoreChangelogReader(time, config, logContext, consumer, exceptionCallback); + new StoreChangelogReader(time, config, logContext, adminClient, consumer, exceptionCallback); changelogReader.register(tp, activeStateManager); @@ -1052,6 +1016,7 @@ private void assignPartition(final long messages, null))); consumer.updateBeginningOffsets(Collections.singletonMap(topicPartition, 0L)); consumer.updateEndOffsets(Collections.singletonMap(topicPartition, Math.max(0, messages) + 1)); + adminClient.updateEndOffsets(Collections.singletonMap(topicPartition, Math.max(0, messages) + 1)); consumer.assign(Collections.singletonList(topicPartition)); } } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java index 7f74cc3059733..c17dd69305c67 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java @@ -252,7 +252,7 @@ public void shouldChangeStateInRebalanceListener() { thread.setStateListener(stateListener); assertEquals(thread.state(), StreamThread.State.CREATED); - final ConsumerRebalanceListener rebalanceListener = thread.rebalanceListener; + final ConsumerRebalanceListener rebalanceListener = thread.rebalanceListener(); final List revokedPartitions; final List assignedPartitions; @@ -267,7 +267,7 @@ public void shouldChangeStateInRebalanceListener() { // assign single partition assignedPartitions = Collections.singletonList(t1p1); - final MockConsumer mockConsumer = (MockConsumer) thread.mainConsumer; + final MockConsumer mockConsumer = (MockConsumer) thread.mainConsumer(); mockConsumer.assign(assignedPartitions); mockConsumer.updateBeginningOffsets(Collections.singletonMap(t1p1, 0L)); rebalanceListener.onPartitionsAssigned(assignedPartitions); @@ -582,10 +582,10 @@ public void shouldRespectNumIterationsInMainLoop() { thread.taskManager().handleAssignment(Collections.singletonMap(task1, assignedPartitions), emptyMap()); - final MockConsumer mockConsumer = (MockConsumer) thread.mainConsumer; + final MockConsumer mockConsumer = (MockConsumer) thread.mainConsumer(); mockConsumer.assign(Collections.singleton(t1p1)); mockConsumer.updateBeginningOffsets(Collections.singletonMap(t1p1, 0L)); - thread.rebalanceListener.onPartitionsAssigned(assignedPartitions); + thread.rebalanceListener().onPartitionsAssigned(assignedPartitions); thread.runOnce(); // processed one record, punctuated after the first record, and hence num.iterations is still 1 @@ -751,7 +751,7 @@ public void shouldInjectSharedProducerForAllTasksUsingClientSupplierOnCreateIfEo final StreamThread thread = createStreamThread(CLIENT_ID, config, false); thread.setState(StreamThread.State.STARTING); - thread.rebalanceListener.onPartitionsRevoked(Collections.emptyList()); + thread.rebalanceListener().onPartitionsRevoked(Collections.emptyList()); final Map> activeTasks = new HashMap<>(); final List assignedPartitions = new ArrayList<>(); @@ -764,21 +764,21 @@ public void shouldInjectSharedProducerForAllTasksUsingClientSupplierOnCreateIfEo thread.taskManager().handleAssignment(activeTasks, emptyMap()); - final MockConsumer mockConsumer = (MockConsumer) thread.mainConsumer; + final MockConsumer mockConsumer = (MockConsumer) thread.mainConsumer(); mockConsumer.assign(assignedPartitions); final Map beginOffsets = new HashMap<>(); beginOffsets.put(t1p1, 0L); beginOffsets.put(t1p2, 0L); mockConsumer.updateBeginningOffsets(beginOffsets); - thread.rebalanceListener.onPartitionsAssigned(new HashSet<>(assignedPartitions)); + thread.rebalanceListener().onPartitionsAssigned(new HashSet<>(assignedPartitions)); assertEquals(1, clientSupplier.producers.size()); final Producer globalProducer = clientSupplier.producers.get(0); for (final Task task : thread.activeTasks()) { assertSame(globalProducer, ((RecordCollectorImpl) ((StreamTask) task).recordCollector()).producer()); } - assertSame(clientSupplier.consumer, thread.mainConsumer); - assertSame(clientSupplier.restoreConsumer, thread.restoreConsumer); + assertSame(clientSupplier.consumer, thread.mainConsumer()); + assertSame(clientSupplier.restoreConsumer, thread.restoreConsumer()); } @Test @@ -788,7 +788,7 @@ public void shouldInjectProducerPerTaskUsingClientSupplierOnCreateIfEosEnable() final StreamThread thread = createStreamThread(CLIENT_ID, new StreamsConfig(configProps(true)), true); thread.setState(StreamThread.State.STARTING); - thread.rebalanceListener.onPartitionsRevoked(Collections.emptyList()); + thread.rebalanceListener().onPartitionsRevoked(Collections.emptyList()); final Map> activeTasks = new HashMap<>(); final List assignedPartitions = new ArrayList<>(); @@ -801,19 +801,19 @@ public void shouldInjectProducerPerTaskUsingClientSupplierOnCreateIfEosEnable() thread.taskManager().handleAssignment(activeTasks, emptyMap()); - final MockConsumer mockConsumer = (MockConsumer) thread.mainConsumer; + final MockConsumer mockConsumer = (MockConsumer) thread.mainConsumer(); mockConsumer.assign(assignedPartitions); final Map beginOffsets = new HashMap<>(); beginOffsets.put(t1p1, 0L); beginOffsets.put(t1p2, 0L); mockConsumer.updateBeginningOffsets(beginOffsets); - thread.rebalanceListener.onPartitionsAssigned(new HashSet<>(assignedPartitions)); + thread.rebalanceListener().onPartitionsAssigned(new HashSet<>(assignedPartitions)); thread.runOnce(); assertEquals(thread.activeTasks().size(), clientSupplier.producers.size()); - assertSame(clientSupplier.consumer, thread.mainConsumer); - assertSame(clientSupplier.restoreConsumer, thread.restoreConsumer); + assertSame(clientSupplier.consumer, thread.mainConsumer()); + assertSame(clientSupplier.restoreConsumer, thread.restoreConsumer()); } @Test @@ -828,7 +828,7 @@ public void shouldOnlyCompleteShutdownAfterRebalanceNotInProgress() throws Inter 10 * 1000, "Thread never started."); - thread.rebalanceListener.onPartitionsRevoked(Collections.emptyList()); + thread.rebalanceListener().onPartitionsRevoked(Collections.emptyList()); thread.taskManager().handleRebalanceStart(Collections.singleton(topic1)); final Map> activeTasks = new HashMap<>(); @@ -852,7 +852,7 @@ public void shouldOnlyCompleteShutdownAfterRebalanceNotInProgress() throws Inter assertEquals(Utils.mkSet(task1, task2), thread.taskManager().activeTaskIds()); assertEquals(StreamThread.State.PENDING_SHUTDOWN, thread.state()); - thread.rebalanceListener.onPartitionsAssigned(assignedPartitions); + thread.rebalanceListener().onPartitionsAssigned(assignedPartitions); TestUtils.waitForCondition( () -> thread.state() == StreamThread.State.DEAD, @@ -873,7 +873,7 @@ public void shouldCloseAllTaskProducersOnCloseIfEosEnabled() throws InterruptedE 10 * 1000, "Thread never started."); - thread.rebalanceListener.onPartitionsRevoked(Collections.emptyList()); + thread.rebalanceListener().onPartitionsRevoked(Collections.emptyList()); final Map> activeTasks = new HashMap<>(); final List assignedPartitions = new ArrayList<>(); @@ -885,7 +885,7 @@ public void shouldCloseAllTaskProducersOnCloseIfEosEnabled() throws InterruptedE activeTasks.put(task2, Collections.singleton(t1p2)); thread.taskManager().handleAssignment(activeTasks, emptyMap()); - thread.rebalanceListener.onPartitionsAssigned(assignedPartitions); + thread.rebalanceListener().onPartitionsAssigned(assignedPartitions); thread.shutdown(); TestUtils.waitForCondition( @@ -1064,7 +1064,7 @@ public void shouldNotThrowWhenStandbyTasksAssignedAndNoStateStoresForTopology() final StreamThread thread = createStreamThread(CLIENT_ID, config, false); thread.setState(StreamThread.State.STARTING); - thread.rebalanceListener.onPartitionsRevoked(Collections.emptyList()); + thread.rebalanceListener().onPartitionsRevoked(Collections.emptyList()); final Map> standbyTasks = new HashMap<>(); @@ -1073,7 +1073,7 @@ public void shouldNotThrowWhenStandbyTasksAssignedAndNoStateStoresForTopology() thread.taskManager().handleAssignment(emptyMap(), standbyTasks); - thread.rebalanceListener.onPartitionsAssigned(Collections.emptyList()); + thread.rebalanceListener().onPartitionsAssigned(Collections.emptyList()); } @Test @@ -1088,7 +1088,7 @@ public void shouldNotCloseTaskAndRemoveFromTaskManagerIfProducerWasFencedWhilePr consumer.updatePartitions(topic1, Collections.singletonList(new PartitionInfo(topic1, 1, null, null, null))); thread.setState(StreamThread.State.STARTING); - thread.rebalanceListener.onPartitionsRevoked(Collections.emptySet()); + thread.rebalanceListener().onPartitionsRevoked(Collections.emptySet()); final Map> activeTasks = new HashMap<>(); final List assignedPartitions = new ArrayList<>(); @@ -1099,10 +1099,10 @@ public void shouldNotCloseTaskAndRemoveFromTaskManagerIfProducerWasFencedWhilePr thread.taskManager().handleAssignment(activeTasks, emptyMap()); - final MockConsumer mockConsumer = (MockConsumer) thread.mainConsumer; + final MockConsumer mockConsumer = (MockConsumer) thread.mainConsumer(); mockConsumer.assign(assignedPartitions); mockConsumer.updateBeginningOffsets(Collections.singletonMap(t1p1, 0L)); - thread.rebalanceListener.onPartitionsAssigned(assignedPartitions); + thread.rebalanceListener().onPartitionsAssigned(assignedPartitions); thread.runOnce(); assertThat(thread.activeTasks().size(), equalTo(1)); @@ -1146,7 +1146,7 @@ public void shouldNotCloseTaskAndRemoveFromTaskManagerIfProducerGotFencedInCommi internalTopologyBuilder.addSink("out", "output", null, null, null, "name"); thread.setState(StreamThread.State.STARTING); - thread.rebalanceListener.onPartitionsRevoked(Collections.emptySet()); + thread.rebalanceListener().onPartitionsRevoked(Collections.emptySet()); final Map> activeTasks = new HashMap<>(); final List assignedPartitions = new ArrayList<>(); @@ -1157,10 +1157,10 @@ public void shouldNotCloseTaskAndRemoveFromTaskManagerIfProducerGotFencedInCommi thread.taskManager().handleAssignment(activeTasks, emptyMap()); - final MockConsumer mockConsumer = (MockConsumer) thread.mainConsumer; + final MockConsumer mockConsumer = (MockConsumer) thread.mainConsumer(); mockConsumer.assign(assignedPartitions); mockConsumer.updateBeginningOffsets(Collections.singletonMap(t1p1, 0L)); - thread.rebalanceListener.onPartitionsAssigned(assignedPartitions); + thread.rebalanceListener().onPartitionsAssigned(assignedPartitions); thread.runOnce(); @@ -1171,7 +1171,7 @@ public void shouldNotCloseTaskAndRemoveFromTaskManagerIfProducerGotFencedInCommi thread.runOnce(); clientSupplier.producers.get(0).commitTransactionException = new ProducerFencedException("Producer is fenced"); - assertThrows(TaskMigratedException.class, () -> thread.rebalanceListener.onPartitionsRevoked(assignedPartitions)); + assertThrows(TaskMigratedException.class, () -> thread.rebalanceListener().onPartitionsRevoked(assignedPartitions)); assertFalse(clientSupplier.producers.get(0).transactionCommitted()); assertFalse(clientSupplier.producers.get(0).closed()); assertEquals(1, thread.activeTasks().size()); @@ -1189,7 +1189,7 @@ public void shouldNotCloseTaskAndRemoveFromTaskManagerIfProducerGotFencedInCommi consumer.updatePartitions(topic1, Collections.singletonList(new PartitionInfo(topic1, 1, null, null, null))); thread.setState(StreamThread.State.STARTING); - thread.rebalanceListener.onPartitionsRevoked(Collections.emptySet()); + thread.rebalanceListener().onPartitionsRevoked(Collections.emptySet()); final Map> activeTasks = new HashMap<>(); final List assignedPartitions = new ArrayList<>(); @@ -1200,10 +1200,10 @@ public void shouldNotCloseTaskAndRemoveFromTaskManagerIfProducerGotFencedInCommi thread.taskManager().handleAssignment(activeTasks, emptyMap()); - final MockConsumer mockConsumer = (MockConsumer) thread.mainConsumer; + final MockConsumer mockConsumer = (MockConsumer) thread.mainConsumer(); mockConsumer.assign(assignedPartitions); mockConsumer.updateBeginningOffsets(Collections.singletonMap(t1p1, 0L)); - thread.rebalanceListener.onPartitionsAssigned(assignedPartitions); + thread.rebalanceListener().onPartitionsAssigned(assignedPartitions); thread.runOnce(); assertThat(thread.activeTasks().size(), equalTo(1)); @@ -1237,7 +1237,7 @@ public void shouldNotCloseTaskProducerWhenSuspending() { internalTopologyBuilder.addSink("out", "output", null, null, null, "name"); thread.setState(StreamThread.State.STARTING); - thread.rebalanceListener.onPartitionsRevoked(Collections.emptySet()); + thread.rebalanceListener().onPartitionsRevoked(Collections.emptySet()); final Map> activeTasks = new HashMap<>(); final List assignedPartitions = new ArrayList<>(); @@ -1248,10 +1248,10 @@ public void shouldNotCloseTaskProducerWhenSuspending() { thread.taskManager().handleAssignment(activeTasks, emptyMap()); - final MockConsumer mockConsumer = (MockConsumer) thread.mainConsumer; + final MockConsumer mockConsumer = (MockConsumer) thread.mainConsumer(); mockConsumer.assign(assignedPartitions); mockConsumer.updateBeginningOffsets(Collections.singletonMap(t1p1, 0L)); - thread.rebalanceListener.onPartitionsAssigned(assignedPartitions); + thread.rebalanceListener().onPartitionsAssigned(assignedPartitions); thread.runOnce(); @@ -1261,7 +1261,7 @@ public void shouldNotCloseTaskProducerWhenSuspending() { addRecord(mockConsumer, 0L); thread.runOnce(); - thread.rebalanceListener.onPartitionsRevoked(assignedPartitions); + thread.rebalanceListener().onPartitionsRevoked(assignedPartitions); assertTrue(clientSupplier.producers.get(0).transactionCommitted()); assertFalse(clientSupplier.producers.get(0).closed()); assertEquals(1, thread.activeTasks().size()); @@ -1298,7 +1298,7 @@ public void shouldReturnActiveTaskMetadataWhileRunningState() { ); thread.setState(StreamThread.State.STARTING); - thread.rebalanceListener.onPartitionsRevoked(Collections.emptySet()); + thread.rebalanceListener().onPartitionsRevoked(Collections.emptySet()); final Map> activeTasks = new HashMap<>(); final List assignedPartitions = new ArrayList<>(); @@ -1309,10 +1309,10 @@ public void shouldReturnActiveTaskMetadataWhileRunningState() { thread.taskManager().handleAssignment(activeTasks, emptyMap()); - final MockConsumer mockConsumer = (MockConsumer) thread.mainConsumer; + final MockConsumer mockConsumer = (MockConsumer) thread.mainConsumer(); mockConsumer.assign(assignedPartitions); mockConsumer.updateBeginningOffsets(Collections.singletonMap(t1p1, 0L)); - thread.rebalanceListener.onPartitionsAssigned(assignedPartitions); + thread.rebalanceListener().onPartitionsAssigned(assignedPartitions); thread.runOnce(); @@ -1356,7 +1356,7 @@ public void shouldReturnStandbyTaskMetadataWhileRunningState() { restoreConsumer.updateBeginningOffsets(offsets); thread.setState(StreamThread.State.STARTING); - thread.rebalanceListener.onPartitionsRevoked(Collections.emptySet()); + thread.rebalanceListener().onPartitionsRevoked(Collections.emptySet()); final Map> standbyTasks = new HashMap<>(); @@ -1365,7 +1365,7 @@ public void shouldReturnStandbyTaskMetadataWhileRunningState() { thread.taskManager().handleAssignment(emptyMap(), standbyTasks); - thread.rebalanceListener.onPartitionsAssigned(Collections.emptyList()); + thread.rebalanceListener().onPartitionsAssigned(Collections.emptyList()); thread.runOnce(); @@ -1408,7 +1408,7 @@ public void shouldUpdateStandbyTask() throws Exception { checkpoint.write(Collections.singletonMap(partition2, 5L)); thread.setState(StreamThread.State.STARTING); - thread.rebalanceListener.onPartitionsRevoked(Collections.emptySet()); + thread.rebalanceListener().onPartitionsRevoked(Collections.emptySet()); final Map> standbyTasks = new HashMap<>(); @@ -1419,7 +1419,7 @@ public void shouldUpdateStandbyTask() throws Exception { thread.taskManager().handleAssignment(emptyMap(), standbyTasks); thread.taskManager().tryToCompleteRestoration(); - thread.rebalanceListener.onPartitionsAssigned(Collections.emptyList()); + thread.rebalanceListener().onPartitionsAssigned(Collections.emptyList()); thread.runOnce(); @@ -1505,7 +1505,7 @@ public void close() {} final StreamThread thread = createStreamThread(CLIENT_ID, config, false); thread.setState(StreamThread.State.STARTING); - thread.rebalanceListener.onPartitionsRevoked(Collections.emptySet()); + thread.rebalanceListener().onPartitionsRevoked(Collections.emptySet()); final List assignedPartitions = new ArrayList<>(); final Map> activeTasks = new HashMap<>(); @@ -1518,7 +1518,7 @@ public void close() {} clientSupplier.consumer.assign(assignedPartitions); clientSupplier.consumer.updateBeginningOffsets(Collections.singletonMap(t1p1, 0L)); - thread.rebalanceListener.onPartitionsAssigned(assignedPartitions); + thread.rebalanceListener().onPartitionsAssigned(assignedPartitions); thread.runOnce(); @@ -1600,7 +1600,7 @@ public void shouldAlwaysReturnEmptyTasksMetadataWhileRebalancingStateAndTasksNot final List assignedPartitions = new ArrayList<>(); thread.setState(StreamThread.State.STARTING); - thread.rebalanceListener.onPartitionsRevoked(assignedPartitions); + thread.rebalanceListener().onPartitionsRevoked(assignedPartitions); assertThreadMetadataHasEmptyTasksWithState(thread.threadMetadata(), StreamThread.State.PARTITIONS_REVOKED); final Map> activeTasks = new HashMap<>(); @@ -1613,7 +1613,7 @@ public void shouldAlwaysReturnEmptyTasksMetadataWhileRebalancingStateAndTasksNot thread.taskManager().handleAssignment(activeTasks, standbyTasks); - thread.rebalanceListener.onPartitionsAssigned(assignedPartitions); + thread.rebalanceListener().onPartitionsAssigned(assignedPartitions); assertThreadMetadataHasEmptyTasksWithState(thread.threadMetadata(), StreamThread.State.PARTITIONS_ASSIGNED); } @@ -1632,9 +1632,9 @@ public void shouldRecoverFromInvalidOffsetExceptionOnRestoreAndFinishRestore() t internalStreamsBuilder.buildAndOptimizeTopology(); final StreamThread thread = createStreamThread("clientId", config, false); - final MockConsumer mockConsumer = (MockConsumer) thread.mainConsumer; - final MockConsumer mockRestoreConsumer = (MockConsumer) thread.restoreConsumer; - final MockAdminClient mockAdminClient = (MockAdminClient) thread.adminClient; + final MockConsumer mockConsumer = (MockConsumer) thread.mainConsumer(); + final MockConsumer mockRestoreConsumer = (MockConsumer) thread.restoreConsumer(); + final MockAdminClient mockAdminClient = (MockAdminClient) thread.adminClient(); final TopicPartition topicPartition = new TopicPartition("topic", 0); final Set topicPartitionSet = Collections.singleton(topicPartition); @@ -1679,7 +1679,7 @@ public void shouldRecoverFromInvalidOffsetExceptionOnRestoreAndFinishRestore() t mockConsumer.schedulePollTask(() -> { thread.setState(StreamThread.State.PARTITIONS_REVOKED); - thread.rebalanceListener.onPartitionsAssigned(topicPartitionSet); + thread.rebalanceListener().onPartitionsAssigned(topicPartitionSet); }); try { @@ -1772,10 +1772,10 @@ private void shouldLogAndRecordSkippedMetricForDeserializationException(final St Collections.singletonMap(task1, assignedPartitions), emptyMap()); - final MockConsumer mockConsumer = (MockConsumer) thread.mainConsumer; + final MockConsumer mockConsumer = (MockConsumer) thread.mainConsumer(); mockConsumer.assign(Collections.singleton(t1p1)); mockConsumer.updateBeginningOffsets(Collections.singletonMap(t1p1, 0L)); - thread.rebalanceListener.onPartitionsAssigned(assignedPartitions); + thread.rebalanceListener().onPartitionsAssigned(assignedPartitions); thread.runOnce(); if (StreamsConfig.METRICS_0100_TO_24.equals(builtInMetricsVersion)) { @@ -1864,7 +1864,7 @@ public void shouldThrowTaskMigratedExceptionHandlingTaskLost() { consumer.schedulePollTask(() -> { thread.setState(StreamThread.State.PARTITIONS_REVOKED); - thread.rebalanceListener.onPartitionsLost(assignedPartitions); + thread.rebalanceListener().onPartitionsLost(assignedPartitions); }); thread.setState(StreamThread.State.STARTING); @@ -1907,7 +1907,7 @@ public void shouldThrowTaskMigratedExceptionHandlingRevocation() { consumer.schedulePollTask(() -> { thread.setState(StreamThread.State.PARTITIONS_REVOKED); - thread.rebalanceListener.onPartitionsRevoked(assignedPartitions); + thread.rebalanceListener().onPartitionsRevoked(assignedPartitions); }); thread.setState(StreamThread.State.STARTING); @@ -2109,10 +2109,10 @@ private void shouldLogAndRecordSkippedRecordsForInvalidTimestamps(final String b assignedPartitions), emptyMap()); - final MockConsumer mockConsumer = (MockConsumer) thread.mainConsumer; + final MockConsumer mockConsumer = (MockConsumer) thread.mainConsumer(); mockConsumer.assign(Collections.singleton(t1p1)); mockConsumer.updateBeginningOffsets(Collections.singletonMap(t1p1, 0L)); - thread.rebalanceListener.onPartitionsAssigned(assignedPartitions); + thread.rebalanceListener().onPartitionsAssigned(assignedPartitions); thread.runOnce(); final MetricName skippedTotalMetric = metrics.metricName( diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java index a1a089e352c43..9dac5341d150d 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java @@ -364,7 +364,7 @@ private StreamTask createStreamsTask(final StreamsConfig streamsConfig, new MockTime(), streamsConfig, logContext, - clientSupplier.getAdmin(new HashMap<>()), + clientSupplier.adminClient, clientSupplier.restoreConsumer, new MockStateRestoreListener()), topology.storeToChangelogTopic(), partitions); @@ -433,6 +433,7 @@ private void configureClients(final MockClientSupplier clientSupplier, final Str clientSupplier.restoreConsumer.updateBeginningOffsets(offsets); clientSupplier.restoreConsumer.updateEndOffsets(offsets); - + clientSupplier.adminClient.updateBeginningOffsets(offsets); + clientSupplier.adminClient.updateEndOffsets(offsets); } } diff --git a/streams/src/test/java/org/apache/kafka/test/MockClientSupplier.java b/streams/src/test/java/org/apache/kafka/test/MockClientSupplier.java index 0ee0de33e83f4..880f2cb3bf5d1 100644 --- a/streams/src/test/java/org/apache/kafka/test/MockClientSupplier.java +++ b/streams/src/test/java/org/apache/kafka/test/MockClientSupplier.java @@ -29,7 +29,6 @@ import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.kafka.streams.KafkaClientSupplier; -import java.util.Collections; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -44,7 +43,7 @@ public class MockClientSupplier implements KafkaClientSupplier { private Cluster cluster; private String applicationId; - public MockAdminClient adminClient = new MockAdminClient(Collections.emptyList(), ); + public MockAdminClient adminClient = new MockAdminClient(); public final List> producers = new LinkedList<>(); public final MockConsumer consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST); public final MockConsumer restoreConsumer = new MockConsumer<>(OffsetResetStrategy.LATEST); @@ -55,11 +54,12 @@ public void setApplicationIdForProducer(final String applicationId) { public void setCluster(final Cluster cluster) { this.cluster = cluster; + this.adminClient = new MockAdminClient(cluster.nodes(), cluster.nodeById(-1)); } @Override public Admin getAdmin(final Map config) { - return new MockAdminClient(cluster.nodes(), cluster.nodeById(-1)); + return adminClient; } @Override diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java index 2ca187bd8362a..678ca21b580da 100644 --- a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java +++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java @@ -478,6 +478,7 @@ private void setupTask(final StreamsConfig streamsConfig, mockWallClockTime, streamsConfig, logContext, + null, createRestoreConsumer(processorTopology.storeToChangelogTopic()), stateRestoreListener), processorTopology.storeToChangelogTopic(), @@ -1239,7 +1240,6 @@ public void sleep(final long ms) { public void waitObject(final Object obj, final Supplier condition, final long timeoutMs) { throw new UnsupportedOperationException(); } - } private MockConsumer createRestoreConsumer(final Map storeToChangelogTopic) { From 2fd0ea54d729aa3904c4f96f29653f03601f64e4 Mon Sep 17 00:00:00 2001 From: Guozhang Wang Date: Wed, 17 Jun 2020 11:01:27 -0700 Subject: [PATCH 4/8] add the mock-admin client to ttd --- .../clients/admin/ListOffsetsResult.java | 4 +- .../kafka/clients/admin/OffsetSpec.java | 6 +- .../kafka/streams/TopologyTestDriver.java | 12 +- .../streams/internals/MockAdminClient.java | 341 ++++++++++++++++++ 4 files changed, 357 insertions(+), 6 deletions(-) create mode 100644 streams/test-utils/src/main/java/org/apache/kafka/streams/internals/MockAdminClient.java diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ListOffsetsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/ListOffsetsResult.java index d830ef2aadc8c..5eb00deb0697a 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/ListOffsetsResult.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/ListOffsetsResult.java @@ -35,7 +35,7 @@ public class ListOffsetsResult { private final Map> futures; - ListOffsetsResult(Map> futures) { + public ListOffsetsResult(Map> futures) { this.futures = futures; } @@ -80,7 +80,7 @@ public static class ListOffsetsResultInfo { private final long timestamp; private final Optional leaderEpoch; - ListOffsetsResultInfo(long offset, long timestamp, Optional leaderEpoch) { + public ListOffsetsResultInfo(long offset, long timestamp, Optional leaderEpoch) { this.offset = offset; this.timestamp = timestamp; this.leaderEpoch = leaderEpoch; diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/OffsetSpec.java b/clients/src/main/java/org/apache/kafka/clients/admin/OffsetSpec.java index 2517985a00cde..339e9cf815e87 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/OffsetSpec.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/OffsetSpec.java @@ -23,9 +23,9 @@ */ public class OffsetSpec { - static class EarliestSpec extends OffsetSpec { } - static class LatestSpec extends OffsetSpec { } - static class TimestampSpec extends OffsetSpec { + public static class EarliestSpec extends OffsetSpec { } + public static class LatestSpec extends OffsetSpec { } + public static class TimestampSpec extends OffsetSpec { private final long timestamp; TimestampSpec(long timestamp) { diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java index 678ca21b580da..47c26e3930985 100644 --- a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java +++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java @@ -43,6 +43,7 @@ import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler; import org.apache.kafka.streams.errors.TopologyException; import org.apache.kafka.streams.internals.KeyValueStoreFacade; +import org.apache.kafka.streams.internals.MockAdminClient; import org.apache.kafka.streams.internals.QuietStreamsConfig; import org.apache.kafka.streams.internals.WindowStoreFacade; import org.apache.kafka.streams.processor.ProcessorContext; @@ -478,7 +479,7 @@ private void setupTask(final StreamsConfig streamsConfig, mockWallClockTime, streamsConfig, logContext, - null, + createAdminClient(processorTopology.storeToChangelogTopic()), createRestoreConsumer(processorTopology.storeToChangelogTopic()), stateRestoreListener), processorTopology.storeToChangelogTopic(), @@ -1269,4 +1270,13 @@ public synchronized long position(final TopicPartition partition) { } return consumer; } + + private MockAdminClient createAdminClient(final Map storeToChangelogTopic) { + final MockAdminClient adminClient = new MockAdminClient(); + for (final Map.Entry storeAndTopic : storeToChangelogTopic.entrySet()) { + final String topicName = storeAndTopic.getValue(); + adminClient.updateEndOffsets(Collections.singletonMap(new TopicPartition(topicName, PARTITION_ID), 0L)); + } + return adminClient; + } } diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/internals/MockAdminClient.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/internals/MockAdminClient.java new file mode 100644 index 0000000000000..754d25b193d75 --- /dev/null +++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/internals/MockAdminClient.java @@ -0,0 +1,341 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.internals; + +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.AlterClientQuotasOptions; +import org.apache.kafka.clients.admin.AlterClientQuotasResult; +import org.apache.kafka.clients.admin.AlterConfigOp; +import org.apache.kafka.clients.admin.AlterConfigsOptions; +import org.apache.kafka.clients.admin.AlterConfigsResult; +import org.apache.kafka.clients.admin.AlterConsumerGroupOffsetsOptions; +import org.apache.kafka.clients.admin.AlterConsumerGroupOffsetsResult; +import org.apache.kafka.clients.admin.AlterPartitionReassignmentsOptions; +import org.apache.kafka.clients.admin.AlterPartitionReassignmentsResult; +import org.apache.kafka.clients.admin.AlterReplicaLogDirsOptions; +import org.apache.kafka.clients.admin.AlterReplicaLogDirsResult; +import org.apache.kafka.clients.admin.Config; +import org.apache.kafka.clients.admin.CreateAclsOptions; +import org.apache.kafka.clients.admin.CreateAclsResult; +import org.apache.kafka.clients.admin.CreateDelegationTokenOptions; +import org.apache.kafka.clients.admin.CreateDelegationTokenResult; +import org.apache.kafka.clients.admin.CreatePartitionsOptions; +import org.apache.kafka.clients.admin.CreatePartitionsResult; +import org.apache.kafka.clients.admin.CreateTopicsOptions; +import org.apache.kafka.clients.admin.CreateTopicsResult; +import org.apache.kafka.clients.admin.DeleteAclsOptions; +import org.apache.kafka.clients.admin.DeleteAclsResult; +import org.apache.kafka.clients.admin.DeleteConsumerGroupOffsetsOptions; +import org.apache.kafka.clients.admin.DeleteConsumerGroupOffsetsResult; +import org.apache.kafka.clients.admin.DeleteConsumerGroupsOptions; +import org.apache.kafka.clients.admin.DeleteConsumerGroupsResult; +import org.apache.kafka.clients.admin.DeleteRecordsOptions; +import org.apache.kafka.clients.admin.DeleteRecordsResult; +import org.apache.kafka.clients.admin.DeleteTopicsOptions; +import org.apache.kafka.clients.admin.DeleteTopicsResult; +import org.apache.kafka.clients.admin.DescribeAclsOptions; +import org.apache.kafka.clients.admin.DescribeAclsResult; +import org.apache.kafka.clients.admin.DescribeClientQuotasOptions; +import org.apache.kafka.clients.admin.DescribeClientQuotasResult; +import org.apache.kafka.clients.admin.DescribeClusterOptions; +import org.apache.kafka.clients.admin.DescribeClusterResult; +import org.apache.kafka.clients.admin.DescribeConfigsOptions; +import org.apache.kafka.clients.admin.DescribeConfigsResult; +import org.apache.kafka.clients.admin.DescribeConsumerGroupsOptions; +import org.apache.kafka.clients.admin.DescribeConsumerGroupsResult; +import org.apache.kafka.clients.admin.DescribeDelegationTokenOptions; +import org.apache.kafka.clients.admin.DescribeDelegationTokenResult; +import org.apache.kafka.clients.admin.DescribeLogDirsOptions; +import org.apache.kafka.clients.admin.DescribeLogDirsResult; +import org.apache.kafka.clients.admin.DescribeReplicaLogDirsOptions; +import org.apache.kafka.clients.admin.DescribeReplicaLogDirsResult; +import org.apache.kafka.clients.admin.DescribeTopicsOptions; +import org.apache.kafka.clients.admin.DescribeTopicsResult; +import org.apache.kafka.clients.admin.ElectLeadersOptions; +import org.apache.kafka.clients.admin.ElectLeadersResult; +import org.apache.kafka.clients.admin.ElectPreferredLeadersOptions; +import org.apache.kafka.clients.admin.ElectPreferredLeadersResult; +import org.apache.kafka.clients.admin.ExpireDelegationTokenOptions; +import org.apache.kafka.clients.admin.ExpireDelegationTokenResult; +import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsOptions; +import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsResult; +import org.apache.kafka.clients.admin.ListConsumerGroupsOptions; +import org.apache.kafka.clients.admin.ListConsumerGroupsResult; +import org.apache.kafka.clients.admin.ListOffsetsOptions; +import org.apache.kafka.clients.admin.ListOffsetsResult; +import org.apache.kafka.clients.admin.ListPartitionReassignmentsOptions; +import org.apache.kafka.clients.admin.ListPartitionReassignmentsResult; +import org.apache.kafka.clients.admin.ListTopicsOptions; +import org.apache.kafka.clients.admin.ListTopicsResult; +import org.apache.kafka.clients.admin.NewPartitionReassignment; +import org.apache.kafka.clients.admin.NewPartitions; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.admin.OffsetSpec; +import org.apache.kafka.clients.admin.RecordsToDelete; +import org.apache.kafka.clients.admin.RemoveMembersFromConsumerGroupOptions; +import org.apache.kafka.clients.admin.RemoveMembersFromConsumerGroupResult; +import org.apache.kafka.clients.admin.RenewDelegationTokenOptions; +import org.apache.kafka.clients.admin.RenewDelegationTokenResult; +import org.apache.kafka.common.KafkaFuture; +import org.apache.kafka.common.Metric; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.ElectionType; +import org.apache.kafka.common.TopicPartitionReplica; +import org.apache.kafka.common.acl.AclBinding; +import org.apache.kafka.common.acl.AclBindingFilter; +import org.apache.kafka.common.config.ConfigResource; +import org.apache.kafka.common.internals.KafkaFutureImpl; +import org.apache.kafka.common.quota.ClientQuotaAlteration; +import org.apache.kafka.common.quota.ClientQuotaFilter; + +import java.time.Duration; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +/** + * Minimum admin client mock implementation needed for TTD + */ +public class MockAdminClient extends AdminClient { + + private final Map beginningOffsets = new HashMap<>(); + private final Map endOffsets = new HashMap<>(); + + @Override + synchronized public ListOffsetsResult listOffsets(Map topicPartitionOffsets, ListOffsetsOptions options) { + Map> futures = new HashMap<>(); + + for (Map.Entry entry : topicPartitionOffsets.entrySet()) { + TopicPartition tp = entry.getKey(); + OffsetSpec spec = entry.getValue(); + KafkaFutureImpl future = new KafkaFutureImpl<>(); + + if (spec instanceof OffsetSpec.EarliestSpec) + future.complete(new ListOffsetsResult.ListOffsetsResultInfo(beginningOffsets.get(tp), -1, Optional.empty())); + else if (spec instanceof OffsetSpec.LatestSpec) + future.complete(new ListOffsetsResult.ListOffsetsResultInfo(endOffsets.get(tp), -1, Optional.empty())); + else + throw new UnsupportedOperationException("Not implement yet"); + + futures.put(tp, future); + } + + return new ListOffsetsResult(futures); + } + + public synchronized void updateBeginningOffsets(Map newOffsets) { + beginningOffsets.putAll(newOffsets); + } + + public synchronized void updateEndOffsets(final Map newOffsets) { + endOffsets.putAll(newOffsets); + } + + // ----------- APIs below are not used by TTD -------- + + @Override + synchronized public DescribeClusterResult describeCluster(DescribeClusterOptions options) { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + synchronized public CreateTopicsResult createTopics(Collection newTopics, CreateTopicsOptions options) { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + synchronized public ListTopicsResult listTopics(ListTopicsOptions options) { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + synchronized public DescribeTopicsResult describeTopics(Collection topicNames, DescribeTopicsOptions options) { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + synchronized public DeleteTopicsResult deleteTopics(Collection topicsToDelete, DeleteTopicsOptions options) { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + synchronized public CreatePartitionsResult createPartitions(Map newPartitions, CreatePartitionsOptions options) { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + synchronized public DeleteRecordsResult deleteRecords(Map recordsToDelete, DeleteRecordsOptions options) { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + synchronized public CreateDelegationTokenResult createDelegationToken(CreateDelegationTokenOptions options) { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + synchronized public RenewDelegationTokenResult renewDelegationToken(byte[] hmac, RenewDelegationTokenOptions options) { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + synchronized public ExpireDelegationTokenResult expireDelegationToken(byte[] hmac, ExpireDelegationTokenOptions options) { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + synchronized public DescribeDelegationTokenResult describeDelegationToken(DescribeDelegationTokenOptions options) { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + synchronized public DescribeConsumerGroupsResult describeConsumerGroups(Collection groupIds, DescribeConsumerGroupsOptions options) { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + synchronized public ListConsumerGroupsResult listConsumerGroups(ListConsumerGroupsOptions options) { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + synchronized public ListConsumerGroupOffsetsResult listConsumerGroupOffsets(String groupId, ListConsumerGroupOffsetsOptions options) { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + synchronized public DeleteConsumerGroupsResult deleteConsumerGroups(Collection groupIds, DeleteConsumerGroupsOptions options) { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + synchronized public DeleteConsumerGroupOffsetsResult deleteConsumerGroupOffsets(String groupId, Set partitions, DeleteConsumerGroupOffsetsOptions options) { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Deprecated + @Override + synchronized public ElectPreferredLeadersResult electPreferredLeaders(Collection partitions, ElectPreferredLeadersOptions options) { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + synchronized public ElectLeadersResult electLeaders( + ElectionType electionType, + Set partitions, + ElectLeadersOptions options) { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + synchronized public RemoveMembersFromConsumerGroupResult removeMembersFromConsumerGroup(String groupId, RemoveMembersFromConsumerGroupOptions options) { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + synchronized public CreateAclsResult createAcls(Collection acls, CreateAclsOptions options) { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + synchronized public DescribeAclsResult describeAcls(AclBindingFilter filter, DescribeAclsOptions options) { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + synchronized public DeleteAclsResult deleteAcls(Collection filters, DeleteAclsOptions options) { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + synchronized public DescribeConfigsResult describeConfigs(Collection resources, DescribeConfigsOptions options) { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + @Deprecated + synchronized public AlterConfigsResult alterConfigs(Map configs, AlterConfigsOptions options) { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + synchronized public AlterConfigsResult incrementalAlterConfigs( + Map> configs, + AlterConfigsOptions options) { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + synchronized public AlterReplicaLogDirsResult alterReplicaLogDirs( + Map replicaAssignment, + AlterReplicaLogDirsOptions options) { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + synchronized public DescribeLogDirsResult describeLogDirs(Collection brokers, + DescribeLogDirsOptions options) { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + synchronized public DescribeReplicaLogDirsResult describeReplicaLogDirs(Collection replicas, + DescribeReplicaLogDirsOptions options) { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + synchronized public AlterPartitionReassignmentsResult alterPartitionReassignments( + Map> newReassignments, + AlterPartitionReassignmentsOptions options) { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + synchronized public ListPartitionReassignmentsResult listPartitionReassignments( + Optional> partitions, + ListPartitionReassignmentsOptions options) { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + synchronized public AlterConsumerGroupOffsetsResult alterConsumerGroupOffsets(String groupId, Map offsets, AlterConsumerGroupOffsetsOptions options) { + throw new UnsupportedOperationException("Not implement yet"); + } + + @Override + public DescribeClientQuotasResult describeClientQuotas(ClientQuotaFilter filter, DescribeClientQuotasOptions options) { + throw new UnsupportedOperationException("Not implement yet"); + } + + @Override + public AlterClientQuotasResult alterClientQuotas(Collection entries, AlterClientQuotasOptions options) { + throw new UnsupportedOperationException("Not implement yet"); + } + + @Override + synchronized public void close(Duration timeout) {} + + @Override + synchronized public Map metrics() { + throw new UnsupportedOperationException("Not implement yet"); + } +} From 67bf6935761e48590527d0bfaf5488c7e6c43ce9 Mon Sep 17 00:00:00 2001 From: Guozhang Wang Date: Wed, 17 Jun 2020 13:33:07 -0700 Subject: [PATCH 5/8] copy past the mock admin client --- .../internals/StoreChangelogReader.java | 13 +- .../streams/internals/MockAdminClient.java | 112 +++++++++--------- 2 files changed, 59 insertions(+), 66 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java index b1dca51776b19..060e2a6c3cfd1 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java @@ -574,15 +574,10 @@ private Map endOffsetForChangelogs(final Set OffsetSpec.latest()))); - return result.all().get().entrySet().stream().collect( - Collectors.toMap(Map.Entry::getKey, entry -> entry.getValue().offset())); - } else { - // we only fall back to use restore consumer if admin client is not set in TTD - return restoreConsumer.endOffsets(partitions); - } + final ListOffsetsResult result = adminClient.listOffsets(partitions.stream().collect( + Collectors.toMap(Function.identity(), tp -> OffsetSpec.latest()))); + return result.all().get().entrySet().stream().collect( + Collectors.toMap(Map.Entry::getKey, entry -> entry.getValue().offset())); } catch (final TimeoutException | InterruptedException | ExecutionException e) { // if timeout exception gets thrown we just give up this time and retry in the next run loop log.debug("Could not fetch all end offsets for {}, will retry in the next run loop", partitions); diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/internals/MockAdminClient.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/internals/MockAdminClient.java index 754d25b193d75..c134ab30592a6 100644 --- a/streams/test-utils/src/main/java/org/apache/kafka/streams/internals/MockAdminClient.java +++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/internals/MockAdminClient.java @@ -67,8 +67,6 @@ import org.apache.kafka.clients.admin.DescribeTopicsResult; import org.apache.kafka.clients.admin.ElectLeadersOptions; import org.apache.kafka.clients.admin.ElectLeadersResult; -import org.apache.kafka.clients.admin.ElectPreferredLeadersOptions; -import org.apache.kafka.clients.admin.ElectPreferredLeadersResult; import org.apache.kafka.clients.admin.ExpireDelegationTokenOptions; import org.apache.kafka.clients.admin.ExpireDelegationTokenResult; import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsOptions; @@ -120,13 +118,14 @@ public class MockAdminClient extends AdminClient { private final Map endOffsets = new HashMap<>(); @Override - synchronized public ListOffsetsResult listOffsets(Map topicPartitionOffsets, ListOffsetsOptions options) { - Map> futures = new HashMap<>(); + synchronized public ListOffsetsResult listOffsets(final Map topicPartitionOffsets, + final ListOffsetsOptions options) { + final Map> futures = new HashMap<>(); - for (Map.Entry entry : topicPartitionOffsets.entrySet()) { - TopicPartition tp = entry.getKey(); - OffsetSpec spec = entry.getValue(); - KafkaFutureImpl future = new KafkaFutureImpl<>(); + for (final Map.Entry entry : topicPartitionOffsets.entrySet()) { + final TopicPartition tp = entry.getKey(); + final OffsetSpec spec = entry.getValue(); + final KafkaFutureImpl future = new KafkaFutureImpl<>(); if (spec instanceof OffsetSpec.EarliestSpec) future.complete(new ListOffsetsResult.ListOffsetsResultInfo(beginningOffsets.get(tp), -1, Optional.empty())); @@ -141,7 +140,7 @@ else if (spec instanceof OffsetSpec.LatestSpec) return new ListOffsetsResult(futures); } - public synchronized void updateBeginningOffsets(Map newOffsets) { + public synchronized void updateBeginningOffsets(final Map newOffsets) { beginningOffsets.putAll(newOffsets); } @@ -152,187 +151,186 @@ public synchronized void updateEndOffsets(final Map newOff // ----------- APIs below are not used by TTD -------- @Override - synchronized public DescribeClusterResult describeCluster(DescribeClusterOptions options) { + synchronized public DescribeClusterResult describeCluster(final DescribeClusterOptions options) { throw new UnsupportedOperationException("Not implemented yet"); } @Override - synchronized public CreateTopicsResult createTopics(Collection newTopics, CreateTopicsOptions options) { + synchronized public CreateTopicsResult createTopics(final Collection newTopics, final CreateTopicsOptions options) { throw new UnsupportedOperationException("Not implemented yet"); } @Override - synchronized public ListTopicsResult listTopics(ListTopicsOptions options) { + synchronized public ListTopicsResult listTopics(final ListTopicsOptions options) { throw new UnsupportedOperationException("Not implemented yet"); } @Override - synchronized public DescribeTopicsResult describeTopics(Collection topicNames, DescribeTopicsOptions options) { + synchronized public DescribeTopicsResult describeTopics(final Collection topicNames, final DescribeTopicsOptions options) { throw new UnsupportedOperationException("Not implemented yet"); } @Override - synchronized public DeleteTopicsResult deleteTopics(Collection topicsToDelete, DeleteTopicsOptions options) { + synchronized public DeleteTopicsResult deleteTopics(final Collection topicsToDelete, final DeleteTopicsOptions options) { throw new UnsupportedOperationException("Not implemented yet"); } @Override - synchronized public CreatePartitionsResult createPartitions(Map newPartitions, CreatePartitionsOptions options) { + synchronized public CreatePartitionsResult createPartitions(final Map newPartitions, + final CreatePartitionsOptions options) { throw new UnsupportedOperationException("Not implemented yet"); } @Override - synchronized public DeleteRecordsResult deleteRecords(Map recordsToDelete, DeleteRecordsOptions options) { + synchronized public DeleteRecordsResult deleteRecords(final Map recordsToDelete, + final DeleteRecordsOptions options) { throw new UnsupportedOperationException("Not implemented yet"); } @Override - synchronized public CreateDelegationTokenResult createDelegationToken(CreateDelegationTokenOptions options) { + synchronized public CreateDelegationTokenResult createDelegationToken(final CreateDelegationTokenOptions options) { throw new UnsupportedOperationException("Not implemented yet"); } @Override - synchronized public RenewDelegationTokenResult renewDelegationToken(byte[] hmac, RenewDelegationTokenOptions options) { + synchronized public RenewDelegationTokenResult renewDelegationToken(final byte[] hmac, final RenewDelegationTokenOptions options) { throw new UnsupportedOperationException("Not implemented yet"); } @Override - synchronized public ExpireDelegationTokenResult expireDelegationToken(byte[] hmac, ExpireDelegationTokenOptions options) { + synchronized public ExpireDelegationTokenResult expireDelegationToken(final byte[] hmac, final ExpireDelegationTokenOptions options) { throw new UnsupportedOperationException("Not implemented yet"); } @Override - synchronized public DescribeDelegationTokenResult describeDelegationToken(DescribeDelegationTokenOptions options) { + synchronized public DescribeDelegationTokenResult describeDelegationToken(final DescribeDelegationTokenOptions options) { throw new UnsupportedOperationException("Not implemented yet"); } @Override - synchronized public DescribeConsumerGroupsResult describeConsumerGroups(Collection groupIds, DescribeConsumerGroupsOptions options) { + synchronized public DescribeConsumerGroupsResult describeConsumerGroups(final Collection groupIds, final DescribeConsumerGroupsOptions options) { throw new UnsupportedOperationException("Not implemented yet"); } @Override - synchronized public ListConsumerGroupsResult listConsumerGroups(ListConsumerGroupsOptions options) { + synchronized public ListConsumerGroupsResult listConsumerGroups(final ListConsumerGroupsOptions options) { throw new UnsupportedOperationException("Not implemented yet"); } @Override - synchronized public ListConsumerGroupOffsetsResult listConsumerGroupOffsets(String groupId, ListConsumerGroupOffsetsOptions options) { + synchronized public ListConsumerGroupOffsetsResult listConsumerGroupOffsets(final String groupId, final ListConsumerGroupOffsetsOptions options) { throw new UnsupportedOperationException("Not implemented yet"); } @Override - synchronized public DeleteConsumerGroupsResult deleteConsumerGroups(Collection groupIds, DeleteConsumerGroupsOptions options) { + synchronized public DeleteConsumerGroupsResult deleteConsumerGroups(final Collection groupIds, final DeleteConsumerGroupsOptions options) { throw new UnsupportedOperationException("Not implemented yet"); } @Override - synchronized public DeleteConsumerGroupOffsetsResult deleteConsumerGroupOffsets(String groupId, Set partitions, DeleteConsumerGroupOffsetsOptions options) { - throw new UnsupportedOperationException("Not implemented yet"); - } - - @Deprecated - @Override - synchronized public ElectPreferredLeadersResult electPreferredLeaders(Collection partitions, ElectPreferredLeadersOptions options) { + synchronized public DeleteConsumerGroupOffsetsResult deleteConsumerGroupOffsets(final String groupId, + final Set partitions, + final DeleteConsumerGroupOffsetsOptions options) { throw new UnsupportedOperationException("Not implemented yet"); } @Override synchronized public ElectLeadersResult electLeaders( - ElectionType electionType, - Set partitions, - ElectLeadersOptions options) { + final ElectionType electionType, + final Set partitions, + final ElectLeadersOptions options) { throw new UnsupportedOperationException("Not implemented yet"); } @Override - synchronized public RemoveMembersFromConsumerGroupResult removeMembersFromConsumerGroup(String groupId, RemoveMembersFromConsumerGroupOptions options) { + synchronized public RemoveMembersFromConsumerGroupResult removeMembersFromConsumerGroup(final String groupId, final RemoveMembersFromConsumerGroupOptions options) { throw new UnsupportedOperationException("Not implemented yet"); } @Override - synchronized public CreateAclsResult createAcls(Collection acls, CreateAclsOptions options) { + synchronized public CreateAclsResult createAcls(final Collection acls, final CreateAclsOptions options) { throw new UnsupportedOperationException("Not implemented yet"); } @Override - synchronized public DescribeAclsResult describeAcls(AclBindingFilter filter, DescribeAclsOptions options) { + synchronized public DescribeAclsResult describeAcls(final AclBindingFilter filter, final DescribeAclsOptions options) { throw new UnsupportedOperationException("Not implemented yet"); } @Override - synchronized public DeleteAclsResult deleteAcls(Collection filters, DeleteAclsOptions options) { + synchronized public DeleteAclsResult deleteAcls(final Collection filters, final DeleteAclsOptions options) { throw new UnsupportedOperationException("Not implemented yet"); } @Override - synchronized public DescribeConfigsResult describeConfigs(Collection resources, DescribeConfigsOptions options) { + synchronized public DescribeConfigsResult describeConfigs(final Collection resources, final DescribeConfigsOptions options) { throw new UnsupportedOperationException("Not implemented yet"); } @Override @Deprecated - synchronized public AlterConfigsResult alterConfigs(Map configs, AlterConfigsOptions options) { + synchronized public AlterConfigsResult alterConfigs(final Map configs, final AlterConfigsOptions options) { throw new UnsupportedOperationException("Not implemented yet"); } @Override - synchronized public AlterConfigsResult incrementalAlterConfigs( - Map> configs, - AlterConfigsOptions options) { + synchronized public AlterConfigsResult incrementalAlterConfigs(final Map> configs, + final AlterConfigsOptions options) { throw new UnsupportedOperationException("Not implemented yet"); } @Override synchronized public AlterReplicaLogDirsResult alterReplicaLogDirs( - Map replicaAssignment, - AlterReplicaLogDirsOptions options) { + final Map replicaAssignment, + final AlterReplicaLogDirsOptions options) { throw new UnsupportedOperationException("Not implemented yet"); } @Override - synchronized public DescribeLogDirsResult describeLogDirs(Collection brokers, - DescribeLogDirsOptions options) { + synchronized public DescribeLogDirsResult describeLogDirs(final Collection brokers, + final DescribeLogDirsOptions options) { throw new UnsupportedOperationException("Not implemented yet"); } @Override - synchronized public DescribeReplicaLogDirsResult describeReplicaLogDirs(Collection replicas, - DescribeReplicaLogDirsOptions options) { + synchronized public DescribeReplicaLogDirsResult describeReplicaLogDirs(final Collection replicas, + final DescribeReplicaLogDirsOptions options) { throw new UnsupportedOperationException("Not implemented yet"); } @Override synchronized public AlterPartitionReassignmentsResult alterPartitionReassignments( - Map> newReassignments, - AlterPartitionReassignmentsOptions options) { + final Map> newReassignments, + final AlterPartitionReassignmentsOptions options) { throw new UnsupportedOperationException("Not implemented yet"); } @Override synchronized public ListPartitionReassignmentsResult listPartitionReassignments( - Optional> partitions, - ListPartitionReassignmentsOptions options) { + final Optional> partitions, + final ListPartitionReassignmentsOptions options) { throw new UnsupportedOperationException("Not implemented yet"); } @Override - synchronized public AlterConsumerGroupOffsetsResult alterConsumerGroupOffsets(String groupId, Map offsets, AlterConsumerGroupOffsetsOptions options) { + synchronized public AlterConsumerGroupOffsetsResult alterConsumerGroupOffsets(final String groupId, + final Map offsets, + final AlterConsumerGroupOffsetsOptions options) { throw new UnsupportedOperationException("Not implement yet"); } @Override - public DescribeClientQuotasResult describeClientQuotas(ClientQuotaFilter filter, DescribeClientQuotasOptions options) { + public DescribeClientQuotasResult describeClientQuotas(final ClientQuotaFilter filter, final DescribeClientQuotasOptions options) { throw new UnsupportedOperationException("Not implement yet"); } @Override - public AlterClientQuotasResult alterClientQuotas(Collection entries, AlterClientQuotasOptions options) { + public AlterClientQuotasResult alterClientQuotas(final Collection entries, final AlterClientQuotasOptions options) { throw new UnsupportedOperationException("Not implement yet"); } @Override - synchronized public void close(Duration timeout) {} + synchronized public void close(final Duration timeout) {} @Override synchronized public Map metrics() { From 6934cd6209d2db22d9ebc4a1fe8e0d296aac0072 Mon Sep 17 00:00:00 2001 From: Guozhang Wang Date: Wed, 17 Jun 2020 14:02:31 -0700 Subject: [PATCH 6/8] github comments --- .../streams/processor/internals/StoreChangelogReader.java | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java index 060e2a6c3cfd1..c6ee4d3fbbe48 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java @@ -17,12 +17,14 @@ package org.apache.kafka.streams.processor.internals; import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.ListOffsetsOptions; import org.apache.kafka.clients.admin.ListOffsetsResult; import org.apache.kafka.clients.admin.OffsetSpec; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.InvalidOffsetException; +import org.apache.kafka.common.IsolationLevel; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.TimeoutException; @@ -574,8 +576,10 @@ private Map endOffsetForChangelogs(final Set OffsetSpec.latest()))); + final ListOffsetsResult result = adminClient.listOffsets( + partitions.stream().collect(Collectors.toMap(Function.identity(), tp -> OffsetSpec.latest())), + new ListOffsetsOptions(IsolationLevel.READ_UNCOMMITTED) + ); return result.all().get().entrySet().stream().collect( Collectors.toMap(Map.Entry::getKey, entry -> entry.getValue().offset())); } catch (final TimeoutException | InterruptedException | ExecutionException e) { From ebe86f98c385eacf6112290880b71a8b725f9c96 Mon Sep 17 00:00:00 2001 From: Guozhang Wang Date: Wed, 17 Jun 2020 18:11:49 -0700 Subject: [PATCH 7/8] not use changelog at all --- .../internals/ChangelogRegister.java | 2 +- .../kafka/streams/TopologyTestDriver.java | 63 +--- .../streams/internals/MockAdminClient.java | 339 ------------------ 3 files changed, 17 insertions(+), 387 deletions(-) delete mode 100644 streams/test-utils/src/main/java/org/apache/kafka/streams/internals/MockAdminClient.java diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogRegister.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogRegister.java index 7403aad57f0db..3f5f977db7549 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogRegister.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogRegister.java @@ -22,7 +22,7 @@ /** * See {@link StoreChangelogReader}. */ -interface ChangelogRegister { +public interface ChangelogRegister { /** * Register a state store for restoration. * diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java index 47c26e3930985..7615612fe16c7 100644 --- a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java +++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java @@ -43,7 +43,6 @@ import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler; import org.apache.kafka.streams.errors.TopologyException; import org.apache.kafka.streams.internals.KeyValueStoreFacade; -import org.apache.kafka.streams.internals.MockAdminClient; import org.apache.kafka.streams.internals.QuietStreamsConfig; import org.apache.kafka.streams.internals.WindowStoreFacade; import org.apache.kafka.streams.processor.ProcessorContext; @@ -52,6 +51,7 @@ import org.apache.kafka.streams.processor.StateRestoreListener; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.internals.ChangelogRegister; import org.apache.kafka.streams.processor.internals.GlobalProcessorContextImpl; import org.apache.kafka.streams.processor.internals.GlobalStateManager; import org.apache.kafka.streams.processor.internals.GlobalStateManagerImpl; @@ -65,7 +65,6 @@ import org.apache.kafka.streams.processor.internals.RecordCollector; import org.apache.kafka.streams.processor.internals.RecordCollectorImpl; import org.apache.kafka.streams.processor.internals.StateDirectory; -import org.apache.kafka.streams.processor.internals.StoreChangelogReader; import org.apache.kafka.streams.processor.internals.StreamTask; import org.apache.kafka.streams.processor.internals.StreamThread; import org.apache.kafka.streams.processor.internals.Task; @@ -91,7 +90,6 @@ import java.io.IOException; import java.time.Duration; import java.time.Instant; -import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -475,13 +473,7 @@ private void setupTask(final StreamsConfig streamsConfig, StreamsConfig.EXACTLY_ONCE.equals(streamsConfig.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG)), logContext, stateDirectory, - new StoreChangelogReader( - mockWallClockTime, - streamsConfig, - logContext, - createAdminClient(processorTopology.storeToChangelogTopic()), - createRestoreConsumer(processorTopology.storeToChangelogTopic()), - stateRestoreListener), + new MockChangelogRegister(), processorTopology.storeToChangelogTopic(), new HashSet<>(partitionsByInputTopic.values()) ); @@ -1204,6 +1196,20 @@ public void close() { stateDirectory.clean(); } + static class MockChangelogRegister implements ChangelogRegister { + private final Set restoringPartitions = new HashSet<>(); + + @Override + public void register(final TopicPartition partition, final ProcessorStateManager stateManager) { + restoringPartitions.add(partition); + } + + @Override + public void unregister(final Collection partitions) { + restoringPartitions.removeAll(partitions); + } + } + static class MockTime implements Time { private final AtomicLong timeMs; private final AtomicLong highResTimeNs; @@ -1242,41 +1248,4 @@ public void waitObject(final Object obj, final Supplier condition, fina throw new UnsupportedOperationException(); } } - - private MockConsumer createRestoreConsumer(final Map storeToChangelogTopic) { - final MockConsumer consumer = new MockConsumer(OffsetResetStrategy.LATEST) { - @Override - public synchronized void seekToEnd(final Collection partitions) {} - - @Override - public synchronized void seekToBeginning(final Collection partitions) {} - - @Override - public synchronized long position(final TopicPartition partition) { - return 0L; - } - }; - - // for each store - for (final Map.Entry storeAndTopic : storeToChangelogTopic.entrySet()) { - final String topicName = storeAndTopic.getValue(); - // Set up the restore-state topic ... - // consumer.subscribe(new TopicPartition(topicName, 0)); - // Set up the partition that matches the ID (which is what ProcessorStateManager expects) ... - final List partitionInfos = new ArrayList<>(); - partitionInfos.add(new PartitionInfo(topicName, PARTITION_ID, null, null, null)); - consumer.updatePartitions(topicName, partitionInfos); - consumer.updateEndOffsets(Collections.singletonMap(new TopicPartition(topicName, PARTITION_ID), 0L)); - } - return consumer; - } - - private MockAdminClient createAdminClient(final Map storeToChangelogTopic) { - final MockAdminClient adminClient = new MockAdminClient(); - for (final Map.Entry storeAndTopic : storeToChangelogTopic.entrySet()) { - final String topicName = storeAndTopic.getValue(); - adminClient.updateEndOffsets(Collections.singletonMap(new TopicPartition(topicName, PARTITION_ID), 0L)); - } - return adminClient; - } } diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/internals/MockAdminClient.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/internals/MockAdminClient.java deleted file mode 100644 index c134ab30592a6..0000000000000 --- a/streams/test-utils/src/main/java/org/apache/kafka/streams/internals/MockAdminClient.java +++ /dev/null @@ -1,339 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.streams.internals; - -import org.apache.kafka.clients.admin.AdminClient; -import org.apache.kafka.clients.admin.AlterClientQuotasOptions; -import org.apache.kafka.clients.admin.AlterClientQuotasResult; -import org.apache.kafka.clients.admin.AlterConfigOp; -import org.apache.kafka.clients.admin.AlterConfigsOptions; -import org.apache.kafka.clients.admin.AlterConfigsResult; -import org.apache.kafka.clients.admin.AlterConsumerGroupOffsetsOptions; -import org.apache.kafka.clients.admin.AlterConsumerGroupOffsetsResult; -import org.apache.kafka.clients.admin.AlterPartitionReassignmentsOptions; -import org.apache.kafka.clients.admin.AlterPartitionReassignmentsResult; -import org.apache.kafka.clients.admin.AlterReplicaLogDirsOptions; -import org.apache.kafka.clients.admin.AlterReplicaLogDirsResult; -import org.apache.kafka.clients.admin.Config; -import org.apache.kafka.clients.admin.CreateAclsOptions; -import org.apache.kafka.clients.admin.CreateAclsResult; -import org.apache.kafka.clients.admin.CreateDelegationTokenOptions; -import org.apache.kafka.clients.admin.CreateDelegationTokenResult; -import org.apache.kafka.clients.admin.CreatePartitionsOptions; -import org.apache.kafka.clients.admin.CreatePartitionsResult; -import org.apache.kafka.clients.admin.CreateTopicsOptions; -import org.apache.kafka.clients.admin.CreateTopicsResult; -import org.apache.kafka.clients.admin.DeleteAclsOptions; -import org.apache.kafka.clients.admin.DeleteAclsResult; -import org.apache.kafka.clients.admin.DeleteConsumerGroupOffsetsOptions; -import org.apache.kafka.clients.admin.DeleteConsumerGroupOffsetsResult; -import org.apache.kafka.clients.admin.DeleteConsumerGroupsOptions; -import org.apache.kafka.clients.admin.DeleteConsumerGroupsResult; -import org.apache.kafka.clients.admin.DeleteRecordsOptions; -import org.apache.kafka.clients.admin.DeleteRecordsResult; -import org.apache.kafka.clients.admin.DeleteTopicsOptions; -import org.apache.kafka.clients.admin.DeleteTopicsResult; -import org.apache.kafka.clients.admin.DescribeAclsOptions; -import org.apache.kafka.clients.admin.DescribeAclsResult; -import org.apache.kafka.clients.admin.DescribeClientQuotasOptions; -import org.apache.kafka.clients.admin.DescribeClientQuotasResult; -import org.apache.kafka.clients.admin.DescribeClusterOptions; -import org.apache.kafka.clients.admin.DescribeClusterResult; -import org.apache.kafka.clients.admin.DescribeConfigsOptions; -import org.apache.kafka.clients.admin.DescribeConfigsResult; -import org.apache.kafka.clients.admin.DescribeConsumerGroupsOptions; -import org.apache.kafka.clients.admin.DescribeConsumerGroupsResult; -import org.apache.kafka.clients.admin.DescribeDelegationTokenOptions; -import org.apache.kafka.clients.admin.DescribeDelegationTokenResult; -import org.apache.kafka.clients.admin.DescribeLogDirsOptions; -import org.apache.kafka.clients.admin.DescribeLogDirsResult; -import org.apache.kafka.clients.admin.DescribeReplicaLogDirsOptions; -import org.apache.kafka.clients.admin.DescribeReplicaLogDirsResult; -import org.apache.kafka.clients.admin.DescribeTopicsOptions; -import org.apache.kafka.clients.admin.DescribeTopicsResult; -import org.apache.kafka.clients.admin.ElectLeadersOptions; -import org.apache.kafka.clients.admin.ElectLeadersResult; -import org.apache.kafka.clients.admin.ExpireDelegationTokenOptions; -import org.apache.kafka.clients.admin.ExpireDelegationTokenResult; -import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsOptions; -import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsResult; -import org.apache.kafka.clients.admin.ListConsumerGroupsOptions; -import org.apache.kafka.clients.admin.ListConsumerGroupsResult; -import org.apache.kafka.clients.admin.ListOffsetsOptions; -import org.apache.kafka.clients.admin.ListOffsetsResult; -import org.apache.kafka.clients.admin.ListPartitionReassignmentsOptions; -import org.apache.kafka.clients.admin.ListPartitionReassignmentsResult; -import org.apache.kafka.clients.admin.ListTopicsOptions; -import org.apache.kafka.clients.admin.ListTopicsResult; -import org.apache.kafka.clients.admin.NewPartitionReassignment; -import org.apache.kafka.clients.admin.NewPartitions; -import org.apache.kafka.clients.admin.NewTopic; -import org.apache.kafka.clients.admin.OffsetSpec; -import org.apache.kafka.clients.admin.RecordsToDelete; -import org.apache.kafka.clients.admin.RemoveMembersFromConsumerGroupOptions; -import org.apache.kafka.clients.admin.RemoveMembersFromConsumerGroupResult; -import org.apache.kafka.clients.admin.RenewDelegationTokenOptions; -import org.apache.kafka.clients.admin.RenewDelegationTokenResult; -import org.apache.kafka.common.KafkaFuture; -import org.apache.kafka.common.Metric; -import org.apache.kafka.common.MetricName; -import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.clients.consumer.OffsetAndMetadata; -import org.apache.kafka.common.ElectionType; -import org.apache.kafka.common.TopicPartitionReplica; -import org.apache.kafka.common.acl.AclBinding; -import org.apache.kafka.common.acl.AclBindingFilter; -import org.apache.kafka.common.config.ConfigResource; -import org.apache.kafka.common.internals.KafkaFutureImpl; -import org.apache.kafka.common.quota.ClientQuotaAlteration; -import org.apache.kafka.common.quota.ClientQuotaFilter; - -import java.time.Duration; -import java.util.Collection; -import java.util.HashMap; -import java.util.Map; -import java.util.Optional; -import java.util.Set; - -/** - * Minimum admin client mock implementation needed for TTD - */ -public class MockAdminClient extends AdminClient { - - private final Map beginningOffsets = new HashMap<>(); - private final Map endOffsets = new HashMap<>(); - - @Override - synchronized public ListOffsetsResult listOffsets(final Map topicPartitionOffsets, - final ListOffsetsOptions options) { - final Map> futures = new HashMap<>(); - - for (final Map.Entry entry : topicPartitionOffsets.entrySet()) { - final TopicPartition tp = entry.getKey(); - final OffsetSpec spec = entry.getValue(); - final KafkaFutureImpl future = new KafkaFutureImpl<>(); - - if (spec instanceof OffsetSpec.EarliestSpec) - future.complete(new ListOffsetsResult.ListOffsetsResultInfo(beginningOffsets.get(tp), -1, Optional.empty())); - else if (spec instanceof OffsetSpec.LatestSpec) - future.complete(new ListOffsetsResult.ListOffsetsResultInfo(endOffsets.get(tp), -1, Optional.empty())); - else - throw new UnsupportedOperationException("Not implement yet"); - - futures.put(tp, future); - } - - return new ListOffsetsResult(futures); - } - - public synchronized void updateBeginningOffsets(final Map newOffsets) { - beginningOffsets.putAll(newOffsets); - } - - public synchronized void updateEndOffsets(final Map newOffsets) { - endOffsets.putAll(newOffsets); - } - - // ----------- APIs below are not used by TTD -------- - - @Override - synchronized public DescribeClusterResult describeCluster(final DescribeClusterOptions options) { - throw new UnsupportedOperationException("Not implemented yet"); - } - - @Override - synchronized public CreateTopicsResult createTopics(final Collection newTopics, final CreateTopicsOptions options) { - throw new UnsupportedOperationException("Not implemented yet"); - } - - @Override - synchronized public ListTopicsResult listTopics(final ListTopicsOptions options) { - throw new UnsupportedOperationException("Not implemented yet"); - } - - @Override - synchronized public DescribeTopicsResult describeTopics(final Collection topicNames, final DescribeTopicsOptions options) { - throw new UnsupportedOperationException("Not implemented yet"); - } - - @Override - synchronized public DeleteTopicsResult deleteTopics(final Collection topicsToDelete, final DeleteTopicsOptions options) { - throw new UnsupportedOperationException("Not implemented yet"); - } - - @Override - synchronized public CreatePartitionsResult createPartitions(final Map newPartitions, - final CreatePartitionsOptions options) { - throw new UnsupportedOperationException("Not implemented yet"); - } - - @Override - synchronized public DeleteRecordsResult deleteRecords(final Map recordsToDelete, - final DeleteRecordsOptions options) { - throw new UnsupportedOperationException("Not implemented yet"); - } - - @Override - synchronized public CreateDelegationTokenResult createDelegationToken(final CreateDelegationTokenOptions options) { - throw new UnsupportedOperationException("Not implemented yet"); - } - - @Override - synchronized public RenewDelegationTokenResult renewDelegationToken(final byte[] hmac, final RenewDelegationTokenOptions options) { - throw new UnsupportedOperationException("Not implemented yet"); - } - - @Override - synchronized public ExpireDelegationTokenResult expireDelegationToken(final byte[] hmac, final ExpireDelegationTokenOptions options) { - throw new UnsupportedOperationException("Not implemented yet"); - } - - @Override - synchronized public DescribeDelegationTokenResult describeDelegationToken(final DescribeDelegationTokenOptions options) { - throw new UnsupportedOperationException("Not implemented yet"); - } - - @Override - synchronized public DescribeConsumerGroupsResult describeConsumerGroups(final Collection groupIds, final DescribeConsumerGroupsOptions options) { - throw new UnsupportedOperationException("Not implemented yet"); - } - - @Override - synchronized public ListConsumerGroupsResult listConsumerGroups(final ListConsumerGroupsOptions options) { - throw new UnsupportedOperationException("Not implemented yet"); - } - - @Override - synchronized public ListConsumerGroupOffsetsResult listConsumerGroupOffsets(final String groupId, final ListConsumerGroupOffsetsOptions options) { - throw new UnsupportedOperationException("Not implemented yet"); - } - - @Override - synchronized public DeleteConsumerGroupsResult deleteConsumerGroups(final Collection groupIds, final DeleteConsumerGroupsOptions options) { - throw new UnsupportedOperationException("Not implemented yet"); - } - - @Override - synchronized public DeleteConsumerGroupOffsetsResult deleteConsumerGroupOffsets(final String groupId, - final Set partitions, - final DeleteConsumerGroupOffsetsOptions options) { - throw new UnsupportedOperationException("Not implemented yet"); - } - - @Override - synchronized public ElectLeadersResult electLeaders( - final ElectionType electionType, - final Set partitions, - final ElectLeadersOptions options) { - throw new UnsupportedOperationException("Not implemented yet"); - } - - @Override - synchronized public RemoveMembersFromConsumerGroupResult removeMembersFromConsumerGroup(final String groupId, final RemoveMembersFromConsumerGroupOptions options) { - throw new UnsupportedOperationException("Not implemented yet"); - } - - @Override - synchronized public CreateAclsResult createAcls(final Collection acls, final CreateAclsOptions options) { - throw new UnsupportedOperationException("Not implemented yet"); - } - - @Override - synchronized public DescribeAclsResult describeAcls(final AclBindingFilter filter, final DescribeAclsOptions options) { - throw new UnsupportedOperationException("Not implemented yet"); - } - - @Override - synchronized public DeleteAclsResult deleteAcls(final Collection filters, final DeleteAclsOptions options) { - throw new UnsupportedOperationException("Not implemented yet"); - } - - @Override - synchronized public DescribeConfigsResult describeConfigs(final Collection resources, final DescribeConfigsOptions options) { - throw new UnsupportedOperationException("Not implemented yet"); - } - - @Override - @Deprecated - synchronized public AlterConfigsResult alterConfigs(final Map configs, final AlterConfigsOptions options) { - throw new UnsupportedOperationException("Not implemented yet"); - } - - @Override - synchronized public AlterConfigsResult incrementalAlterConfigs(final Map> configs, - final AlterConfigsOptions options) { - throw new UnsupportedOperationException("Not implemented yet"); - } - - @Override - synchronized public AlterReplicaLogDirsResult alterReplicaLogDirs( - final Map replicaAssignment, - final AlterReplicaLogDirsOptions options) { - throw new UnsupportedOperationException("Not implemented yet"); - } - - @Override - synchronized public DescribeLogDirsResult describeLogDirs(final Collection brokers, - final DescribeLogDirsOptions options) { - throw new UnsupportedOperationException("Not implemented yet"); - } - - @Override - synchronized public DescribeReplicaLogDirsResult describeReplicaLogDirs(final Collection replicas, - final DescribeReplicaLogDirsOptions options) { - throw new UnsupportedOperationException("Not implemented yet"); - } - - @Override - synchronized public AlterPartitionReassignmentsResult alterPartitionReassignments( - final Map> newReassignments, - final AlterPartitionReassignmentsOptions options) { - throw new UnsupportedOperationException("Not implemented yet"); - } - - @Override - synchronized public ListPartitionReassignmentsResult listPartitionReassignments( - final Optional> partitions, - final ListPartitionReassignmentsOptions options) { - throw new UnsupportedOperationException("Not implemented yet"); - } - - @Override - synchronized public AlterConsumerGroupOffsetsResult alterConsumerGroupOffsets(final String groupId, - final Map offsets, - final AlterConsumerGroupOffsetsOptions options) { - throw new UnsupportedOperationException("Not implement yet"); - } - - @Override - public DescribeClientQuotasResult describeClientQuotas(final ClientQuotaFilter filter, final DescribeClientQuotasOptions options) { - throw new UnsupportedOperationException("Not implement yet"); - } - - @Override - public AlterClientQuotasResult alterClientQuotas(final Collection entries, final AlterClientQuotasOptions options) { - throw new UnsupportedOperationException("Not implement yet"); - } - - @Override - synchronized public void close(final Duration timeout) {} - - @Override - synchronized public Map metrics() { - throw new UnsupportedOperationException("Not implement yet"); - } -} From 8d80cce0917522e766c0cec0ae93acb4ca90ffdf Mon Sep 17 00:00:00 2001 From: Guozhang Wang Date: Wed, 17 Jun 2020 19:27:19 -0700 Subject: [PATCH 8/8] github comments --- .../processor/internals/StoreChangelogReaderTest.java | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java index 98e76d0e2fcfb..ad16cff17a205 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.streams.processor.internals; +import org.apache.kafka.clients.admin.ListOffsetsOptions; import org.apache.kafka.clients.admin.ListOffsetsResult; import org.apache.kafka.clients.admin.MockAdminClient; import org.apache.kafka.clients.admin.OffsetSpec; @@ -477,9 +478,10 @@ public void shouldRequestEndOffsetsAndHandleTimeoutException() { final MockAdminClient adminClient = new MockAdminClient() { @Override - public ListOffsetsResult listOffsets(final Map topicPartitionOffsets) { + public ListOffsetsResult listOffsets(final Map topicPartitionOffsets, + final ListOffsetsOptions options) { if (functionCalled.get()) { - return super.listOffsets(topicPartitionOffsets); + return super.listOffsets(topicPartitionOffsets, options); } else { functionCalled.set(true); throw new TimeoutException("KABOOM!"); @@ -519,10 +521,12 @@ public void shouldThrowIfEndOffsetsFail() { final MockAdminClient adminClient = new MockAdminClient() { @Override - public ListOffsetsResult listOffsets(final Map topicPartitionOffsets) { + public ListOffsetsResult listOffsets(final Map topicPartitionOffsets, + final ListOffsetsOptions options) { throw kaboom; } }; + adminClient.updateEndOffsets(Collections.singletonMap(tp, 0L)); final StoreChangelogReader changelogReader = new StoreChangelogReader(time, config, logContext, adminClient, consumer, callback);