Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion checkstyle/suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
<suppress checks="CyclomaticComplexity|BooleanExpressionComplexity"
files="(SchemaGenerator|MessageDataGenerator|FieldSpec).java"/>
<suppress checks="NPathComplexity"
files="(MessageDataGenerator|FieldSpec).java"/>
files="(MessageDataGenerator|FieldSpec|AssignorConfiguration).java"/>
<suppress checks="JavaNCSS"
files="(ApiMessageType).java|MessageDataGenerator.java"/>
<suppress checks="MethodLength"
Expand Down
18 changes: 17 additions & 1 deletion clients/src/main/java/org/apache/kafka/common/utils/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*/
package org.apache.kafka.common.utils;

import java.util.SortedSet;
import java.util.TreeSet;
import org.apache.kafka.common.KafkaException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -660,7 +662,7 @@ public static ByteBuffer ensureCapacity(ByteBuffer existingBuffer, int newLength
return existingBuffer;
}

/*
/**
* Creates a set
* @param elems the elements
* @param <T> the type of element
Expand All @@ -674,6 +676,20 @@ public static <T> Set<T> mkSet(T... elems) {
return result;
}

/**
* Creates a sorted set
* @param elems the elements
* @param <T> the type of element, must be comparable
* @return SortedSet
*/
@SafeVarargs
public static <T extends Comparable<T>> SortedSet<T> mkSortedSet(T... elems) {
SortedSet<T> result = new TreeSet<>();
for (T elem : elems)
result.add(elem);
return result;
}

/**
* Creates a map entry (for use with {@link Utils#mkMap(java.util.Map.Entry[])})
*
Expand Down
38 changes: 28 additions & 10 deletions streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.util.LinkedList;
import java.util.TreeMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.Admin;
Expand All @@ -27,6 +28,7 @@
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
Expand Down Expand Up @@ -1217,17 +1219,9 @@ public Map<String, Map<Integer, LagInfo>> allLocalStorePartitionLags() {
}

log.debug("Current changelog positions: {}", allChangelogPositions);
final Map<TopicPartition, ListOffsetsResultInfo> allEndOffsets;
try {
allEndOffsets = adminClient.listOffsets(
allPartitions.stream()
.collect(Collectors.toMap(Function.identity(), tp -> OffsetSpec.latest()))
).all().get();
} catch (final RuntimeException | InterruptedException | ExecutionException e) {
throw new StreamsException("Unable to obtain end offsets from kafka", e);
}

final Map<TopicPartition, ListOffsetsResultInfo> allEndOffsets = fetchEndOffsetsWithoutTimeout(allPartitions, adminClient);
log.debug("Current end offsets :{}", allEndOffsets);

for (final Map.Entry<TopicPartition, ListOffsetsResultInfo> entry : allEndOffsets.entrySet()) {
// Avoiding an extra admin API lookup by computing lags for not-yet-started restorations
// from zero instead of the real "earliest offset" for the changelog.
Expand All @@ -1244,4 +1238,28 @@ public Map<String, Map<Integer, LagInfo>> allLocalStorePartitionLags() {

return Collections.unmodifiableMap(localStorePartitionLags);
}

static Map<TopicPartition, ListOffsetsResultInfo> fetchEndOffsetsWithoutTimeout(final Collection<TopicPartition> partitions,
final Admin adminClient) {
return fetchEndOffsets(partitions, adminClient, null);
}

public static Map<TopicPartition, ListOffsetsResultInfo> fetchEndOffsets(final Collection<TopicPartition> partitions,
final Admin adminClient,
final Duration timeout) {
final Map<TopicPartition, ListOffsetsResultInfo> endOffsets;
try {
final KafkaFuture<Map<TopicPartition, ListOffsetsResultInfo>> future = adminClient.listOffsets(
partitions.stream().collect(Collectors.toMap(Function.identity(), tp -> OffsetSpec.latest())))
.all();
if (timeout == null) {
endOffsets = future.get();
} else {
endOffsets = future.get(timeout.toMillis(), TimeUnit.MILLISECONDS);
}
} catch (final TimeoutException | RuntimeException | InterruptedException | ExecutionException e) {
throw new StreamsException("Unable to obtain end offsets from kafka", e);
}
return endOffsets;
}
Comment on lines +1242 to +1264
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The fact that this is static and has nothing in particular to do with the KafkaStreams class indicates that it probably belongs in a util class for use by KafkaStreams and StreamsPartitionAssignor.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See #8328

}
Loading