Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 3 additions & 31 deletions streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,11 @@

import java.util.LinkedList;
import java.util.TreeMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo;
import org.apache.kafka.clients.admin.OffsetSpec;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
Expand All @@ -54,6 +48,7 @@
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.processor.ThreadMetadata;
import org.apache.kafka.streams.processor.internals.ClientUtils;
import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier;
import org.apache.kafka.streams.processor.internals.GlobalStreamThread;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
Expand Down Expand Up @@ -93,6 +88,7 @@

import static org.apache.kafka.streams.StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG;
import static org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFailMsgPrefix;
import static org.apache.kafka.streams.processor.internals.ClientUtils.fetchEndOffsetsWithoutTimeout;

/**
* A Kafka client that allows for performing continuous computation on input coming from one or more input topics and
Expand Down Expand Up @@ -743,7 +739,7 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder,
}

// use client id instead of thread client id since this admin client may be shared among threads
adminClient = clientSupplier.getAdmin(config.getAdminConfigs(StreamThread.getSharedAdminClientId(clientId)));
adminClient = clientSupplier.getAdmin(config.getAdminConfigs(ClientUtils.getSharedAdminClientId(clientId)));

final Map<Long, StreamThread.State> threadState = new HashMap<>(threads.length);
final ArrayList<StreamThreadStateStoreProvider> storeProviders = new ArrayList<>();
Expand Down Expand Up @@ -1238,28 +1234,4 @@ public Map<String, Map<Integer, LagInfo>> allLocalStorePartitionLags() {

return Collections.unmodifiableMap(localStorePartitionLags);
}

static Map<TopicPartition, ListOffsetsResultInfo> fetchEndOffsetsWithoutTimeout(final Collection<TopicPartition> partitions,
final Admin adminClient) {
return fetchEndOffsets(partitions, adminClient, null);
}

public static Map<TopicPartition, ListOffsetsResultInfo> fetchEndOffsets(final Collection<TopicPartition> partitions,
final Admin adminClient,
final Duration timeout) {
final Map<TopicPartition, ListOffsetsResultInfo> endOffsets;
try {
final KafkaFuture<Map<TopicPartition, ListOffsetsResultInfo>> future = adminClient.listOffsets(
partitions.stream().collect(Collectors.toMap(Function.identity(), tp -> OffsetSpec.latest())))
.all();
if (timeout == null) {
endOffsets = future.get();
} else {
endOffsets = future.get(timeout.toMillis(), TimeUnit.MILLISECONDS);
}
} catch (final TimeoutException | RuntimeException | InterruptedException | ExecutionException e) {
throw new StreamsException("Unable to obtain end offsets from kafka", e);
}
return endOffsets;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,15 @@
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;

import static org.apache.kafka.streams.processor.internals.ClientUtils.getTaskProducerClientId;
import static org.apache.kafka.streams.processor.internals.ClientUtils.getThreadProducerClientId;

class ActiveTaskCreator {
private final InternalTopologyBuilder builder;
private final StreamsConfig config;
Expand All @@ -61,14 +63,6 @@ class ActiveTaskCreator {
private final Map<TaskId, StreamsProducer> taskProducers;
private final StreamThread.ProcessingMode processingMode;

private static String getThreadProducerClientId(final String threadClientId) {
return threadClientId + "-producer";
}

private static String getTaskProducerClientId(final String threadClientId, final TaskId taskId) {
return threadClientId + "-" + taskId + "-producer";
}

ActiveTaskCreator(final InternalTopologyBuilder builder,
final StreamsConfig config,
final StreamThread.ProcessingMode processingMode,
Expand Down Expand Up @@ -227,22 +221,13 @@ void closeAndRemoveTaskProducerIfNeeded(final TaskId id) {
}

Map<MetricName, Metric> producerMetrics() {
final Map<MetricName, Metric> result = new LinkedHashMap<>();
if (threadProducer != null) {
final Map<MetricName, ? extends Metric> producerMetrics = threadProducer.kafkaProducer().metrics();
if (producerMetrics != null) {
result.putAll(producerMetrics);
}
} else {
// When EOS is turned on, each task will have its own producer client
// and the producer object passed in here will be null. We would then iterate through
// all the active tasks and add their metrics to the output metrics map.
for (final Map.Entry<TaskId, StreamsProducer> entry : taskProducers.entrySet()) {
final Map<MetricName, ? extends Metric> taskProducerMetrics = entry.getValue().kafkaProducer().metrics();
result.putAll(taskProducerMetrics);
}
}
return result;
// When EOS is turned on, each task will have its own producer client
// and the producer object passed in here will be null. We would then iterate through
// all the active tasks and add their metrics to the output metrics map.
final Collection<StreamsProducer> producers = threadProducer != null ?
Collections.singleton(threadProducer) :
taskProducers.values();
return ClientUtils.producerMetrics(producers);
}

Set<String> producerClientIds() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.processor.internals;

import java.time.Duration;
import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo;
import org.apache.kafka.clients.admin.OffsetSpec;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.TaskId;

public class ClientUtils {

// currently admin client is shared among all threads
public static String getSharedAdminClientId(final String clientId) {
return clientId + "-admin";
}

public static String getConsumerClientId(final String threadClientId) {
return threadClientId + "-consumer";
}

public static String getRestoreConsumerClientId(final String threadClientId) {
return threadClientId + "-restore-consumer";
}

public static String getThreadProducerClientId(final String threadClientId) {
return threadClientId + "-producer";
}

public static String getTaskProducerClientId(final String threadClientId, final TaskId taskId) {
return threadClientId + "-" + taskId + "-producer";
}

public static Map<MetricName, Metric> consumerMetrics(final Consumer<byte[], byte[]> mainConsumer,
final Consumer<byte[], byte[]> restoreConsumer) {
final Map<MetricName, ? extends Metric> consumerMetrics = mainConsumer.metrics();
final Map<MetricName, ? extends Metric> restoreConsumerMetrics = restoreConsumer.metrics();
final LinkedHashMap<MetricName, Metric> result = new LinkedHashMap<>();
result.putAll(consumerMetrics);
result.putAll(restoreConsumerMetrics);
return result;
}

public static Map<MetricName, Metric> adminClientMetrics(final Admin adminClient) {
final Map<MetricName, ? extends Metric> adminClientMetrics = adminClient.metrics();
return new LinkedHashMap<>(adminClientMetrics);
}

public static Map<MetricName, Metric> producerMetrics(final Collection<StreamsProducer> producers) {
final Map<MetricName, Metric> result = new LinkedHashMap<>();
for (final StreamsProducer producer : producers) {
final Map<MetricName, ? extends Metric> producerMetrics = producer.kafkaProducer().metrics();
if (producerMetrics != null) {
result.putAll(producerMetrics);
}
}
return result;
}

public static Map<TopicPartition, ListOffsetsResultInfo> fetchEndOffsetsWithoutTimeout(final Collection<TopicPartition> partitions,
final Admin adminClient) {
return fetchEndOffsets(partitions, adminClient, null);
}

public static Map<TopicPartition, ListOffsetsResultInfo> fetchEndOffsets(final Collection<TopicPartition> partitions,
final Admin adminClient,
final Duration timeout) {
final Map<TopicPartition, ListOffsetsResultInfo> endOffsets;
try {
final KafkaFuture<Map<TopicPartition, ListOffsetsResultInfo>> future = adminClient.listOffsets(
partitions.stream().collect(Collectors.toMap(Function.identity(), tp -> OffsetSpec.latest())))
.all();
if (timeout == null) {
endOffsets = future.get();
} else {
endOffsets = future.get(timeout.toMillis(), TimeUnit.MILLISECONDS);
}
} catch (final TimeoutException | RuntimeException | InterruptedException | ExecutionException e) {
throw new StreamsException("Unable to obtain end offsets from kafka", e);
}
return endOffsets;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand All @@ -59,11 +58,12 @@

import static org.apache.kafka.streams.StreamsConfig.EXACTLY_ONCE;
import static org.apache.kafka.streams.StreamsConfig.EXACTLY_ONCE_BETA;
import static org.apache.kafka.streams.processor.internals.ClientUtils.getConsumerClientId;
import static org.apache.kafka.streams.processor.internals.ClientUtils.getRestoreConsumerClientId;
import static org.apache.kafka.streams.processor.internals.ClientUtils.getSharedAdminClientId;

public class StreamThread extends Thread {

private final Admin adminClient;

/**
* Stream thread states are the possible states that a stream thread can be in.
* A thread must only be in one state at a time
Expand Down Expand Up @@ -270,6 +270,7 @@ int getAssignmentErrorCode() {
private volatile ThreadMetadata threadMetadata;
private StreamThread.StateListener stateListener;

private final Admin adminClient;
private final ChangelogReader changelogReader;

// package-private for testing
Expand Down Expand Up @@ -482,19 +483,6 @@ private InternalConsumerConfig(final Map<String, Object> props) {
}
}

private static String getConsumerClientId(final String threadClientId) {
return threadClientId + "-consumer";
}

private static String getRestoreConsumerClientId(final String threadClientId) {
return threadClientId + "-restore-consumer";
}

// currently admin client is shared among all threads
public static String getSharedAdminClientId(final String clientId) {
return clientId + "-admin";
}

/**
* Execute the stream processors
*
Expand Down Expand Up @@ -983,17 +971,11 @@ public Map<MetricName, Metric> producerMetrics() {
}

public Map<MetricName, Metric> consumerMetrics() {
final Map<MetricName, ? extends Metric> consumerMetrics = mainConsumer.metrics();
final Map<MetricName, ? extends Metric> restoreConsumerMetrics = restoreConsumer.metrics();
final LinkedHashMap<MetricName, Metric> result = new LinkedHashMap<>();
result.putAll(consumerMetrics);
result.putAll(restoreConsumerMetrics);
return result;
return ClientUtils.consumerMetrics(mainConsumer, restoreConsumer);
}

public Map<MetricName, Metric> adminClientMetrics() {
final Map<MetricName, ? extends Metric> adminClientMetrics = adminClient.metrics();
return new LinkedHashMap<>(adminClientMetrics);
return ClientUtils.adminClientMetrics(adminClient);
}

// the following are for testing only
Expand All @@ -1008,4 +990,5 @@ TaskManager taskManager() {
int currentNumIterations() {
return numIterations;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@
import java.util.stream.Collectors;

import static java.util.UUID.randomUUID;
import static org.apache.kafka.streams.KafkaStreams.fetchEndOffsets;
import static org.apache.kafka.streams.processor.internals.ClientUtils.fetchEndOffsets;
import static org.apache.kafka.streams.processor.internals.assignment.StreamsAssignmentProtocolVersions.EARLIEST_PROBEABLE_VERSION;
import static org.apache.kafka.streams.processor.internals.assignment.StreamsAssignmentProtocolVersions.LATEST_SUPPORTED_VERSION;
import static org.apache.kafka.streams.processor.internals.assignment.StreamsAssignmentProtocolVersions.UNKNOWN;
Expand Down
Loading