diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java index 915bd728e18c5..a6458b1268a1a 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java @@ -25,6 +25,8 @@ import java.util.Properties; import java.util.Set; import java.util.concurrent.TimeUnit; + +import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.ElectionType; import org.apache.kafka.common.Metric; import org.apache.kafka.common.MetricName; @@ -1080,6 +1082,55 @@ ListPartitionReassignmentsResult listPartitionReassignments(OptionalAlters offsets for the specified group. In order to succeed, the group must be empty. + * + *

This is a convenience method for {@link #alterConsumerGroupOffsets(String, Map, AlterConsumerGroupOffsetsOptions)} with default options. + * See the overload for more details. + * + * @param groupId The group for which to alter offsets. + * @param offsets A map of offsets by partition with associated metadata. + * @return The AlterOffsetsResult. + */ + default AlterConsumerGroupOffsetsResult alterConsumerGroupOffsets(String groupId, Map offsets) { + return alterConsumerGroupOffsets(groupId, offsets, new AlterConsumerGroupOffsetsOptions()); + } + + /** + *

Alters offsets for the specified group. In order to succeed, the group must be empty. + * + *

This operation is not transactional so it may succeed for some partitions while fail for others. + * + * @param groupId The group for which to alter offsets. + * @param offsets A map of offsets by partition with associated metadata. Partitions not specified in the map are ignored. + * @param options The options to use when altering the offsets. + * @return The AlterOffsetsResult. + */ + AlterConsumerGroupOffsetsResult alterConsumerGroupOffsets(String groupId, Map offsets, AlterConsumerGroupOffsetsOptions options); + + /** + *

List offset for the specified partitions and OffsetSpec. This operation enables to find + * the beginning offset, end offset as well as the offset matching a timestamp in partitions. + * + *

This is a convenience method for {@link #listOffsets(Map, ListOffsetsOptions)} + * + * @param topicPartitionOffsets The mapping from partition to the OffsetSpec to look up. + * @return The ListOffsetsResult. + */ + default ListOffsetsResult listOffsets(Map topicPartitionOffsets) { + return listOffsets(topicPartitionOffsets, new ListOffsetsOptions()); + } + + /** + *

List offset for the specified partitions. This operation enables to find + * the beginning offset, end offset as well as the offset matching a timestamp in partitions. + * + * @param topicPartitionOffsets The mapping from partition to the OffsetSpec to look up. + * @param options The options to use when retrieving the offsets + * @return The ListOffsetsResult. + */ + ListOffsetsResult listOffsets(Map topicPartitionOffsets, ListOffsetsOptions options); + /** * Get the metrics kept by the adminClient */ diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/AlterConsumerGroupOffsetsOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/AlterConsumerGroupOffsetsOptions.java new file mode 100644 index 0000000000000..eb8b8ec377112 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/admin/AlterConsumerGroupOffsetsOptions.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.admin; + +import org.apache.kafka.common.annotation.InterfaceStability; + +/** + * Options for the {@link AdminClient#alterConsumerGroupOffsets(String, Map)} call. + * + * The API of this class is evolving, see {@link AdminClient} for details. + */ +@InterfaceStability.Evolving +public class AlterConsumerGroupOffsetsOptions extends AbstractOptions { +} diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/AlterConsumerGroupOffsetsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/AlterConsumerGroupOffsetsResult.java new file mode 100644 index 0000000000000..38ee14a15e60a --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/admin/AlterConsumerGroupOffsetsResult.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.admin; + +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import org.apache.kafka.common.KafkaFuture; +import org.apache.kafka.common.KafkaFuture.BaseFunction; +import org.apache.kafka.common.KafkaFuture.BiConsumer; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.annotation.InterfaceStability; +import org.apache.kafka.common.internals.KafkaFutureImpl; +import org.apache.kafka.common.protocol.Errors; + +/** + * The result of the {@link AdminClient#alterConsumerGroupOffsets(String, Map)} call. + * + * The API of this class is evolving, see {@link AdminClient} for details. + */ +@InterfaceStability.Evolving +public class AlterConsumerGroupOffsetsResult { + + private final KafkaFuture> future; + + AlterConsumerGroupOffsetsResult(KafkaFuture> future) { + this.future = future; + } + + /** + * Return a future which can be used to check the result for a given partition. + */ + public KafkaFuture partitionResult(final TopicPartition partition) { + final KafkaFutureImpl result = new KafkaFutureImpl<>(); + + this.future.whenComplete(new BiConsumer, Throwable>() { + @Override + public void accept(final Map topicPartitions, final Throwable throwable) { + if (throwable != null) { + result.completeExceptionally(throwable); + } else if (!topicPartitions.containsKey(partition)) { + result.completeExceptionally(new IllegalArgumentException( + "Alter offset for partition \"" + partition + "\" was not attempted")); + } else { + final Errors error = topicPartitions.get(partition); + if (error == Errors.NONE) { + result.complete(null); + } else { + result.completeExceptionally(error.exception()); + } + } + + } + }); + + return result; + } + + /** + * Return a future which succeeds if all the alter offsets succeed. + */ + public KafkaFuture all() { + return this.future.thenApply(new BaseFunction, Void>() { + @Override + public Void apply(final Map topicPartitionErrorsMap) { + List partitionsFailed = topicPartitionErrorsMap.entrySet() + .stream() + .filter(e -> e.getValue() != Errors.NONE) + .map(Map.Entry::getKey) + .collect(Collectors.toList()); + for (Errors error : topicPartitionErrorsMap.values()) { + if (error != Errors.NONE) { + throw error.exception( + "Failed altering consumer group offsets for the following partitions: " + partitionsFailed); + } + } + return null; + } + }); + } +} diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index 0850cedd71037..984e863f723e7 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -29,7 +29,11 @@ import org.apache.kafka.clients.admin.DeleteAclsResult.FilterResult; import org.apache.kafka.clients.admin.DeleteAclsResult.FilterResults; import org.apache.kafka.clients.admin.DescribeReplicaLogDirsResult.ReplicaLogDirInfo; +import org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo; +import org.apache.kafka.clients.admin.OffsetSpec.TimestampSpec; import org.apache.kafka.clients.admin.internals.AdminMetadataManager; +import org.apache.kafka.clients.admin.internals.ConsumerGroupOperationContext; +import org.apache.kafka.clients.admin.internals.MetadataOperationContext; import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Assignment; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.consumer.internals.ConsumerProtocol; @@ -87,6 +91,11 @@ import org.apache.kafka.common.message.ListGroupsResponseData; import org.apache.kafka.common.message.ListPartitionReassignmentsRequestData; import org.apache.kafka.common.message.MetadataRequestData; +import org.apache.kafka.common.message.OffsetCommitRequestData; +import org.apache.kafka.common.message.OffsetCommitRequestData.OffsetCommitRequestPartition; +import org.apache.kafka.common.message.OffsetCommitRequestData.OffsetCommitRequestTopic; +import org.apache.kafka.common.message.OffsetCommitResponseData.OffsetCommitResponsePartition; +import org.apache.kafka.common.message.OffsetCommitResponseData.OffsetCommitResponseTopic; import org.apache.kafka.common.message.OffsetDeleteRequestData; import org.apache.kafka.common.message.OffsetDeleteRequestData.OffsetDeleteRequestPartition; import org.apache.kafka.common.message.OffsetDeleteRequestData.OffsetDeleteRequestTopic; @@ -153,10 +162,15 @@ import org.apache.kafka.common.requests.LeaveGroupResponse; import org.apache.kafka.common.requests.ListGroupsRequest; import org.apache.kafka.common.requests.ListGroupsResponse; +import org.apache.kafka.common.requests.ListOffsetRequest; +import org.apache.kafka.common.requests.ListOffsetResponse; +import org.apache.kafka.common.requests.ListOffsetResponse.PartitionData; import org.apache.kafka.common.requests.ListPartitionReassignmentsRequest; import org.apache.kafka.common.requests.ListPartitionReassignmentsResponse; import org.apache.kafka.common.requests.MetadataRequest; import org.apache.kafka.common.requests.MetadataResponse; +import org.apache.kafka.common.requests.OffsetCommitRequest; +import org.apache.kafka.common.requests.OffsetCommitResponse; import org.apache.kafka.common.requests.OffsetDeleteRequest; import org.apache.kafka.common.requests.OffsetDeleteResponse; import org.apache.kafka.common.requests.OffsetFetchRequest; @@ -187,6 +201,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Objects; import java.util.Optional; import java.util.Set; @@ -714,6 +729,7 @@ final void fail(long now, Throwable throwable) { * * @return The AbstractRequest builder. */ + @SuppressWarnings("rawtypes") abstract AbstractRequest.Builder createRequest(int timeoutMs); /** @@ -1271,7 +1287,7 @@ private Call makeMetadataCall(long now) { return new Call(true, "fetchMetadata", calcDeadlineMs(now, defaultTimeoutMs), new MetadataUpdateNodeIdProvider()) { @Override - public AbstractRequest.Builder createRequest(int timeoutMs) { + public MetadataRequest.Builder createRequest(int timeoutMs) { // Since this only requests node information, it's safe to pass true // for allowAutoTopicCreation (and it simplifies communication with // older brokers) @@ -1338,7 +1354,7 @@ public CreateTopicsResult createTopics(final Collection newTopics, new ControllerNodeProvider()) { @Override - public AbstractRequest.Builder createRequest(int timeoutMs) { + public CreateTopicsRequest.Builder createRequest(int timeoutMs) { return new CreateTopicsRequest.Builder( new CreateTopicsRequestData(). setTopics(topics). @@ -1435,7 +1451,7 @@ public DeleteTopicsResult deleteTopics(Collection topicNames, new ControllerNodeProvider()) { @Override - AbstractRequest.Builder createRequest(int timeoutMs) { + DeleteTopicsRequest.Builder createRequest(int timeoutMs) { return new DeleteTopicsRequest.Builder(new DeleteTopicsRequestData() .setTopicNames(validTopicNames) .setTimeoutMs(timeoutMs)); @@ -1495,7 +1511,7 @@ public ListTopicsResult listTopics(final ListTopicsOptions options) { new LeastLoadedNodeProvider()) { @Override - AbstractRequest.Builder createRequest(int timeoutMs) { + MetadataRequest.Builder createRequest(int timeoutMs) { return MetadataRequest.Builder.allTopics(); } @@ -1542,7 +1558,7 @@ public DescribeTopicsResult describeTopics(final Collection topicNames, private boolean supportsDisablingTopicCreation = true; @Override - AbstractRequest.Builder createRequest(int timeoutMs) { + MetadataRequest.Builder createRequest(int timeoutMs) { if (supportsDisablingTopicCreation) return new MetadataRequest.Builder(new MetadataRequestData() .setTopics(convertToMetadataRequestTopic(topicNamesList)) @@ -1624,7 +1640,7 @@ public DescribeClusterResult describeCluster(DescribeClusterOptions options) { new LeastLoadedNodeProvider()) { @Override - AbstractRequest.Builder createRequest(int timeoutMs) { + MetadataRequest.Builder createRequest(int timeoutMs) { // Since this only requests node information, it's safe to pass true for allowAutoTopicCreation (and it // simplifies communication with older brokers) return new MetadataRequest.Builder(new MetadataRequestData() @@ -1676,7 +1692,7 @@ public DescribeAclsResult describeAcls(final AclBindingFilter filter, DescribeAc new LeastLoadedNodeProvider()) { @Override - AbstractRequest.Builder createRequest(int timeoutMs) { + DescribeAclsRequest.Builder createRequest(int timeoutMs) { return new DescribeAclsRequest.Builder(filter); } @@ -1720,7 +1736,7 @@ public CreateAclsResult createAcls(Collection acls, CreateAclsOption new LeastLoadedNodeProvider()) { @Override - AbstractRequest.Builder createRequest(int timeoutMs) { + CreateAclsRequest.Builder createRequest(int timeoutMs) { return new CreateAclsRequest.Builder(aclCreations); } @@ -1768,7 +1784,7 @@ public DeleteAclsResult deleteAcls(Collection filters, DeleteA new LeastLoadedNodeProvider()) { @Override - AbstractRequest.Builder createRequest(int timeoutMs) { + DeleteAclsRequest.Builder createRequest(int timeoutMs) { return new DeleteAclsRequest.Builder(filterList); } @@ -1834,7 +1850,7 @@ public DescribeConfigsResult describeConfigs(Collection configRe new LeastLoadedNodeProvider()) { @Override - AbstractRequest.Builder createRequest(int timeoutMs) { + DescribeConfigsRequest.Builder createRequest(int timeoutMs) { return new DescribeConfigsRequest.Builder(unifiedRequestResources) .includeSynonyms(options.includeSynonyms()); } @@ -1881,7 +1897,7 @@ void handleFailure(Throwable throwable) { new ConstantNodeIdProvider(nodeId)) { @Override - AbstractRequest.Builder createRequest(int timeoutMs) { + DescribeConfigsRequest.Builder createRequest(int timeoutMs) { return new DescribeConfigsRequest.Builder(Collections.singleton(resource)) .includeSynonyms(options.includeSynonyms()); } @@ -1995,7 +2011,7 @@ private Map> alterConfigs(Map> incrementalAlterConfigs(Map brokers, Descri new ConstantNodeIdProvider(brokerId)) { @Override - public AbstractRequest.Builder createRequest(int timeoutMs) { + public DescribeLogDirsRequest.Builder createRequest(int timeoutMs) { // Query selected partitions in all log directories return new DescribeLogDirsRequest.Builder(null); } @@ -2231,7 +2247,7 @@ public DescribeReplicaLogDirsResult describeReplicaLogDirs(Collection newPar new ControllerNodeProvider()) { @Override - public AbstractRequest.Builder createRequest(int timeoutMs) { + public CreatePartitionsRequest.Builder createRequest(int timeoutMs) { return new CreatePartitionsRequest.Builder(requestMap, timeoutMs, options.validateOnly()); } @@ -2360,7 +2376,7 @@ public DeleteRecordsResult deleteRecords(final Map createRequest(int timeoutMs) { + CreateDelegationTokenRequest.Builder createRequest(int timeoutMs) { return new CreateDelegationTokenRequest.Builder( new CreateDelegationTokenRequestData() .setRenewers(renewers) @@ -2493,7 +2509,7 @@ public RenewDelegationTokenResult renewDelegationToken(final byte[] hmac, final new LeastLoadedNodeProvider()) { @Override - AbstractRequest.Builder createRequest(int timeoutMs) { + RenewDelegationTokenRequest.Builder createRequest(int timeoutMs) { return new RenewDelegationTokenRequest.Builder( new RenewDelegationTokenRequestData() .setHmac(hmac) @@ -2527,7 +2543,7 @@ public ExpireDelegationTokenResult expireDelegationToken(final byte[] hmac, fina new LeastLoadedNodeProvider()) { @Override - AbstractRequest.Builder createRequest(int timeoutMs) { + ExpireDelegationTokenRequest.Builder createRequest(int timeoutMs) { return new ExpireDelegationTokenRequest.Builder( new ExpireDelegationTokenRequestData() .setHmac(hmac) @@ -2561,7 +2577,7 @@ public DescribeDelegationTokenResult describeDelegationToken(final DescribeDeleg new LeastLoadedNodeProvider()) { @Override - AbstractRequest.Builder createRequest(int timeoutMs) { + DescribeDelegationTokenRequest.Builder createRequest(int timeoutMs) { return new DescribeDelegationTokenRequest.Builder(options.owners()); } @@ -2584,72 +2600,23 @@ void handleFailure(Throwable throwable) { return new DescribeDelegationTokenResult(tokensFuture); } - /** - * Context class to encapsulate parameters of a call to find and use a consumer group coordinator. - * Some of the parameters are provided at construction and are immutable whereas others are provided - * as "Call" are completed and values are available, like node id of the coordinator. - * - * @param The type of return value of the KafkaFuture - * @param The type of configuration option. Different for different consumer group commands. - */ - private final static class ConsumerGroupOperationContext> { - final private String groupId; - final private O options; - final private long deadline; - final private KafkaFutureImpl future; - private Optional node; - - public ConsumerGroupOperationContext(String groupId, - O options, - long deadline, - KafkaFutureImpl future) { - this.groupId = groupId; - this.options = options; - this.deadline = deadline; - this.future = future; - this.node = Optional.empty(); - } - - public String getGroupId() { - return groupId; - } - - public O getOptions() { - return options; - } - - public long getDeadline() { - return deadline; - } - - public KafkaFutureImpl getFuture() { - return future; - } - - public Optional getNode() { - return node; - } - - public void setNode(Node node) { - this.node = Optional.ofNullable(node); - } - - public boolean hasCoordinatorMoved(AbstractResponse response) { - return response.errorCounts().keySet() - .stream() - .anyMatch(error -> error == Errors.NOT_COORDINATOR); - } - } - - private void rescheduleTask(ConsumerGroupOperationContext context, Supplier nextCall) { + private void rescheduleFindCoordinatorTask(ConsumerGroupOperationContext context, Supplier nextCall) { log.info("Node {} is no longer the Coordinator. Retrying with new coordinator.", - context.getNode().orElse(null)); + context.node().orElse(null)); // Requeue the task so that we can try with new coordinator context.setNode(null); Call findCoordinatorCall = getFindCoordinatorCall(context, nextCall); runnable.call(findCoordinatorCall, time.milliseconds()); } + private void rescheduleMetadataTask(MetadataOperationContext context, Supplier> nextCalls) { + log.info("Retrying to fetch metadata."); + // Requeue the task so that we can re-attempt fetching metadata + context.setResponse(Optional.empty()); + Call metadataCall = getMetadataCall(context, nextCalls); + runnable.call(metadataCall, time.milliseconds()); + } + private static Map> createFutures(Collection groupIds) { return new HashSet<>(groupIds).stream().collect( Collectors.toMap(groupId -> groupId, @@ -2703,21 +2670,21 @@ public DescribeConsumerGroupsResult describeConsumerGroups(final Collection The type of configuration option, like DescribeConsumerGroupsOptions, ListConsumerGroupsOptions etc */ private > Call getFindCoordinatorCall(ConsumerGroupOperationContext context, - Supplier nextCall) { - return new Call("findCoordinator", context.getDeadline(), new LeastLoadedNodeProvider()) { + Supplier nextCall) { + return new Call("findCoordinator", context.deadline(), new LeastLoadedNodeProvider()) { @Override FindCoordinatorRequest.Builder createRequest(int timeoutMs) { return new FindCoordinatorRequest.Builder( new FindCoordinatorRequestData() .setKeyType(CoordinatorType.GROUP.id()) - .setKey(context.getGroupId())); + .setKey(context.groupId())); } @Override void handleResponse(AbstractResponse abstractResponse) { final FindCoordinatorResponse response = (FindCoordinatorResponse) abstractResponse; - if (handleGroupRequestError(response.error(), context.getFuture())) + if (handleGroupRequestError(response.error(), context.future())) return; context.setNode(response.node()); @@ -2727,7 +2694,7 @@ void handleResponse(AbstractResponse abstractResponse) { @Override void handleFailure(Throwable throwable) { - context.getFuture().completeExceptionally(throwable); + context.future().completeExceptionally(throwable); } }; } @@ -2735,14 +2702,14 @@ void handleFailure(Throwable throwable) { private Call getDescribeConsumerGroupsCall( ConsumerGroupOperationContext context) { return new Call("describeConsumerGroups", - context.getDeadline(), - new ConstantNodeIdProvider(context.getNode().get().id())) { + context.deadline(), + new ConstantNodeIdProvider(context.node().get().id())) { @Override - AbstractRequest.Builder createRequest(int timeoutMs) { + DescribeGroupsRequest.Builder createRequest(int timeoutMs) { return new DescribeGroupsRequest.Builder( new DescribeGroupsRequestData() - .setGroups(Collections.singletonList(context.getGroupId())) - .setIncludeAuthorizedOperations(context.getOptions().includeAuthorizedOperations())); + .setGroups(Collections.singletonList(context.groupId())) + .setIncludeAuthorizedOperations(context.options().includeAuthorizedOperations())); } @Override @@ -2751,29 +2718,29 @@ void handleResponse(AbstractResponse abstractResponse) { List describedGroups = response.data().groups(); if (describedGroups.isEmpty()) { - context.getFuture().completeExceptionally( - new InvalidGroupIdException("No consumer group found for GroupId: " + context.getGroupId())); + context.future().completeExceptionally( + new InvalidGroupIdException("No consumer group found for GroupId: " + context.groupId())); return; } if (describedGroups.size() > 1 || - !describedGroups.get(0).groupId().equals(context.getGroupId())) { + !describedGroups.get(0).groupId().equals(context.groupId())) { String ids = Arrays.toString(describedGroups.stream().map(DescribedGroup::groupId).toArray()); - context.getFuture().completeExceptionally(new InvalidGroupIdException( - "DescribeConsumerGroup request for GroupId: " + context.getGroupId() + " returned " + ids)); + context.future().completeExceptionally(new InvalidGroupIdException( + "DescribeConsumerGroup request for GroupId: " + context.groupId() + " returned " + ids)); return; } final DescribedGroup describedGroup = describedGroups.get(0); // If coordinator changed since we fetched it, retry - if (context.hasCoordinatorMoved(response)) { - rescheduleTask(context, () -> getDescribeConsumerGroupsCall(context)); + if (ConsumerGroupOperationContext.hasCoordinatorMoved(response)) { + rescheduleFindCoordinatorTask(context, () -> getDescribeConsumerGroupsCall(context)); return; } final Errors groupError = Errors.forCode(describedGroup.errorCode()); - if (handleGroupRequestError(groupError, context.getFuture())) + if (handleGroupRequestError(groupError, context.future())) return; final String protocolType = describedGroup.protocolType(); @@ -2797,19 +2764,59 @@ void handleResponse(AbstractResponse abstractResponse) { memberDescriptions.add(memberDescription); } final ConsumerGroupDescription consumerGroupDescription = - new ConsumerGroupDescription(context.getGroupId(), protocolType.isEmpty(), + new ConsumerGroupDescription(context.groupId(), protocolType.isEmpty(), memberDescriptions, describedGroup.protocolData(), ConsumerGroupState.parse(describedGroup.groupState()), - context.getNode().get(), + context.node().get(), authorizedOperations); - context.getFuture().complete(consumerGroupDescription); + context.future().complete(consumerGroupDescription); } } @Override void handleFailure(Throwable throwable) { - context.getFuture().completeExceptionally(throwable); + context.future().completeExceptionally(throwable); + } + }; + } + + /** + * Returns a {@code Call} object to fetch the cluster metadata. Takes a List of Calls + * parameter to schedule actions that need to be taken using the metadata. The param is a Supplier + * so that it can be lazily created, so that it can use the results of the metadata call in its + * construction. + * + * @param The type of return value of the KafkaFuture, like ListOffsetsResultInfo, etc. + * @param The type of configuration option, like ListOffsetsOptions, etc + */ + private > Call getMetadataCall(MetadataOperationContext context, + Supplier> nextCalls) { + return new Call("metadata", context.deadline(), new LeastLoadedNodeProvider()) { + @Override + MetadataRequest.Builder createRequest(int timeoutMs) { + return new MetadataRequest.Builder(new MetadataRequestData() + .setTopics(convertToMetadataRequestTopic(context.topics())) + .setAllowAutoTopicCreation(false)); + } + + @Override + void handleResponse(AbstractResponse abstractResponse) { + MetadataResponse response = (MetadataResponse) abstractResponse; + MetadataOperationContext.handleMetadataErrors(response); + + context.setResponse(Optional.of(response)); + + for (Call call : nextCalls.get()) { + runnable.call(call, time.milliseconds()); + } + } + + @Override + void handleFailure(Throwable throwable) { + for (KafkaFutureImpl future : context.futures().values()) { + future.completeExceptionally(throwable); + } } }; } @@ -2890,7 +2897,7 @@ public ListConsumerGroupsResult listConsumerGroups(ListConsumerGroupsOptions opt final long deadline = calcDeadlineMs(nowMetadata, options.timeoutMs()); runnable.call(new Call("findAllBrokers", deadline, new LeastLoadedNodeProvider()) { @Override - AbstractRequest.Builder createRequest(int timeoutMs) { + MetadataRequest.Builder createRequest(int timeoutMs) { return new MetadataRequest.Builder(new MetadataRequestData() .setTopics(Collections.emptyList()) .setAllowAutoTopicCreation(true)); @@ -2910,7 +2917,7 @@ void handleResponse(AbstractResponse abstractResponse) { final long nowList = time.milliseconds(); runnable.call(new Call("listConsumerGroups", deadline, new ConstantNodeIdProvider(node.id())) { @Override - AbstractRequest.Builder createRequest(int timeoutMs) { + ListGroupsRequest.Builder createRequest(int timeoutMs) { return new ListGroupsRequest.Builder(new ListGroupsRequestData()); } @@ -2981,11 +2988,11 @@ public ListConsumerGroupOffsetsResult listConsumerGroupOffsets(final String grou private Call getListConsumerGroupOffsetsCall(ConsumerGroupOperationContext, ListConsumerGroupOffsetsOptions> context) { - return new Call("listConsumerGroupOffsets", context.getDeadline(), - new ConstantNodeIdProvider(context.getNode().get().id())) { + return new Call("listConsumerGroupOffsets", context.deadline(), + new ConstantNodeIdProvider(context.node().get().id())) { @Override - AbstractRequest.Builder createRequest(int timeoutMs) { - return new OffsetFetchRequest.Builder(context.getGroupId(), context.getOptions().topicPartitions()); + OffsetFetchRequest.Builder createRequest(int timeoutMs) { + return new OffsetFetchRequest.Builder(context.groupId(), context.options().topicPartitions()); } @Override @@ -2994,12 +3001,12 @@ void handleResponse(AbstractResponse abstractResponse) { final Map groupOffsetsListing = new HashMap<>(); // If coordinator changed since we fetched it, retry - if (context.hasCoordinatorMoved(response)) { - rescheduleTask(context, () -> getListConsumerGroupOffsetsCall(context)); + if (ConsumerGroupOperationContext.hasCoordinatorMoved(response)) { + rescheduleFindCoordinatorTask(context, () -> getListConsumerGroupOffsetsCall(context)); return; } - if (handleGroupRequestError(response.error(), context.getFuture())) + if (handleGroupRequestError(response.error(), context.future())) return; for (Map.Entry entry : @@ -3017,12 +3024,12 @@ void handleResponse(AbstractResponse abstractResponse) { log.warn("Skipping return offset for {} due to error {}.", topicPartition, error); } } - context.getFuture().complete(groupOffsetsListing); + context.future().complete(groupOffsetsListing); } @Override void handleFailure(Throwable throwable) { - context.getFuture().completeExceptionally(throwable); + context.future().completeExceptionally(throwable); } }; } @@ -3053,13 +3060,13 @@ public DeleteConsumerGroupsResult deleteConsumerGroups(Collection groupI } private Call getDeleteConsumerGroupsCall(ConsumerGroupOperationContext context) { - return new Call("deleteConsumerGroups", context.getDeadline(), new ConstantNodeIdProvider(context.getNode().get().id())) { + return new Call("deleteConsumerGroups", context.deadline(), new ConstantNodeIdProvider(context.node().get().id())) { @Override - AbstractRequest.Builder createRequest(int timeoutMs) { + DeleteGroupsRequest.Builder createRequest(int timeoutMs) { return new DeleteGroupsRequest.Builder( new DeleteGroupsRequestData() - .setGroupsNames(Collections.singletonList(context.getGroupId())) + .setGroupsNames(Collections.singletonList(context.groupId())) ); } @@ -3068,21 +3075,21 @@ void handleResponse(AbstractResponse abstractResponse) { final DeleteGroupsResponse response = (DeleteGroupsResponse) abstractResponse; // If coordinator changed since we fetched it, retry - if (context.hasCoordinatorMoved(response)) { - rescheduleTask(context, () -> getDeleteConsumerGroupsCall(context)); + if (ConsumerGroupOperationContext.hasCoordinatorMoved(response)) { + rescheduleFindCoordinatorTask(context, () -> getDeleteConsumerGroupsCall(context)); return; } - final Errors groupError = response.get(context.getGroupId()); - if (handleGroupRequestError(groupError, context.getFuture())) + final Errors groupError = response.get(context.groupId()); + if (handleGroupRequestError(groupError, context.future())) return; - context.getFuture().complete(null); + context.future().complete(null); } @Override void handleFailure(Throwable throwable) { - context.getFuture().completeExceptionally(throwable); + context.future().completeExceptionally(throwable); } }; } @@ -3115,10 +3122,10 @@ public DeleteConsumerGroupOffsetsResult deleteConsumerGroupOffsets( private Call getDeleteConsumerGroupOffsetsCall( ConsumerGroupOperationContext, DeleteConsumerGroupOffsetsOptions> context, Set partitions) { - return new Call("deleteConsumerGroupOffsets", context.getDeadline(), new ConstantNodeIdProvider(context.getNode().get().id())) { + return new Call("deleteConsumerGroupOffsets", context.deadline(), new ConstantNodeIdProvider(context.node().get().id())) { @Override - AbstractRequest.Builder createRequest(int timeoutMs) { + OffsetDeleteRequest.Builder createRequest(int timeoutMs) { final OffsetDeleteRequestTopicCollection topics = new OffsetDeleteRequestTopicCollection(); partitions.stream().collect(Collectors.groupingBy(TopicPartition::topic)).forEach((topic, topicPartitions) -> { @@ -3134,7 +3141,7 @@ AbstractRequest.Builder createRequest(int timeoutMs) { return new OffsetDeleteRequest.Builder( new OffsetDeleteRequestData() - .setGroupId(context.groupId) + .setGroupId(context.groupId()) .setTopics(topics) ); } @@ -3144,14 +3151,14 @@ void handleResponse(AbstractResponse abstractResponse) { final OffsetDeleteResponse response = (OffsetDeleteResponse) abstractResponse; // If coordinator changed since we fetched it, retry - if (context.hasCoordinatorMoved(response)) { - rescheduleTask(context, () -> getDeleteConsumerGroupOffsetsCall(context, partitions)); + if (ConsumerGroupOperationContext.hasCoordinatorMoved(response)) { + rescheduleFindCoordinatorTask(context, () -> getDeleteConsumerGroupOffsetsCall(context, partitions)); return; } // If the error is an error at the group level, the future is failed with it final Errors groupError = Errors.forCode(response.data.errorCode()); - if (handleGroupRequestError(groupError, context.getFuture())) + if (handleGroupRequestError(groupError, context.future())) return; final Map partitions = new HashMap<>(); @@ -3163,12 +3170,12 @@ void handleResponse(AbstractResponse abstractResponse) { }); }); - context.getFuture().complete(partitions); + context.future().complete(partitions); } @Override void handleFailure(Throwable throwable) { - context.getFuture().completeExceptionally(throwable); + context.future().completeExceptionally(throwable); } }; } @@ -3189,7 +3196,7 @@ public ElectLeadersResult electLeaders( new ControllerNodeProvider()) { @Override - public AbstractRequest.Builder createRequest(int timeoutMs) { + public ElectLeadersRequest.Builder createRequest(int timeoutMs) { return new ElectLeadersRequest.Builder(electionType, topicPartitions, timeoutMs); } @@ -3254,7 +3261,7 @@ public AlterPartitionReassignmentsResult alterPartitionReassignments( new ControllerNodeProvider()) { @Override - public AbstractRequest.Builder createRequest(int timeoutMs) { + public AlterPartitionReassignmentsRequest.Builder createRequest(int timeoutMs) { AlterPartitionReassignmentsRequestData data = new AlterPartitionReassignmentsRequestData(); for (Map.Entry>> entry : @@ -3389,7 +3396,7 @@ public ListPartitionReassignmentsResult listPartitionReassignments(Optional context) { return new Call("leaveGroup", - context.getDeadline(), - new ConstantNodeIdProvider(context.getNode().get().id())) { + context.deadline(), + new ConstantNodeIdProvider(context.node().get().id())) { @Override - AbstractRequest.Builder createRequest(int timeoutMs) { - return new LeaveGroupRequest.Builder(context.getGroupId(), - context.getOptions().getMembers()); + LeaveGroupRequest.Builder createRequest(int timeoutMs) { + return new LeaveGroupRequest.Builder(context.groupId(), + context.options().getMembers()); } @Override @@ -3495,8 +3502,8 @@ void handleResponse(AbstractResponse abstractResponse) { final LeaveGroupResponse response = (LeaveGroupResponse) abstractResponse; // If coordinator changed since we fetched it, retry - if (context.hasCoordinatorMoved(response)) { - rescheduleTask(context, () -> getRemoveMembersFromGroupCall(context)); + if (ConsumerGroupOperationContext.hasCoordinatorMoved(response)) { + rescheduleFindCoordinatorTask(context, () -> getRemoveMembersFromGroupCall(context)); return; } @@ -3507,15 +3514,226 @@ void handleResponse(AbstractResponse abstractResponse) { } final RemoveMemberFromGroupResult membershipChangeResult = - new RemoveMemberFromGroupResult(response, context.getOptions().getMembers()); + new RemoveMemberFromGroupResult(response, context.options().getMembers()); + + context.future().complete(membershipChangeResult); + } + + @Override + void handleFailure(Throwable throwable) { + context.future().completeExceptionally(throwable); + } + }; + } + + @Override + public AlterConsumerGroupOffsetsResult alterConsumerGroupOffsets(String groupId, + Map offsets, + AlterConsumerGroupOffsetsOptions options) { + final KafkaFutureImpl> future = new KafkaFutureImpl<>(); + + final long startFindCoordinatorMs = time.milliseconds(); + final long deadline = calcDeadlineMs(startFindCoordinatorMs, options.timeoutMs()); + + ConsumerGroupOperationContext, AlterConsumerGroupOffsetsOptions> context = + new ConsumerGroupOperationContext<>(groupId, options, deadline, future); + + Call findCoordinatorCall = getFindCoordinatorCall(context, + () -> KafkaAdminClient.this.getAlterConsumerGroupOffsetsCall(context, offsets)); + runnable.call(findCoordinatorCall, startFindCoordinatorMs); + + return new AlterConsumerGroupOffsetsResult(future); + } + + private Call getAlterConsumerGroupOffsetsCall(ConsumerGroupOperationContext, + AlterConsumerGroupOffsetsOptions> context, + Map offsets) { + + return new Call("commitOffsets", context.deadline(), new ConstantNodeIdProvider(context.node().get().id())) { + + @Override + OffsetCommitRequest.Builder createRequest(int timeoutMs) { + List topics = new ArrayList<>(); + Map> offsetData = new HashMap<>(); + for (Map.Entry entry : offsets.entrySet()) { + String topic = entry.getKey().topic(); + OffsetAndMetadata oam = entry.getValue(); + offsetData.compute(topic, (key, value) -> { + if (value == null) { + value = new ArrayList<>(); + } + OffsetCommitRequestPartition partition = new OffsetCommitRequestPartition() + .setCommittedOffset(oam.offset()) + .setCommittedLeaderEpoch(oam.leaderEpoch().orElse(-1)) + .setCommittedMetadata(oam.metadata()) + .setPartitionIndex(entry.getKey().partition()); + value.add(partition); + return value; + }); + } + for (Map.Entry> entry : offsetData.entrySet()) { + OffsetCommitRequestTopic topic = new OffsetCommitRequestTopic() + .setName(entry.getKey()) + .setPartitions(entry.getValue()); + topics.add(topic); + } + OffsetCommitRequestData data = new OffsetCommitRequestData() + .setGroupId(context.groupId()) + .setTopics(topics); + return new OffsetCommitRequest.Builder(data); + } + + @Override + void handleResponse(AbstractResponse abstractResponse) { + final OffsetCommitResponse response = (OffsetCommitResponse) abstractResponse; - context.getFuture().complete(membershipChangeResult); + // If coordinator changed since we fetched it, retry + if (ConsumerGroupOperationContext.hasCoordinatorMoved(response)) { + rescheduleFindCoordinatorTask(context, + () -> getAlterConsumerGroupOffsetsCall(context, offsets)); + return; + } + + // If there is a coordinator error, retry + for (OffsetCommitResponseTopic topic : response.data().topics()) { + for (OffsetCommitResponsePartition partition : topic.partitions()) { + Errors error = Errors.forCode(partition.errorCode()); + if (ConsumerGroupOperationContext.shouldRefreshCoordinator(error)) { + rescheduleFindCoordinatorTask(context, + () -> getAlterConsumerGroupOffsetsCall(context, offsets)); + return; + } + } + } + + final Map partitions = new HashMap<>(); + for (OffsetCommitResponseTopic topic : response.data().topics()) { + for (OffsetCommitResponsePartition partition : topic.partitions()) { + TopicPartition tp = new TopicPartition(topic.name(), partition.partitionIndex()); + Errors error = Errors.forCode(partition.errorCode()); + partitions.put(tp, error); + } + } + context.future().complete(partitions); } @Override void handleFailure(Throwable throwable) { - context.getFuture().completeExceptionally(throwable); + context.future().completeExceptionally(throwable); } }; } + + @Override + public ListOffsetsResult listOffsets(Map topicPartitionOffsets, + ListOffsetsOptions options) { + + // preparing topics list for asking metadata about them + final Map> futures = new HashMap<>(topicPartitionOffsets.size()); + final Set topics = new HashSet<>(); + for (TopicPartition topicPartition : topicPartitionOffsets.keySet()) { + topics.add(topicPartition.topic()); + futures.put(topicPartition, new KafkaFutureImpl<>()); + } + + final long nowMetadata = time.milliseconds(); + final long deadline = calcDeadlineMs(nowMetadata, options.timeoutMs()); + + MetadataOperationContext context = + new MetadataOperationContext<>(topics, options, deadline, futures); + + Call metadataCall = getMetadataCall(context, + () -> KafkaAdminClient.this.getListOffsetsCalls(context, topicPartitionOffsets, futures)); + runnable.call(metadataCall, nowMetadata); + + return new ListOffsetsResult(new HashMap<>(futures)); + } + + private List getListOffsetsCalls(MetadataOperationContext context, + Map topicPartitionOffsets, + Map> futures) { + + MetadataResponse mr = context.response().orElseThrow(() -> new IllegalStateException("No Metadata response")); + List calls = new ArrayList<>(); + // grouping topic partitions per leader + Map> leaders = new HashMap<>(); + + for (Map.Entry entry: topicPartitionOffsets.entrySet()) { + + OffsetSpec offsetSpec = entry.getValue(); + TopicPartition tp = entry.getKey(); + KafkaFutureImpl future = futures.get(tp); + long offsetQuery = (offsetSpec instanceof TimestampSpec) + ? ((TimestampSpec) offsetSpec).timestamp() + : (offsetSpec instanceof OffsetSpec.EarliestSpec) + ? ListOffsetRequest.EARLIEST_TIMESTAMP + : ListOffsetRequest.LATEST_TIMESTAMP; + // avoid sending listOffsets request for topics with errors + if (!mr.errors().containsKey(tp.topic())) { + Node node = mr.cluster().leaderFor(tp); + if (node != null) { + Map leadersOnNode = leaders.computeIfAbsent(node, k -> new HashMap<>()); + leadersOnNode.put(tp, new ListOffsetRequest.PartitionData(offsetQuery, Optional.empty())); + } else { + future.completeExceptionally(Errors.LEADER_NOT_AVAILABLE.exception()); + } + } else { + future.completeExceptionally(mr.errors().get(tp.topic()).exception()); + } + } + + for (final Map.Entry> entry: leaders.entrySet()) { + final int brokerId = entry.getKey().id(); + final Map partitionsToQuery = entry.getValue(); + + calls.add(new Call("listOffsets on broker " + brokerId, context.deadline(), new ConstantNodeIdProvider(brokerId)) { + + @Override + ListOffsetRequest.Builder createRequest(int timeoutMs) { + return ListOffsetRequest.Builder + .forConsumer(true, context.options().isolationLevel()) + .setTargetTimes(partitionsToQuery); + } + + @Override + void handleResponse(AbstractResponse abstractResponse) { + ListOffsetResponse response = (ListOffsetResponse) abstractResponse; + Set partitionsWithErrors = new HashSet<>(); + + for (Entry result : response.responseData().entrySet()) { + TopicPartition tp = result.getKey(); + PartitionData partitionData = result.getValue(); + + KafkaFutureImpl future = futures.get(tp); + Errors error = partitionData.error; + if (MetadataOperationContext.shouldRefreshMetadata(error)) { + partitionsWithErrors.add(tp); + } else if (error == Errors.NONE) { + future.complete(new ListOffsetsResultInfo(partitionData.offset, partitionData.timestamp, partitionData.leaderEpoch)); + } else { + future.completeExceptionally(error.exception()); + } + } + + if (!partitionsWithErrors.isEmpty()) { + partitionsToQuery.keySet().retainAll(partitionsWithErrors); + Set retryTopics = partitionsWithErrors.stream().map(tp -> tp.topic()).collect(Collectors.toSet()); + MetadataOperationContext retryContext = + new MetadataOperationContext<>(retryTopics, context.options(), context.deadline(), futures); + rescheduleMetadataTask(retryContext, () -> Collections.singletonList(this)); + } + } + + @Override + void handleFailure(Throwable throwable) { + for (TopicPartition tp : entry.getValue().keySet()) { + KafkaFutureImpl future = futures.get(tp); + future.completeExceptionally(throwable); + } + } + }); + } + return calls; + } + } diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ListOffsetsOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/ListOffsetsOptions.java new file mode 100644 index 0000000000000..0e116e2f7d04e --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/admin/ListOffsetsOptions.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.admin; + +import org.apache.kafka.common.IsolationLevel; +import org.apache.kafka.common.annotation.InterfaceStability; + +/** + * Options for {@link AdminClient#listOffsets(Map)}. + * + * The API of this class is evolving, see {@link AdminClient} for details. + */ +@InterfaceStability.Evolving +public class ListOffsetsOptions extends AbstractOptions { + + private final IsolationLevel isolationLevel; + + public ListOffsetsOptions() { + this(IsolationLevel.READ_UNCOMMITTED); + } + + public ListOffsetsOptions(IsolationLevel isolationLevel) { + this.isolationLevel = isolationLevel; + } + + public IsolationLevel isolationLevel() { + return isolationLevel; + } +} diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ListOffsetsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/ListOffsetsResult.java new file mode 100644 index 0000000000000..d830ef2aadc8c --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/admin/ListOffsetsResult.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.admin; + +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ExecutionException; + +import org.apache.kafka.common.KafkaFuture; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.annotation.InterfaceStability; + +/** + * The result of the {@link AdminClient#listOffsets(Map)} call. + * + * The API of this class is evolving, see {@link AdminClient} for details. + */ +@InterfaceStability.Evolving +public class ListOffsetsResult { + + private final Map> futures; + + ListOffsetsResult(Map> futures) { + this.futures = futures; + } + + /** + * Return a future which can be used to check the result for a given partition. + */ + public KafkaFuture partitionResult(final TopicPartition partition) { + KafkaFuture future = futures.get(partition); + if (future == null) { + throw new IllegalArgumentException( + "List Offsets for partition \"" + partition + "\" was not attempted"); + } + return future; + } + + /** + * Return a future which succeeds only if offsets for all specified partitions have been successfully + * retrieved. + */ + public KafkaFuture> all() { + return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0])) + .thenApply(new KafkaFuture.BaseFunction>() { + @Override + public Map apply(Void v) { + Map offsets = new HashMap<>(futures.size()); + for (Map.Entry> entry : futures.entrySet()) { + try { + offsets.put(entry.getKey(), entry.getValue().get()); + } catch (InterruptedException | ExecutionException e) { + // This should be unreachable, because allOf ensured that all the futures completed successfully. + throw new RuntimeException(e); + } + } + return offsets; + } + }); + } + + public static class ListOffsetsResultInfo { + + private final long offset; + private final long timestamp; + private final Optional leaderEpoch; + + ListOffsetsResultInfo(long offset, long timestamp, Optional leaderEpoch) { + this.offset = offset; + this.timestamp = timestamp; + this.leaderEpoch = leaderEpoch; + } + + public long offset() { + return offset; + } + + public long timestamp() { + return timestamp; + } + + public Optional leaderEpoch() { + return leaderEpoch; + } + + @Override + public String toString() { + return "ListOffsetsResultInfo(offset=" + offset + ", timestamp=" + timestamp + ", leaderEpoch=" + + leaderEpoch + ")"; + } + } +} diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/OffsetSpec.java b/clients/src/main/java/org/apache/kafka/clients/admin/OffsetSpec.java new file mode 100644 index 0000000000000..8955b41328603 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/admin/OffsetSpec.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.admin; + + +/** + * This class allows to specify the desired offsets when using {@link KafkaAdminClient#listOffsets(Map, ListOffsetsOptions)} + */ +public class OffsetSpec { + + static class EarliestSpec extends OffsetSpec { } + static class LatestSpec extends OffsetSpec { } + static class TimestampSpec extends OffsetSpec { + private final long timestamp; + + TimestampSpec(long timestamp) { + this.timestamp = timestamp; + } + + long timestamp() { + return timestamp; + } + } + + /** + * Used to retrieve the latest offset of a partition + */ + public static OffsetSpec latest() { + return new LatestSpec(); + } + + /** + * Used to retrieve the earliest offset of a partition + */ + public static OffsetSpec earliest() { + return new EarliestSpec(); + } + + /** + * Used to retrieve the the earliest offset whose timestamp is greater than + * or equal to the given timestamp in the corresponding partition + * @param timestamp in milliseconds + */ + public static OffsetSpec forTimestamp(long timestamp) { + return new TimestampSpec(timestamp); + } + +} diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/ConsumerGroupOperationContext.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/ConsumerGroupOperationContext.java new file mode 100644 index 0000000000000..bd4415ceb7452 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/ConsumerGroupOperationContext.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients.admin.internals; + +import java.util.Optional; + +import org.apache.kafka.clients.admin.AbstractOptions; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.internals.KafkaFutureImpl; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.AbstractResponse; + +/** + * Context class to encapsulate parameters of a call to find and use a consumer group coordinator. + * Some of the parameters are provided at construction and are immutable whereas others are provided + * as "Call" are completed and values are available, like node id of the coordinator. + * + * @param The type of return value of the KafkaFuture + * @param The type of configuration option. Different for different consumer group commands. + */ +public final class ConsumerGroupOperationContext> { + final private String groupId; + final private O options; + final private long deadline; + final private KafkaFutureImpl future; + private Optional node; + + public ConsumerGroupOperationContext(String groupId, + O options, + long deadline, + KafkaFutureImpl future) { + this.groupId = groupId; + this.options = options; + this.deadline = deadline; + this.future = future; + this.node = Optional.empty(); + } + + public String groupId() { + return groupId; + } + + public O options() { + return options; + } + + public long deadline() { + return deadline; + } + + public KafkaFutureImpl future() { + return future; + } + + public Optional node() { + return node; + } + + public void setNode(Node node) { + this.node = Optional.ofNullable(node); + } + + public static boolean hasCoordinatorMoved(AbstractResponse response) { + return response.errorCounts().keySet() + .stream() + .anyMatch(error -> error == Errors.NOT_COORDINATOR); + } + + public static boolean shouldRefreshCoordinator(Errors error) { + return error == Errors.COORDINATOR_LOAD_IN_PROGRESS || error == Errors.COORDINATOR_NOT_AVAILABLE; + } +} diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/MetadataOperationContext.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/MetadataOperationContext.java new file mode 100644 index 0000000000000..e6f405436645e --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/MetadataOperationContext.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients.admin.internals; + +import java.util.Collection; +import java.util.Map; +import java.util.Optional; + +import org.apache.kafka.clients.admin.AbstractOptions; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.InvalidMetadataException; +import org.apache.kafka.common.internals.KafkaFutureImpl; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.MetadataResponse; +import org.apache.kafka.common.requests.MetadataResponse.PartitionMetadata; +import org.apache.kafka.common.requests.MetadataResponse.TopicMetadata; + +/** + * Context class to encapsulate parameters of a call to fetch and use cluster metadata. + * Some of the parameters are provided at construction and are immutable whereas others are provided + * as "Call" are completed and values are available. + * + * @param The type of return value of the KafkaFuture + * @param The type of configuration option. + */ +public final class MetadataOperationContext> { + final private Collection topics; + final private O options; + final private long deadline; + final private Map> futures; + private Optional response; + + public MetadataOperationContext(Collection topics, + O options, + long deadline, + Map> futures) { + this.topics = topics; + this.options = options; + this.deadline = deadline; + this.futures = futures; + this.response = Optional.empty(); + } + + public void setResponse(Optional response) { + this.response = response; + } + + public Optional response() { + return response; + } + + public O options() { + return options; + } + + public long deadline() { + return deadline; + } + + public Map> futures() { + return futures; + } + + public Collection topics() { + return topics; + } + + public static void handleMetadataErrors(MetadataResponse response) { + for (TopicMetadata tm : response.topicMetadata()) { + for (PartitionMetadata pm : tm.partitionMetadata()) { + if (shouldRefreshMetadata(pm.error())) { + throw pm.error().exception(); + } + } + } + } + + public static boolean shouldRefreshMetadata(Errors error) { + return error.exception() instanceof InvalidMetadataException; + } +} \ No newline at end of file diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java index e0f7c4d160d5e..67897cfd54fb3 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java @@ -18,13 +18,13 @@ import org.apache.kafka.clients.ClientDnsLookup; import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.common.IsolationLevel; import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigDef.Importance; import org.apache.kafka.common.config.ConfigDef.Type; import org.apache.kafka.common.config.SecurityConfig; import org.apache.kafka.common.metrics.Sensor; -import org.apache.kafka.common.requests.IsolationLevel; import org.apache.kafka.common.serialization.Deserializer; import java.util.Collections; diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index f12beaf8d5391..e9e91bc28ae6b 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -35,6 +35,7 @@ import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener; import org.apache.kafka.clients.consumer.internals.SubscriptionState; import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.IsolationLevel; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.Metric; import org.apache.kafka.common.MetricName; @@ -52,7 +53,6 @@ import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.network.ChannelBuilder; import org.apache.kafka.common.network.Selector; -import org.apache.kafka.common.requests.IsolationLevel; import org.apache.kafka.common.requests.MetadataRequest; import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.utils.AppInfoParser; diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java index 384da315e70f4..a162ddbf12ae8 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java @@ -31,6 +31,7 @@ import org.apache.kafka.clients.consumer.OffsetOutOfRangeException; import org.apache.kafka.clients.consumer.OffsetResetStrategy; import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.IsolationLevel; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.MetricName; import org.apache.kafka.common.Node; @@ -64,7 +65,6 @@ import org.apache.kafka.common.record.TimestampType; import org.apache.kafka.common.requests.FetchRequest; import org.apache.kafka.common.requests.FetchResponse; -import org.apache.kafka.common.requests.IsolationLevel; import org.apache.kafka.common.requests.ListOffsetRequest; import org.apache.kafka.common.requests.ListOffsetResponse; import org.apache.kafka.common.requests.MetadataRequest; diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java index 953505f0b3e35..bd05909269ce7 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java @@ -22,11 +22,11 @@ import org.apache.kafka.clients.consumer.NoOffsetForPartitionException; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.common.IsolationLevel; import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.internals.PartitionStates; import org.apache.kafka.common.requests.EpochEndOffset; -import org.apache.kafka.common.requests.IsolationLevel; import org.apache.kafka.common.utils.LogContext; import org.slf4j.Logger; diff --git a/clients/src/main/java/org/apache/kafka/common/requests/IsolationLevel.java b/clients/src/main/java/org/apache/kafka/common/IsolationLevel.java similarity index 96% rename from clients/src/main/java/org/apache/kafka/common/requests/IsolationLevel.java rename to clients/src/main/java/org/apache/kafka/common/IsolationLevel.java index a09b625c050a3..79f0a92954bf1 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/IsolationLevel.java +++ b/clients/src/main/java/org/apache/kafka/common/IsolationLevel.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.kafka.common.requests; +package org.apache.kafka.common; public enum IsolationLevel { READ_UNCOMMITTED((byte) 0), READ_COMMITTED((byte) 1); diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsRequest.java index 963ad065e4389..3fbf13d765c96 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsRequest.java @@ -102,7 +102,7 @@ public String value() { } - public static class Builder extends AbstractRequest.Builder { + public static class Builder extends AbstractRequest.Builder { private final Map configs; private final boolean validateOnly; diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsRequest.java index 0ee256ff53408..99bbdd772c63e 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsRequest.java @@ -65,7 +65,7 @@ public static Schema[] schemaVersions() { return new Schema[]{DESCRIBE_CONFIGS_REQUEST_V0, DESCRIBE_CONFIGS_REQUEST_V1, DESCRIBE_CONFIGS_REQUEST_V2}; } - public static class Builder extends AbstractRequest.Builder { + public static class Builder extends AbstractRequest.Builder { private final Map> resourceToConfigNames; private boolean includeSynonyms; diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java index 2c0455ae4ae1e..f99409eee6666 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.common.requests; +import org.apache.kafka.common.IsolationLevel; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java index e9fe942e8581a..094d0d2f2de47 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.common.requests; +import org.apache.kafka.common.IsolationLevel; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; @@ -217,7 +218,6 @@ public String toString() { /** * Private constructor with a specified version. */ - @SuppressWarnings("unchecked") private ListOffsetRequest(int replicaId, Map targetTimes, IsolationLevel isolationLevel, diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java index 6d870fcd8b35d..4d677bfc2760a 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java @@ -21,6 +21,7 @@ import org.apache.kafka.clients.MockClient; import org.apache.kafka.clients.NodeApiVersions; import org.apache.kafka.clients.admin.DeleteAclsResult.FilterResults; +import org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo; import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.consumer.internals.ConsumerProtocol; @@ -102,9 +103,14 @@ import org.apache.kafka.common.requests.IncrementalAlterConfigsResponse; import org.apache.kafka.common.requests.LeaveGroupResponse; import org.apache.kafka.common.requests.ListGroupsResponse; +import org.apache.kafka.common.requests.ListOffsetResponse; +import org.apache.kafka.common.requests.ListOffsetResponse.PartitionData; +import org.apache.kafka.common.requests.MetadataResponse.PartitionMetadata; +import org.apache.kafka.common.requests.MetadataResponse.TopicMetadata; import org.apache.kafka.common.requests.ListPartitionReassignmentsResponse; import org.apache.kafka.common.requests.MetadataRequest; import org.apache.kafka.common.requests.MetadataResponse; +import org.apache.kafka.common.requests.OffsetCommitResponse; import org.apache.kafka.common.requests.OffsetDeleteResponse; import org.apache.kafka.common.requests.OffsetFetchResponse; import org.apache.kafka.common.resource.PatternType; @@ -276,6 +282,12 @@ private static OffsetDeleteResponse prepareOffsetDeleteResponse(String topic, in ); } + private static OffsetCommitResponse prepareOffsetCommitResponse(TopicPartition tp, Errors error) { + Map responseData = new HashMap<>(); + responseData.put(tp, error); + return new OffsetCommitResponse(0, responseData); + } + private static CreateTopicsResponse prepareCreateTopicsResponse(String topicName, Errors error) { CreateTopicsResponseData data = new CreateTopicsResponseData(); data.topics().add(new CreatableTopicResult(). @@ -295,6 +307,31 @@ private static FindCoordinatorResponse prepareFindCoordinatorResponse(Errors err return FindCoordinatorResponse.prepareResponse(error, node); } + private static MetadataResponse prepareMetadataResponse(Cluster cluster, Errors error) { + List metadata = new ArrayList<>(); + for (String topic : cluster.topics()) { + List pms = new ArrayList<>(); + for (PartitionInfo pInfo : cluster.availablePartitionsForTopic(topic)) { + PartitionMetadata pm = new PartitionMetadata(error, + pInfo.partition(), + pInfo.leader(), + Optional.of(234), + Arrays.asList(pInfo.replicas()), + Arrays.asList(pInfo.inSyncReplicas()), + Arrays.asList(pInfo.offlineReplicas())); + pms.add(pm); + } + TopicMetadata tm = new TopicMetadata(error, topic, false, pms); + metadata.add(tm); + } + return MetadataResponse.prepareResponse(0, + cluster.nodes(), + cluster.clusterResource().clusterId(), + cluster.controller().id(), + metadata, + MetadataResponse.AUTHORIZED_OPERATIONS_OMITTED); + } + /** * Test that the client properly times out when we don't receive any metadata. */ @@ -1451,7 +1488,8 @@ public void testDescribeConsumerGroupsWithAuthorizedOperationsOmitted() throws E try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) { env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); - env.kafkaClient().prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, env.cluster().controller())); + env.kafkaClient().prepareResponse( + prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller())); DescribeGroupsResponseData data = new DescribeGroupsResponseData(); data.groups().add(DescribeGroupsResponse.groupMetadata( @@ -1567,13 +1605,15 @@ public void testDeleteConsumerGroups() throws Exception { assertNull(results.get()); //should throw error for non-retriable errors - env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.GROUP_AUTHORIZATION_FAILED, Node.noNode())); + env.kafkaClient().prepareResponse( + prepareFindCoordinatorResponse(Errors.GROUP_AUTHORIZATION_FAILED, Node.noNode())); final DeleteConsumerGroupsResult errorResult = env.adminClient().deleteConsumerGroups(groupIds); TestUtils.assertFutureError(errorResult.deletedGroups().get("group-0"), GroupAuthorizationException.class); //Retriable errors should be retried - env.kafkaClient().prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, env.cluster().controller())); + env.kafkaClient().prepareResponse( + prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller())); final DeletableGroupResultCollection errorResponse1 = new DeletableGroupResultCollection(); errorResponse1.add(new DeletableGroupResult() @@ -1698,7 +1738,7 @@ public void testDeleteConsumerGroupOffsetsRetriableErrors() throws Exception { env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); env.kafkaClient().prepareResponse( - FindCoordinatorResponse.prepareResponse(Errors.NONE, env.cluster().controller())); + prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller())); env.kafkaClient().prepareResponse( prepareOffsetDeleteResponse(Errors.COORDINATOR_NOT_AVAILABLE)); @@ -1752,8 +1792,8 @@ public void testDeleteConsumerGroupOffsetsNonRetriableErrors() throws Exception env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); for (Errors error : retriableErrors) { - env.kafkaClient().prepareResponse(FindCoordinatorResponse - .prepareResponse(Errors.NONE, env.cluster().controller())); + env.kafkaClient().prepareResponse( + prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller())); env.kafkaClient().prepareResponse( prepareOffsetDeleteResponse(error)); @@ -2022,8 +2062,8 @@ public void testAlterPartitionReassignments() throws Exception { .setPartitions(Collections.singletonList(normalPartitionResponse)))); env.kafkaClient().prepareResponse(new AlterPartitionReassignmentsResponse(responseData1)); AlterPartitionReassignmentsResult result1 = env.adminClient().alterPartitionReassignments(reassignments); - Future future1 = result1.all(); - Future future2 = result1.values().get(tp1); + Future future1 = result1.all(); + Future future2 = result1.values().get(tp1); TestUtils.assertFutureError(future1, UnknownServerException.class); TestUtils.assertFutureError(future2, UnknownServerException.class); @@ -2208,6 +2248,466 @@ public void testListPartitionReassignments() throws Exception { } } + @Test + public void testAlterConsumerGroupOffsets() throws Exception { + // Happy path + + final Map nodes = new HashMap<>(); + nodes.put(0, new Node(0, "localhost", 8121)); + + final Cluster cluster = + new Cluster( + "mockClusterId", + nodes.values(), + Collections.emptyList(), + Collections.emptySet(), + Collections.emptySet(), nodes.get(0)); + + final String groupId = "group-0"; + final TopicPartition tp1 = new TopicPartition("foo", 0); + final TopicPartition tp2 = new TopicPartition("bar", 0); + final TopicPartition tp3 = new TopicPartition("foobar", 0); + + try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) { + env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); + + env.kafkaClient().prepareResponse( + prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller())); + + Map responseData = new HashMap<>(); + responseData.put(tp1, Errors.NONE); + responseData.put(tp2, Errors.NONE); + env.kafkaClient().prepareResponse(new OffsetCommitResponse(0, responseData)); + + Map offsets = new HashMap<>(); + offsets.put(tp1, new OffsetAndMetadata(123L)); + offsets.put(tp2, new OffsetAndMetadata(456L)); + final AlterConsumerGroupOffsetsResult result = env.adminClient().alterConsumerGroupOffsets( + groupId, offsets); + + assertNull(result.all().get()); + assertNull(result.partitionResult(tp1).get()); + assertNull(result.partitionResult(tp2).get()); + TestUtils.assertFutureError(result.partitionResult(tp3), IllegalArgumentException.class); + } + } + + @Test + public void testAlterConsumerGroupOffsetsRetriableErrors() throws Exception { + // Retriable errors should be retried + + final Map nodes = new HashMap<>(); + nodes.put(0, new Node(0, "localhost", 8121)); + + final Cluster cluster = + new Cluster( + "mockClusterId", + nodes.values(), + Collections.emptyList(), + Collections.emptySet(), + Collections.emptySet(), nodes.get(0)); + + final String groupId = "group-0"; + final TopicPartition tp1 = new TopicPartition("foo", 0); + + try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) { + env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); + + env.kafkaClient().prepareResponse( + prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller())); + + env.kafkaClient().prepareResponse( + prepareOffsetCommitResponse(tp1, Errors.COORDINATOR_NOT_AVAILABLE)); + + env.kafkaClient().prepareResponse( + prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller())); + + env.kafkaClient().prepareResponse( + prepareOffsetCommitResponse(tp1, Errors.COORDINATOR_LOAD_IN_PROGRESS)); + + env.kafkaClient().prepareResponse( + prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller())); + + env.kafkaClient().prepareResponse( + prepareOffsetCommitResponse(tp1, Errors.NOT_COORDINATOR)); + + env.kafkaClient().prepareResponse( + prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller())); + + env.kafkaClient().prepareResponse( + prepareOffsetCommitResponse(tp1, Errors.NONE)); + + Map offsets = new HashMap<>(); + offsets.put(tp1, new OffsetAndMetadata(123L)); + final AlterConsumerGroupOffsetsResult result1 = env.adminClient() + .alterConsumerGroupOffsets(groupId, offsets); + + assertNull(result1.all().get()); + assertNull(result1.partitionResult(tp1).get()); + } + } + + @Test + public void testAlterConsumerGroupOffsetsNonRetriableErrors() throws Exception { + // Non-retriable errors throw an exception + + final Map nodes = new HashMap<>(); + nodes.put(0, new Node(0, "localhost", 8121)); + + final Cluster cluster = + new Cluster( + "mockClusterId", + nodes.values(), + Collections.emptyList(), + Collections.emptySet(), + Collections.emptySet(), nodes.get(0)); + + final String groupId = "group-0"; + final TopicPartition tp1 = new TopicPartition("foo", 0); + final List nonRetriableErrors = Arrays.asList( + Errors.GROUP_AUTHORIZATION_FAILED, Errors.INVALID_GROUP_ID, Errors.GROUP_ID_NOT_FOUND); + + try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) { + env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); + + for (Errors error : nonRetriableErrors) { + env.kafkaClient().prepareResponse( + prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller())); + + env.kafkaClient().prepareResponse(prepareOffsetCommitResponse(tp1, error)); + + Map offsets = new HashMap<>(); + offsets.put(tp1, new OffsetAndMetadata(123L)); + AlterConsumerGroupOffsetsResult errorResult = env.adminClient() + .alterConsumerGroupOffsets(groupId, offsets); + + TestUtils.assertFutureError(errorResult.all(), error.exception().getClass()); + TestUtils.assertFutureError(errorResult.partitionResult(tp1), error.exception().getClass()); + } + } + } + + @Test + public void testAlterConsumerGroupOffsetsFindCoordinatorRetriableErrors() throws Exception { + // Retriable FindCoordinatorResponse errors should be retried + + final Map nodes = new HashMap<>(); + nodes.put(0, new Node(0, "localhost", 8121)); + + final Cluster cluster = + new Cluster( + "mockClusterId", + nodes.values(), + Collections.emptyList(), + Collections.emptySet(), + Collections.emptySet(), nodes.get(0)); + + final String groupId = "group-0"; + final TopicPartition tp1 = new TopicPartition("foo", 0); + + try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) { + env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); + + env.kafkaClient().prepareResponse( + prepareFindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode())); + env.kafkaClient().prepareResponse( + prepareFindCoordinatorResponse(Errors.COORDINATOR_LOAD_IN_PROGRESS, Node.noNode())); + + env.kafkaClient().prepareResponse( + prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller())); + + env.kafkaClient().prepareResponse( + prepareOffsetCommitResponse(tp1, Errors.NONE)); + + Map offsets = new HashMap<>(); + offsets.put(tp1, new OffsetAndMetadata(123L)); + final AlterConsumerGroupOffsetsResult result = env.adminClient() + .alterConsumerGroupOffsets(groupId, offsets); + + assertNull(result.all().get()); + assertNull(result.partitionResult(tp1).get()); + } + } + + @Test + public void testAlterConsumerGroupOffsetsFindCoordinatorNonRetriableErrors() throws Exception { + // Non-retriable FindCoordinatorResponse errors throw an exception + + final Map nodes = new HashMap<>(); + nodes.put(0, new Node(0, "localhost", 8121)); + + final Cluster cluster = + new Cluster( + "mockClusterId", + nodes.values(), + Collections.emptyList(), + Collections.emptySet(), + Collections.emptySet(), nodes.get(0)); + + final String groupId = "group-0"; + final TopicPartition tp1 = new TopicPartition("foo", 0); + + try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) { + env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); + + env.kafkaClient().prepareResponse( + prepareFindCoordinatorResponse(Errors.GROUP_AUTHORIZATION_FAILED, Node.noNode())); + + Map offsets = new HashMap<>(); + offsets.put(tp1, new OffsetAndMetadata(123L)); + final AlterConsumerGroupOffsetsResult errorResult = env.adminClient() + .alterConsumerGroupOffsets(groupId, offsets); + + TestUtils.assertFutureError(errorResult.all(), GroupAuthorizationException.class); + TestUtils.assertFutureError(errorResult.partitionResult(tp1), GroupAuthorizationException.class); + } + } + + @Test + public void testListOffsets() throws Exception { + // Happy path + + Node node0 = new Node(0, "localhost", 8120); + List pInfos = new ArrayList<>(); + pInfos.add(new PartitionInfo("foo", 0, node0, new Node[]{node0}, new Node[]{node0})); + pInfos.add(new PartitionInfo("bar", 0, node0, new Node[]{node0}, new Node[]{node0})); + pInfos.add(new PartitionInfo("baz", 0, node0, new Node[]{node0}, new Node[]{node0})); + final Cluster cluster = + new Cluster( + "mockClusterId", + Arrays.asList(node0), + pInfos, + Collections.emptySet(), + Collections.emptySet(), + node0); + + final TopicPartition tp1 = new TopicPartition("foo", 0); + final TopicPartition tp2 = new TopicPartition("bar", 0); + final TopicPartition tp3 = new TopicPartition("baz", 0); + + try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) { + env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); + + env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE)); + + Map responseData = new HashMap<>(); + responseData.put(tp1, new PartitionData(Errors.NONE, -1L, 123L, Optional.of(321))); + responseData.put(tp2, new PartitionData(Errors.NONE, -1L, 234L, Optional.of(432))); + responseData.put(tp3, new PartitionData(Errors.NONE, 123456789L, 345L, Optional.of(543))); + env.kafkaClient().prepareResponse(new ListOffsetResponse(responseData)); + + Map partitions = new HashMap<>(); + partitions.put(tp1, OffsetSpec.latest()); + partitions.put(tp2, OffsetSpec.earliest()); + partitions.put(tp3, OffsetSpec.forTimestamp(System.currentTimeMillis())); + ListOffsetsResult result = env.adminClient().listOffsets(partitions); + + Map offsets = result.all().get(); + assertFalse(offsets.isEmpty()); + assertEquals(123L, offsets.get(tp1).offset()); + assertEquals(321, offsets.get(tp1).leaderEpoch().get().intValue()); + assertEquals(-1L, offsets.get(tp1).timestamp()); + assertEquals(234L, offsets.get(tp2).offset()); + assertEquals(432, offsets.get(tp2).leaderEpoch().get().intValue()); + assertEquals(-1L, offsets.get(tp2).timestamp()); + assertEquals(345L, offsets.get(tp3).offset()); + assertEquals(543, offsets.get(tp3).leaderEpoch().get().intValue()); + assertEquals(123456789L, offsets.get(tp3).timestamp()); + assertEquals(offsets.get(tp1), result.partitionResult(tp1).get()); + assertEquals(offsets.get(tp2), result.partitionResult(tp2).get()); + assertEquals(offsets.get(tp3), result.partitionResult(tp3).get()); + try { + result.partitionResult(new TopicPartition("unknown", 0)).get(); + fail("should have thrown IllegalArgumentException"); + } catch (IllegalArgumentException expected) { } + } + } + + @Test + public void testListOffsetsRetriableErrors() throws Exception { + + Node node0 = new Node(0, "localhost", 8120); + Node node1 = new Node(1, "localhost", 8121); + List nodes = Arrays.asList(node0, node1); + List pInfos = new ArrayList<>(); + pInfos.add(new PartitionInfo("foo", 0, node0, new Node[]{node0, node1}, new Node[]{node0, node1})); + pInfos.add(new PartitionInfo("foo", 1, node0, new Node[]{node0, node1}, new Node[]{node0, node1})); + pInfos.add(new PartitionInfo("bar", 0, node1, new Node[]{node1, node0}, new Node[]{node1, node0})); + final Cluster cluster = + new Cluster( + "mockClusterId", + nodes, + pInfos, + Collections.emptySet(), + Collections.emptySet(), + node0); + + final TopicPartition tp1 = new TopicPartition("foo", 0); + final TopicPartition tp2 = new TopicPartition("foo", 1); + final TopicPartition tp3 = new TopicPartition("bar", 0); + + try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) { + env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); + + env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE)); + // listoffsets response from broker 0 + Map responseData = new HashMap<>(); + responseData.put(tp1, new PartitionData(Errors.LEADER_NOT_AVAILABLE, -1L, 123L, Optional.of(321))); + responseData.put(tp3, new PartitionData(Errors.NONE, -1L, 987L, Optional.of(789))); + env.kafkaClient().prepareResponse(new ListOffsetResponse(responseData)); + // listoffsets response from broker 1 + responseData = new HashMap<>(); + responseData.put(tp2, new PartitionData(Errors.NONE, -1L, 456L, Optional.of(654))); + env.kafkaClient().prepareResponse(new ListOffsetResponse(responseData)); + + // metadata refresh because of LEADER_NOT_AVAILABLE + env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE)); + // listoffsets response from broker 0 + responseData = new HashMap<>(); + responseData.put(tp1, new PartitionData(Errors.NONE, -1L, 345L, Optional.of(543))); + env.kafkaClient().prepareResponse(new ListOffsetResponse(responseData)); + + Map partitions = new HashMap<>(); + partitions.put(tp1, OffsetSpec.latest()); + partitions.put(tp2, OffsetSpec.latest()); + partitions.put(tp3, OffsetSpec.latest()); + ListOffsetsResult result = env.adminClient().listOffsets(partitions); + + Map offsets = result.all().get(); + assertFalse(offsets.isEmpty()); + assertEquals(345L, offsets.get(tp1).offset()); + assertEquals(543, offsets.get(tp1).leaderEpoch().get().intValue()); + assertEquals(-1L, offsets.get(tp1).timestamp()); + assertEquals(456, offsets.get(tp2).offset()); + assertEquals(654, offsets.get(tp2).leaderEpoch().get().intValue()); + assertEquals(-1L, offsets.get(tp2).timestamp()); + assertEquals(987, offsets.get(tp3).offset()); + assertEquals(789, offsets.get(tp3).leaderEpoch().get().intValue()); + assertEquals(-1L, offsets.get(tp3).timestamp()); + } + } + + @Test + public void testListOffsetsNonRetriableErrors() throws Exception { + + Node node0 = new Node(0, "localhost", 8120); + Node node1 = new Node(1, "localhost", 8121); + List nodes = Arrays.asList(node0, node1); + List pInfos = new ArrayList<>(); + pInfos.add(new PartitionInfo("foo", 0, node0, new Node[]{node0, node1}, new Node[]{node0, node1})); + final Cluster cluster = + new Cluster( + "mockClusterId", + nodes, + pInfos, + Collections.emptySet(), + Collections.emptySet(), + node0); + + final TopicPartition tp1 = new TopicPartition("foo", 0); + + try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) { + env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); + + env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE)); + + Map responseData = new HashMap<>(); + responseData.put(tp1, new PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED, -1L, -1, Optional.empty())); + env.kafkaClient().prepareResponse(new ListOffsetResponse(responseData)); + + Map partitions = new HashMap<>(); + partitions.put(tp1, OffsetSpec.latest()); + ListOffsetsResult result = env.adminClient().listOffsets(partitions); + + TestUtils.assertFutureError(result.all(), TopicAuthorizationException.class); + } + } + + @Test + public void testListOffsetsMetadataRetriableErrors() throws Exception { + + Node node0 = new Node(0, "localhost", 8120); + Node node1 = new Node(1, "localhost", 8121); + List nodes = Arrays.asList(node0, node1); + List pInfos = new ArrayList<>(); + pInfos.add(new PartitionInfo("foo", 0, node0, new Node[]{node0}, new Node[]{node0})); + pInfos.add(new PartitionInfo("foo", 1, node1, new Node[]{node1}, new Node[]{node1})); + final Cluster cluster = + new Cluster( + "mockClusterId", + nodes, + pInfos, + Collections.emptySet(), + Collections.emptySet(), + node0); + + final TopicPartition tp0 = new TopicPartition("foo", 0); + final TopicPartition tp1 = new TopicPartition("foo", 1); + + try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) { + env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); + + env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.LEADER_NOT_AVAILABLE)); + env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.UNKNOWN_TOPIC_OR_PARTITION)); + env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE)); + + // listoffsets response from broker 0 + Map responseData = new HashMap<>(); + responseData.put(tp0, new PartitionData(Errors.NONE, -1L, 345L, Optional.of(543))); + env.kafkaClient().prepareResponse(new ListOffsetResponse(responseData)); + // listoffsets response from broker 1 + responseData = new HashMap<>(); + responseData.put(tp1, new PartitionData(Errors.NONE, -1L, 789L, Optional.of(987))); + env.kafkaClient().prepareResponse(new ListOffsetResponse(responseData)); + + Map partitions = new HashMap<>(); + partitions.put(tp0, OffsetSpec.latest()); + partitions.put(tp1, OffsetSpec.latest()); + ListOffsetsResult result = env.adminClient().listOffsets(partitions); + + Map offsets = result.all().get(); + assertFalse(offsets.isEmpty()); + assertEquals(345L, offsets.get(tp0).offset()); + assertEquals(543, offsets.get(tp0).leaderEpoch().get().intValue()); + assertEquals(-1L, offsets.get(tp0).timestamp()); + assertEquals(789L, offsets.get(tp1).offset()); + assertEquals(987, offsets.get(tp1).leaderEpoch().get().intValue()); + assertEquals(-1L, offsets.get(tp1).timestamp()); + } + } + + @Test + public void testListOffsetsMetadataNonRetriableErrors() throws Exception { + + Node node0 = new Node(0, "localhost", 8120); + Node node1 = new Node(1, "localhost", 8121); + List nodes = Arrays.asList(node0, node1); + List pInfos = new ArrayList<>(); + pInfos.add(new PartitionInfo("foo", 0, node0, new Node[]{node0, node1}, new Node[]{node0, node1})); + final Cluster cluster = + new Cluster( + "mockClusterId", + nodes, + pInfos, + Collections.emptySet(), + Collections.emptySet(), + node0); + + final TopicPartition tp1 = new TopicPartition("foo", 0); + + try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) { + env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); + + env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.TOPIC_AUTHORIZATION_FAILED)); + + Map partitions = new HashMap<>(); + partitions.put(tp1, OffsetSpec.latest()); + ListOffsetsResult result = env.adminClient().listOffsets(partitions); + + TestUtils.assertFutureError(result.all(), TopicAuthorizationException.class); + } + } + private static MemberDescription convertToMemberDescriptions(DescribedGroupMember member, MemberAssignment assignment) { return new MemberDescription(member.memberId(), diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java index 017e447b74219..3ec3b307732cb 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java @@ -21,6 +21,7 @@ import org.apache.kafka.common.MetricName; import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.ElectionType; import org.apache.kafka.common.TopicPartitionInfo; import org.apache.kafka.common.TopicPartitionReplica; @@ -442,6 +443,16 @@ public ListPartitionReassignmentsResult listPartitionReassignments(Optional offsets, AlterConsumerGroupOffsetsOptions options) { + throw new UnsupportedOperationException("Not implement yet"); + } + + @Override + public ListOffsetsResult listOffsets(Map topicPartitionOffsets, ListOffsetsOptions options) { + throw new UnsupportedOperationException("Not implement yet"); + } + @Override public void close(Duration timeout) {} diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java index 68fac2b7e6764..79477b7419d8e 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java @@ -30,6 +30,7 @@ import org.apache.kafka.clients.consumer.internals.Fetcher; import org.apache.kafka.clients.consumer.internals.SubscriptionState; import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.IsolationLevel; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.MetricName; import org.apache.kafka.common.Node; @@ -61,7 +62,6 @@ import org.apache.kafka.common.requests.FetchResponse; import org.apache.kafka.common.requests.FindCoordinatorResponse; import org.apache.kafka.common.requests.HeartbeatResponse; -import org.apache.kafka.common.requests.IsolationLevel; import org.apache.kafka.common.requests.JoinGroupRequest; import org.apache.kafka.common.requests.JoinGroupResponse; import org.apache.kafka.common.requests.LeaveGroupResponse; diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java index fd1be374b7eea..ae02241403c11 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java @@ -31,6 +31,7 @@ import org.apache.kafka.clients.consumer.OffsetOutOfRangeException; import org.apache.kafka.clients.consumer.OffsetResetStrategy; import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.IsolationLevel; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.MetricName; import org.apache.kafka.common.MetricNameTemplate; @@ -70,7 +71,6 @@ import org.apache.kafka.common.requests.EpochEndOffset; import org.apache.kafka.common.requests.FetchRequest; import org.apache.kafka.common.requests.FetchResponse; -import org.apache.kafka.common.requests.IsolationLevel; import org.apache.kafka.common.requests.ListOffsetRequest; import org.apache.kafka.common.requests.ListOffsetResponse; import org.apache.kafka.common.requests.MetadataRequest; diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java index c8bd7518c67c9..c8563e72c79d2 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java @@ -19,6 +19,7 @@ import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.ElectionType; +import org.apache.kafka.common.IsolationLevel; import org.apache.kafka.common.acl.AccessControlEntry; import org.apache.kafka.common.acl.AccessControlEntryFilter; import org.apache.kafka.common.acl.AclBinding; diff --git a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala index 2d176591542d8..139335aa8f814 100755 --- a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala +++ b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala @@ -27,9 +27,8 @@ import com.fasterxml.jackson.module.scala.DefaultScalaModule import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper import kafka.utils._ import org.apache.kafka.clients.admin._ -import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer, OffsetAndMetadata} +import org.apache.kafka.clients.consumer.OffsetAndMetadata import org.apache.kafka.clients.{CommonClientConfigs, admin} -import org.apache.kafka.common.serialization.StringDeserializer import org.apache.kafka.common.utils.Utils import org.apache.kafka.common.{KafkaException, Node, TopicPartition} @@ -42,6 +41,7 @@ import org.apache.kafka.common.protocol.Errors import scala.collection.immutable.TreeMap import scala.reflect.ClassTag +import org.apache.kafka.common.requests.ListOffsetResponse object ConsumerGroupCommand extends Logging { @@ -169,9 +169,6 @@ object ConsumerGroupCommand extends Logging { private val adminClient = createAdminClient(configOverrides) - // `consumers` are only needed for `describe`, so we instantiate them lazily - private lazy val consumers: mutable.Map[String, KafkaConsumer[String, String]] = mutable.Map.empty - // We have to make sure it is evaluated once and available private lazy val resetPlanFromFile: Option[Map[String, Map[TopicPartition, OffsetAndMetadata]]] = { if (opts.options.has(opts.resetFromFileOpt)) { @@ -390,8 +387,13 @@ object ConsumerGroupCommand extends Logging { // Dry-run is the default behavior if --execute is not specified val dryRun = opts.options.has(opts.dryRunOpt) || !opts.options.has(opts.executeOpt) - if (!dryRun) - getConsumer(groupId).commitSync(preparedOffsets.asJava) + if (!dryRun) { + adminClient.alterConsumerGroupOffsets( + groupId, + preparedOffsets.asJava, + withTimeoutMs(new AlterConsumerGroupOffsetsOptions) + ).all.get + } acc.updated(groupId, preparedOffsets) case currentState => printError(s"Assignments can only be reset if the group '$groupId' is inactive, but the current state is $currentState.") @@ -574,34 +576,50 @@ object ConsumerGroupCommand extends Logging { } private def getLogEndOffsets(groupId: String, topicPartitions: Seq[TopicPartition]): Map[TopicPartition, LogOffsetResult] = { - val offsets = getConsumer(groupId).endOffsets(topicPartitions.asJava) + val endOffsets = topicPartitions.map { topicPartition => + topicPartition -> OffsetSpec.latest + }.toMap + val offsets = adminClient.listOffsets( + endOffsets.asJava, + withTimeoutMs(new ListOffsetsOptions) + ).all.get topicPartitions.map { topicPartition => Option(offsets.get(topicPartition)) match { - case Some(logEndOffset) => topicPartition -> LogOffsetResult.LogOffset(logEndOffset) + case Some(listOffsetsResultInfo) => topicPartition -> LogOffsetResult.LogOffset(listOffsetsResultInfo.offset) case _ => topicPartition -> LogOffsetResult.Unknown } }.toMap } private def getLogStartOffsets(groupId: String, topicPartitions: Seq[TopicPartition]): Map[TopicPartition, LogOffsetResult] = { - val offsets = getConsumer(groupId).beginningOffsets(topicPartitions.asJava) + val startOffsets = topicPartitions.map { topicPartition => + topicPartition -> OffsetSpec.earliest + }.toMap + val offsets = adminClient.listOffsets( + startOffsets.asJava, + withTimeoutMs(new ListOffsetsOptions) + ).all.get topicPartitions.map { topicPartition => Option(offsets.get(topicPartition)) match { - case Some(logStartOffset) => topicPartition -> LogOffsetResult.LogOffset(logStartOffset) + case Some(listOffsetsResultInfo) => topicPartition -> LogOffsetResult.LogOffset(listOffsetsResultInfo.offset) case _ => topicPartition -> LogOffsetResult.Unknown } }.toMap } private def getLogTimestampOffsets(groupId: String, topicPartitions: Seq[TopicPartition], timestamp: java.lang.Long): Map[TopicPartition, LogOffsetResult] = { - val consumer = getConsumer(groupId) - consumer.assign(topicPartitions.asJava) - + val timestampOffsets = topicPartitions.map { topicPartition => + topicPartition -> OffsetSpec.forTimestamp(timestamp) + }.toMap + val offsets = adminClient.listOffsets( + timestampOffsets.asJava, + withTimeoutMs(new ListOffsetsOptions) + ).all.get val (successfulOffsetsForTimes, unsuccessfulOffsetsForTimes) = - consumer.offsetsForTimes(topicPartitions.map(_ -> timestamp).toMap.asJava).asScala.partition(_._2 != null) + offsets.asScala.partition(_._2.offset != ListOffsetResponse.UNKNOWN_OFFSET) val successfulLogTimestampOffsets = successfulOffsetsForTimes.map { - case (topicPartition, offsetAndTimestamp) => topicPartition -> LogOffsetResult.LogOffset(offsetAndTimestamp.offset) + case (topicPartition, listOffsetsResultInfo) => topicPartition -> LogOffsetResult.LogOffset(listOffsetsResultInfo.offset) }.toMap successfulLogTimestampOffsets ++ getLogEndOffsets(groupId, unsuccessfulOffsetsForTimes.keySet.toSeq) @@ -609,9 +627,6 @@ object ConsumerGroupCommand extends Logging { def close(): Unit = { adminClient.close() - consumers.values.foreach(consumer => - Option(consumer).foreach(_.close()) - ) } private def createAdminClient(configOverrides: Map[String, String]): Admin = { @@ -621,32 +636,6 @@ object ConsumerGroupCommand extends Logging { admin.AdminClient.create(props) } - private def getConsumer(groupId: String) = { - if (consumers.get(groupId).isEmpty) - consumers.update(groupId, createConsumer(groupId)) - consumers(groupId) - } - - private def createConsumer(groupId: String): KafkaConsumer[String, String] = { - val properties = new Properties() - val deserializer = (new StringDeserializer).getClass.getName - val brokerUrl = opts.options.valueOf(opts.bootstrapServerOpt) - properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerUrl) - properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId) - properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false") - properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000") - properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, deserializer) - properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deserializer) - - if (opts.options.has(opts.commandConfigOpt)) { - Utils.loadProps(opts.options.valueOf(opts.commandConfigOpt)).asScala.foreach { - case (k,v) => properties.put(k, v) - } - } - - new KafkaConsumer(properties) - } - private def withTimeoutMs [T <: AbstractOptions[T]] (options : T) = { val t = opts.options.valueOf(opts.timeoutMsOpt).intValue() options.timeoutMs(t) @@ -657,8 +646,17 @@ object ConsumerGroupCommand extends Logging { val topicPartitions = topicArg.split(":") val topic = topicPartitions(0) topicPartitions(1).split(",").map(partition => new TopicPartition(topic, partition.toInt)) - case topic => getConsumer(groupId).partitionsFor(topic).asScala - .map(partitionInfo => new TopicPartition(topic, partitionInfo.partition)) + case topic => + val descriptionMap = adminClient.describeTopics( + Seq(topic).asJava, + withTimeoutMs(new DescribeTopicsOptions) + ).all().get.asScala + val r = descriptionMap.flatMap{ case(topic, description) => + description.partitions().asScala.map{ tpInfo => + new TopicPartition(topic, tpInfo.partition) + } + } + r } private def getPartitionsToReset(groupId: String): Seq[TopicPartition] = { diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index 43bc04272c254..2089e5c74a75d 100755 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -30,7 +30,7 @@ import kafka.server.checkpoints.OffsetCheckpoints import kafka.utils.CoreUtils.{inReadLock, inWriteLock} import kafka.utils._ import kafka.zk.{AdminZkClient, KafkaZkClient} -import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.{IsolationLevel, TopicPartition} import org.apache.kafka.common.errors._ import org.apache.kafka.common.message.LeaderAndIsrRequestData.LeaderAndIsrPartitionState import org.apache.kafka.common.protocol.Errors diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 3995bec32645b..4e7ceda75b51e 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -35,7 +35,7 @@ import kafka.server.QuotaFactory.QuotaManagers import kafka.server.checkpoints.{LazyOffsetCheckpoints, OffsetCheckpointFile, OffsetCheckpoints} import kafka.utils._ import kafka.zk.KafkaZkClient -import org.apache.kafka.common.{ElectionType, Node, TopicPartition} +import org.apache.kafka.common.{ElectionType, IsolationLevel, Node, TopicPartition} import org.apache.kafka.common.errors._ import org.apache.kafka.common.internals.Topic import org.apache.kafka.common.message.LeaderAndIsrRequestData.LeaderAndIsrPartitionState diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala index 38fe16598f704..38073dc79313c 100644 --- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala @@ -30,7 +30,7 @@ import org.apache.kafka.clients.admin.{Admin, AdminClient, AdminClientConfig, Al import org.apache.kafka.clients.consumer._ import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener import org.apache.kafka.clients.producer._ -import org.apache.kafka.common.ElectionType +import org.apache.kafka.common.{ElectionType, IsolationLevel} import org.apache.kafka.common.acl.{AccessControlEntry, AccessControlEntryFilter, AclBinding, AclBindingFilter, AclOperation} import org.apache.kafka.common.acl.AclOperation._ import org.apache.kafka.common.acl.AclPermissionType.{ALLOW, DENY} diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala index eb50e969d0e6a..f57dd859cfabd 100644 --- a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala +++ b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala @@ -28,14 +28,14 @@ import kafka.common.UnexpectedAppendOffsetException import kafka.log.{Defaults => _, _} import kafka.server._ import kafka.utils._ -import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.{IsolationLevel, TopicPartition} import org.apache.kafka.common.errors.{ApiException, OffsetNotAvailableException, ReplicaNotAvailableException} import org.apache.kafka.common.message.LeaderAndIsrRequestData.LeaderAndIsrPartitionState import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.record.FileRecords.TimestampAndOffset import org.apache.kafka.common.utils.SystemTime import org.apache.kafka.common.record._ -import org.apache.kafka.common.requests.{EpochEndOffset, IsolationLevel, ListOffsetRequest} +import org.apache.kafka.common.requests.{EpochEndOffset, ListOffsetRequest} import org.junit.Test import org.junit.Assert._ import org.mockito.Mockito._ diff --git a/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala b/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala index 895d9e55401e3..07acfdec275f9 100644 --- a/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala @@ -25,10 +25,10 @@ import kafka.log.LogConfig import kafka.message.{GZIPCompressionCodec, ProducerCompressionCodec, ZStdCompressionCodec} import kafka.utils.TestUtils import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} -import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.{IsolationLevel, TopicPartition} import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.common.record.{MemoryRecords, Record, RecordBatch} -import org.apache.kafka.common.requests.{FetchRequest, FetchResponse, IsolationLevel, FetchMetadata => JFetchMetadata} +import org.apache.kafka.common.requests.{FetchRequest, FetchResponse, FetchMetadata => JFetchMetadata} import org.apache.kafka.common.serialization.{ByteArraySerializer, StringSerializer} import org.junit.Assert._ import org.junit.Test diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala index 6f30c11b321e3..852349148d34d 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala @@ -33,7 +33,7 @@ import kafka.network.RequestChannel.SendResponse import kafka.server.QuotaFactory.QuotaManagers import kafka.utils.{MockTime, TestUtils} import kafka.zk.KafkaZkClient -import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.{IsolationLevel, TopicPartition} import org.apache.kafka.common.errors.UnsupportedVersionException import org.apache.kafka.common.memory.MemoryPool import org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocol diff --git a/core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala b/core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala index 9c97c1a448da0..a298d8401e078 100644 --- a/core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala @@ -19,9 +19,9 @@ package kafka.server import java.util.Optional import kafka.utils.TestUtils -import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.{IsolationLevel, TopicPartition} import org.apache.kafka.common.protocol.{ApiKeys, Errors} -import org.apache.kafka.common.requests.{IsolationLevel, ListOffsetRequest, ListOffsetResponse} +import org.apache.kafka.common.requests.{ListOffsetRequest, ListOffsetResponse} import org.junit.Assert._ import org.junit.Test diff --git a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala index d88540d0a415d..85d2dede58db5 100755 --- a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala +++ b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala @@ -24,10 +24,10 @@ import java.util.{Optional, Properties, Random} import kafka.log.{Log, LogSegment} import kafka.network.SocketServer import kafka.utils.{MockTime, TestUtils} -import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.{IsolationLevel, TopicPartition} import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.common.record.MemoryRecords -import org.apache.kafka.common.requests.{FetchRequest, FetchResponse, IsolationLevel, ListOffsetRequest, ListOffsetResponse} +import org.apache.kafka.common.requests.{FetchRequest, FetchResponse, ListOffsetRequest, ListOffsetResponse} import org.easymock.{EasyMock, IAnswer} import org.junit.Assert._ import org.junit.Test diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index 9aaae9267e6b3..01d6bf0731b16 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -46,7 +46,7 @@ import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse import org.apache.kafka.common.requests._ import org.apache.kafka.common.security.auth.KafkaPrincipal import org.apache.kafka.common.utils.Time -import org.apache.kafka.common.{Node, TopicPartition} +import org.apache.kafka.common.{IsolationLevel, Node, TopicPartition} import org.easymock.EasyMock import org.junit.Assert._ import org.junit.{After, Before, Test} diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala index f34eae45b1e4d..6b1d3c1e7388b 100644 --- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala +++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala @@ -22,7 +22,7 @@ import kafka.log.LogConfig import kafka.network.RequestChannel.Session import kafka.security.authorizer.AclAuthorizer import kafka.utils.TestUtils -import org.apache.kafka.common.{ElectionType, Node, TopicPartition} +import org.apache.kafka.common.{ElectionType, IsolationLevel, Node, TopicPartition} import org.apache.kafka.common.acl._ import org.apache.kafka.common.config.ConfigResource import org.apache.kafka.common.message.CreateTopicsRequestData.{CreatableTopic, CreatableTopicCollection} diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java index eb6faffd45b03..4799966883b98 100644 --- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java +++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java @@ -50,10 +50,10 @@ import java.util.Properties; import java.util.Set; +import static org.apache.kafka.common.IsolationLevel.READ_COMMITTED; import static org.apache.kafka.common.config.ConfigDef.Range.atLeast; import static org.apache.kafka.common.config.ConfigDef.Range.between; import static org.apache.kafka.common.config.ConfigDef.ValidString.in; -import static org.apache.kafka.common.requests.IsolationLevel.READ_COMMITTED; /** * Configuration for a {@link KafkaStreams} instance. diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java index ad607086f2c96..b1fbd2474b1ce 100644 --- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java @@ -44,8 +44,8 @@ import java.util.Map; import java.util.Properties; -import static org.apache.kafka.common.requests.IsolationLevel.READ_COMMITTED; -import static org.apache.kafka.common.requests.IsolationLevel.READ_UNCOMMITTED; +import static org.apache.kafka.common.IsolationLevel.READ_COMMITTED; +import static org.apache.kafka.common.IsolationLevel.READ_UNCOMMITTED; import static org.apache.kafka.streams.StreamsConfig.EXACTLY_ONCE; import static org.apache.kafka.streams.StreamsConfig.TOPOLOGY_OPTIMIZATION; import static org.apache.kafka.streams.StreamsConfig.adminClientPrefix; diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java index 15d6789798dee..c611dc2809e1b 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java @@ -17,7 +17,7 @@ package org.apache.kafka.streams.integration; import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.common.requests.IsolationLevel; +import org.apache.kafka.common.IsolationLevel; import org.apache.kafka.common.serialization.LongDeserializer; import org.apache.kafka.common.serialization.LongSerializer; import org.apache.kafka.common.serialization.Serdes; diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java index 0c9889a1e3de3..255afc886723b 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java @@ -23,7 +23,7 @@ import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.requests.IsolationLevel; +import org.apache.kafka.common.IsolationLevel; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.StringDeserializer; diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java index 5daf066f386f8..e98d1527e144b 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java @@ -27,11 +27,11 @@ import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.IsolationLevel; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.SerializationException; import org.apache.kafka.common.errors.TimeoutException; -import org.apache.kafka.common.requests.IsolationLevel; import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.apache.kafka.common.serialization.IntegerDeserializer; import org.apache.kafka.common.serialization.IntegerSerializer;