diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java index 8487268b4b551..d04aa0b85dd1c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java +++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java @@ -18,17 +18,11 @@ import java.util.LinkedList; import java.util.TreeMap; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeoutException; -import java.util.function.Function; -import java.util.stream.Collectors; import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo; -import org.apache.kafka.clients.admin.OffsetSpec; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; -import org.apache.kafka.common.KafkaFuture; import org.apache.kafka.common.Metric; import org.apache.kafka.common.MetricName; import org.apache.kafka.common.TopicPartition; @@ -54,6 +48,7 @@ import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.StreamPartitioner; import org.apache.kafka.streams.processor.ThreadMetadata; +import org.apache.kafka.streams.processor.internals.ClientUtils; import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier; import org.apache.kafka.streams.processor.internals.GlobalStreamThread; import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder; @@ -93,6 +88,7 @@ import static org.apache.kafka.streams.StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG; import static org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFailMsgPrefix; +import static org.apache.kafka.streams.processor.internals.ClientUtils.fetchEndOffsetsWithoutTimeout; /** * A Kafka client that allows for performing continuous computation on input coming from one or more input topics and @@ -743,7 +739,7 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder, } // use client id instead of thread client id since this admin client may be shared among threads - adminClient = clientSupplier.getAdmin(config.getAdminConfigs(StreamThread.getSharedAdminClientId(clientId))); + adminClient = clientSupplier.getAdmin(config.getAdminConfigs(ClientUtils.getSharedAdminClientId(clientId))); final Map threadState = new HashMap<>(threads.length); final ArrayList storeProviders = new ArrayList<>(); @@ -1238,28 +1234,4 @@ public Map> allLocalStorePartitionLags() { return Collections.unmodifiableMap(localStorePartitionLags); } - - static Map fetchEndOffsetsWithoutTimeout(final Collection partitions, - final Admin adminClient) { - return fetchEndOffsets(partitions, adminClient, null); - } - - public static Map fetchEndOffsets(final Collection partitions, - final Admin adminClient, - final Duration timeout) { - final Map endOffsets; - try { - final KafkaFuture> future = adminClient.listOffsets( - partitions.stream().collect(Collectors.toMap(Function.identity(), tp -> OffsetSpec.latest()))) - .all(); - if (timeout == null) { - endOffsets = future.get(); - } else { - endOffsets = future.get(timeout.toMillis(), TimeUnit.MILLISECONDS); - } - } catch (final TimeoutException | RuntimeException | InterruptedException | ExecutionException e) { - throw new StreamsException("Unable to obtain end offsets from kafka", e); - } - return endOffsets; - } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java index c8c2aea2f0107..60ace63f9a6c7 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java @@ -37,13 +37,15 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; -import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Set; import java.util.UUID; import java.util.stream.Collectors; +import static org.apache.kafka.streams.processor.internals.ClientUtils.getTaskProducerClientId; +import static org.apache.kafka.streams.processor.internals.ClientUtils.getThreadProducerClientId; + class ActiveTaskCreator { private final InternalTopologyBuilder builder; private final StreamsConfig config; @@ -61,14 +63,6 @@ class ActiveTaskCreator { private final Map taskProducers; private final StreamThread.ProcessingMode processingMode; - private static String getThreadProducerClientId(final String threadClientId) { - return threadClientId + "-producer"; - } - - private static String getTaskProducerClientId(final String threadClientId, final TaskId taskId) { - return threadClientId + "-" + taskId + "-producer"; - } - ActiveTaskCreator(final InternalTopologyBuilder builder, final StreamsConfig config, final StreamThread.ProcessingMode processingMode, @@ -227,22 +221,13 @@ void closeAndRemoveTaskProducerIfNeeded(final TaskId id) { } Map producerMetrics() { - final Map result = new LinkedHashMap<>(); - if (threadProducer != null) { - final Map producerMetrics = threadProducer.kafkaProducer().metrics(); - if (producerMetrics != null) { - result.putAll(producerMetrics); - } - } else { - // When EOS is turned on, each task will have its own producer client - // and the producer object passed in here will be null. We would then iterate through - // all the active tasks and add their metrics to the output metrics map. - for (final Map.Entry entry : taskProducers.entrySet()) { - final Map taskProducerMetrics = entry.getValue().kafkaProducer().metrics(); - result.putAll(taskProducerMetrics); - } - } - return result; + // When EOS is turned on, each task will have its own producer client + // and the producer object passed in here will be null. We would then iterate through + // all the active tasks and add their metrics to the output metrics map. + final Collection producers = threadProducer != null ? + Collections.singleton(threadProducer) : + taskProducers.values(); + return ClientUtils.producerMetrics(producers); } Set producerClientIds() { diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ClientUtils.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ClientUtils.java new file mode 100644 index 0000000000000..21a750d898bad --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ClientUtils.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.internals; + +import java.time.Duration; +import java.util.Collection; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.function.Function; +import java.util.stream.Collectors; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo; +import org.apache.kafka.clients.admin.OffsetSpec; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.common.KafkaFuture; +import org.apache.kafka.common.Metric; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.streams.errors.StreamsException; +import org.apache.kafka.streams.processor.TaskId; + +public class ClientUtils { + + // currently admin client is shared among all threads + public static String getSharedAdminClientId(final String clientId) { + return clientId + "-admin"; + } + + public static String getConsumerClientId(final String threadClientId) { + return threadClientId + "-consumer"; + } + + public static String getRestoreConsumerClientId(final String threadClientId) { + return threadClientId + "-restore-consumer"; + } + + public static String getThreadProducerClientId(final String threadClientId) { + return threadClientId + "-producer"; + } + + public static String getTaskProducerClientId(final String threadClientId, final TaskId taskId) { + return threadClientId + "-" + taskId + "-producer"; + } + + public static Map consumerMetrics(final Consumer mainConsumer, + final Consumer restoreConsumer) { + final Map consumerMetrics = mainConsumer.metrics(); + final Map restoreConsumerMetrics = restoreConsumer.metrics(); + final LinkedHashMap result = new LinkedHashMap<>(); + result.putAll(consumerMetrics); + result.putAll(restoreConsumerMetrics); + return result; + } + + public static Map adminClientMetrics(final Admin adminClient) { + final Map adminClientMetrics = adminClient.metrics(); + return new LinkedHashMap<>(adminClientMetrics); + } + + public static Map producerMetrics(final Collection producers) { + final Map result = new LinkedHashMap<>(); + for (final StreamsProducer producer : producers) { + final Map producerMetrics = producer.kafkaProducer().metrics(); + if (producerMetrics != null) { + result.putAll(producerMetrics); + } + } + return result; + } + + public static Map fetchEndOffsetsWithoutTimeout(final Collection partitions, + final Admin adminClient) { + return fetchEndOffsets(partitions, adminClient, null); + } + + public static Map fetchEndOffsets(final Collection partitions, + final Admin adminClient, + final Duration timeout) { + final Map endOffsets; + try { + final KafkaFuture> future = adminClient.listOffsets( + partitions.stream().collect(Collectors.toMap(Function.identity(), tp -> OffsetSpec.latest()))) + .all(); + if (timeout == null) { + endOffsets = future.get(); + } else { + endOffsets = future.get(timeout.toMillis(), TimeUnit.MILLISECONDS); + } + } catch (final TimeoutException | RuntimeException | InterruptedException | ExecutionException e) { + throw new StreamsException("Unable to obtain end offsets from kafka", e); + } + return endOffsets; + } +} diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index cfaeb608f2cc9..5931572b33137 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -50,7 +50,6 @@ import java.util.Arrays; import java.util.Collections; import java.util.HashSet; -import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Set; @@ -59,11 +58,12 @@ import static org.apache.kafka.streams.StreamsConfig.EXACTLY_ONCE; import static org.apache.kafka.streams.StreamsConfig.EXACTLY_ONCE_BETA; +import static org.apache.kafka.streams.processor.internals.ClientUtils.getConsumerClientId; +import static org.apache.kafka.streams.processor.internals.ClientUtils.getRestoreConsumerClientId; +import static org.apache.kafka.streams.processor.internals.ClientUtils.getSharedAdminClientId; public class StreamThread extends Thread { - private final Admin adminClient; - /** * Stream thread states are the possible states that a stream thread can be in. * A thread must only be in one state at a time @@ -270,6 +270,7 @@ int getAssignmentErrorCode() { private volatile ThreadMetadata threadMetadata; private StreamThread.StateListener stateListener; + private final Admin adminClient; private final ChangelogReader changelogReader; // package-private for testing @@ -482,19 +483,6 @@ private InternalConsumerConfig(final Map props) { } } - private static String getConsumerClientId(final String threadClientId) { - return threadClientId + "-consumer"; - } - - private static String getRestoreConsumerClientId(final String threadClientId) { - return threadClientId + "-restore-consumer"; - } - - // currently admin client is shared among all threads - public static String getSharedAdminClientId(final String clientId) { - return clientId + "-admin"; - } - /** * Execute the stream processors * @@ -983,17 +971,11 @@ public Map producerMetrics() { } public Map consumerMetrics() { - final Map consumerMetrics = mainConsumer.metrics(); - final Map restoreConsumerMetrics = restoreConsumer.metrics(); - final LinkedHashMap result = new LinkedHashMap<>(); - result.putAll(consumerMetrics); - result.putAll(restoreConsumerMetrics); - return result; + return ClientUtils.consumerMetrics(mainConsumer, restoreConsumer); } public Map adminClientMetrics() { - final Map adminClientMetrics = adminClient.metrics(); - return new LinkedHashMap<>(adminClientMetrics); + return ClientUtils.adminClientMetrics(adminClient); } // the following are for testing only @@ -1008,4 +990,5 @@ TaskManager taskManager() { int currentNumIterations() { return numIterations; } + } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java index 2d1d3de8c0899..c3532a4aab641 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java @@ -66,7 +66,7 @@ import java.util.stream.Collectors; import static java.util.UUID.randomUUID; -import static org.apache.kafka.streams.KafkaStreams.fetchEndOffsets; +import static org.apache.kafka.streams.processor.internals.ClientUtils.fetchEndOffsets; import static org.apache.kafka.streams.processor.internals.assignment.StreamsAssignmentProtocolVersions.EARLIEST_PROBEABLE_VERSION; import static org.apache.kafka.streams.processor.internals.assignment.StreamsAssignmentProtocolVersions.LATEST_SUPPORTED_VERSION; import static org.apache.kafka.streams.processor.internals.assignment.StreamsAssignmentProtocolVersions.UNKNOWN; diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java index a5e0e32957a84..8805ed10e5521 100644 --- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java @@ -16,17 +16,13 @@ */ package org.apache.kafka.streams; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeoutException; import org.apache.kafka.clients.admin.Admin; -import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.ListOffsetsResult; import org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo; import org.apache.kafka.clients.admin.MockAdminClient; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.producer.MockProducer; import org.apache.kafka.common.Cluster; -import org.apache.kafka.common.KafkaFuture; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.internals.KafkaFutureImpl; import org.apache.kafka.common.metrics.MetricConfig; @@ -37,7 +33,6 @@ import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Time; -import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.internals.metrics.ClientMetrics; import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.processor.AbstractProcessor; @@ -88,21 +83,16 @@ import static java.util.Collections.emptyList; import static java.util.Collections.singletonList; -import static org.apache.kafka.streams.KafkaStreams.fetchEndOffsets; -import static org.apache.kafka.streams.KafkaStreams.fetchEndOffsetsWithoutTimeout; import static org.easymock.EasyMock.anyInt; import static org.easymock.EasyMock.anyLong; import static org.easymock.EasyMock.anyObject; import static org.easymock.EasyMock.anyString; import static org.easymock.EasyMock.capture; -import static org.easymock.EasyMock.replay; -import static org.easymock.EasyMock.verify; import static org.hamcrest.CoreMatchers.hasItem; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -220,9 +210,6 @@ private void prepareStreams() throws Exception { anyObject(StateRestoreListener.class), anyInt() )).andReturn(streamThreadOne).andReturn(streamThreadTwo); - EasyMock.expect(StreamThread.getSharedAdminClientId( - anyString() - )).andReturn("admin").anyTimes(); EasyMock.expect(StreamThread.eosEnabled(anyObject(StreamsConfig.class))).andReturn(false).anyTimes(); EasyMock.expect(streamThreadOne.getId()).andReturn(0L).anyTimes(); @@ -894,60 +881,6 @@ public void statefulTopologyShouldCreateStateDirectory() throws Exception { startStreamsAndCheckDirExists(topology, true); } - @Test - public void fetchEndOffsetsShouldRethrowRuntimeExceptionAsStreamsException() { - final Admin adminClient = EasyMock.createMock(AdminClient.class); - EasyMock.expect(adminClient.listOffsets(EasyMock.anyObject())).andThrow(new RuntimeException()); - replay(adminClient); - assertThrows(StreamsException.class, () -> fetchEndOffsetsWithoutTimeout(emptyList(), adminClient)); - verify(adminClient); - } - - @Test - public void fetchEndOffsetsShouldRethrowInterruptedExceptionAsStreamsException() throws InterruptedException, ExecutionException { - final Admin adminClient = EasyMock.createMock(AdminClient.class); - final ListOffsetsResult result = EasyMock.createNiceMock(ListOffsetsResult.class); - final KafkaFuture> allFuture = EasyMock.createMock(KafkaFuture.class); - - EasyMock.expect(adminClient.listOffsets(EasyMock.anyObject())).andStubReturn(result); - EasyMock.expect(result.all()).andStubReturn(allFuture); - EasyMock.expect(allFuture.get()).andThrow(new InterruptedException()); - replay(adminClient, result, allFuture); - - assertThrows(StreamsException.class, () -> fetchEndOffsetsWithoutTimeout(emptyList(), adminClient)); - verify(adminClient); - } - - @Test - public void fetchEndOffsetsShouldRethrowExecutionExceptionAsStreamsException() throws InterruptedException, ExecutionException { - final Admin adminClient = EasyMock.createMock(AdminClient.class); - final ListOffsetsResult result = EasyMock.createNiceMock(ListOffsetsResult.class); - final KafkaFuture> allFuture = EasyMock.createMock(KafkaFuture.class); - - EasyMock.expect(adminClient.listOffsets(EasyMock.anyObject())).andStubReturn(result); - EasyMock.expect(result.all()).andStubReturn(allFuture); - EasyMock.expect(allFuture.get()).andThrow(new ExecutionException(new RuntimeException())); - replay(adminClient, result, allFuture); - - assertThrows(StreamsException.class, () -> fetchEndOffsetsWithoutTimeout(emptyList(), adminClient)); - verify(adminClient); - } - - @Test - public void fetchEndOffsetsWithTimeoutShouldRethrowTimeoutExceptionAsStreamsException() throws InterruptedException, ExecutionException, TimeoutException { - final Admin adminClient = EasyMock.createMock(AdminClient.class); - final ListOffsetsResult result = EasyMock.createNiceMock(ListOffsetsResult.class); - final KafkaFuture> allFuture = EasyMock.createMock(KafkaFuture.class); - - EasyMock.expect(adminClient.listOffsets(EasyMock.anyObject())).andStubReturn(result); - EasyMock.expect(result.all()).andStubReturn(allFuture); - EasyMock.expect(allFuture.get(1L, TimeUnit.MILLISECONDS)).andThrow(new TimeoutException()); - replay(adminClient, result, allFuture); - - assertThrows(StreamsException.class, () -> fetchEndOffsets(emptyList(), adminClient, Duration.ofMillis(1))); - verify(adminClient); - } - @SuppressWarnings("unchecked") private Topology getStatefulTopology(final String inputTopic, final String outputTopic, diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ClientUtilsTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ClientUtilsTest.java new file mode 100644 index 0000000000000..96b6b62378fc5 --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ClientUtilsTest.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.internals; + +import static java.util.Collections.emptyList; +import static org.apache.kafka.streams.processor.internals.ClientUtils.fetchEndOffsets; +import static org.apache.kafka.streams.processor.internals.ClientUtils.fetchEndOffsetsWithoutTimeout; +import static org.easymock.EasyMock.replay; +import static org.easymock.EasyMock.verify; +import static org.junit.Assert.assertThrows; + +import java.time.Duration; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.ListOffsetsResult; +import org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo; +import org.apache.kafka.common.KafkaFuture; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.streams.errors.StreamsException; +import org.easymock.EasyMock; +import org.junit.Test; + +public class ClientUtilsTest { + + @Test + public void fetchEndOffsetsShouldRethrowRuntimeExceptionAsStreamsException() { + final Admin adminClient = EasyMock.createMock(AdminClient.class); + EasyMock.expect(adminClient.listOffsets(EasyMock.anyObject())).andThrow(new RuntimeException()); + replay(adminClient); + assertThrows(StreamsException.class, () -> fetchEndOffsetsWithoutTimeout(emptyList(), adminClient)); + verify(adminClient); + } + + @Test + public void fetchEndOffsetsShouldRethrowInterruptedExceptionAsStreamsException() throws InterruptedException, ExecutionException { + final Admin adminClient = EasyMock.createMock(AdminClient.class); + final ListOffsetsResult result = EasyMock.createNiceMock(ListOffsetsResult.class); + final KafkaFuture> allFuture = EasyMock.createMock(KafkaFuture.class); + + EasyMock.expect(adminClient.listOffsets(EasyMock.anyObject())).andStubReturn(result); + EasyMock.expect(result.all()).andStubReturn(allFuture); + EasyMock.expect(allFuture.get()).andThrow(new InterruptedException()); + replay(adminClient, result, allFuture); + + assertThrows(StreamsException.class, () -> fetchEndOffsetsWithoutTimeout(emptyList(), adminClient)); + verify(adminClient); + } + + @Test + public void fetchEndOffsetsShouldRethrowExecutionExceptionAsStreamsException() throws InterruptedException, ExecutionException { + final Admin adminClient = EasyMock.createMock(AdminClient.class); + final ListOffsetsResult result = EasyMock.createNiceMock(ListOffsetsResult.class); + final KafkaFuture> allFuture = EasyMock.createMock(KafkaFuture.class); + + EasyMock.expect(adminClient.listOffsets(EasyMock.anyObject())).andStubReturn(result); + EasyMock.expect(result.all()).andStubReturn(allFuture); + EasyMock.expect(allFuture.get()).andThrow(new ExecutionException(new RuntimeException())); + replay(adminClient, result, allFuture); + + assertThrows(StreamsException.class, () -> fetchEndOffsetsWithoutTimeout(emptyList(), adminClient)); + verify(adminClient); + } + + @Test + public void fetchEndOffsetsWithTimeoutShouldRethrowTimeoutExceptionAsStreamsException() throws InterruptedException, ExecutionException, TimeoutException { + final Admin adminClient = EasyMock.createMock(AdminClient.class); + final ListOffsetsResult result = EasyMock.createNiceMock(ListOffsetsResult.class); + final KafkaFuture> allFuture = EasyMock.createMock(KafkaFuture.class); + + EasyMock.expect(adminClient.listOffsets(EasyMock.anyObject())).andStubReturn(result); + EasyMock.expect(result.all()).andStubReturn(allFuture); + EasyMock.expect(allFuture.get(1L, TimeUnit.MILLISECONDS)).andThrow(new TimeoutException()); + replay(adminClient, result, allFuture); + + assertThrows(StreamsException.class, () -> fetchEndOffsets(emptyList(), adminClient, Duration.ofMillis(1))); + verify(adminClient); + } +} diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java index c66874a0b25c5..c3f150e3f45a8 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java @@ -100,8 +100,8 @@ import static org.apache.kafka.common.utils.Utils.mkEntry; import static org.apache.kafka.common.utils.Utils.mkMap; import static org.apache.kafka.common.utils.Utils.mkProperties; +import static org.apache.kafka.streams.processor.internals.ClientUtils.getSharedAdminClientId; import static org.apache.kafka.streams.processor.internals.StateManagerUtil.CHECKPOINT_FILE_NAME; -import static org.apache.kafka.streams.processor.internals.StreamThread.getSharedAdminClientId; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.not; import static org.hamcrest.CoreMatchers.startsWith;