From 04d64928835f363624650b30fb3160df237bc3da Mon Sep 17 00:00:00 2001 From: Edoardo Comar Date: Fri, 31 May 2024 09:44:49 +0100 Subject: [PATCH 1/2] KAFKA-16047: Use REQUEST_TIMEOUT_MS_CONFIG in AdminClient.fenceProducers Use REQUEST_TIMEOUT_MS_CONFIG in AdminClient.fenceProducers as transaction timeout. No transaction will be started with this timeout, but ReplicaManager.appendRecords uses this value as its timeout. Use REQUEST_TIMEOUT_MS_CONFIG like a regular producer append to allow for replication to take place. Co-Authored-By: Adrian Preston --- .../apache/kafka/clients/admin/KafkaAdminClient.java | 2 +- .../admin/internals/FenceProducersHandler.java | 11 +++++++---- .../admin/internals/FenceProducersHandlerTest.java | 9 +++++---- 3 files changed, 13 insertions(+), 9 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index 71d39900cd5a8..8496c677a5ac7 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -4569,7 +4569,7 @@ public ListTransactionsResult listTransactions(ListTransactionsOptions options) public FenceProducersResult fenceProducers(Collection transactionalIds, FenceProducersOptions options) { AdminApiFuture.SimpleAdminApiFuture future = FenceProducersHandler.newFuture(transactionalIds); - FenceProducersHandler handler = new FenceProducersHandler(logContext); + FenceProducersHandler handler = new FenceProducersHandler(logContext, requestTimeoutMs); invokeDriver(handler, future, options.timeoutMs); return new FenceProducersResult(future.all()); } diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/FenceProducersHandler.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/FenceProducersHandler.java index 23572dd4419ca..bf5e99953c56d 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/internals/FenceProducersHandler.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/FenceProducersHandler.java @@ -38,12 +38,15 @@ public class FenceProducersHandler extends AdminApiHandler.Unbatched { private final Logger log; private final AdminApiLookupStrategy lookupStrategy; + private final int requestTimeoutMs; public FenceProducersHandler( - LogContext logContext + LogContext logContext, + int requestTimeoutMs ) { this.log = logContext.logger(FenceProducersHandler.class); this.lookupStrategy = new CoordinatorStrategy(FindCoordinatorRequest.CoordinatorType.TRANSACTION, logContext); + this.requestTimeoutMs = requestTimeoutMs; } public static AdminApiFuture.SimpleAdminApiFuture newFuture( @@ -82,9 +85,9 @@ InitProducerIdRequest.Builder buildSingleRequest(int brokerId, CoordinatorKey ke .setProducerEpoch(ProducerIdAndEpoch.NONE.epoch) .setProducerId(ProducerIdAndEpoch.NONE.producerId) .setTransactionalId(key.idValue) - // Set transaction timeout to 1 since it's only being initialized to fence out older producers with the same transactional ID, - // and shouldn't be used for any actual record writes - .setTransactionTimeoutMs(1); + // Set transaction timeout to requestTimeoutMs since it's only being initialized to fence out older producers with the same transactional ID. + // This timeout is used to append the record with the new producer epoch to the transaction log. + .setTransactionTimeoutMs(requestTimeoutMs); return new InitProducerIdRequest.Builder(data); } diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/internals/FenceProducersHandlerTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/internals/FenceProducersHandlerTest.java index 34ed2e6772c2f..0b45d16588cc7 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/internals/FenceProducersHandlerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/internals/FenceProducersHandlerTest.java @@ -39,10 +39,11 @@ public class FenceProducersHandlerTest { private final LogContext logContext = new LogContext(); private final Node node = new Node(1, "host", 1234); + private final int timeoutMs = 30000; @Test public void testBuildRequest() { - FenceProducersHandler handler = new FenceProducersHandler(logContext); + FenceProducersHandler handler = new FenceProducersHandler(logContext, timeoutMs); mkSet("foo", "bar", "baz").forEach(transactionalId -> assertLookup(handler, transactionalId)); } @@ -51,7 +52,7 @@ public void testHandleSuccessfulResponse() { String transactionalId = "foo"; CoordinatorKey key = CoordinatorKey.byTransactionalId(transactionalId); - FenceProducersHandler handler = new FenceProducersHandler(logContext); + FenceProducersHandler handler = new FenceProducersHandler(logContext, timeoutMs); short epoch = 57; long producerId = 7; @@ -73,7 +74,7 @@ public void testHandleSuccessfulResponse() { @Test public void testHandleErrorResponse() { String transactionalId = "foo"; - FenceProducersHandler handler = new FenceProducersHandler(logContext); + FenceProducersHandler handler = new FenceProducersHandler(logContext, timeoutMs); assertFatalError(handler, transactionalId, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED); assertFatalError(handler, transactionalId, Errors.CLUSTER_AUTHORIZATION_FAILED); assertFatalError(handler, transactionalId, Errors.UNKNOWN_SERVER_ERROR); @@ -140,6 +141,6 @@ private void assertLookup(FenceProducersHandler handler, String transactionalId) CoordinatorKey key = CoordinatorKey.byTransactionalId(transactionalId); InitProducerIdRequest.Builder request = handler.buildSingleRequest(1, key); assertEquals(transactionalId, request.data.transactionalId()); - assertEquals(1, request.data.transactionTimeoutMs()); + assertEquals(timeoutMs, request.data.transactionTimeoutMs()); } } From b80bee6e7bb1779f57d1051821a4c82b35eb4cd7 Mon Sep 17 00:00:00 2001 From: Edoardo Comar Date: Tue, 4 Jun 2024 00:52:44 +0100 Subject: [PATCH 2/2] Use options.timeoutMs if not null --- .../kafka/clients/admin/KafkaAdminClient.java | 2 +- .../internals/FenceProducersHandler.java | 11 +++++---- .../internals/FenceProducersHandlerTest.java | 24 +++++++++++++------ 3 files changed, 24 insertions(+), 13 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index 8496c677a5ac7..92ba6ad3d6c1f 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -4569,7 +4569,7 @@ public ListTransactionsResult listTransactions(ListTransactionsOptions options) public FenceProducersResult fenceProducers(Collection transactionalIds, FenceProducersOptions options) { AdminApiFuture.SimpleAdminApiFuture future = FenceProducersHandler.newFuture(transactionalIds); - FenceProducersHandler handler = new FenceProducersHandler(logContext, requestTimeoutMs); + FenceProducersHandler handler = new FenceProducersHandler(options, logContext, requestTimeoutMs); invokeDriver(handler, future, options.timeoutMs); return new FenceProducersResult(future.all()); } diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/FenceProducersHandler.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/FenceProducersHandler.java index bf5e99953c56d..9a12bc1959609 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/internals/FenceProducersHandler.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/FenceProducersHandler.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.clients.admin.internals; +import org.apache.kafka.clients.admin.FenceProducersOptions; import org.apache.kafka.common.Node; import org.apache.kafka.common.errors.ClusterAuthorizationException; import org.apache.kafka.common.errors.TransactionalIdAuthorizationException; @@ -38,15 +39,16 @@ public class FenceProducersHandler extends AdminApiHandler.Unbatched { private final Logger log; private final AdminApiLookupStrategy lookupStrategy; - private final int requestTimeoutMs; + private final int txnTimeoutMs; public FenceProducersHandler( + FenceProducersOptions options, LogContext logContext, int requestTimeoutMs ) { this.log = logContext.logger(FenceProducersHandler.class); this.lookupStrategy = new CoordinatorStrategy(FindCoordinatorRequest.CoordinatorType.TRANSACTION, logContext); - this.requestTimeoutMs = requestTimeoutMs; + this.txnTimeoutMs = options.timeoutMs() != null ? options.timeoutMs() : requestTimeoutMs; } public static AdminApiFuture.SimpleAdminApiFuture newFuture( @@ -85,9 +87,8 @@ InitProducerIdRequest.Builder buildSingleRequest(int brokerId, CoordinatorKey ke .setProducerEpoch(ProducerIdAndEpoch.NONE.epoch) .setProducerId(ProducerIdAndEpoch.NONE.producerId) .setTransactionalId(key.idValue) - // Set transaction timeout to requestTimeoutMs since it's only being initialized to fence out older producers with the same transactional ID. - // This timeout is used to append the record with the new producer epoch to the transaction log. - .setTransactionTimeoutMs(requestTimeoutMs); + // This timeout is used by the coordinator to append the record with the new producer epoch to the transaction log. + .setTransactionTimeoutMs(txnTimeoutMs); return new InitProducerIdRequest.Builder(data); } diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/internals/FenceProducersHandlerTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/internals/FenceProducersHandlerTest.java index 0b45d16588cc7..9665bd0bdf120 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/internals/FenceProducersHandlerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/internals/FenceProducersHandlerTest.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.clients.admin.internals; +import org.apache.kafka.clients.admin.FenceProducersOptions; import org.apache.kafka.clients.admin.internals.AdminApiHandler.ApiResult; import org.apache.kafka.common.Node; import org.apache.kafka.common.message.InitProducerIdResponseData; @@ -39,12 +40,21 @@ public class FenceProducersHandlerTest { private final LogContext logContext = new LogContext(); private final Node node = new Node(1, "host", 1234); - private final int timeoutMs = 30000; + private final int requestTimeoutMs = 30000; + private final FenceProducersOptions options = new FenceProducersOptions(); @Test public void testBuildRequest() { - FenceProducersHandler handler = new FenceProducersHandler(logContext, timeoutMs); - mkSet("foo", "bar", "baz").forEach(transactionalId -> assertLookup(handler, transactionalId)); + FenceProducersHandler handler = new FenceProducersHandler(options, logContext, requestTimeoutMs); + mkSet("foo", "bar", "baz").forEach(transactionalId -> assertLookup(handler, transactionalId, requestTimeoutMs)); + } + + @Test + public void testBuildRequestOptionsTimeout() { + final int optionsTimeoutMs = 50000; + options.timeoutMs(optionsTimeoutMs); + FenceProducersHandler handler = new FenceProducersHandler(options, logContext, requestTimeoutMs); + mkSet("foo", "bar", "baz").forEach(transactionalId -> assertLookup(handler, transactionalId, optionsTimeoutMs)); } @Test @@ -52,7 +62,7 @@ public void testHandleSuccessfulResponse() { String transactionalId = "foo"; CoordinatorKey key = CoordinatorKey.byTransactionalId(transactionalId); - FenceProducersHandler handler = new FenceProducersHandler(logContext, timeoutMs); + FenceProducersHandler handler = new FenceProducersHandler(options, logContext, requestTimeoutMs); short epoch = 57; long producerId = 7; @@ -74,7 +84,7 @@ public void testHandleSuccessfulResponse() { @Test public void testHandleErrorResponse() { String transactionalId = "foo"; - FenceProducersHandler handler = new FenceProducersHandler(logContext, timeoutMs); + FenceProducersHandler handler = new FenceProducersHandler(options, logContext, requestTimeoutMs); assertFatalError(handler, transactionalId, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED); assertFatalError(handler, transactionalId, Errors.CLUSTER_AUTHORIZATION_FAILED); assertFatalError(handler, transactionalId, Errors.UNKNOWN_SERVER_ERROR); @@ -137,10 +147,10 @@ private ApiResult handleResponseError( return result; } - private void assertLookup(FenceProducersHandler handler, String transactionalId) { + private void assertLookup(FenceProducersHandler handler, String transactionalId, int txnTimeoutMs) { CoordinatorKey key = CoordinatorKey.byTransactionalId(transactionalId); InitProducerIdRequest.Builder request = handler.buildSingleRequest(1, key); assertEquals(transactionalId, request.data.transactionalId()); - assertEquals(timeoutMs, request.data.transactionTimeoutMs()); + assertEquals(txnTimeoutMs, request.data.transactionTimeoutMs()); } }