diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index 5fc9a1b9b38ba..a5af5b60093d9 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -256,6 +256,7 @@ public class KafkaProducer implements Producer { private final ProducerInterceptors interceptors; private final ApiVersions apiVersions; private final TransactionManager transactionManager; + private TransactionalRequestResult initTransactionsResult; /** * A producer is instantiated by providing a set of key-value pairs as configuration. Valid configuration strings @@ -555,18 +556,36 @@ private static int parseAcks(String acksString) { * 2. Gets the internal producer id and epoch, used in all future transactional * messages issued by the producer. * + * Note that this method will raise {@link TimeoutException} if the transactional state cannot + * be initialized before expiration of {@code max.block.ms}. Additionally, it will raise {@link InterruptException} + * if interrupted. It is safe to retry in either case, but once the transactional state has been successfully + * initialized, this method should no longer be used. + * * @throws IllegalStateException if no transactional.id has been configured * @throws org.apache.kafka.common.errors.UnsupportedVersionException fatal error indicating the broker * does not support transactions (i.e. if its version is lower than 0.11.0.0) * @throws org.apache.kafka.common.errors.AuthorizationException fatal error indicating that the configured * transactional.id is not authorized. See the exception for more details * @throws KafkaException if the producer has encountered a previous fatal error or for any other unexpected error + * @throws TimeoutException if the time taken for initialize the transaction has surpassed max.block.ms. + * @throws InterruptException if the thread is interrupted while blocked */ public void initTransactions() { throwIfNoTransactionManager(); - TransactionalRequestResult result = transactionManager.initializeTransactions(); - sender.wakeup(); - result.await(); + if (initTransactionsResult == null) { + initTransactionsResult = transactionManager.initializeTransactions(); + sender.wakeup(); + } + + try { + if (initTransactionsResult.await(maxBlockTimeMs, TimeUnit.MILLISECONDS)) { + initTransactionsResult = null; + } else { + throw new TimeoutException("Timeout expired while initializing transactional state in " + maxBlockTimeMs + "ms."); + } + } catch (InterruptedException e) { + throw new InterruptException("Initialize transactions interrupted.", e); + } } /** diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index 7eea4992b3315..426b273b88538 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -329,7 +329,7 @@ private boolean maybeSendTransactionalRequest(long now) { return false; AbstractRequest.Builder requestBuilder = nextRequestHandler.requestBuilder(); - while (true) { + while (running) { Node targetNode = null; try { if (nextRequestHandler.needsCoordinator()) { diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionalRequestResult.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionalRequestResult.java index ff93da872dc97..9c02e94c0458f 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionalRequestResult.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionalRequestResult.java @@ -59,7 +59,10 @@ public void await() { } public boolean await(long timeout, TimeUnit unit) throws InterruptedException { - return latch.await(timeout, unit); + boolean success = latch.await(timeout, unit); + if (!isSuccessful()) + throw error(); + return success; } public RuntimeException error() { diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java index 9f70fd7e70834..8bfc5e7d28a4c 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java @@ -548,4 +548,65 @@ public void testPartitionsForWithNullTopic() { // expected } } + + @Test(expected = TimeoutException.class) + public void testInitTransactionTimeout() { + Properties props = new Properties(); + props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "bad-transaction"); + props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 5); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000"); + + Time time = new MockTime(); + Cluster cluster = TestUtils.singletonCluster("topic", 1); + Node node = cluster.nodes().get(0); + + Metadata metadata = new Metadata(0, Long.MAX_VALUE, true); + metadata.update(cluster, Collections.emptySet(), time.milliseconds()); + + MockClient client = new MockClient(time, metadata); + client.setNode(node); + + Producer producer = new KafkaProducer<>( + new ProducerConfig(ProducerConfig.addSerializerToConfig(props, new StringSerializer(), new StringSerializer())), + new StringSerializer(), new StringSerializer(), metadata, client); + try { + producer.initTransactions(); + fail("initTransactions() should have raised TimeoutException"); + } finally { + producer.close(0, TimeUnit.MILLISECONDS); + } + } + + @Test(expected = KafkaException.class) + public void testOnlyCanExecuteCloseAfterInitTransactionsTimeout() { + Properties props = new Properties(); + props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "bad-transaction"); + props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 5); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000"); + + Time time = new MockTime(); + Cluster cluster = TestUtils.singletonCluster("topic", 1); + Node node = cluster.nodes().get(0); + + Metadata metadata = new Metadata(0, Long.MAX_VALUE, true); + metadata.update(cluster, Collections.emptySet(), time.milliseconds()); + + MockClient client = new MockClient(time, metadata); + client.setNode(node); + + Producer producer = new KafkaProducer<>( + new ProducerConfig(ProducerConfig.addSerializerToConfig(props, new StringSerializer(), new StringSerializer())), + new StringSerializer(), new StringSerializer(), metadata, client); + try { + producer.initTransactions(); + } catch (TimeoutException e) { + // expected + } + // other transactional operations should not be allowed if we catch the error after initTransactions failed + try { + producer.beginTransaction(); + } finally { + producer.close(0, TimeUnit.MILLISECONDS); + } + } } diff --git a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala index 911808a49cdb9..8435e5a3a6c15 100644 --- a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala +++ b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala @@ -19,7 +19,7 @@ package kafka.api import java.lang.{Long => JLong} import java.util.Properties -import java.util.concurrent.{ExecutionException, TimeUnit} +import java.util.concurrent.TimeUnit import kafka.integration.KafkaServerTestHarness import kafka.server.KafkaConfig @@ -27,7 +27,7 @@ import kafka.utils.TestUtils import kafka.utils.TestUtils.consumeRecords import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer, OffsetAndMetadata} import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} -import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.{KafkaException, TopicPartition} import org.apache.kafka.common.errors.ProducerFencedException import org.apache.kafka.common.security.auth.SecurityProtocol import org.junit.{After, Before, Test} @@ -532,6 +532,19 @@ class TransactionsTest extends KafkaServerTestHarness { } } + @Test(expected = classOf[KafkaException]) + def testConsecutivelyRunInitTransactions(): Unit = { + val producer = createTransactionalProducer(transactionalId = "normalProducer") + + try { + producer.initTransactions() + producer.initTransactions() + fail("Should have raised a KafkaException") + } finally { + producer.close() + } + } + private def sendTransactionalMessagesWithValueRange(producer: KafkaProducer[Array[Byte], Array[Byte]], topic: String, start: Int, end: Int, willBeCommitted: Boolean): Unit = { for (i <- start until end) {