Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
Original file line number Diff line number Diff line change
Expand Up @@ -1575,6 +1575,29 @@ default ListTransactionsResult listTransactions() {
*/
ListTransactionsResult listTransactions(ListTransactionsOptions options);

/**
* Fence out all active producers that use any of the provided transactional IDs, with the default options.
* <p>
* This is a convenience method for {@link #fenceProducers(Collection, FenceProducersOptions)}
* with default options. See the overload for more details.
*
* @param transactionalIds The IDs of the producers to fence.
* @return The FenceProducersResult.
*/
default FenceProducersResult fenceProducers(Collection<String> transactionalIds) {
return fenceProducers(transactionalIds, new FenceProducersOptions());
}

/**
* Fence out all active producers that use any of the provided transactional IDs.
*
* @param transactionalIds The IDs of the producers to fence.
* @param options The options to use when fencing the producers.
* @return The FenceProducersResult.
*/
FenceProducersResult fenceProducers(Collection<String> transactionalIds,
FenceProducersOptions options);

/**
* Get the metrics kept by the adminClient
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kafka.clients.admin;

import org.apache.kafka.common.annotation.InterfaceStability;

import java.util.Collection;

/**
* Options for {@link Admin#fenceProducers(Collection, FenceProducersOptions)}
*
* The API of this class is evolving. See {@link Admin} for details.
*/
@InterfaceStability.Evolving
public class FenceProducersOptions extends AbstractOptions<FenceProducersOptions> {

@Override
public String toString() {
return "FenceProducersOptions{" +
"timeoutMs=" + timeoutMs +
'}';
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kafka.clients.admin;

import org.apache.kafka.clients.admin.internals.CoordinatorKey;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.common.utils.ProducerIdAndEpoch;

import java.util.Collection;
import java.util.Map;
import java.util.stream.Collectors;

/**
* The result of the {@link Admin#fenceProducers(Collection)} call.
*
* The API of this class is evolving, see {@link Admin} for details.
*/
@InterfaceStability.Evolving
public class FenceProducersResult {

private final Map<CoordinatorKey, KafkaFuture<ProducerIdAndEpoch>> futures;

FenceProducersResult(Map<CoordinatorKey, KafkaFuture<ProducerIdAndEpoch>> futures) {
this.futures = futures;
}

/**
* Return a map from transactional ID to futures which can be used to check the status of
* individual fencings.
*/
public Map<String, KafkaFuture<Void>> fencedProducers() {
Comment thread
tombentley marked this conversation as resolved.
Outdated
return futures.entrySet().stream().collect(Collectors.toMap(
e -> e.getKey().idValue,
e -> e.getValue().thenApply(p -> null)
));
}

/**
* Returns a future that provides the producer ID generated while initializing the given transaction when the request completes.
*/
public KafkaFuture<Long> producerId(String transactionalId) {
return findAndApply(transactionalId, p -> p.producerId);
}

/**
* Returns a future that provides the epoch ID generated while initializing the given transaction when the request completes.
*/
public KafkaFuture<Short> epochId(String transactionalId) {
return findAndApply(transactionalId, p -> p.epoch);
}

/**
* Return a future which succeeds only if all the producer fencings succeed.
*/
public KafkaFuture<Void> all() {
return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0]));
}

private <T> KafkaFuture<T> findAndApply(String transactionalId, KafkaFuture.BaseFunction<ProducerIdAndEpoch, T> followup) {
CoordinatorKey key = CoordinatorKey.byTransactionalId(transactionalId);
KafkaFuture<ProducerIdAndEpoch> future = futures.get(key);
if (future == null) {
throw new IllegalArgumentException("TransactionalId " +
"`" + transactionalId + "` was not included in the request");
}
return future.thenApply(followup);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.kafka.clients.admin.internals.DescribeConsumerGroupsHandler;
import org.apache.kafka.clients.admin.internals.DescribeProducersHandler;
import org.apache.kafka.clients.admin.internals.DescribeTransactionsHandler;
import org.apache.kafka.clients.admin.internals.FenceProducersHandler;
import org.apache.kafka.clients.admin.internals.ListConsumerGroupOffsetsHandler;
import org.apache.kafka.clients.admin.internals.ListTransactionsHandler;
import org.apache.kafka.clients.admin.internals.MetadataOperationContext;
Expand Down Expand Up @@ -234,6 +235,7 @@
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.common.utils.KafkaThread;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.ProducerIdAndEpoch;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;
Expand Down Expand Up @@ -4390,6 +4392,15 @@ public ListTransactionsResult listTransactions(ListTransactionsOptions options)
return new ListTransactionsResult(future.all());
}

@Override
public FenceProducersResult fenceProducers(Collection<String> transactionalIds, FenceProducersOptions options) {
AdminApiFuture.SimpleAdminApiFuture<CoordinatorKey, ProducerIdAndEpoch> future =
FenceProducersHandler.newFuture(transactionalIds);
FenceProducersHandler handler = new FenceProducersHandler(logContext);
invokeDriver(handler, future, options.timeoutMs);
return new FenceProducersResult(future.all());
}

private <K, V> void invokeDriver(
AdminApiHandler<K, V> handler,
AdminApiFuture<K, V> future,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
import static java.util.Collections.singleton;
import static java.util.Collections.singletonList;

public class AbortTransactionHandler implements AdminApiHandler<TopicPartition, Void> {
public class AbortTransactionHandler extends AdminApiHandler.Batched<TopicPartition, Void> {
private final Logger log;
private final AbortTransactionSpec abortSpec;
private final PartitionLeaderStrategy lookupStrategy;
Expand Down Expand Up @@ -69,7 +69,7 @@ public AdminApiLookupStrategy<TopicPartition> lookupStrategy() {
}

@Override
public WriteTxnMarkersRequest.Builder buildRequest(
public WriteTxnMarkersRequest.Builder buildBatchedRequest(
int brokerId,
Set<TopicPartition> topicPartitions
) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ private void clearInflightRequest(long currentTimeMs, RequestSpec<K> spec) {
private <T extends ApiRequestScope> void collectRequests(
List<RequestSpec<K>> requests,
BiMultimap<T, K> multimap,
BiFunction<Set<K>, T, AbstractRequest.Builder<?>> buildRequest
BiFunction<Set<K>, T, Collection<AdminApiHandler.RequestAndKeys<K>>> buildRequest
) {
for (Map.Entry<T, Set<K>> entry : multimap.entrySet()) {
T scope = entry.getKey();
Expand All @@ -306,12 +306,19 @@ private <T extends ApiRequestScope> void collectRequests(
// Copy the keys to avoid exposing the underlying mutable set
Set<K> copyKeys = Collections.unmodifiableSet(new HashSet<>(keys));

AbstractRequest.Builder<?> request = buildRequest.apply(copyKeys, scope);
Collection<AdminApiHandler.RequestAndKeys<K>> newRequests = buildRequest.apply(copyKeys, scope);
if (newRequests.isEmpty()) {
return;
}

// Only process the first request; all the remaining requests will be targeted at the same broker
// and we don't want to issue more than one fulfillment request per broker at a time
AdminApiHandler.RequestAndKeys<K> newRequest = newRequests.iterator().next();
RequestSpec<K> spec = new RequestSpec<>(
handler.apiName() + "(api=" + request.apiKey() + ")",
handler.apiName() + "(api=" + newRequest.request.apiKey() + ")",
scope,
copyKeys,
request,
newRequest.keys,
newRequest.request,
requestState.nextAllowedRetryMs,
deadlineMs,
requestState.tries
Expand All @@ -326,7 +333,7 @@ private void collectLookupRequests(List<RequestSpec<K>> requests) {
collectRequests(
requests,
lookupMap,
(keys, scope) -> handler.lookupStrategy().buildRequest(keys)
(keys, scope) -> Collections.singletonList(new AdminApiHandler.RequestAndKeys<>(handler.lookupStrategy().buildRequest(keys), keys))
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.AbstractResponse;

import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

public interface AdminApiHandler<K, V> {

Expand All @@ -33,16 +35,18 @@ public interface AdminApiHandler<K, V> {
String apiName();

/**
* Build the request. The set of keys are derived by {@link AdminApiDriver}
* during the lookup stage as the set of keys which all map to the same
* destination broker.
* Build the requests necessary for the given keys. The set of keys is derived by
* {@link AdminApiDriver} during the lookup stage as the set of keys which all map
* to the same destination broker. Handlers can choose to issue a single request for
* all of the provided keys (see {@link Batched}, issue one request per key (see
* {@link Unbatched}, or implement their own custom grouping logic if necessary.
*
* @param brokerId the target brokerId for the request
* @param keys the set of keys that should be handled by this request
*
* @return a builder for the request containing the given keys
* @return a collection of {@link RequestAndKeys} for the requests containing the given keys
*/
AbstractRequest.Builder<?> buildRequest(int brokerId, Set<K> keys);
Collection<RequestAndKeys<K>> buildRequest(int brokerId, Set<K> keys);

/**
* Callback that is invoked when a request returns successfully.
Expand Down Expand Up @@ -122,4 +126,54 @@ public static <K, V> ApiResult<K, V> empty() {
}
}

class RequestAndKeys<K> {
public final AbstractRequest.Builder<?> request;
public final Set<K> keys;

public RequestAndKeys(AbstractRequest.Builder<?> request, Set<K> keys) {
this.request = request;
this.keys = keys;
}
}

/**
* An {@link AdminApiHandler} that will group multiple keys into a single request when possible.
* Keys will be grouped together whenever they target the same broker. This type of handler
* should be used when interacting with broker APIs that can act on multiple keys at once, such
* as describing or listing transactions.
*/
abstract class Batched<K, V> implements AdminApiHandler<K, V> {
abstract AbstractRequest.Builder<?> buildBatchedRequest(int brokerId, Set<K> keys);

@Override
public final Collection<RequestAndKeys<K>> buildRequest(int brokerId, Set<K> keys) {
return Collections.singleton(new RequestAndKeys<>(buildBatchedRequest(brokerId, keys), keys));
}
}

/**
* An {@link AdminApiHandler} that will create one request per key, not performing any grouping based
* on the targeted broker. This type of handler should only be used for broker APIs that do not accept
* multiple keys at once, such as initializing a transactional producer.
*/
abstract class Unbatched<K, V> implements AdminApiHandler<K, V> {
abstract AbstractRequest.Builder<?> buildSingleRequest(int brokerId, K key);
abstract ApiResult<K, V> handleSingleResponse(Node broker, K key, AbstractResponse response);

@Override
public final Collection<RequestAndKeys<K>> buildRequest(int brokerId, Set<K> keys) {
return keys.stream()
.map(key -> new RequestAndKeys<>(buildSingleRequest(brokerId, key), Collections.singleton(key)))
.collect(Collectors.toSet());
}

@Override
public final ApiResult<K, V> handleResponse(Node broker, Set<K> keys, AbstractResponse response) {
if (keys.size() != 1) {
throw new IllegalArgumentException("Unbatched admin handler should only be required to handle responses for a single key at a time");
}
K key = keys.iterator().next();
return handleSingleResponse(broker, key, response);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
import org.apache.kafka.common.utils.LogContext;
import org.slf4j.Logger;

public class AlterConsumerGroupOffsetsHandler implements AdminApiHandler<CoordinatorKey, Map<TopicPartition, Errors>> {
public class AlterConsumerGroupOffsetsHandler extends AdminApiHandler.Batched<CoordinatorKey, Map<TopicPartition, Errors>> {

private final CoordinatorKey groupId;
private final Map<TopicPartition, OffsetAndMetadata> offsets;
Expand Down Expand Up @@ -83,7 +83,7 @@ private void validateKeys(Set<CoordinatorKey> groupIds) {
}

@Override
public OffsetCommitRequest.Builder buildRequest(
public OffsetCommitRequest.Builder buildBatchedRequest(
int coordinatorId,
Set<CoordinatorKey> groupIds
) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
import org.apache.kafka.common.utils.LogContext;
import org.slf4j.Logger;

public class DeleteConsumerGroupOffsetsHandler implements AdminApiHandler<CoordinatorKey, Map<TopicPartition, Errors>> {
public class DeleteConsumerGroupOffsetsHandler extends AdminApiHandler.Batched<CoordinatorKey, Map<TopicPartition, Errors>> {

private final CoordinatorKey groupId;
private final Set<TopicPartition> partitions;
Expand Down Expand Up @@ -80,7 +80,7 @@ private void validateKeys(Set<CoordinatorKey> groupIds) {
}

@Override
public OffsetDeleteRequest.Builder buildRequest(int coordinatorId, Set<CoordinatorKey> groupIds) {
public OffsetDeleteRequest.Builder buildBatchedRequest(int coordinatorId, Set<CoordinatorKey> groupIds) {
validateKeys(groupIds);

final OffsetDeleteRequestTopicCollection topics = new OffsetDeleteRequestTopicCollection();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
import org.apache.kafka.common.utils.LogContext;
import org.slf4j.Logger;

public class DeleteConsumerGroupsHandler implements AdminApiHandler<CoordinatorKey, Void> {
public class DeleteConsumerGroupsHandler extends AdminApiHandler.Batched<CoordinatorKey, Void> {

private final Logger log;
private final AdminApiLookupStrategy<CoordinatorKey> lookupStrategy;
Expand Down Expand Up @@ -71,7 +71,7 @@ private static Set<CoordinatorKey> buildKeySet(Collection<String> groupIds) {
}

@Override
public DeleteGroupsRequest.Builder buildRequest(
public DeleteGroupsRequest.Builder buildBatchedRequest(
int coordinatorId,
Set<CoordinatorKey> keys
) {
Expand Down
Loading