diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index 71d39900cd5a8..92ba6ad3d6c1f 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -4569,7 +4569,7 @@ public ListTransactionsResult listTransactions(ListTransactionsOptions options) public FenceProducersResult fenceProducers(Collection transactionalIds, FenceProducersOptions options) { AdminApiFuture.SimpleAdminApiFuture future = FenceProducersHandler.newFuture(transactionalIds); - FenceProducersHandler handler = new FenceProducersHandler(logContext); + FenceProducersHandler handler = new FenceProducersHandler(options, logContext, requestTimeoutMs); invokeDriver(handler, future, options.timeoutMs); return new FenceProducersResult(future.all()); } diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/FenceProducersHandler.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/FenceProducersHandler.java index 23572dd4419ca..9a12bc1959609 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/internals/FenceProducersHandler.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/FenceProducersHandler.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.clients.admin.internals; +import org.apache.kafka.clients.admin.FenceProducersOptions; import org.apache.kafka.common.Node; import org.apache.kafka.common.errors.ClusterAuthorizationException; import org.apache.kafka.common.errors.TransactionalIdAuthorizationException; @@ -38,12 +39,16 @@ public class FenceProducersHandler extends AdminApiHandler.Unbatched { private final Logger log; private final AdminApiLookupStrategy lookupStrategy; + private final int txnTimeoutMs; public FenceProducersHandler( - LogContext logContext + FenceProducersOptions options, + LogContext logContext, + int requestTimeoutMs ) { this.log = logContext.logger(FenceProducersHandler.class); this.lookupStrategy = new CoordinatorStrategy(FindCoordinatorRequest.CoordinatorType.TRANSACTION, logContext); + this.txnTimeoutMs = options.timeoutMs() != null ? options.timeoutMs() : requestTimeoutMs; } public static AdminApiFuture.SimpleAdminApiFuture newFuture( @@ -82,9 +87,8 @@ InitProducerIdRequest.Builder buildSingleRequest(int brokerId, CoordinatorKey ke .setProducerEpoch(ProducerIdAndEpoch.NONE.epoch) .setProducerId(ProducerIdAndEpoch.NONE.producerId) .setTransactionalId(key.idValue) - // Set transaction timeout to 1 since it's only being initialized to fence out older producers with the same transactional ID, - // and shouldn't be used for any actual record writes - .setTransactionTimeoutMs(1); + // This timeout is used by the coordinator to append the record with the new producer epoch to the transaction log. + .setTransactionTimeoutMs(txnTimeoutMs); return new InitProducerIdRequest.Builder(data); } diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/internals/FenceProducersHandlerTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/internals/FenceProducersHandlerTest.java index 34ed2e6772c2f..9665bd0bdf120 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/internals/FenceProducersHandlerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/internals/FenceProducersHandlerTest.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.clients.admin.internals; +import org.apache.kafka.clients.admin.FenceProducersOptions; import org.apache.kafka.clients.admin.internals.AdminApiHandler.ApiResult; import org.apache.kafka.common.Node; import org.apache.kafka.common.message.InitProducerIdResponseData; @@ -39,11 +40,21 @@ public class FenceProducersHandlerTest { private final LogContext logContext = new LogContext(); private final Node node = new Node(1, "host", 1234); + private final int requestTimeoutMs = 30000; + private final FenceProducersOptions options = new FenceProducersOptions(); @Test public void testBuildRequest() { - FenceProducersHandler handler = new FenceProducersHandler(logContext); - mkSet("foo", "bar", "baz").forEach(transactionalId -> assertLookup(handler, transactionalId)); + FenceProducersHandler handler = new FenceProducersHandler(options, logContext, requestTimeoutMs); + mkSet("foo", "bar", "baz").forEach(transactionalId -> assertLookup(handler, transactionalId, requestTimeoutMs)); + } + + @Test + public void testBuildRequestOptionsTimeout() { + final int optionsTimeoutMs = 50000; + options.timeoutMs(optionsTimeoutMs); + FenceProducersHandler handler = new FenceProducersHandler(options, logContext, requestTimeoutMs); + mkSet("foo", "bar", "baz").forEach(transactionalId -> assertLookup(handler, transactionalId, optionsTimeoutMs)); } @Test @@ -51,7 +62,7 @@ public void testHandleSuccessfulResponse() { String transactionalId = "foo"; CoordinatorKey key = CoordinatorKey.byTransactionalId(transactionalId); - FenceProducersHandler handler = new FenceProducersHandler(logContext); + FenceProducersHandler handler = new FenceProducersHandler(options, logContext, requestTimeoutMs); short epoch = 57; long producerId = 7; @@ -73,7 +84,7 @@ public void testHandleSuccessfulResponse() { @Test public void testHandleErrorResponse() { String transactionalId = "foo"; - FenceProducersHandler handler = new FenceProducersHandler(logContext); + FenceProducersHandler handler = new FenceProducersHandler(options, logContext, requestTimeoutMs); assertFatalError(handler, transactionalId, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED); assertFatalError(handler, transactionalId, Errors.CLUSTER_AUTHORIZATION_FAILED); assertFatalError(handler, transactionalId, Errors.UNKNOWN_SERVER_ERROR); @@ -136,10 +147,10 @@ private ApiResult handleResponseError( return result; } - private void assertLookup(FenceProducersHandler handler, String transactionalId) { + private void assertLookup(FenceProducersHandler handler, String transactionalId, int txnTimeoutMs) { CoordinatorKey key = CoordinatorKey.byTransactionalId(transactionalId); InitProducerIdRequest.Builder request = handler.buildSingleRequest(1, key); assertEquals(transactionalId, request.data.transactionalId()); - assertEquals(1, request.data.transactionTimeoutMs()); + assertEquals(txnTimeoutMs, request.data.transactionTimeoutMs()); } }