From 918d14ab81fdf2b3e6ef9de522d4e40a320a9d8e Mon Sep 17 00:00:00 2001 From: dengziming Date: Fri, 2 Apr 2021 16:25:28 +0800 Subject: [PATCH 1/2] check KAFKA-13002 rebase on new code rewrite listoffsets using new class --- .../kafka/clients/admin/KafkaAdminClient.java | 240 ++------------ .../clients/admin/ListOffsetsResult.java | 21 +- .../admin/internals/AdminApiDriver.java | 4 + .../admin/internals/AdminApiHandler.java | 23 ++ .../admin/internals/ListOffsetsHandler.java | 195 ++++++++++++ .../internals/MetadataOperationContext.java | 97 ------ .../kafka/common/utils/CollectionUtils.java | 19 ++ .../admin/internals/AdminApiDriverTest.java | 56 ++++ .../internals/ListOffsetsHandlerTest.java | 300 ++++++++++++++++++ 9 files changed, 629 insertions(+), 326 deletions(-) create mode 100644 clients/src/main/java/org/apache/kafka/clients/admin/internals/ListOffsetsHandler.java delete mode 100644 clients/src/main/java/org/apache/kafka/clients/admin/internals/MetadataOperationContext.java create mode 100644 clients/src/test/java/org/apache/kafka/clients/admin/internals/ListOffsetsHandlerTest.java diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index 4fae1cca1b0ee..3291ace310d67 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -47,8 +47,9 @@ import org.apache.kafka.clients.admin.internals.DescribeTransactionsHandler; import org.apache.kafka.clients.admin.internals.ListConsumerGroupOffsetsHandler; import org.apache.kafka.clients.admin.internals.ListTransactionsHandler; -import org.apache.kafka.clients.admin.internals.MetadataOperationContext; import org.apache.kafka.clients.admin.internals.RemoveMembersFromConsumerGroupHandler; +import org.apache.kafka.clients.admin.internals.ListOffsetsHandler; +import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Assignment; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.consumer.internals.ConsumerProtocol; import org.apache.kafka.common.Cluster; @@ -139,10 +140,6 @@ import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity; import org.apache.kafka.common.message.ListGroupsRequestData; import org.apache.kafka.common.message.ListGroupsResponseData; -import org.apache.kafka.common.message.ListOffsetsRequestData.ListOffsetsPartition; -import org.apache.kafka.common.message.ListOffsetsRequestData.ListOffsetsTopic; -import org.apache.kafka.common.message.ListOffsetsResponseData.ListOffsetsPartitionResponse; -import org.apache.kafka.common.message.ListOffsetsResponseData.ListOffsetsTopicResponse; import org.apache.kafka.common.message.ListPartitionReassignmentsRequestData; import org.apache.kafka.common.message.MetadataRequestData; import org.apache.kafka.common.message.RenewDelegationTokenRequestData; @@ -214,7 +211,6 @@ import org.apache.kafka.common.requests.ListGroupsRequest; import org.apache.kafka.common.requests.ListGroupsResponse; import org.apache.kafka.common.requests.ListOffsetsRequest; -import org.apache.kafka.common.requests.ListOffsetsResponse; import org.apache.kafka.common.requests.ListPartitionReassignmentsRequest; import org.apache.kafka.common.requests.ListPartitionReassignmentsResponse; import org.apache.kafka.common.requests.MetadataRequest; @@ -3163,14 +3159,6 @@ void handleFailure(Throwable throwable) { return new DescribeDelegationTokenResult(tokensFuture); } - private void rescheduleMetadataTask(MetadataOperationContext context, Supplier> nextCalls) { - log.info("Retrying to fetch metadata."); - // Requeue the task so that we can re-attempt fetching metadata - context.setResponse(Optional.empty()); - Call metadataCall = getMetadataCall(context, nextCalls); - runnable.call(metadataCall, time.milliseconds()); - } - @Override public DescribeConsumerGroupsResult describeConsumerGroups(final Collection groupIds, final DescribeConsumerGroupsOptions options) { @@ -3181,46 +3169,6 @@ public DescribeConsumerGroupsResult describeConsumerGroups(final Collection The type of return value of the KafkaFuture, like ListOffsetsResultInfo, etc. - * @param The type of configuration option, like ListOffsetsOptions, etc - */ - private > Call getMetadataCall(MetadataOperationContext context, - Supplier> nextCalls) { - return new Call("metadata", context.deadline(), new LeastLoadedNodeProvider()) { - @Override - MetadataRequest.Builder createRequest(int timeoutMs) { - return new MetadataRequest.Builder(new MetadataRequestData() - .setTopics(convertToMetadataRequestTopic(context.topics())) - .setAllowAutoTopicCreation(false)); - } - - @Override - void handleResponse(AbstractResponse abstractResponse) { - MetadataResponse response = (MetadataResponse) abstractResponse; - MetadataOperationContext.handleMetadataErrors(response); - - context.setResponse(Optional.of(response)); - - for (Call call : nextCalls.get()) { - runnable.call(call, time.milliseconds()); - } - } - - @Override - void handleFailure(Throwable throwable) { - for (KafkaFutureImpl future : context.futures().values()) { - future.completeExceptionally(throwable); - } - } - }; - } - private Set validAclOperations(final int authorizedOperations) { if (authorizedOperations == MetadataResponse.AUTHORIZED_OPERATIONS_OMITTED) { return null; @@ -3739,169 +3687,6 @@ public AlterConsumerGroupOffsetsResult alterConsumerGroupOffsets( return new AlterConsumerGroupOffsetsResult(future.get(CoordinatorKey.byGroupId(groupId))); } - @Override - public ListOffsetsResult listOffsets(Map topicPartitionOffsets, - ListOffsetsOptions options) { - - // preparing topics list for asking metadata about them - final Map> futures = new HashMap<>(topicPartitionOffsets.size()); - final Set topics = new HashSet<>(); - for (TopicPartition topicPartition : topicPartitionOffsets.keySet()) { - topics.add(topicPartition.topic()); - futures.put(topicPartition, new KafkaFutureImpl<>()); - } - - final long nowMetadata = time.milliseconds(); - final long deadline = calcDeadlineMs(nowMetadata, options.timeoutMs()); - - MetadataOperationContext context = - new MetadataOperationContext<>(topics, options, deadline, futures); - - Call metadataCall = getMetadataCall(context, - () -> KafkaAdminClient.this.getListOffsetsCalls(context, topicPartitionOffsets, futures)); - runnable.call(metadataCall, nowMetadata); - - return new ListOffsetsResult(new HashMap<>(futures)); - } - - // visible for benchmark - List getListOffsetsCalls(MetadataOperationContext context, - Map topicPartitionOffsets, - Map> futures) { - - MetadataResponse mr = context.response().orElseThrow(() -> new IllegalStateException("No Metadata response")); - Cluster clusterSnapshot = mr.buildCluster(); - List calls = new ArrayList<>(); - // grouping topic partitions per leader - Map> leaders = new HashMap<>(); - - for (Map.Entry entry: topicPartitionOffsets.entrySet()) { - - OffsetSpec offsetSpec = entry.getValue(); - TopicPartition tp = entry.getKey(); - KafkaFutureImpl future = futures.get(tp); - long offsetQuery = getOffsetFromOffsetSpec(offsetSpec); - // avoid sending listOffsets request for topics with errors - if (!mr.errors().containsKey(tp.topic())) { - Node node = clusterSnapshot.leaderFor(tp); - if (node != null) { - Map leadersOnNode = leaders.computeIfAbsent(node, k -> new HashMap<>()); - ListOffsetsTopic topic = leadersOnNode.computeIfAbsent(tp.topic(), k -> new ListOffsetsTopic().setName(tp.topic())); - topic.partitions().add(new ListOffsetsPartition().setPartitionIndex(tp.partition()).setTimestamp(offsetQuery)); - } else { - future.completeExceptionally(Errors.LEADER_NOT_AVAILABLE.exception()); - } - } else { - future.completeExceptionally(mr.errors().get(tp.topic()).exception()); - } - } - - for (final Map.Entry> entry : leaders.entrySet()) { - final int brokerId = entry.getKey().id(); - - calls.add(new Call("listOffsets on broker " + brokerId, context.deadline(), new ConstantNodeIdProvider(brokerId)) { - - final List partitionsToQuery = new ArrayList<>(entry.getValue().values()); - - private boolean supportsMaxTimestamp = partitionsToQuery.stream() - .flatMap(t -> t.partitions().stream()) - .anyMatch(p -> p.timestamp() == ListOffsetsRequest.MAX_TIMESTAMP); - - @Override - ListOffsetsRequest.Builder createRequest(int timeoutMs) { - return ListOffsetsRequest.Builder - .forConsumer(true, context.options().isolationLevel(), supportsMaxTimestamp) - .setTargetTimes(partitionsToQuery); - } - - @Override - void handleResponse(AbstractResponse abstractResponse) { - ListOffsetsResponse response = (ListOffsetsResponse) abstractResponse; - Map retryTopicPartitionOffsets = new HashMap<>(); - - for (ListOffsetsTopicResponse topic : response.topics()) { - for (ListOffsetsPartitionResponse partition : topic.partitions()) { - TopicPartition tp = new TopicPartition(topic.name(), partition.partitionIndex()); - KafkaFutureImpl future = futures.get(tp); - Errors error = Errors.forCode(partition.errorCode()); - OffsetSpec offsetRequestSpec = topicPartitionOffsets.get(tp); - if (offsetRequestSpec == null) { - log.warn("Server response mentioned unknown topic partition {}", tp); - } else if (MetadataOperationContext.shouldRefreshMetadata(error)) { - retryTopicPartitionOffsets.put(tp, offsetRequestSpec); - } else if (error == Errors.NONE) { - Optional leaderEpoch = (partition.leaderEpoch() == ListOffsetsResponse.UNKNOWN_EPOCH) - ? Optional.empty() - : Optional.of(partition.leaderEpoch()); - future.complete(new ListOffsetsResultInfo(partition.offset(), partition.timestamp(), leaderEpoch)); - } else { - future.completeExceptionally(error.exception()); - } - } - } - - if (retryTopicPartitionOffsets.isEmpty()) { - // The server should send back a response for every topic partition. But do a sanity check anyway. - for (ListOffsetsTopic topic : partitionsToQuery) { - for (ListOffsetsPartition partition : topic.partitions()) { - TopicPartition tp = new TopicPartition(topic.name(), partition.partitionIndex()); - ApiException error = new ApiException("The response from broker " + brokerId + - " did not contain a result for topic partition " + tp); - futures.get(tp).completeExceptionally(error); - } - } - } else { - Set retryTopics = retryTopicPartitionOffsets.keySet().stream().map( - TopicPartition::topic).collect(Collectors.toSet()); - MetadataOperationContext retryContext = - new MetadataOperationContext<>(retryTopics, context.options(), context.deadline(), futures); - rescheduleMetadataTask(retryContext, () -> getListOffsetsCalls(retryContext, retryTopicPartitionOffsets, futures)); - } - } - - @Override - void handleFailure(Throwable throwable) { - for (ListOffsetsTopic topic : entry.getValue().values()) { - for (ListOffsetsPartition partition : topic.partitions()) { - TopicPartition tp = new TopicPartition(topic.name(), partition.partitionIndex()); - KafkaFutureImpl future = futures.get(tp); - future.completeExceptionally(throwable); - } - } - } - - @Override - boolean handleUnsupportedVersionException(UnsupportedVersionException exception) { - if (supportsMaxTimestamp) { - supportsMaxTimestamp = false; - - // fail any unsupported futures and remove partitions from the downgraded retry - Iterator topicIterator = partitionsToQuery.iterator(); - while (topicIterator.hasNext()) { - ListOffsetsTopic topic = topicIterator.next(); - Iterator partitionIterator = topic.partitions().iterator(); - while (partitionIterator.hasNext()) { - ListOffsetsPartition partition = partitionIterator.next(); - if (partition.timestamp() == ListOffsetsRequest.MAX_TIMESTAMP) { - futures.get(new TopicPartition(topic.name(), partition.partitionIndex())) - .completeExceptionally(new UnsupportedVersionException( - "Broker " + brokerId + " does not support MAX_TIMESTAMP offset spec")); - partitionIterator.remove(); - } - } - if (topic.partitions().isEmpty()) { - topicIterator.remove(); - } - } - return !partitionsToQuery.isEmpty(); - } - return false; - } - }); - } - return calls; - } - @Override public DescribeClientQuotasResult describeClientQuotas(ClientQuotaFilter filter, DescribeClientQuotasOptions options) { KafkaFutureImpl>> future = new KafkaFutureImpl<>(); @@ -4366,6 +4151,27 @@ public ListTransactionsResult listTransactions(ListTransactionsOptions options) return new ListTransactionsResult(future.all()); } + @Override + public ListOffsetsResult listOffsets(Map topicPartitionOffsets, + ListOffsetsOptions options) { + Function offsetQueryFunction = this::getOffsetFromOffsetSpec; + + Map topicPartitionLongOffsets = new HashMap<>(topicPartitionOffsets.size()); + for (Map.Entry entry: topicPartitionOffsets.entrySet()) { + topicPartitionLongOffsets.put(entry.getKey(), offsetQueryFunction.apply(entry.getValue())); + } + AdminApiFuture.SimpleAdminApiFuture future = + ListOffsetsHandler.newFuture(topicPartitionOffsets.keySet()); + ListOffsetsHandler handler = new ListOffsetsHandler( + topicPartitionLongOffsets, + logContext, + options.isolationLevel() + ); + + invokeDriver(handler, future, options.timeoutMs); + return new ListOffsetsResult(new HashMap<>(future.all())); + } + private void invokeDriver( AdminApiHandler handler, AdminApiFuture future, diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ListOffsetsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/ListOffsetsResult.java index 5eb00deb0697a..552b2559958aa 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/ListOffsetsResult.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/ListOffsetsResult.java @@ -57,20 +57,17 @@ public KafkaFuture partitionResult(final TopicPartition p */ public KafkaFuture> all() { return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0])) - .thenApply(new KafkaFuture.BaseFunction>() { - @Override - public Map apply(Void v) { - Map offsets = new HashMap<>(futures.size()); - for (Map.Entry> entry : futures.entrySet()) { - try { - offsets.put(entry.getKey(), entry.getValue().get()); - } catch (InterruptedException | ExecutionException e) { - // This should be unreachable, because allOf ensured that all the futures completed successfully. - throw new RuntimeException(e); - } + .thenApply(v -> { + Map offsets = new HashMap<>(futures.size()); + for (Map.Entry> entry : futures.entrySet()) { + try { + offsets.put(entry.getKey(), entry.getValue().get()); + } catch (InterruptedException | ExecutionException e) { + // This should be unreachable, because allOf ensured that all the futures completed successfully. + throw new RuntimeException(e); } - return offsets; } + return offsets; }); } diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminApiDriver.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminApiDriver.java index b5c9ff32f264b..b784af9ffd7f0 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminApiDriver.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminApiDriver.java @@ -18,6 +18,7 @@ import org.apache.kafka.common.Node; import org.apache.kafka.common.errors.DisconnectException; +import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.requests.AbstractRequest; import org.apache.kafka.common.requests.AbstractResponse; import org.apache.kafka.common.requests.FindCoordinatorRequest.NoBatchedFindCoordinatorsException; @@ -259,6 +260,9 @@ public void onFailure( .filter(future.lookupKeys()::contains) .collect(Collectors.toSet()); retryLookup(keysToUnmap); + } else if (t instanceof UnsupportedVersionException) { + Map failed = handler.handleUnsupportedVersion(spec, (UnsupportedVersionException) t); + completeExceptionally(failed); } else { Map errors = spec.keys.stream().collect(Collectors.toMap( Function.identity(), diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminApiHandler.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminApiHandler.java index 9f8d0ac5f07f2..c968231c5ecc6 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminApiHandler.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminApiHandler.java @@ -17,6 +17,7 @@ package org.apache.kafka.clients.admin.internals; import org.apache.kafka.common.Node; +import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.requests.AbstractRequest; import org.apache.kafka.common.requests.AbstractResponse; @@ -24,6 +25,8 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.function.Function; +import java.util.stream.Collectors; public interface AdminApiHandler { @@ -66,6 +69,26 @@ public interface AdminApiHandler { */ ApiResult handleResponse(Node broker, Set keys, AbstractResponse response); + /** + * Callback that is invoked when a request returns an {@link UnsupportedVersionException}. + * The handler should decide which keys (if any) can retry with lower version and + * which keys can not, keys which are not retriable should be put into the result, + * and others are left out of the result which will be retried automatically. + * + * @param spec information about the request + * @param t the error response received from the broker + * + * @return result indicating key completion, failure, and unmapping + */ + default Map handleUnsupportedVersion( + AdminApiDriver.RequestSpec spec, + UnsupportedVersionException t + ) { + return spec.keys.stream().collect(Collectors.toMap( + Function.identity(), + key -> t + )); + } /** * Get the lookup strategy that is responsible for finding the brokerId * which will handle each respective key. diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListOffsetsHandler.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListOffsetsHandler.java new file mode 100644 index 0000000000000..e6c61a2c4bd11 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListOffsetsHandler.java @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.admin.internals; + +import org.apache.kafka.clients.admin.ListOffsetsResult; +import org.apache.kafka.common.IsolationLevel; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.InvalidMetadataException; +import org.apache.kafka.common.errors.UnsupportedVersionException; +import org.apache.kafka.common.message.ListOffsetsRequestData; +import org.apache.kafka.common.message.ListOffsetsResponseData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.AbstractResponse; +import org.apache.kafka.common.requests.ApiError; +import org.apache.kafka.common.requests.ListOffsetsRequest; +import org.apache.kafka.common.requests.ListOffsetsResponse; +import org.apache.kafka.common.utils.CollectionUtils; +import org.apache.kafka.common.utils.LogContext; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.function.Function; +import java.util.stream.Collectors; + +import static org.apache.kafka.common.protocol.Errors.LEADER_NOT_AVAILABLE; + +public class ListOffsetsHandler implements AdminApiHandler { + private final LogContext logContext; + private final Logger log; + private final Map topicPartitionOffsets; + private final IsolationLevel isolationLevel; + private boolean supportsMaxTimestamp = true; + + public ListOffsetsHandler( + Map topicPartitionOffsets, + LogContext logContext, + IsolationLevel isolationLevel + ) { + this.topicPartitionOffsets = Collections.unmodifiableMap(topicPartitionOffsets); + this.log = logContext.logger(ListOffsetsHandler.class); + this.logContext = logContext; + this.isolationLevel = isolationLevel; + this.supportsMaxTimestamp = topicPartitionOffsets.values().stream() + .anyMatch(timestamp -> timestamp == ListOffsetsRequest.MAX_TIMESTAMP); + } + + public static AdminApiFuture.SimpleAdminApiFuture newFuture( + Collection topicPartitions + ) { + return AdminApiFuture.forKeys(new HashSet<>(topicPartitions)); + } + + @Override + public String apiName() { + return "listOffsets"; + } + + @Override + public AdminApiLookupStrategy lookupStrategy() { + return new PartitionLeaderStrategy(logContext); + } + + @Override + public ListOffsetsRequest.Builder buildRequest( + int brokerId, + Set topicPartitions + ) { + + List listOffsetsTopics = new ArrayList<>(CollectionUtils.groupTopicPartitionsByTopic( + topicPartitions, + topic -> new ListOffsetsRequestData.ListOffsetsTopic().setName(topic), + (topicRequest, tp) -> topicRequest.partitions() + .add(new ListOffsetsRequestData.ListOffsetsPartition() + .setPartitionIndex(tp.partition()) + .setTimestamp(topicPartitionOffsets.get(tp)) + ) + ).values()); + + return ListOffsetsRequest.Builder + .forConsumer(true, isolationLevel, supportsMaxTimestamp) + .setTargetTimes(listOffsetsTopics); + } + + private void handlePartitionError( + TopicPartition topicPartition, + ApiError apiError, + Map failed, + List unmapped + ) { + Errors error = Errors.forCode(apiError.error().code()); + if (error.exception() instanceof InvalidMetadataException) { + log.debug("Invalid metadata error in `ListOffsets` response for partition {}. " + + "Will retry later.", topicPartition); + if (error == Errors.NOT_LEADER_OR_FOLLOWER || error == LEADER_NOT_AVAILABLE) + unmapped.add(topicPartition); + } else { + log.error("Unexpected error in `ListOffsets` response for partition {}", + topicPartition, apiError.exception()); + failed.put(topicPartition, apiError.error().exception("Failed to list offsets " + + "for partition " + topicPartition + " due to unexpected error")); + } + } + + @Override + public ApiResult handleResponse( + Node broker, + Set keys, + AbstractResponse abstractResponse + ) { + ListOffsetsResponse response = (ListOffsetsResponse) abstractResponse; + Map completed = new HashMap<>(); + Map failed = new HashMap<>(); + List unmapped = new ArrayList<>(); + + for (ListOffsetsResponseData.ListOffsetsTopicResponse topicResponse : response.data().topics()) { + for (ListOffsetsResponseData.ListOffsetsPartitionResponse partitionResponse : topicResponse.partitions()) { + TopicPartition topicPartition = new TopicPartition( + topicResponse.name(), partitionResponse.partitionIndex()); + + Errors error = Errors.forCode(partitionResponse.errorCode()); + if (error != Errors.NONE) { + ApiError apiError = new ApiError(error); + handlePartitionError(topicPartition, apiError, failed, unmapped); + continue; + } + + Optional leaderEpoch = (partitionResponse.leaderEpoch() == ListOffsetsResponse.UNKNOWN_EPOCH) + ? Optional.empty() + : Optional.of(partitionResponse.leaderEpoch()); + + ListOffsetsResult.ListOffsetsResultInfo resultInfo = new ListOffsetsResult.ListOffsetsResultInfo( + partitionResponse.offset(), + partitionResponse.timestamp(), + leaderEpoch + ); + + completed.put(topicPartition, resultInfo); + } + } + return new ApiResult<>(completed, failed, unmapped); + } + + @Override + public Map handleUnsupportedVersion( + AdminApiDriver.RequestSpec spec, + UnsupportedVersionException t + ) { + + if (supportsMaxTimestamp) { + supportsMaxTimestamp = false; + + Map failedMaxTimestamp = new HashMap<>(); + + // fail any unsupported futures and remove partitions from the downgraded retry + boolean foundMaxTimestampPartition = false; + for (Map.Entry entry: topicPartitionOffsets.entrySet()) + if (entry.getValue() == ListOffsetsRequest.MAX_TIMESTAMP) { + foundMaxTimestampPartition = true; + failedMaxTimestamp.put(entry.getKey(), new UnsupportedVersionException( + "Broker " + spec.scope.destinationBrokerId() + " does not support MAX_TIMESTAMP offset spec")); + } + + if (foundMaxTimestampPartition) + return failedMaxTimestamp; + } + + return spec.keys.stream().collect(Collectors.toMap( + Function.identity(), + key -> t + )); + } +} diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/MetadataOperationContext.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/MetadataOperationContext.java deleted file mode 100644 index e7f2c07d9de83..0000000000000 --- a/clients/src/main/java/org/apache/kafka/clients/admin/internals/MetadataOperationContext.java +++ /dev/null @@ -1,97 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kafka.clients.admin.internals; - -import java.util.Collection; -import java.util.Map; -import java.util.Optional; - -import org.apache.kafka.clients.admin.AbstractOptions; -import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.errors.InvalidMetadataException; -import org.apache.kafka.common.internals.KafkaFutureImpl; -import org.apache.kafka.common.protocol.Errors; -import org.apache.kafka.common.requests.MetadataResponse; -import org.apache.kafka.common.requests.MetadataResponse.PartitionMetadata; -import org.apache.kafka.common.requests.MetadataResponse.TopicMetadata; - -/** - * Context class to encapsulate parameters of a call to fetch and use cluster metadata. - * Some of the parameters are provided at construction and are immutable whereas others are provided - * as "Call" are completed and values are available. - * - * @param The type of return value of the KafkaFuture - * @param The type of configuration option. - */ -public final class MetadataOperationContext> { - final private Collection topics; - final private O options; - final private long deadline; - final private Map> futures; - private Optional response; - - public MetadataOperationContext(Collection topics, - O options, - long deadline, - Map> futures) { - this.topics = topics; - this.options = options; - this.deadline = deadline; - this.futures = futures; - this.response = Optional.empty(); - } - - public void setResponse(Optional response) { - this.response = response; - } - - public Optional response() { - return response; - } - - public O options() { - return options; - } - - public long deadline() { - return deadline; - } - - public Map> futures() { - return futures; - } - - public Collection topics() { - return topics; - } - - public static void handleMetadataErrors(MetadataResponse response) { - for (TopicMetadata tm : response.topicMetadata()) { - if (shouldRefreshMetadata(tm.error())) throw tm.error().exception(); - for (PartitionMetadata pm : tm.partitionMetadata()) { - if (shouldRefreshMetadata(pm.error)) { - throw pm.error.exception(); - } - } - } - } - - public static boolean shouldRefreshMetadata(Errors error) { - return error.exception() instanceof InvalidMetadataException; - } -} \ No newline at end of file diff --git a/clients/src/main/java/org/apache/kafka/common/utils/CollectionUtils.java b/clients/src/main/java/org/apache/kafka/common/utils/CollectionUtils.java index 3ebbd913cca7f..d8461fe2fb694 100644 --- a/clients/src/main/java/org/apache/kafka/common/utils/CollectionUtils.java +++ b/clients/src/main/java/org/apache/kafka/common/utils/CollectionUtils.java @@ -90,4 +90,23 @@ public static Map groupPartitionsByTopic( } return dataByTopic; } + + /** + * Group a collection of partitions by topic + * + * @return The map used to group the partitions + */ + public static Map groupTopicPartitionsByTopic( + Collection partitions, + Function buildGroup, + BiConsumer addToGroup + ) { + Map dataByTopic = new HashMap<>(); + for (TopicPartition tp : partitions) { + String topic = tp.topic(); + T topicData = dataByTopic.computeIfAbsent(topic, buildGroup); + addToGroup.accept(topicData, tp); + } + return dataByTopic; + } } diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/internals/AdminApiDriverTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/internals/AdminApiDriverTest.java index 93a4fa84bea3f..b02f856678246 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/internals/AdminApiDriverTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/internals/AdminApiDriverTest.java @@ -22,6 +22,7 @@ import org.apache.kafka.common.Node; import org.apache.kafka.common.errors.DisconnectException; import org.apache.kafka.common.errors.UnknownServerException; +import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.internals.KafkaFutureImpl; import org.apache.kafka.common.message.MetadataResponseData; import org.apache.kafka.common.protocol.ApiKeys; @@ -45,6 +46,7 @@ import java.util.OptionalInt; import java.util.Set; import java.util.concurrent.ExecutionException; +import java.util.function.Function; import java.util.stream.Collectors; import static java.util.Collections.emptyMap; @@ -460,6 +462,47 @@ public void testLookupRetryBookkeeping() { assertEquals(ctx.time.milliseconds(), retrySpec.nextAllowedTryMs); } + @Test + public void testFulfillmentUnsupportedVersion() { + TestContext ctx = TestContext.staticMapped(map( + "foo", 0, + "bar", 1 + )); + + Map, ApiResult> fulfillmentResults = map( + mkSet("foo"), failed("foo", new UnsupportedVersionException("unsupported api")), + mkSet("bar"), completed("bar", 30L) + ); + + ctx.poll(emptyMap(), fulfillmentResults); + + ctx.poll(emptyMap(), emptyMap()); + } + + @Test + public void testFulfillmentRetryUnsupportedVersion() { + TestContext ctx = TestContext.staticMapped(map( + "foo", 0, + "bar", 1 + )); + + ctx.handler.addRetriableUnsupportedVersionKey("foo"); + + ctx.handler.expectRequest(mkSet("foo"), failed("foo", new UnsupportedVersionException("unsupported api"))); + ctx.handler.expectRequest(mkSet("bar"), failed("foo", new UnsupportedVersionException("unsupported api"))); + + List> requestSpecs = ctx.driver.poll(); + assertEquals(2, requestSpecs.size()); + + requestSpecs.forEach(requestSpec -> { + ctx.driver.onFailure(ctx.time.milliseconds(), requestSpec, new UnsupportedVersionException("unsupported api")); + }); + + ctx.poll(emptyMap(), map( + mkSet("foo"), failed("foo", new UnsupportedVersionException("unsupported api")) + )); + } + @Test public void testFulfillmentRetryBookkeeping() { TestContext ctx = TestContext.staticMapped(map("foo", 0)); @@ -736,6 +779,7 @@ public void reset() { private static class MockAdminApiHandler implements AdminApiHandler { private final Map, ApiResult> expectedRequests = new HashMap<>(); + private final Set retriableUnsupportedVersionKeys = new HashSet<>(); private final MockLookupStrategy lookupStrategy; private MockAdminApiHandler(MockLookupStrategy lookupStrategy) { @@ -756,6 +800,10 @@ public void expectRequest(Set keys, ApiResult result) { expectedRequests.put(keys, result); } + public void addRetriableUnsupportedVersionKey(K key) { + retriableUnsupportedVersionKeys.add(key); + } + @Override public AbstractRequest.Builder buildRequest(int brokerId, Set keys) { // The request is just a placeholder in these tests @@ -770,6 +818,14 @@ public ApiResult handleResponse(Node broker, Set keys, AbstractResponse ); } + @Override + public Map handleUnsupportedVersion(RequestSpec spec, UnsupportedVersionException t) { + return spec.keys.stream().filter(key -> !retriableUnsupportedVersionKeys.contains(key)).collect(Collectors.toMap( + Function.identity(), + key -> t + )); + } + public void reset() { expectedRequests.clear(); } diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/internals/ListOffsetsHandlerTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/internals/ListOffsetsHandlerTest.java new file mode 100644 index 0000000000000..1447f5884bb47 --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/clients/admin/internals/ListOffsetsHandlerTest.java @@ -0,0 +1,300 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.admin.internals; + +import org.apache.kafka.clients.admin.ListOffsetsResult; +import org.apache.kafka.clients.admin.internals.AdminApiHandler.ApiResult; +import org.apache.kafka.common.IsolationLevel; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.UnknownServerException; +import org.apache.kafka.common.errors.UnsupportedVersionException; +import org.apache.kafka.common.message.ListOffsetsRequestData; +import org.apache.kafka.common.message.ListOffsetsResponseData; +import org.apache.kafka.common.message.ListOffsetsResponseData.ListOffsetsPartitionResponse; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.ListOffsetsRequest; +import org.apache.kafka.common.requests.ListOffsetsResponse; +import org.apache.kafka.common.utils.CollectionUtils; +import org.apache.kafka.common.utils.LogContext; +import org.junit.jupiter.api.Test; + +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.OptionalInt; +import java.util.Set; +import java.util.stream.Collectors; + +import static java.util.Collections.emptyList; +import static java.util.Collections.emptyMap; +import static java.util.Collections.singletonList; +import static java.util.Collections.singletonMap; +import static org.apache.kafka.common.utils.Utils.mkEntry; +import static org.apache.kafka.common.utils.Utils.mkMap; +import static org.apache.kafka.common.utils.Utils.mkSet; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class ListOffsetsHandlerTest { + private ListOffsetsHandler newHandler( + Map topicPartitionOffsets + ) { + return new ListOffsetsHandler( + topicPartitionOffsets, + new LogContext(), + IsolationLevel.READ_COMMITTED + ); + } + + @Test + public void testBuildRequest() { + Set topicPartitions = mkSet( + new TopicPartition("foo", 5), + new TopicPartition("bar", 3), + new TopicPartition("foo", 4), + new TopicPartition("bar", 2) + ); + + Map topicPartitionOffsets = mkMap( + mkEntry(new TopicPartition("foo", 5), ListOffsetsRequest.EARLIEST_TIMESTAMP), + mkEntry(new TopicPartition("bar", 3), ListOffsetsRequest.LATEST_TIMESTAMP), + mkEntry(new TopicPartition("foo", 4), 1L), + mkEntry(new TopicPartition("bar", 2), ListOffsetsRequest.MAX_TIMESTAMP) + ); + + ListOffsetsHandler handler = newHandler( + topicPartitionOffsets + ); + + int brokerId = 3; + ListOffsetsRequest request = handler.buildRequest(brokerId, topicPartitions).build(); + + List topics = request.data().topics(); + + assertEquals(mkSet("foo", "bar"), topics.stream() + .map(ListOffsetsRequestData.ListOffsetsTopic::name) + .collect(Collectors.toSet())); + + topics.forEach(topic -> { + Set expectedTopicPartitions = "foo".equals(topic.name()) ? + mkSet( + new ListOffsetsRequestData.ListOffsetsPartition() + .setPartitionIndex(4) + .setTimestamp(1L), + new ListOffsetsRequestData.ListOffsetsPartition() + .setPartitionIndex(5) + .setTimestamp(ListOffsetsRequest.EARLIEST_TIMESTAMP) + ) : mkSet( + new ListOffsetsRequestData.ListOffsetsPartition() + .setPartitionIndex(3) + .setTimestamp(ListOffsetsRequest.LATEST_TIMESTAMP), + new ListOffsetsRequestData.ListOffsetsPartition() + .setPartitionIndex(2) + .setTimestamp(ListOffsetsRequest.MAX_TIMESTAMP) + ); + assertEquals(expectedTopicPartitions, new HashSet<>(topic.partitions())); + }); + } + + @Test + public void testUnexpectedError() { + TopicPartition topicPartition = new TopicPartition("foo", 5); + Throwable exception = assertFatalError(topicPartition, Errors.UNKNOWN_SERVER_ERROR); + assertTrue(exception instanceof UnknownServerException); + } + + @Test + public void testRetriableErrors() { + TopicPartition topicPartition = new TopicPartition("foo", 5); + assertRetriableError(topicPartition, Errors.UNKNOWN_TOPIC_OR_PARTITION); + } + + @Test + public void testUnmappedAfterNotLeaderError() { + TopicPartition topicPartition = new TopicPartition("foo", 5); + ApiResult result = + handleResponseWithError(topicPartition, Errors.NOT_LEADER_OR_FOLLOWER); + assertEquals(emptyMap(), result.failedKeys); + assertEquals(emptyMap(), result.completedKeys); + assertEquals(singletonList(topicPartition), result.unmappedKeys); + } + + @Test + public void testUnmappedAfterLeaderNotAvailableError() { + TopicPartition topicPartition = new TopicPartition("foo", 5); + ApiResult result = + handleResponseWithError(topicPartition, Errors.LEADER_NOT_AVAILABLE); + assertEquals(emptyMap(), result.failedKeys); + assertEquals(emptyMap(), result.completedKeys); + assertEquals(singletonList(topicPartition), result.unmappedKeys); + } + + @Test + public void testCompletedResult() { + TopicPartition topicPartition = new TopicPartition("foo", 5); + ListOffsetsHandler handler = newHandler(mkMap(mkEntry(topicPartition, ListOffsetsRequest.EARLIEST_TIMESTAMP)) + ); + + int brokerId = 3; + ListOffsetsPartitionResponse partitionResponse = + samplePartitionOffsets(topicPartition); + ListOffsetsResponse response = listOffsetsResponse( + singletonMap(topicPartition, partitionResponse) + ); + + ApiResult result = + handler.handleResponse(new Node(brokerId, "host", 1234), mkSet(topicPartition), response); + + assertEquals(mkSet(topicPartition), result.completedKeys.keySet()); + assertEquals(emptyMap(), result.failedKeys); + assertEquals(emptyList(), result.unmappedKeys); + + ListOffsetsResult.ListOffsetsResultInfo listOffsetsResultInfo = result.completedKeys.get(topicPartition); + assertMatchingOffsets(partitionResponse, listOffsetsResultInfo); + } + + @Test + public void testHandleUnsupportedVersion() { + TopicPartition supported = new TopicPartition("foo", 5); + TopicPartition unsupported = new TopicPartition("bar", 5); + + Set topicPartitions = mkSet(supported, unsupported); + + Map topicPartitionOffsets = mkMap( + mkEntry(supported, ListOffsetsRequest.EARLIEST_TIMESTAMP), + mkEntry(unsupported, ListOffsetsRequest.MAX_TIMESTAMP) // unsupported version + ); + + ListOffsetsHandler handler = newHandler( + topicPartitionOffsets + ); + + int brokerId = 3; + ApiRequestScope mockScope = new ApiRequestScope() { + public OptionalInt destinationBrokerId() { + return OptionalInt.of(brokerId); + } + }; + + final Map unsupportedVersion = handler.handleUnsupportedVersion( + new AdminApiDriver.RequestSpec<>(null, mockScope, topicPartitions, null, 0L, 0L, 1), + new UnsupportedVersionException("unsupported version") + ); + + assertEquals(mkSet(unsupported), unsupportedVersion.keySet()); + } + + private void assertRetriableError( + TopicPartition topicPartition, + Errors error + ) { + ApiResult result = + handleResponseWithError(topicPartition, error); + assertEquals(emptyMap(), result.failedKeys); + assertEquals(emptyMap(), result.completedKeys); + assertEquals(emptyList(), result.unmappedKeys); + } + + private Throwable assertFatalError( + TopicPartition topicPartition, + Errors error + ) { + ApiResult result = handleResponseWithError( + topicPartition, error); + assertEquals(emptyMap(), result.completedKeys); + assertEquals(emptyList(), result.unmappedKeys); + assertEquals(mkSet(topicPartition), result.failedKeys.keySet()); + return result.failedKeys.get(topicPartition); + } + + private ApiResult handleResponseWithError( + TopicPartition topicPartition, + Errors error + ) { + ListOffsetsHandler handler = newHandler(mkMap(mkEntry( + topicPartition, ListOffsetsRequest.EARLIEST_TIMESTAMP))); + ListOffsetsResponse response = buildResponseWithError(topicPartition, error); + return handler.handleResponse(new Node(3, "host", 1234), mkSet(topicPartition), response); + } + + private ListOffsetsResponse buildResponseWithError( + TopicPartition topicPartition, + Errors error + ) { + ListOffsetsPartitionResponse partitionResponse = + new ListOffsetsPartitionResponse() + .setPartitionIndex(topicPartition.partition()) + .setErrorCode(error.code()); + return listOffsetsResponse(singletonMap(topicPartition, partitionResponse)); + } + + private ListOffsetsPartitionResponse samplePartitionOffsets( + TopicPartition topicPartition + ) { + + return new ListOffsetsPartitionResponse() + .setPartitionIndex(topicPartition.partition()) + .setLeaderEpoch(1) + .setOffset(1L) + .setTimestamp(0L) + .setErrorCode(Errors.NONE.code()); + } + + private void assertMatchingOffsets( + ListOffsetsPartitionResponse expected, + ListOffsetsResult.ListOffsetsResultInfo actual + ) { + Optional leaderEpoch = (expected.leaderEpoch() == ListOffsetsResponse.UNKNOWN_EPOCH) + ? Optional.empty() + : Optional.of(expected.leaderEpoch()); + + assertEquals(leaderEpoch, actual.leaderEpoch()); + assertEquals(expected.offset(), actual.offset()); + assertEquals(expected.timestamp(), actual.timestamp()); + } + + private ListOffsetsResponse listOffsetsResponse( + Map partitionResponses + ) { + ListOffsetsResponseData response = new ListOffsetsResponseData(); + Map> partitionResponsesByTopic = + CollectionUtils.groupPartitionDataByTopic(partitionResponses); + + for (Map.Entry> topicEntry: + partitionResponsesByTopic.entrySet()) { + String topic = topicEntry.getKey(); + Map topicPartitionResponses = topicEntry.getValue(); + + ListOffsetsResponseData.ListOffsetsTopicResponse topicResponse = + new ListOffsetsResponseData.ListOffsetsTopicResponse().setName(topic); + response.topics().add(topicResponse); + + for (Map.Entry partitionEntry: + topicPartitionResponses.entrySet()) { + + Integer partitionId = partitionEntry.getKey(); + ListOffsetsPartitionResponse partitionResponse = partitionEntry.getValue(); + topicResponse.partitions().add(partitionResponse.setPartitionIndex(partitionId)); + } + } + + return new ListOffsetsResponse(response); + } + +} From 8cdaff32f6e89301bc42c73aeafec9f08ee4b773 Mon Sep 17 00:00:00 2001 From: dengziming Date: Fri, 16 Jul 2021 10:10:55 +0800 Subject: [PATCH 2/2] revert listoffsets benchmark since we nolonger need buildCluster --- .../kafka/clients/admin/KafkaAdminClient.java | 2 - .../clients/admin/AdminClientTestUtils.java | 13 -- .../admin/GetListOffsetsCallsBenchmark.java | 142 ------------------ 3 files changed, 157 deletions(-) delete mode 100644 jmh-benchmarks/src/main/java/org/apache/kafka/jmh/admin/GetListOffsetsCallsBenchmark.java diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index 3291ace310d67..a1157b014d227 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -49,7 +49,6 @@ import org.apache.kafka.clients.admin.internals.ListTransactionsHandler; import org.apache.kafka.clients.admin.internals.RemoveMembersFromConsumerGroupHandler; import org.apache.kafka.clients.admin.internals.ListOffsetsHandler; -import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Assignment; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.consumer.internals.ConsumerProtocol; import org.apache.kafka.common.Cluster; @@ -256,7 +255,6 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.function.Function; import java.util.function.Predicate; -import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.Stream; diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientTestUtils.java b/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientTestUtils.java index 3db5739e692d7..d78fca99189cc 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientTestUtils.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientTestUtils.java @@ -17,12 +17,10 @@ package org.apache.kafka.clients.admin; import java.util.Collections; -import java.util.List; import java.util.Map; import java.util.stream.Collectors; import org.apache.kafka.clients.admin.CreateTopicsResult.TopicMetadataAndConfig; -import org.apache.kafka.clients.admin.internals.MetadataOperationContext; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.KafkaFuture; import org.apache.kafka.common.TopicPartition; @@ -105,15 +103,4 @@ public static DescribeTopicsResult describeTopicsResult(Map offsets) { return new ListConsumerGroupOffsetsResult(KafkaFuture.completedFuture(offsets)); } - - /** - * Used for benchmark. KafkaAdminClient.getListOffsetsCalls is only accessible - * from within the admin package. - */ - public static List getListOffsetsCalls(KafkaAdminClient adminClient, - MetadataOperationContext context, - Map topicPartitionOffsets, - Map> futures) { - return adminClient.getListOffsetsCalls(context, topicPartitionOffsets, futures); - } } diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/admin/GetListOffsetsCallsBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/admin/GetListOffsetsCallsBenchmark.java deleted file mode 100644 index 8da09ed2d9b09..0000000000000 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/admin/GetListOffsetsCallsBenchmark.java +++ /dev/null @@ -1,142 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kafka.jmh.admin; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.Set; -import java.util.concurrent.TimeUnit; - -import org.apache.kafka.clients.admin.AdminClientTestUtils; -import org.apache.kafka.clients.admin.AdminClientUnitTestEnv; -import org.apache.kafka.clients.admin.KafkaAdminClient; -import org.apache.kafka.clients.admin.ListOffsetsOptions; -import org.apache.kafka.clients.admin.ListOffsetsResult; -import org.apache.kafka.clients.admin.OffsetSpec; -import org.apache.kafka.clients.admin.internals.MetadataOperationContext; -import org.apache.kafka.common.Cluster; -import org.apache.kafka.common.Node; -import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.Uuid; -import org.apache.kafka.common.internals.KafkaFutureImpl; -import org.apache.kafka.common.message.MetadataResponseData; -import org.apache.kafka.common.message.MetadataResponseData.MetadataResponsePartition; -import org.apache.kafka.common.message.MetadataResponseData.MetadataResponseTopic; -import org.apache.kafka.common.requests.MetadataResponse; -import org.openjdk.jmh.annotations.Benchmark; -import org.openjdk.jmh.annotations.BenchmarkMode; -import org.openjdk.jmh.annotations.Fork; -import org.openjdk.jmh.annotations.Level; -import org.openjdk.jmh.annotations.Measurement; -import org.openjdk.jmh.annotations.Mode; -import org.openjdk.jmh.annotations.OutputTimeUnit; -import org.openjdk.jmh.annotations.Param; -import org.openjdk.jmh.annotations.Scope; -import org.openjdk.jmh.annotations.Setup; -import org.openjdk.jmh.annotations.State; -import org.openjdk.jmh.annotations.Warmup; - -@State(Scope.Benchmark) -@Fork(value = 1) -@Warmup(iterations = 5) -@Measurement(iterations = 15) -@BenchmarkMode(Mode.AverageTime) -@OutputTimeUnit(TimeUnit.MICROSECONDS) -public class GetListOffsetsCallsBenchmark { - @Param({"1", "10"}) - private int topicCount; - - @Param({"100", "1000", "10000"}) - private int partitionCount; - - private KafkaAdminClient admin; - private MetadataOperationContext context; - private final Map topicPartitionOffsets = new HashMap<>(); - private final Map> futures = new HashMap<>(); - private final int numNodes = 3; - - @Setup(Level.Trial) - public void setup() { - MetadataResponseData data = new MetadataResponseData(); - List mrTopicList = new ArrayList<>(); - Set topics = new HashSet<>(); - - for (int topicIndex = 0; topicIndex < topicCount; topicIndex++) { - Uuid topicId = Uuid.randomUuid(); - String topicName = "topic-" + topicIndex; - MetadataResponseTopic mrTopic = new MetadataResponseTopic() - .setTopicId(topicId) - .setName(topicName) - .setErrorCode((short) 0) - .setIsInternal(false); - - List mrPartitionList = new ArrayList<>(); - - for (int partition = 0; partition < partitionCount; partition++) { - TopicPartition tp = new TopicPartition(topicName, partition); - topics.add(tp.topic()); - futures.put(tp, new KafkaFutureImpl<>()); - topicPartitionOffsets.put(tp, OffsetSpec.latest()); - - MetadataResponsePartition mrPartition = new MetadataResponsePartition() - .setLeaderId(partition % numNodes) - .setPartitionIndex(partition) - .setIsrNodes(Arrays.asList(0, 1, 2)) - .setReplicaNodes(Arrays.asList(0, 1, 2)) - .setOfflineReplicas(Collections.emptyList()) - .setErrorCode((short) 0); - - mrPartitionList.add(mrPartition); - } - - mrTopic.setPartitions(mrPartitionList); - mrTopicList.add(mrTopic); - } - data.setTopics(new MetadataResponseData.MetadataResponseTopicCollection(mrTopicList.listIterator())); - - long deadline = 0L; - short version = 0; - context = new MetadataOperationContext<>(topics, new ListOffsetsOptions(), deadline, futures); - context.setResponse(Optional.of(new MetadataResponse(data, version))); - - AdminClientUnitTestEnv adminEnv = new AdminClientUnitTestEnv(mockCluster()); - admin = (KafkaAdminClient) adminEnv.adminClient(); - } - - @Benchmark - public Object testGetListOffsetsCalls() { - return AdminClientTestUtils.getListOffsetsCalls(admin, context, topicPartitionOffsets, futures); - } - - private Cluster mockCluster() { - final int controllerIndex = 0; - - HashMap nodes = new HashMap<>(); - for (int i = 0; i < numNodes; i++) - nodes.put(i, new Node(i, "localhost", 8121 + i)); - return new Cluster("mockClusterId", nodes.values(), - Collections.emptySet(), Collections.emptySet(), - Collections.emptySet(), nodes.get(controllerIndex)); - } -}