diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
index 377f009a958df..0c795bc5206dc 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
@@ -1575,6 +1575,29 @@ default ListTransactionsResult listTransactions() {
*/
ListTransactionsResult listTransactions(ListTransactionsOptions options);
+ /**
+ * Fence out all active producers that use any of the provided transactional IDs, with the default options.
+ *
+ * This is a convenience method for {@link #fenceProducers(Collection, FenceProducersOptions)}
+ * with default options. See the overload for more details.
+ *
+ * @param transactionalIds The IDs of the producers to fence.
+ * @return The FenceProducersResult.
+ */
+ default FenceProducersResult fenceProducers(Collection transactionalIds) {
+ return fenceProducers(transactionalIds, new FenceProducersOptions());
+ }
+
+ /**
+ * Fence out all active producers that use any of the provided transactional IDs.
+ *
+ * @param transactionalIds The IDs of the producers to fence.
+ * @param options The options to use when fencing the producers.
+ * @return The FenceProducersResult.
+ */
+ FenceProducersResult fenceProducers(Collection transactionalIds,
+ FenceProducersOptions options);
+
/**
* Get the metrics kept by the adminClient
*/
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/FenceProducersOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/FenceProducersOptions.java
new file mode 100644
index 0000000000000..4e38281809b83
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/FenceProducersOptions.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.util.Collection;
+
+/**
+ * Options for {@link Admin#fenceProducers(Collection, FenceProducersOptions)}
+ *
+ * The API of this class is evolving. See {@link Admin} for details.
+ */
+@InterfaceStability.Evolving
+public class FenceProducersOptions extends AbstractOptions {
+
+ @Override
+ public String toString() {
+ return "FenceProducersOptions{" +
+ "timeoutMs=" + timeoutMs +
+ '}';
+ }
+}
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/FenceProducersResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/FenceProducersResult.java
new file mode 100644
index 0000000000000..a1a8e66c115c5
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/FenceProducersResult.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.clients.admin.internals.CoordinatorKey;
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.common.utils.ProducerIdAndEpoch;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * The result of the {@link Admin#fenceProducers(Collection)} call.
+ *
+ * The API of this class is evolving, see {@link Admin} for details.
+ */
+@InterfaceStability.Evolving
+public class FenceProducersResult {
+
+ private final Map> futures;
+
+ FenceProducersResult(Map> futures) {
+ this.futures = futures;
+ }
+
+ /**
+ * Return a map from transactional ID to futures which can be used to check the status of
+ * individual fencings.
+ */
+ public Map> fencedProducers() {
+ return futures.entrySet().stream().collect(Collectors.toMap(
+ e -> e.getKey().idValue,
+ e -> e.getValue().thenApply(p -> null)
+ ));
+ }
+
+ /**
+ * Returns a future that provides the producer ID generated while initializing the given transaction when the request completes.
+ */
+ public KafkaFuture producerId(String transactionalId) {
+ return findAndApply(transactionalId, p -> p.producerId);
+ }
+
+ /**
+ * Returns a future that provides the epoch ID generated while initializing the given transaction when the request completes.
+ */
+ public KafkaFuture epochId(String transactionalId) {
+ return findAndApply(transactionalId, p -> p.epoch);
+ }
+
+ /**
+ * Return a future which succeeds only if all the producer fencings succeed.
+ */
+ public KafkaFuture all() {
+ return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0]));
+ }
+
+ private KafkaFuture findAndApply(String transactionalId, KafkaFuture.BaseFunction followup) {
+ CoordinatorKey key = CoordinatorKey.byTransactionalId(transactionalId);
+ KafkaFuture future = futures.get(key);
+ if (future == null) {
+ throw new IllegalArgumentException("TransactionalId " +
+ "`" + transactionalId + "` was not included in the request");
+ }
+ return future.thenApply(followup);
+ }
+}
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index ea4e2d7e5ffb0..03322fdcf1dc8 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -47,6 +47,7 @@
import org.apache.kafka.clients.admin.internals.DescribeConsumerGroupsHandler;
import org.apache.kafka.clients.admin.internals.DescribeProducersHandler;
import org.apache.kafka.clients.admin.internals.DescribeTransactionsHandler;
+import org.apache.kafka.clients.admin.internals.FenceProducersHandler;
import org.apache.kafka.clients.admin.internals.ListConsumerGroupOffsetsHandler;
import org.apache.kafka.clients.admin.internals.ListTransactionsHandler;
import org.apache.kafka.clients.admin.internals.MetadataOperationContext;
@@ -234,6 +235,7 @@
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.common.utils.KafkaThread;
import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.ProducerIdAndEpoch;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;
@@ -4390,6 +4392,15 @@ public ListTransactionsResult listTransactions(ListTransactionsOptions options)
return new ListTransactionsResult(future.all());
}
+ @Override
+ public FenceProducersResult fenceProducers(Collection transactionalIds, FenceProducersOptions options) {
+ AdminApiFuture.SimpleAdminApiFuture future =
+ FenceProducersHandler.newFuture(transactionalIds);
+ FenceProducersHandler handler = new FenceProducersHandler(logContext);
+ invokeDriver(handler, future, options.timeoutMs);
+ return new FenceProducersResult(future.all());
+ }
+
private void invokeDriver(
AdminApiHandler handler,
AdminApiFuture future,
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/AbortTransactionHandler.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/AbortTransactionHandler.java
index c25e4d8d3f479..4963a49c351b0 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/internals/AbortTransactionHandler.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/AbortTransactionHandler.java
@@ -38,7 +38,7 @@
import static java.util.Collections.singleton;
import static java.util.Collections.singletonList;
-public class AbortTransactionHandler implements AdminApiHandler {
+public class AbortTransactionHandler extends AdminApiHandler.Batched {
private final Logger log;
private final AbortTransactionSpec abortSpec;
private final PartitionLeaderStrategy lookupStrategy;
@@ -69,7 +69,7 @@ public AdminApiLookupStrategy lookupStrategy() {
}
@Override
- public WriteTxnMarkersRequest.Builder buildRequest(
+ public WriteTxnMarkersRequest.Builder buildBatchedRequest(
int brokerId,
Set topicPartitions
) {
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminApiDriver.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminApiDriver.java
index b5c9ff32f264b..d00db4b18c694 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminApiDriver.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminApiDriver.java
@@ -288,7 +288,7 @@ private void clearInflightRequest(long currentTimeMs, RequestSpec spec) {
private void collectRequests(
List> requests,
BiMultimap multimap,
- BiFunction, T, AbstractRequest.Builder>> buildRequest
+ BiFunction, T, Collection>> buildRequest
) {
for (Map.Entry> entry : multimap.entrySet()) {
T scope = entry.getKey();
@@ -306,12 +306,19 @@ private void collectRequests(
// Copy the keys to avoid exposing the underlying mutable set
Set copyKeys = Collections.unmodifiableSet(new HashSet<>(keys));
- AbstractRequest.Builder> request = buildRequest.apply(copyKeys, scope);
+ Collection> newRequests = buildRequest.apply(copyKeys, scope);
+ if (newRequests.isEmpty()) {
+ return;
+ }
+
+ // Only process the first request; all the remaining requests will be targeted at the same broker
+ // and we don't want to issue more than one fulfillment request per broker at a time
+ AdminApiHandler.RequestAndKeys newRequest = newRequests.iterator().next();
RequestSpec spec = new RequestSpec<>(
- handler.apiName() + "(api=" + request.apiKey() + ")",
+ handler.apiName() + "(api=" + newRequest.request.apiKey() + ")",
scope,
- copyKeys,
- request,
+ newRequest.keys,
+ newRequest.request,
requestState.nextAllowedRetryMs,
deadlineMs,
requestState.tries
@@ -326,7 +333,7 @@ private void collectLookupRequests(List> requests) {
collectRequests(
requests,
lookupMap,
- (keys, scope) -> handler.lookupStrategy().buildRequest(keys)
+ (keys, scope) -> Collections.singletonList(new AdminApiHandler.RequestAndKeys<>(handler.lookupStrategy().buildRequest(keys), keys))
);
}
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminApiHandler.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminApiHandler.java
index 9f8d0ac5f07f2..561c22b104cd3 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminApiHandler.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminApiHandler.java
@@ -20,10 +20,12 @@
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.AbstractResponse;
+import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.stream.Collectors;
public interface AdminApiHandler {
@@ -33,16 +35,18 @@ public interface AdminApiHandler {
String apiName();
/**
- * Build the request. The set of keys are derived by {@link AdminApiDriver}
- * during the lookup stage as the set of keys which all map to the same
- * destination broker.
+ * Build the requests necessary for the given keys. The set of keys is derived by
+ * {@link AdminApiDriver} during the lookup stage as the set of keys which all map
+ * to the same destination broker. Handlers can choose to issue a single request for
+ * all of the provided keys (see {@link Batched}, issue one request per key (see
+ * {@link Unbatched}, or implement their own custom grouping logic if necessary.
*
* @param brokerId the target brokerId for the request
* @param keys the set of keys that should be handled by this request
*
- * @return a builder for the request containing the given keys
+ * @return a collection of {@link RequestAndKeys} for the requests containing the given keys
*/
- AbstractRequest.Builder> buildRequest(int brokerId, Set keys);
+ Collection> buildRequest(int brokerId, Set keys);
/**
* Callback that is invoked when a request returns successfully.
@@ -122,4 +126,54 @@ public static ApiResult empty() {
}
}
+ class RequestAndKeys {
+ public final AbstractRequest.Builder> request;
+ public final Set keys;
+
+ public RequestAndKeys(AbstractRequest.Builder> request, Set keys) {
+ this.request = request;
+ this.keys = keys;
+ }
+ }
+
+ /**
+ * An {@link AdminApiHandler} that will group multiple keys into a single request when possible.
+ * Keys will be grouped together whenever they target the same broker. This type of handler
+ * should be used when interacting with broker APIs that can act on multiple keys at once, such
+ * as describing or listing transactions.
+ */
+ abstract class Batched implements AdminApiHandler {
+ abstract AbstractRequest.Builder> buildBatchedRequest(int brokerId, Set keys);
+
+ @Override
+ public final Collection> buildRequest(int brokerId, Set keys) {
+ return Collections.singleton(new RequestAndKeys<>(buildBatchedRequest(brokerId, keys), keys));
+ }
+ }
+
+ /**
+ * An {@link AdminApiHandler} that will create one request per key, not performing any grouping based
+ * on the targeted broker. This type of handler should only be used for broker APIs that do not accept
+ * multiple keys at once, such as initializing a transactional producer.
+ */
+ abstract class Unbatched implements AdminApiHandler {
+ abstract AbstractRequest.Builder> buildSingleRequest(int brokerId, K key);
+ abstract ApiResult handleSingleResponse(Node broker, K key, AbstractResponse response);
+
+ @Override
+ public final Collection> buildRequest(int brokerId, Set keys) {
+ return keys.stream()
+ .map(key -> new RequestAndKeys<>(buildSingleRequest(brokerId, key), Collections.singleton(key)))
+ .collect(Collectors.toSet());
+ }
+
+ @Override
+ public final ApiResult handleResponse(Node broker, Set keys, AbstractResponse response) {
+ if (keys.size() != 1) {
+ throw new IllegalArgumentException("Unbatched admin handler should only be required to handle responses for a single key at a time");
+ }
+ K key = keys.iterator().next();
+ return handleSingleResponse(broker, key, response);
+ }
+ }
}
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/AlterConsumerGroupOffsetsHandler.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/AlterConsumerGroupOffsetsHandler.java
index cb7551e55ed21..eab2e2bb73a40 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/internals/AlterConsumerGroupOffsetsHandler.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/AlterConsumerGroupOffsetsHandler.java
@@ -41,7 +41,7 @@
import org.apache.kafka.common.utils.LogContext;
import org.slf4j.Logger;
-public class AlterConsumerGroupOffsetsHandler implements AdminApiHandler> {
+public class AlterConsumerGroupOffsetsHandler extends AdminApiHandler.Batched> {
private final CoordinatorKey groupId;
private final Map offsets;
@@ -83,7 +83,7 @@ private void validateKeys(Set groupIds) {
}
@Override
- public OffsetCommitRequest.Builder buildRequest(
+ public OffsetCommitRequest.Builder buildBatchedRequest(
int coordinatorId,
Set groupIds
) {
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupOffsetsHandler.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupOffsetsHandler.java
index a853eddcb31b1..b68334b55c8ae 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupOffsetsHandler.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupOffsetsHandler.java
@@ -38,7 +38,7 @@
import org.apache.kafka.common.utils.LogContext;
import org.slf4j.Logger;
-public class DeleteConsumerGroupOffsetsHandler implements AdminApiHandler> {
+public class DeleteConsumerGroupOffsetsHandler extends AdminApiHandler.Batched> {
private final CoordinatorKey groupId;
private final Set partitions;
@@ -80,7 +80,7 @@ private void validateKeys(Set groupIds) {
}
@Override
- public OffsetDeleteRequest.Builder buildRequest(int coordinatorId, Set groupIds) {
+ public OffsetDeleteRequest.Builder buildBatchedRequest(int coordinatorId, Set groupIds) {
validateKeys(groupIds);
final OffsetDeleteRequestTopicCollection topics = new OffsetDeleteRequestTopicCollection();
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupsHandler.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupsHandler.java
index 693d23625e8d3..c05a11b5cfb40 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupsHandler.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupsHandler.java
@@ -36,7 +36,7 @@
import org.apache.kafka.common.utils.LogContext;
import org.slf4j.Logger;
-public class DeleteConsumerGroupsHandler implements AdminApiHandler {
+public class DeleteConsumerGroupsHandler extends AdminApiHandler.Batched {
private final Logger log;
private final AdminApiLookupStrategy lookupStrategy;
@@ -71,7 +71,7 @@ private static Set buildKeySet(Collection groupIds) {
}
@Override
- public DeleteGroupsRequest.Builder buildRequest(
+ public DeleteGroupsRequest.Builder buildBatchedRequest(
int coordinatorId,
Set keys
) {
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeConsumerGroupsHandler.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeConsumerGroupsHandler.java
index 5c5022a37eccc..09a2a7b4cdcdf 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeConsumerGroupsHandler.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeConsumerGroupsHandler.java
@@ -51,7 +51,7 @@
import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;
-public class DescribeConsumerGroupsHandler implements AdminApiHandler {
+public class DescribeConsumerGroupsHandler extends AdminApiHandler.Batched {
private final boolean includeAuthorizedOperations;
private final Logger log;
@@ -89,7 +89,7 @@ public AdminApiLookupStrategy lookupStrategy() {
}
@Override
- public DescribeGroupsRequest.Builder buildRequest(int coordinatorId, Set keys) {
+ public DescribeGroupsRequest.Builder buildBatchedRequest(int coordinatorId, Set keys) {
List groupIds = keys.stream().map(key -> {
if (key.type != FindCoordinatorRequest.CoordinatorType.GROUP) {
throw new IllegalArgumentException("Invalid transaction coordinator key " + key +
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeProducersHandler.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeProducersHandler.java
index 4b279d5c90c78..8555bf59ab4ed 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeProducersHandler.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeProducersHandler.java
@@ -46,7 +46,7 @@
import java.util.Set;
import java.util.stream.Collectors;
-public class DescribeProducersHandler implements AdminApiHandler {
+public class DescribeProducersHandler extends AdminApiHandler.Batched {
private final Logger log;
private final DescribeProducersOptions options;
private final AdminApiLookupStrategy lookupStrategy;
@@ -82,7 +82,7 @@ public AdminApiLookupStrategy lookupStrategy() {
}
@Override
- public DescribeProducersRequest.Builder buildRequest(
+ public DescribeProducersRequest.Builder buildBatchedRequest(
int brokerId,
Set topicPartitions
) {
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeTransactionsHandler.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeTransactionsHandler.java
index d270145a423da..4a3d7c0a05272 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeTransactionsHandler.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeTransactionsHandler.java
@@ -43,7 +43,7 @@
import java.util.Set;
import java.util.stream.Collectors;
-public class DescribeTransactionsHandler implements AdminApiHandler {
+public class DescribeTransactionsHandler extends AdminApiHandler.Batched {
private final Logger log;
private final AdminApiLookupStrategy lookupStrategy;
@@ -77,7 +77,7 @@ public AdminApiLookupStrategy lookupStrategy() {
}
@Override
- public DescribeTransactionsRequest.Builder buildRequest(
+ public DescribeTransactionsRequest.Builder buildBatchedRequest(
int brokerId,
Set keys
) {
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/FenceProducersHandler.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/FenceProducersHandler.java
new file mode 100644
index 0000000000000..225c6f4e75139
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/FenceProducersHandler.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin.internals;
+
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.errors.ClusterAuthorizationException;
+import org.apache.kafka.common.errors.TransactionalIdAuthorizationException;
+import org.apache.kafka.common.message.InitProducerIdRequestData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.AbstractResponse;
+import org.apache.kafka.common.requests.FindCoordinatorRequest;
+import org.apache.kafka.common.requests.InitProducerIdRequest;
+import org.apache.kafka.common.requests.InitProducerIdResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.ProducerIdAndEpoch;
+import org.slf4j.Logger;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class FenceProducersHandler extends AdminApiHandler.Unbatched {
+ private final Logger log;
+ private final AdminApiLookupStrategy lookupStrategy;
+
+ public FenceProducersHandler(
+ LogContext logContext
+ ) {
+ this.log = logContext.logger(FenceProducersHandler.class);
+ this.lookupStrategy = new CoordinatorStrategy(FindCoordinatorRequest.CoordinatorType.TRANSACTION, logContext);
+ }
+
+ public static AdminApiFuture.SimpleAdminApiFuture newFuture(
+ Collection transactionalIds
+ ) {
+ return AdminApiFuture.forKeys(buildKeySet(transactionalIds));
+ }
+
+ private static Set buildKeySet(Collection transactionalIds) {
+ return transactionalIds.stream()
+ .map(CoordinatorKey::byTransactionalId)
+ .collect(Collectors.toSet());
+ }
+
+ @Override
+ public String apiName() {
+ return "fenceProducer";
+ }
+
+ @Override
+ public AdminApiLookupStrategy lookupStrategy() {
+ return lookupStrategy;
+ }
+
+ @Override
+ InitProducerIdRequest.Builder buildSingleRequest(int brokerId, CoordinatorKey key) {
+ if (key.type != FindCoordinatorRequest.CoordinatorType.TRANSACTION) {
+ throw new IllegalArgumentException("Invalid group coordinator key " + key +
+ " when building `InitProducerId` request");
+ }
+ InitProducerIdRequestData data = new InitProducerIdRequestData()
+ // Because we never include a producer epoch or ID in this request, we expect that some errors
+ // (such as PRODUCER_FENCED) will never be returned in the corresponding broker response.
+ // If we ever modify this logic to include an epoch or producer ID, we will need to update the
+ // error handling logic for this handler to accommodate these new errors.
+ .setProducerEpoch(ProducerIdAndEpoch.NONE.epoch)
+ .setProducerId(ProducerIdAndEpoch.NONE.producerId)
+ .setTransactionalId(key.idValue)
+ // Set transaction timeout to 1 since it's only being initialized to fence out older producers with the same transactional ID,
+ // and shouldn't be used for any actual record writes
+ .setTransactionTimeoutMs(1);
+ return new InitProducerIdRequest.Builder(data);
+ }
+
+ @Override
+ public ApiResult handleSingleResponse(
+ Node broker,
+ CoordinatorKey key,
+ AbstractResponse abstractResponse
+ ) {
+ InitProducerIdResponse response = (InitProducerIdResponse) abstractResponse;
+
+ Errors error = Errors.forCode(response.data().errorCode());
+ if (error != Errors.NONE) {
+ return handleError(key, error);
+ }
+
+ Map completed = Collections.singletonMap(key, new ProducerIdAndEpoch(
+ response.data().producerId(),
+ response.data().producerEpoch()
+ ));
+
+ return new ApiResult<>(completed, Collections.emptyMap(), Collections.emptyList());
+ }
+
+ private ApiResult handleError(CoordinatorKey transactionalIdKey, Errors error) {
+ switch (error) {
+ case CLUSTER_AUTHORIZATION_FAILED:
+ return ApiResult.failed(transactionalIdKey, new ClusterAuthorizationException(
+ "InitProducerId request for transactionalId `" + transactionalIdKey.idValue + "` " +
+ "failed due to cluster authorization failure"));
+
+ case TRANSACTIONAL_ID_AUTHORIZATION_FAILED:
+ return ApiResult.failed(transactionalIdKey, new TransactionalIdAuthorizationException(
+ "InitProducerId request for transactionalId `" + transactionalIdKey.idValue + "` " +
+ "failed due to transactional ID authorization failure"));
+
+ case COORDINATOR_LOAD_IN_PROGRESS:
+ // If the coordinator is in the middle of loading, then we just need to retry
+ log.debug("InitProducerId request for transactionalId `{}` failed because the " +
+ "coordinator is still in the process of loading state. Will retry",
+ transactionalIdKey.idValue);
+ return ApiResult.empty();
+
+ case NOT_COORDINATOR:
+ case COORDINATOR_NOT_AVAILABLE:
+ // If the coordinator is unavailable or there was a coordinator change, then we unmap
+ // the key so that we retry the `FindCoordinator` request
+ log.debug("InitProducerId request for transactionalId `{}` returned error {}. Will attempt " +
+ "to find the coordinator again and retry", transactionalIdKey.idValue, error);
+ return ApiResult.unmapped(Collections.singletonList(transactionalIdKey));
+
+ // We intentionally omit cases for PRODUCER_FENCED, TRANSACTIONAL_ID_NOT_FOUND, and INVALID_PRODUCER_EPOCH
+ // since those errors should never happen when our InitProducerIdRequest doesn't include a producer epoch or ID
+ // and should therefore fall under the "unexpected error" catch-all case below
+
+ default:
+ return ApiResult.failed(transactionalIdKey, error.exception("InitProducerId request for " +
+ "transactionalId `" + transactionalIdKey.idValue + "` failed due to unexpected error"));
+ }
+ }
+}
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandler.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandler.java
index b1d2e9dc69665..b591548954b96 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandler.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandler.java
@@ -36,7 +36,7 @@
import org.apache.kafka.common.utils.LogContext;
import org.slf4j.Logger;
-public class ListConsumerGroupOffsetsHandler implements AdminApiHandler> {
+public class ListConsumerGroupOffsetsHandler extends AdminApiHandler.Batched> {
private final CoordinatorKey groupId;
private final List partitions;
@@ -78,7 +78,7 @@ private void validateKeys(Set groupIds) {
}
@Override
- public OffsetFetchRequest.Builder buildRequest(int coordinatorId, Set groupIds) {
+ public OffsetFetchRequest.Builder buildBatchedRequest(int coordinatorId, Set groupIds) {
validateKeys(groupIds);
// Set the flag to false as for admin client request,
// we don't need to wait for any pending offset state to clear.
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListTransactionsHandler.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListTransactionsHandler.java
index d60580c85bf22..ca249bca7f4a4 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListTransactionsHandler.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListTransactionsHandler.java
@@ -35,7 +35,7 @@
import java.util.Set;
import java.util.stream.Collectors;
-public class ListTransactionsHandler implements AdminApiHandler> {
+public class ListTransactionsHandler extends AdminApiHandler.Batched> {
private final Logger log;
private final ListTransactionsOptions options;
private final AllBrokersStrategy lookupStrategy;
@@ -64,7 +64,7 @@ public AdminApiLookupStrategy lookupStrategy() {
}
@Override
- public ListTransactionsRequest.Builder buildRequest(
+ public ListTransactionsRequest.Builder buildBatchedRequest(
int brokerId,
Set keys
) {
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/RemoveMembersFromConsumerGroupHandler.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/RemoveMembersFromConsumerGroupHandler.java
index 90b3865d0bd65..83aee36587d3a 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/internals/RemoveMembersFromConsumerGroupHandler.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/RemoveMembersFromConsumerGroupHandler.java
@@ -35,7 +35,7 @@
import org.apache.kafka.common.utils.LogContext;
import org.slf4j.Logger;
-public class RemoveMembersFromConsumerGroupHandler implements AdminApiHandler> {
+public class RemoveMembersFromConsumerGroupHandler extends AdminApiHandler.Batched> {
private final CoordinatorKey groupId;
private final List members;
@@ -79,7 +79,7 @@ private void validateKeys(
}
@Override
- public LeaveGroupRequest.Builder buildRequest(int coordinatorId, Set groupIds) {
+ public LeaveGroupRequest.Builder buildBatchedRequest(int coordinatorId, Set groupIds) {
validateKeys(groupIds);
return new LeaveGroupRequest.Builder(groupId.idValue, members);
}
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/InitProducerIdRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/InitProducerIdRequest.java
index 5c24b41b351df..9d92f0e5351dd 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/InitProducerIdRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/InitProducerIdRequest.java
@@ -27,7 +27,7 @@
public class InitProducerIdRequest extends AbstractRequest {
public static class Builder extends AbstractRequest.Builder {
- private final InitProducerIdRequestData data;
+ public final InitProducerIdRequestData data;
public Builder(InitProducerIdRequestData data) {
super(ApiKeys.INIT_PRODUCER_ID);
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
index 6ec6bfd829af2..3e3faeaadf794 100755
--- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
@@ -1416,7 +1416,7 @@ public static Map initializeMap(Collection keys, Supplier val
}
/**
- * Get an array containing all of the {@link Object#toString names} of a given enumerable type.
+ * Get an array containing all of the {@link Object#toString string representations} of a given enumerable type.
* @param enumClass the enum class; may not be null
* @return an array with the names of every value for the enum class; never null, but may be empty
* if there are no values defined for the enum
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index fcd93a96b3989..97d8ed4d8fb44 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -109,6 +109,7 @@
import org.apache.kafka.common.message.IncrementalAlterConfigsResponseData;
import org.apache.kafka.common.message.IncrementalAlterConfigsResponseData.AlterConfigsResourceResponse;
import org.apache.kafka.common.message.LeaveGroupRequestData;
+import org.apache.kafka.common.message.InitProducerIdResponseData;
import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity;
import org.apache.kafka.common.message.LeaveGroupResponseData;
import org.apache.kafka.common.message.LeaveGroupResponseData.MemberResponse;
@@ -167,6 +168,8 @@
import org.apache.kafka.common.requests.FindCoordinatorRequest;
import org.apache.kafka.common.requests.FindCoordinatorResponse;
import org.apache.kafka.common.requests.IncrementalAlterConfigsResponse;
+import org.apache.kafka.common.requests.InitProducerIdRequest;
+import org.apache.kafka.common.requests.InitProducerIdResponse;
import org.apache.kafka.common.requests.JoinGroupRequest;
import org.apache.kafka.common.requests.LeaveGroupRequest;
import org.apache.kafka.common.requests.LeaveGroupResponse;
@@ -6281,6 +6284,47 @@ public void testClientSideTimeoutAfterFailureToReceiveResponse() throws Exceptio
}
}
+ @Test
+ public void testFenceProducers() throws Exception {
+ try (AdminClientUnitTestEnv env = mockClientEnv()) {
+ String transactionalId = "copyCat";
+ Node transactionCoordinator = env.cluster().nodes().iterator().next();
+
+ // fail to find the coordinator at first with a retriable error
+ env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE, transactionalId, transactionCoordinator));
+ // and then succeed in the attempt to find the transaction coordinator
+ env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, transactionalId, transactionCoordinator));
+ // unfortunately, a coordinator load is in progress and we need to retry our init PID request
+ env.kafkaClient().prepareResponseFrom(
+ request -> request instanceof InitProducerIdRequest,
+ new InitProducerIdResponse(new InitProducerIdResponseData().setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code())),
+ transactionCoordinator
+ );
+ // then find out that the coordinator has changed since then
+ env.kafkaClient().prepareResponseFrom(
+ request -> request instanceof InitProducerIdRequest,
+ new InitProducerIdResponse(new InitProducerIdResponseData().setErrorCode(Errors.NOT_COORDINATOR.code())),
+ transactionCoordinator
+ );
+ // and as a result, try once more to locate the coordinator (this time succeeding on the first try)
+ env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, transactionalId, transactionCoordinator));
+ // and finally, complete the init PID request
+ InitProducerIdResponseData initProducerIdResponseData = new InitProducerIdResponseData()
+ .setProducerId(4761)
+ .setProducerEpoch((short) 489);
+ env.kafkaClient().prepareResponseFrom(
+ request -> request instanceof InitProducerIdRequest,
+ new InitProducerIdResponse(initProducerIdResponseData),
+ transactionCoordinator
+ );
+
+ FenceProducersResult result = env.adminClient().fenceProducers(Collections.singleton(transactionalId));
+ assertNull(result.all().get());
+ assertEquals(4761, result.producerId(transactionalId).get());
+ assertEquals((short) 489, result.epochId(transactionalId).get());
+ }
+ }
+
private UnregisterBrokerResponse prepareUnregisterBrokerResponse(Errors error, int throttleTimeMs) {
return new UnregisterBrokerResponse(new UnregisterBrokerResponseData()
.setErrorCode(error.code())
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
index 1593b6328df74..15cdc5ccc4116 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
@@ -1003,6 +1003,11 @@ public ListTransactionsResult listTransactions(ListTransactionsOptions options)
throw new UnsupportedOperationException("Not implemented yet");
}
+ @Override
+ public FenceProducersResult fenceProducers(Collection transactionalIds, FenceProducersOptions options) {
+ throw new UnsupportedOperationException("Not implemented yet");
+ }
+
@Override
synchronized public void close(Duration timeout) {}
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/internals/AbortTransactionHandlerTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/internals/AbortTransactionHandlerTest.java
index 78b33a7f59dff..68c850583d6ae 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/internals/AbortTransactionHandlerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/internals/AbortTransactionHandlerTest.java
@@ -63,7 +63,7 @@ public void testInvalidBuildRequestCall() {
@Test
public void testValidBuildRequestCall() {
AbortTransactionHandler handler = new AbortTransactionHandler(abortSpec, logContext);
- WriteTxnMarkersRequest.Builder request = handler.buildRequest(1, singleton(topicPartition));
+ WriteTxnMarkersRequest.Builder request = handler.buildBatchedRequest(1, singleton(topicPartition));
assertEquals(1, request.data.markers().size());
WriteTxnMarkersRequestData.WritableTxnMarker markerRequest = request.data.markers().get(0);
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/internals/AdminApiDriverTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/internals/AdminApiDriverTest.java
index 6ff393fddd65e..2c664e857fced 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/internals/AdminApiDriverTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/internals/AdminApiDriverTest.java
@@ -734,7 +734,7 @@ public void reset() {
}
}
- private static class MockAdminApiHandler implements AdminApiHandler {
+ private static class MockAdminApiHandler extends AdminApiHandler.Batched {
private final Map, ApiResult> expectedRequests = new HashMap<>();
private final MockLookupStrategy lookupStrategy;
@@ -757,7 +757,7 @@ public void expectRequest(Set keys, ApiResult result) {
}
@Override
- public AbstractRequest.Builder> buildRequest(int brokerId, Set keys) {
+ public AbstractRequest.Builder> buildBatchedRequest(int brokerId, Set keys) {
// The request is just a placeholder in these tests
assertTrue(expectedRequests.containsKey(keys), "Unexpected fulfillment request for keys " + keys);
return new MetadataRequest.Builder(Collections.emptyList(), false);
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/internals/AllBrokersStrategyIntegrationTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/internals/AllBrokersStrategyIntegrationTest.java
index 2b989058082fb..b2e0a0d285b06 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/internals/AllBrokersStrategyIntegrationTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/internals/AllBrokersStrategyIntegrationTest.java
@@ -215,7 +215,7 @@ private MetadataResponse responseWithBrokers(Set brokerIds) {
return new MetadataResponse(response, ApiKeys.METADATA.latestVersion());
}
- private class MockApiHandler implements AdminApiHandler {
+ private class MockApiHandler extends AdminApiHandler.Batched {
private final AllBrokersStrategy allBrokersStrategy = new AllBrokersStrategy(logContext);
@Override
@@ -224,7 +224,7 @@ public String apiName() {
}
@Override
- public AbstractRequest.Builder> buildRequest(
+ public AbstractRequest.Builder> buildBatchedRequest(
int brokerId,
Set keys
) {
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/internals/AlterConsumerGroupOffsetsHandlerTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/internals/AlterConsumerGroupOffsetsHandlerTest.java
index c0ea2ba9f0e7e..80453cc9b85d4 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/internals/AlterConsumerGroupOffsetsHandlerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/internals/AlterConsumerGroupOffsetsHandlerTest.java
@@ -62,7 +62,7 @@ public void setUp() {
@Test
public void testBuildRequest() {
AlterConsumerGroupOffsetsHandler handler = new AlterConsumerGroupOffsetsHandler(groupId, partitions, logContext);
- OffsetCommitRequest request = handler.buildRequest(-1, singleton(CoordinatorKey.byGroupId(groupId))).build();
+ OffsetCommitRequest request = handler.buildBatchedRequest(-1, singleton(CoordinatorKey.byGroupId(groupId))).build();
assertEquals(groupId, request.data().groupId());
assertEquals(2, request.data().topics().size());
assertEquals(2, request.data().topics().get(0).partitions().size());
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupOffsetsHandlerTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupOffsetsHandlerTest.java
index b4aea93c3f3f6..629ca89801559 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupOffsetsHandlerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupOffsetsHandlerTest.java
@@ -59,7 +59,7 @@ public class DeleteConsumerGroupOffsetsHandlerTest {
@Test
public void testBuildRequest() {
DeleteConsumerGroupOffsetsHandler handler = new DeleteConsumerGroupOffsetsHandler(groupId, tps, logContext);
- OffsetDeleteRequest request = handler.buildRequest(1, singleton(CoordinatorKey.byGroupId(groupId))).build();
+ OffsetDeleteRequest request = handler.buildBatchedRequest(1, singleton(CoordinatorKey.byGroupId(groupId))).build();
assertEquals(groupId, request.data().groupId());
assertEquals(2, request.data().topics().size());
assertEquals(2, request.data().topics().find("t0").partitions().size());
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupsHandlerTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupsHandlerTest.java
index 8d3a2376fde9a..3e7cead6f5d40 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupsHandlerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupsHandlerTest.java
@@ -45,7 +45,7 @@ public class DeleteConsumerGroupsHandlerTest {
@Test
public void testBuildRequest() {
DeleteConsumerGroupsHandler handler = new DeleteConsumerGroupsHandler(logContext);
- DeleteGroupsRequest request = handler.buildRequest(1, singleton(CoordinatorKey.byGroupId(groupId1))).build();
+ DeleteGroupsRequest request = handler.buildBatchedRequest(1, singleton(CoordinatorKey.byGroupId(groupId1))).build();
assertEquals(1, request.data().groupsNames().size());
assertEquals(groupId1, request.data().groupsNames().get(0));
}
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/internals/DescribeConsumerGroupsHandlerTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/internals/DescribeConsumerGroupsHandlerTest.java
index aef207aca6a75..892a81e8c0e11 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/internals/DescribeConsumerGroupsHandlerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/internals/DescribeConsumerGroupsHandlerTest.java
@@ -69,12 +69,12 @@ public class DescribeConsumerGroupsHandlerTest {
@Test
public void testBuildRequest() {
DescribeConsumerGroupsHandler handler = new DescribeConsumerGroupsHandler(false, logContext);
- DescribeGroupsRequest request = handler.buildRequest(1, keys).build();
+ DescribeGroupsRequest request = handler.buildBatchedRequest(1, keys).build();
assertEquals(2, request.data().groups().size());
assertFalse(request.data().includeAuthorizedOperations());
handler = new DescribeConsumerGroupsHandler(true, logContext);
- request = handler.buildRequest(1, keys).build();
+ request = handler.buildBatchedRequest(1, keys).build();
assertEquals(2, request.data().groups().size());
assertTrue(request.data().includeAuthorizedOperations());
}
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/internals/DescribeProducersHandlerTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/internals/DescribeProducersHandlerTest.java
index 8daed06ddf133..0f39b4dd01633 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/internals/DescribeProducersHandlerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/internals/DescribeProducersHandlerTest.java
@@ -118,7 +118,7 @@ public void testBuildRequest() {
);
int brokerId = 3;
- DescribeProducersRequest.Builder request = handler.buildRequest(brokerId, topicPartitions);
+ DescribeProducersRequest.Builder request = handler.buildBatchedRequest(brokerId, topicPartitions);
List topics = request.data.topics();
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/internals/DescribeTransactionsHandlerTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/internals/DescribeTransactionsHandlerTest.java
index 04eac89a373ac..7ffda2b00e014 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/internals/DescribeTransactionsHandlerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/internals/DescribeTransactionsHandlerTest.java
@@ -156,7 +156,7 @@ private void assertLookup(
Set transactionalIds
) {
Set keys = coordinatorKeys(transactionalIds);
- DescribeTransactionsRequest.Builder request = handler.buildRequest(1, keys);
+ DescribeTransactionsRequest.Builder request = handler.buildBatchedRequest(1, keys);
assertEquals(transactionalIds, new HashSet<>(request.data.transactionalIds()));
}
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/internals/FenceProducersHandlerTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/internals/FenceProducersHandlerTest.java
new file mode 100644
index 0000000000000..c4151ebb0e15e
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/internals/FenceProducersHandlerTest.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin.internals;
+
+import org.apache.kafka.clients.admin.internals.AdminApiHandler.ApiResult;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.message.InitProducerIdResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.InitProducerIdRequest;
+import org.apache.kafka.common.requests.InitProducerIdResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.ProducerIdAndEpoch;
+import org.junit.jupiter.api.Test;
+
+import java.util.Set;
+
+import static java.util.Collections.emptyList;
+import static java.util.Collections.emptyMap;
+import static java.util.Collections.singleton;
+import static java.util.Collections.singletonList;
+import static org.apache.kafka.common.utils.Utils.mkSet;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class FenceProducersHandlerTest {
+ private final LogContext logContext = new LogContext();
+ private final Node node = new Node(1, "host", 1234);
+
+ @Test
+ public void testBuildRequest() {
+ FenceProducersHandler handler = new FenceProducersHandler(logContext);
+ mkSet("foo", "bar", "baz").forEach(transactionalId -> assertLookup(handler, transactionalId));
+ }
+
+ @Test
+ public void testHandleSuccessfulResponse() {
+ String transactionalId = "foo";
+ CoordinatorKey key = CoordinatorKey.byTransactionalId(transactionalId);
+
+ FenceProducersHandler handler = new FenceProducersHandler(logContext);
+
+ short epoch = 57;
+ long producerId = 7;
+ InitProducerIdResponse response = new InitProducerIdResponse(new InitProducerIdResponseData()
+ .setProducerEpoch(epoch)
+ .setProducerId(producerId));
+
+ ApiResult result = handler.handleSingleResponse(
+ node, key, response);
+
+ assertEquals(emptyList(), result.unmappedKeys);
+ assertEquals(emptyMap(), result.failedKeys);
+ assertEquals(singleton(key), result.completedKeys.keySet());
+
+ ProducerIdAndEpoch expected = new ProducerIdAndEpoch(producerId, epoch);
+ assertEquals(expected, result.completedKeys.get(key));
+ }
+
+ @Test
+ public void testHandleErrorResponse() {
+ String transactionalId = "foo";
+ FenceProducersHandler handler = new FenceProducersHandler(logContext);
+ assertFatalError(handler, transactionalId, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED);
+ assertFatalError(handler, transactionalId, Errors.CLUSTER_AUTHORIZATION_FAILED);
+ assertFatalError(handler, transactionalId, Errors.UNKNOWN_SERVER_ERROR);
+ assertFatalError(handler, transactionalId, Errors.PRODUCER_FENCED);
+ assertFatalError(handler, transactionalId, Errors.TRANSACTIONAL_ID_NOT_FOUND);
+ assertFatalError(handler, transactionalId, Errors.INVALID_PRODUCER_EPOCH);
+ assertRetriableError(handler, transactionalId, Errors.COORDINATOR_LOAD_IN_PROGRESS);
+ assertUnmappedKey(handler, transactionalId, Errors.NOT_COORDINATOR);
+ assertUnmappedKey(handler, transactionalId, Errors.COORDINATOR_NOT_AVAILABLE);
+ }
+
+ private void assertFatalError(
+ FenceProducersHandler handler,
+ String transactionalId,
+ Errors error
+ ) {
+ CoordinatorKey key = CoordinatorKey.byTransactionalId(transactionalId);
+ ApiResult result = handleResponseError(handler, transactionalId, error);
+ assertEquals(emptyList(), result.unmappedKeys);
+ assertEquals(mkSet(key), result.failedKeys.keySet());
+
+ Throwable throwable = result.failedKeys.get(key);
+ assertTrue(error.exception().getClass().isInstance(throwable));
+ }
+
+ private void assertRetriableError(
+ FenceProducersHandler handler,
+ String transactionalId,
+ Errors error
+ ) {
+ ApiResult result = handleResponseError(handler, transactionalId, error);
+ assertEquals(emptyList(), result.unmappedKeys);
+ assertEquals(emptyMap(), result.failedKeys);
+ }
+
+ private void assertUnmappedKey(
+ FenceProducersHandler handler,
+ String transactionalId,
+ Errors error
+ ) {
+ CoordinatorKey key = CoordinatorKey.byTransactionalId(transactionalId);
+ ApiResult result = handleResponseError(handler, transactionalId, error);
+ assertEquals(emptyMap(), result.failedKeys);
+ assertEquals(singletonList(key), result.unmappedKeys);
+ }
+
+ private ApiResult handleResponseError(
+ FenceProducersHandler handler,
+ String transactionalId,
+ Errors error
+ ) {
+ int brokerId = 1;
+
+ CoordinatorKey key = CoordinatorKey.byTransactionalId(transactionalId);
+ Set keys = mkSet(key);
+
+ InitProducerIdResponse response = new InitProducerIdResponse(new InitProducerIdResponseData()
+ .setErrorCode(error.code()));
+
+ ApiResult result = handler.handleResponse(node, keys, response);
+ assertEquals(emptyMap(), result.completedKeys);
+ return result;
+ }
+
+ private void assertLookup(FenceProducersHandler handler, String transactionalId) {
+ CoordinatorKey key = CoordinatorKey.byTransactionalId(transactionalId);
+ InitProducerIdRequest.Builder request = handler.buildSingleRequest(1, key);
+ assertEquals(transactionalId, request.data.transactionalId());
+ assertEquals(1, request.data.transactionTimeoutMs());
+ }
+}
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandlerTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandlerTest.java
index 9c9bb1e58adb5..27597ce035b00 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandlerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandlerTest.java
@@ -56,7 +56,7 @@ public class ListConsumerGroupOffsetsHandlerTest {
@Test
public void testBuildRequest() {
ListConsumerGroupOffsetsHandler handler = new ListConsumerGroupOffsetsHandler(groupId, tps, logContext);
- OffsetFetchRequest request = handler.buildRequest(1, singleton(CoordinatorKey.byGroupId(groupId))).build();
+ OffsetFetchRequest request = handler.buildBatchedRequest(1, singleton(CoordinatorKey.byGroupId(groupId))).build();
assertEquals(groupId, request.data().groups().get(0).groupId());
assertEquals(2, request.data().groups().get(0).topics().size());
assertEquals(2, request.data().groups().get(0).topics().get(0).partitionIndexes().size());
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/internals/ListTransactionsHandlerTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/internals/ListTransactionsHandlerTest.java
index a8923d1aa2c9c..3c54fad65e8f9 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/internals/ListTransactionsHandlerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/internals/ListTransactionsHandlerTest.java
@@ -52,7 +52,7 @@ public void testBuildRequestWithoutFilters() {
BrokerKey brokerKey = new BrokerKey(OptionalInt.of(brokerId));
ListTransactionsOptions options = new ListTransactionsOptions();
ListTransactionsHandler handler = new ListTransactionsHandler(options, logContext);
- ListTransactionsRequest request = handler.buildRequest(brokerId, singleton(brokerKey)).build();
+ ListTransactionsRequest request = handler.buildBatchedRequest(brokerId, singleton(brokerKey)).build();
assertEquals(Collections.emptyList(), request.data().producerIdFilters());
assertEquals(Collections.emptyList(), request.data().stateFilters());
}
@@ -65,7 +65,7 @@ public void testBuildRequestWithFilteredProducerId() {
ListTransactionsOptions options = new ListTransactionsOptions()
.filterProducerIds(singleton(filteredProducerId));
ListTransactionsHandler handler = new ListTransactionsHandler(options, logContext);
- ListTransactionsRequest request = handler.buildRequest(brokerId, singleton(brokerKey)).build();
+ ListTransactionsRequest request = handler.buildBatchedRequest(brokerId, singleton(brokerKey)).build();
assertEquals(Collections.singletonList(filteredProducerId), request.data().producerIdFilters());
assertEquals(Collections.emptyList(), request.data().stateFilters());
}
@@ -78,7 +78,7 @@ public void testBuildRequestWithFilteredState() {
ListTransactionsOptions options = new ListTransactionsOptions()
.filterStates(singleton(filteredState));
ListTransactionsHandler handler = new ListTransactionsHandler(options, logContext);
- ListTransactionsRequest request = handler.buildRequest(brokerId, singleton(brokerKey)).build();
+ ListTransactionsRequest request = handler.buildBatchedRequest(brokerId, singleton(brokerKey)).build();
assertEquals(Collections.singletonList(filteredState.toString()), request.data().stateFilters());
assertEquals(Collections.emptyList(), request.data().producerIdFilters());
}
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/internals/RemoveMembersFromConsumerGroupHandlerTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/internals/RemoveMembersFromConsumerGroupHandlerTest.java
index 6f5dfda5bc307..3ecd1f10ee2f6 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/internals/RemoveMembersFromConsumerGroupHandlerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/internals/RemoveMembersFromConsumerGroupHandlerTest.java
@@ -55,7 +55,7 @@ public class RemoveMembersFromConsumerGroupHandlerTest {
@Test
public void testBuildRequest() {
RemoveMembersFromConsumerGroupHandler handler = new RemoveMembersFromConsumerGroupHandler(groupId, members, logContext);
- LeaveGroupRequest request = handler.buildRequest(1, singleton(CoordinatorKey.byGroupId(groupId))).build();
+ LeaveGroupRequest request = handler.buildBatchedRequest(1, singleton(CoordinatorKey.byGroupId(groupId))).build();
assertEquals(groupId, request.data().groupId());
assertEquals(2, request.data().members().size());
}