diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index f663d6efc60f4..a66c610e4f5fa 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -4394,7 +4394,10 @@ public ListTransactionsResult listTransactions(ListTransactionsOptions options) public FenceProducersResult fenceProducers(Collection transactionalIds, FenceProducersOptions options) { AdminApiFuture.SimpleAdminApiFuture future = FenceProducersHandler.newFuture(transactionalIds); - FenceProducersHandler handler = new FenceProducersHandler(logContext); + if (options.timeoutMs() == null) { + options.timeoutMs(defaultApiTimeoutMs); + } + FenceProducersHandler handler = new FenceProducersHandler(options, logContext); invokeDriver(handler, future, options.timeoutMs); return new FenceProducersResult(future.all()); } diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/FenceProducersHandler.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/FenceProducersHandler.java index 23572dd4419ca..3035549efb106 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/internals/FenceProducersHandler.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/FenceProducersHandler.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.clients.admin.internals; +import org.apache.kafka.clients.admin.FenceProducersOptions; import org.apache.kafka.common.Node; import org.apache.kafka.common.errors.ClusterAuthorizationException; import org.apache.kafka.common.errors.TransactionalIdAuthorizationException; @@ -38,12 +39,15 @@ public class FenceProducersHandler extends AdminApiHandler.Unbatched { private final Logger log; private final AdminApiLookupStrategy lookupStrategy; + private final FenceProducersOptions options; public FenceProducersHandler( + FenceProducersOptions options, LogContext logContext ) { this.log = logContext.logger(FenceProducersHandler.class); this.lookupStrategy = new CoordinatorStrategy(FindCoordinatorRequest.CoordinatorType.TRANSACTION, logContext); + this.options = options; } public static AdminApiFuture.SimpleAdminApiFuture newFuture( @@ -82,9 +86,10 @@ InitProducerIdRequest.Builder buildSingleRequest(int brokerId, CoordinatorKey ke .setProducerEpoch(ProducerIdAndEpoch.NONE.epoch) .setProducerId(ProducerIdAndEpoch.NONE.producerId) .setTransactionalId(key.idValue) - // Set transaction timeout to 1 since it's only being initialized to fence out older producers with the same transactional ID, - // and shouldn't be used for any actual record writes - .setTransactionTimeoutMs(1); + // Set transaction timeout to the equivalent as the fenceProducers request since it's only being initialized to fence out older producers with the same transactional ID, + // and shouldn't be used for any actual record writes. This has been changed to match the fenceProducers request timeout from one as some brokers may be slower than expected + // and we need a safe timeout that allows the transaction init to finish. + .setTransactionTimeoutMs(this.options.timeoutMs()); return new InitProducerIdRequest.Builder(data); } diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java index 19bff2ea5e87a..b0922c45915d1 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java @@ -7031,6 +7031,7 @@ public void testFenceProducers() throws Exception { try (AdminClientUnitTestEnv env = mockClientEnv()) { String transactionalId = "copyCat"; Node transactionCoordinator = env.cluster().nodes().iterator().next(); + final FenceProducersOptions options = new FenceProducersOptions().timeoutMs(10000); // fail to find the coordinator at first with a retriable error env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE, transactionalId, transactionCoordinator)); @@ -7060,7 +7061,7 @@ public void testFenceProducers() throws Exception { transactionCoordinator ); - FenceProducersResult result = env.adminClient().fenceProducers(Collections.singleton(transactionalId)); + FenceProducersResult result = env.adminClient().fenceProducers(Collections.singleton(transactionalId), options); assertNull(result.all().get()); assertEquals(4761, result.producerId(transactionalId).get()); assertEquals((short) 489, result.epochId(transactionalId).get()); diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/internals/FenceProducersHandlerTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/internals/FenceProducersHandlerTest.java index c4151ebb0e15e..798974117ffb7 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/internals/FenceProducersHandlerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/internals/FenceProducersHandlerTest.java @@ -35,14 +35,16 @@ import static org.apache.kafka.common.utils.Utils.mkSet; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; +import org.apache.kafka.clients.admin.FenceProducersOptions; public class FenceProducersHandlerTest { private final LogContext logContext = new LogContext(); private final Node node = new Node(1, "host", 1234); + private final FenceProducersOptions options = new FenceProducersOptions().timeoutMs(10000); @Test public void testBuildRequest() { - FenceProducersHandler handler = new FenceProducersHandler(logContext); + FenceProducersHandler handler = new FenceProducersHandler(options, logContext); mkSet("foo", "bar", "baz").forEach(transactionalId -> assertLookup(handler, transactionalId)); } @@ -51,7 +53,7 @@ public void testHandleSuccessfulResponse() { String transactionalId = "foo"; CoordinatorKey key = CoordinatorKey.byTransactionalId(transactionalId); - FenceProducersHandler handler = new FenceProducersHandler(logContext); + FenceProducersHandler handler = new FenceProducersHandler(options, logContext); short epoch = 57; long producerId = 7; @@ -73,7 +75,7 @@ public void testHandleSuccessfulResponse() { @Test public void testHandleErrorResponse() { String transactionalId = "foo"; - FenceProducersHandler handler = new FenceProducersHandler(logContext); + FenceProducersHandler handler = new FenceProducersHandler(options, logContext); assertFatalError(handler, transactionalId, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED); assertFatalError(handler, transactionalId, Errors.CLUSTER_AUTHORIZATION_FAILED); assertFatalError(handler, transactionalId, Errors.UNKNOWN_SERVER_ERROR); @@ -142,6 +144,6 @@ private void assertLookup(FenceProducersHandler handler, String transactionalId) CoordinatorKey key = CoordinatorKey.byTransactionalId(transactionalId); InitProducerIdRequest.Builder request = handler.buildSingleRequest(1, key); assertEquals(transactionalId, request.data.transactionalId()); - assertEquals(1, request.data.transactionTimeoutMs()); + assertEquals(this.options.timeoutMs(), request.data.transactionTimeoutMs()); } }