From af840cc7294bfcc72e6f8fdf12c63af40ff494f7 Mon Sep 17 00:00:00 2001 From: huxihx Date: Tue, 13 Feb 2018 16:43:01 +0800 Subject: [PATCH 1/8] KAFKA-6446: KafkaProducer with transactionId endless waits when bootstrap server is down https://issues.apache.org/jira/browse/KAFKA-6446 Replaced await() with timed version to avoid endless waiting and refined the code to have Sender thread able to exit from infinitely connecting the `bad` broker. --- .../kafka/clients/producer/KafkaProducer.java | 10 ++++++++- .../clients/producer/internals/Sender.java | 1 + .../internals/TransactionalRequestResult.java | 5 ++++- .../kafka/api/TransactionsTest.scala | 22 ++++++++++++++++--- 4 files changed, 33 insertions(+), 5 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index 5fc9a1b9b38ba..b86da60799771 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -561,12 +561,20 @@ private static int parseAcks(String acksString) { * @throws org.apache.kafka.common.errors.AuthorizationException fatal error indicating that the configured * transactional.id is not authorized. See the exception for more details * @throws KafkaException if the producer has encountered a previous fatal error or for any other unexpected error + * @throws TimeoutException if the producer fails to initialize a transaction within the configured time interval + * @throws InterruptException if the thread is interrupted while blocked */ public void initTransactions() { throwIfNoTransactionManager(); TransactionalRequestResult result = transactionManager.initializeTransactions(); sender.wakeup(); - result.await(); + try { + if (!result.await(requestTimeoutMs, TimeUnit.MILLISECONDS)) { + throw new TimeoutException("Timeout expired while initializing the transaction in " + requestTimeoutMs + "ms."); + } + } catch (InterruptedException e) { + throw new InterruptException("Initialize transactions interrupted.", e); + } } /** diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index 7eea4992b3315..1651a6d6e49af 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -359,6 +359,7 @@ private boolean maybeSendTransactionalRequest(long now) { client.send(clientRequest, now); return true; } + break; // break the loop if we failed to find a specific node } catch (IOException e) { log.debug("Disconnect from {} while trying to send request {}. Going " + "to back off and retry", targetNode, requestBuilder); diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionalRequestResult.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionalRequestResult.java index ff93da872dc97..9c02e94c0458f 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionalRequestResult.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionalRequestResult.java @@ -59,7 +59,10 @@ public void await() { } public boolean await(long timeout, TimeUnit unit) throws InterruptedException { - return latch.await(timeout, unit); + boolean success = latch.await(timeout, unit); + if (!isSuccessful()) + throw error(); + return success; } public RuntimeException error() { diff --git a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala index 911808a49cdb9..f7e02e6f3bacd 100644 --- a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala +++ b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala @@ -19,16 +19,16 @@ package kafka.api import java.lang.{Long => JLong} import java.util.Properties -import java.util.concurrent.{ExecutionException, TimeUnit} +import java.util.concurrent.TimeUnit import kafka.integration.KafkaServerTestHarness import kafka.server.KafkaConfig import kafka.utils.TestUtils import kafka.utils.TestUtils.consumeRecords import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer, OffsetAndMetadata} -import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} +import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord} import org.apache.kafka.common.TopicPartition -import org.apache.kafka.common.errors.ProducerFencedException +import org.apache.kafka.common.errors.{ProducerFencedException, TimeoutException} import org.apache.kafka.common.security.auth.SecurityProtocol import org.junit.{After, Before, Test} import org.junit.Assert._ @@ -532,6 +532,22 @@ class TransactionsTest extends KafkaServerTestHarness { } } + @Test + def testInitTransactionThrowTimeoutExceptionWithBadBrokers() { + val props = new Properties() + props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "test-transaction-id") + val producer = TestUtils.createNewProducer( + brokerList = "192.168.1.1:9092", retries = Integer.MAX_VALUE, requestTimeoutMs = 1000L, props = Some(props)) + try { + producer.initTransactions() + fail("should have raised a TimeoutException since initializing the transaction expired") + } catch { + case _: TimeoutException => // expected + } finally { + producer.close() // should successfully close the producer + } + } + private def sendTransactionalMessagesWithValueRange(producer: KafkaProducer[Array[Byte], Array[Byte]], topic: String, start: Int, end: Int, willBeCommitted: Boolean): Unit = { for (i <- start until end) { From ea2c7027dae5b4d5dedaa63f77b1cc4f8618a82e Mon Sep 17 00:00:00 2001 From: huxihx Date: Fri, 2 Mar 2018 12:11:51 +0800 Subject: [PATCH 2/8] address apurvam's comments --- .../kafka/clients/producer/KafkaProducer.java | 6 ++- .../internals/TransactionManager.java | 8 ++-- .../kafka/api/TransactionsTest.scala | 40 ++++++++++++++----- 3 files changed, 38 insertions(+), 16 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index b86da60799771..047eee9c81bf7 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -50,6 +50,7 @@ import org.apache.kafka.common.header.Headers; import org.apache.kafka.common.header.internals.RecordHeaders; import org.apache.kafka.common.internals.ClusterResourceListeners; +import org.apache.kafka.common.internals.FatalExitError; import org.apache.kafka.common.metrics.JmxReporter; import org.apache.kafka.common.metrics.MetricConfig; import org.apache.kafka.common.metrics.Metrics; @@ -561,7 +562,6 @@ private static int parseAcks(String acksString) { * @throws org.apache.kafka.common.errors.AuthorizationException fatal error indicating that the configured * transactional.id is not authorized. See the exception for more details * @throws KafkaException if the producer has encountered a previous fatal error or for any other unexpected error - * @throws TimeoutException if the producer fails to initialize a transaction within the configured time interval * @throws InterruptException if the thread is interrupted while blocked */ public void initTransactions() { @@ -570,7 +570,9 @@ public void initTransactions() { sender.wakeup(); try { if (!result.await(requestTimeoutMs, TimeUnit.MILLISECONDS)) { - throw new TimeoutException("Timeout expired while initializing the transaction in " + requestTimeoutMs + "ms."); + transactionManager.transitionToFatalError( + new TimeoutException("Timeout expired while initializing the transaction in " + requestTimeoutMs + "ms.")); + throw new FatalExitError(); } } catch (InterruptedException e) { throw new InterruptException("Initialize transactions interrupted.", e); diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java index 006a12b1bfd4c..38d09413f3d18 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java @@ -306,6 +306,10 @@ public boolean isTransactional() { return transactionalId != null; } + public synchronized void transitionToFatalError(RuntimeException exception) { + transitionTo(State.FATAL_ERROR, exception); + } + synchronized boolean hasPartitionsToAdd() { return !newPartitionsInTransaction.isEmpty() || !pendingPartitionsInTransaction.isEmpty(); } @@ -331,10 +335,6 @@ synchronized void transitionToAbortableError(RuntimeException exception) { transitionTo(State.ABORTABLE_ERROR, exception); } - synchronized void transitionToFatalError(RuntimeException exception) { - transitionTo(State.FATAL_ERROR, exception); - } - // visible for testing synchronized boolean isPartitionAdded(TopicPartition partition) { return partitionsInTransaction.contains(partition); diff --git a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala index f7e02e6f3bacd..ef38aadeb63ec 100644 --- a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala +++ b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala @@ -27,8 +27,9 @@ import kafka.utils.TestUtils import kafka.utils.TestUtils.consumeRecords import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer, OffsetAndMetadata} import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord} -import org.apache.kafka.common.TopicPartition -import org.apache.kafka.common.errors.{ProducerFencedException, TimeoutException} +import org.apache.kafka.common.{KafkaException, TopicPartition} +import org.apache.kafka.common.errors.ProducerFencedException +import org.apache.kafka.common.internals.FatalExitError import org.apache.kafka.common.security.auth.SecurityProtocol import org.junit.{After, Before, Test} import org.junit.Assert._ @@ -532,22 +533,41 @@ class TransactionsTest extends KafkaServerTestHarness { } } - @Test - def testInitTransactionThrowTimeoutExceptionWithBadBrokers() { - val props = new Properties() - props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "test-transaction-id") - val producer = TestUtils.createNewProducer( - brokerList = "192.168.1.1:9092", retries = Integer.MAX_VALUE, requestTimeoutMs = 1000L, props = Some(props)) + @Test(expected = classOf[FatalExitError]) + def testInitTransactionFailWithBadBrokers() { + val producer = createTransactionalProducerToConnectNonExistentBrokers() + try { + producer.initTransactions() + fail("should have raised a FatalExitError error since initializing the transaction expired") + } finally { + producer.close() // should successfully close the producer + } + } + + @Test(expected = classOf[KafkaException]) + def testOnlyCanExecuteCloseAfterFailInitTransaction(): Unit = { + val producer = createTransactionalProducerToConnectNonExistentBrokers() try { producer.initTransactions() - fail("should have raised a TimeoutException since initializing the transaction expired") } catch { - case _: TimeoutException => // expected + case _: FatalExitError => // expected + } + // other transactional operations should not be allowed even when we capture the error after initTransaction failed + try { + producer.beginTransaction() } finally { producer.close() // should successfully close the producer } } + private def createTransactionalProducerToConnectNonExistentBrokers(): KafkaProducer[Array[Byte], Array[Byte]] = { + val props = new Properties() + props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "test-transaction-id") + val producer = TestUtils.createNewProducer(brokerList = "192.168.1.1:9092", retries = Integer.MAX_VALUE, + requestTimeoutMs = 1000L, props = Some(props)) + producer + } + private def sendTransactionalMessagesWithValueRange(producer: KafkaProducer[Array[Byte], Array[Byte]], topic: String, start: Int, end: Int, willBeCommitted: Boolean): Unit = { for (i <- start until end) { From 0c99432aeb36384a71c6227b4a87b42a07b6ce1d Mon Sep 17 00:00:00 2001 From: huxihx Date: Mon, 12 Mar 2018 11:26:59 +0800 Subject: [PATCH 3/8] address Jason's comments including: throw TimeoutException instead of FatorExitError and changed to a conditional loop --- .../kafka/clients/producer/KafkaProducer.java | 14 +++++++++----- .../kafka/clients/producer/internals/Sender.java | 3 +-- .../integration/kafka/api/TransactionsTest.scala | 13 ++++++------- 3 files changed, 16 insertions(+), 14 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index 047eee9c81bf7..df6350adc2be2 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -50,7 +50,6 @@ import org.apache.kafka.common.header.Headers; import org.apache.kafka.common.header.internals.RecordHeaders; import org.apache.kafka.common.internals.ClusterResourceListeners; -import org.apache.kafka.common.internals.FatalExitError; import org.apache.kafka.common.metrics.JmxReporter; import org.apache.kafka.common.metrics.MetricConfig; import org.apache.kafka.common.metrics.Metrics; @@ -562,6 +561,7 @@ private static int parseAcks(String acksString) { * @throws org.apache.kafka.common.errors.AuthorizationException fatal error indicating that the configured * transactional.id is not authorized. See the exception for more details * @throws KafkaException if the producer has encountered a previous fatal error or for any other unexpected error + * @throws TimeoutException if the time taken for initialize the transaction has surpassed max.block.ms. * @throws InterruptException if the thread is interrupted while blocked */ public void initTransactions() { @@ -569,13 +569,17 @@ public void initTransactions() { TransactionalRequestResult result = transactionManager.initializeTransactions(); sender.wakeup(); try { - if (!result.await(requestTimeoutMs, TimeUnit.MILLISECONDS)) { - transactionManager.transitionToFatalError( - new TimeoutException("Timeout expired while initializing the transaction in " + requestTimeoutMs + "ms.")); - throw new FatalExitError(); + if (!result.await(maxBlockTimeMs, TimeUnit.MILLISECONDS)) { + TimeoutException e = new TimeoutException("Timeout expired while initializing transactional state in " + + maxBlockTimeMs + "ms."); + transactionManager.transitionToFatalError(e); + throw e; } } catch (InterruptedException e) { throw new InterruptException("Initialize transactions interrupted.", e); + } catch (KafkaException e) { + transactionManager.transitionToFatalError(e); + throw e; } } diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index 1651a6d6e49af..426b273b88538 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -329,7 +329,7 @@ private boolean maybeSendTransactionalRequest(long now) { return false; AbstractRequest.Builder requestBuilder = nextRequestHandler.requestBuilder(); - while (true) { + while (running) { Node targetNode = null; try { if (nextRequestHandler.needsCoordinator()) { @@ -359,7 +359,6 @@ private boolean maybeSendTransactionalRequest(long now) { client.send(clientRequest, now); return true; } - break; // break the loop if we failed to find a specific node } catch (IOException e) { log.debug("Disconnect from {} while trying to send request {}. Going " + "to back off and retry", targetNode, requestBuilder); diff --git a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala index ef38aadeb63ec..ff05ed9d56e70 100644 --- a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala +++ b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala @@ -28,8 +28,7 @@ import kafka.utils.TestUtils.consumeRecords import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer, OffsetAndMetadata} import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord} import org.apache.kafka.common.{KafkaException, TopicPartition} -import org.apache.kafka.common.errors.ProducerFencedException -import org.apache.kafka.common.internals.FatalExitError +import org.apache.kafka.common.errors.{ProducerFencedException, TimeoutException} import org.apache.kafka.common.security.auth.SecurityProtocol import org.junit.{After, Before, Test} import org.junit.Assert._ @@ -533,12 +532,12 @@ class TransactionsTest extends KafkaServerTestHarness { } } - @Test(expected = classOf[FatalExitError]) + @Test(expected = classOf[TimeoutException]) def testInitTransactionFailWithBadBrokers() { val producer = createTransactionalProducerToConnectNonExistentBrokers() try { producer.initTransactions() - fail("should have raised a FatalExitError error since initializing the transaction expired") + fail("should have raised a TimeoutException since initializing the transaction expired") } finally { producer.close() // should successfully close the producer } @@ -550,7 +549,7 @@ class TransactionsTest extends KafkaServerTestHarness { try { producer.initTransactions() } catch { - case _: FatalExitError => // expected + case _: TimeoutException => // expected } // other transactional operations should not be allowed even when we capture the error after initTransaction failed try { @@ -563,8 +562,8 @@ class TransactionsTest extends KafkaServerTestHarness { private def createTransactionalProducerToConnectNonExistentBrokers(): KafkaProducer[Array[Byte], Array[Byte]] = { val props = new Properties() props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "test-transaction-id") - val producer = TestUtils.createNewProducer(brokerList = "192.168.1.1:9092", retries = Integer.MAX_VALUE, - requestTimeoutMs = 1000L, props = Some(props)) + val producer = TestUtils.createNewProducer(brokerList = "192.168.1.1:9092", maxBlockMs = 1000, + retries = Integer.MAX_VALUE, props = Some(props)) producer } From 9802bf34f1e8613164ffe2822ae245cb5cdb79db Mon Sep 17 00:00:00 2001 From: huxihx Date: Tue, 13 Mar 2018 14:05:04 +0800 Subject: [PATCH 4/8] addressed Jason's comments on caching result. --- .../kafka/clients/producer/KafkaProducer.java | 7 +--- .../internals/TransactionManager.java | 33 ++++++++++------- .../internals/TransactionManagerTest.java | 37 +++++++++++++++++++ 3 files changed, 58 insertions(+), 19 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index df6350adc2be2..31741c43ad610 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -570,16 +570,11 @@ public void initTransactions() { sender.wakeup(); try { if (!result.await(maxBlockTimeMs, TimeUnit.MILLISECONDS)) { - TimeoutException e = new TimeoutException("Timeout expired while initializing transactional state in " + + throw new TimeoutException("Timeout expired while initializing transactional state in " + maxBlockTimeMs + "ms."); - transactionManager.transitionToFatalError(e); - throw e; } } catch (InterruptedException e) { throw new InterruptException("Initialize transactions interrupted.", e); - } catch (KafkaException e) { - transactionManager.transitionToFatalError(e); - throw e; } } diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java index 38d09413f3d18..093286dd98bf7 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java @@ -51,6 +51,7 @@ import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; import java.util.Map; import java.util.PriorityQueue; import java.util.Set; @@ -111,6 +112,8 @@ public class TransactionManager { private int inFlightRequestCorrelationId = NO_INFLIGHT_REQUEST_CORRELATION_ID; private Node transactionCoordinator; private Node consumerGroupCoordinator; + // only visible for testing + TransactionalRequestResult transactionalRequestResult; private volatile State currentState = State.UNINITIALIZED; private volatile RuntimeException lastError = null; @@ -200,13 +203,16 @@ public int compare(TxnRequestHandler o1, TxnRequestHandler o2) { public synchronized TransactionalRequestResult initializeTransactions() { ensureTransactional(); - transitionTo(State.INITIALIZING); - setProducerIdAndEpoch(ProducerIdAndEpoch.NONE); - this.nextSequence.clear(); - InitProducerIdRequest.Builder builder = new InitProducerIdRequest.Builder(transactionalId, transactionTimeoutMs); - InitProducerIdHandler handler = new InitProducerIdHandler(builder); - enqueueRequest(handler); - return handler.result; + if (transactionalRequestResult == null || transactionalRequestResult.isCompleted()) { + transitionTo(State.INITIALIZING); + setProducerIdAndEpoch(ProducerIdAndEpoch.NONE); + this.nextSequence.clear(); + InitProducerIdRequest.Builder builder = new InitProducerIdRequest.Builder(transactionalId, transactionTimeoutMs); + InitProducerIdHandler handler = new InitProducerIdHandler(builder); + enqueueRequest(handler); + transactionalRequestResult = handler.result; + } + return transactionalRequestResult; } public synchronized void beginTransaction() { @@ -306,10 +312,6 @@ public boolean isTransactional() { return transactionalId != null; } - public synchronized void transitionToFatalError(RuntimeException exception) { - transitionTo(State.FATAL_ERROR, exception); - } - synchronized boolean hasPartitionsToAdd() { return !newPartitionsInTransaction.isEmpty() || !pendingPartitionsInTransaction.isEmpty(); } @@ -335,6 +337,10 @@ synchronized void transitionToAbortableError(RuntimeException exception) { transitionTo(State.ABORTABLE_ERROR, exception); } + synchronized void transitionToFatalError(RuntimeException exception) { + transitionTo(State.FATAL_ERROR, exception); + } + // visible for testing synchronized boolean isPartitionAdded(TopicPartition partition) { return partitionsInTransaction.contains(partition); @@ -568,14 +574,15 @@ synchronized boolean shouldResetProducerStateAfterResolvingSequences() { if (isTransactional()) // We should not reset producer state if we are transactional. We will transition to a fatal error instead. return false; - for (TopicPartition topicPartition : partitionsWithUnresolvedSequences) { + for (Iterator iter = partitionsWithUnresolvedSequences.iterator(); iter.hasNext(); ) { + TopicPartition topicPartition = iter.next(); if (!hasInflightBatches(topicPartition)) { // The partition has been fully drained. At this point, the last ack'd sequence should be once less than // next sequence destined for the partition. If so, the partition is fully resolved. If not, we should // reset the sequence number if necessary. if (isNextSequence(topicPartition, sequenceNumber(topicPartition))) { // This would happen when a batch was expired, but subsequent batches succeeded. - partitionsWithUnresolvedSequences.remove(topicPartition); + iter.remove(); } else { // We would enter this branch if all in flight batches were ultimately expired in the producer. log.info("No inflight batches remaining for {}, last ack'd sequence for partition is {}, next sequence is {}. " + diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java index fab139ab816c5..bcdf0eea1519d 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java @@ -2213,6 +2213,43 @@ public void testTransitionToFatalErrorWhenRetriedBatchIsExpired() throws Interru assertFalse(transactionManager.hasOngoingTransaction()); } + @Test + public void testShouldResetProducerStateAfterResolvingSequences() throws InterruptedException, ExecutionException { + // Create a TransactionManager without a transactionalId to test + // shouldResetProducerStateAfterResolvingSequences. + TransactionManager manager = new TransactionManager(logContext, null, transactionTimeoutMs, + DEFAULT_RETRY_BACKOFF_MS); + assertFalse(manager.shouldResetProducerStateAfterResolvingSequences()); + TopicPartition tp0 = new TopicPartition("foo", 0); + TopicPartition tp1 = new TopicPartition("foo", 1); + assertEquals(Integer.valueOf(0), manager.sequenceNumber(tp0)); + assertEquals(Integer.valueOf(0), manager.sequenceNumber(tp1)); + + manager.incrementSequenceNumber(tp0, 1); + manager.incrementSequenceNumber(tp1, 1); + manager.maybeUpdateLastAckedSequence(tp0, 0); + manager.maybeUpdateLastAckedSequence(tp1, 0); + manager.markSequenceUnresolved(tp0); + manager.markSequenceUnresolved(tp1); + assertFalse(manager.shouldResetProducerStateAfterResolvingSequences()); + + manager.maybeUpdateLastAckedSequence(tp0, 5); + manager.incrementSequenceNumber(tp0, 1); + manager.markSequenceUnresolved(tp0); + manager.markSequenceUnresolved(tp1); + assertTrue(manager.shouldResetProducerStateAfterResolvingSequences()); + } + + @Test + public void verifyInitializeTransactionsCacheRequestResult() { + assertNull(transactionManager.transactionalRequestResult); + transactionManager.initializeTransactions(); + assertNotNull(transactionManager.transactionalRequestResult); + assertFalse(transactionManager.transactionalRequestResult.isCompleted()); + TransactionalRequestResult result = transactionManager.initializeTransactions(); // should cache the incomplete result + assertEquals(result, transactionManager.transactionalRequestResult); + } + private void verifyAddPartitionsFailsWithPartitionLevelError(final Errors error) throws InterruptedException { final long pid = 1L; final short epoch = 1; From 3269b2613b9682e8a8e4469755bee625cff70d06 Mon Sep 17 00:00:00 2001 From: huxihx Date: Fri, 16 Mar 2018 17:54:41 +0800 Subject: [PATCH 5/8] addressed Jason's comments. --- .../kafka/clients/producer/KafkaProducer.java | 15 ++++++++++----- .../internals/TransactionManager.java | 19 +++++++------------ .../internals/TransactionManagerTest.java | 10 ---------- .../kafka/api/TransactionsTest.scala | 18 ++++++++++++++++-- 4 files changed, 33 insertions(+), 29 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index 31741c43ad610..3393d48c0755e 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -256,6 +256,7 @@ public class KafkaProducer implements Producer { private final ProducerInterceptors interceptors; private final ApiVersions apiVersions; private final TransactionManager transactionManager; + private TransactionalRequestResult transactionalRequestResult; /** * A producer is instantiated by providing a set of key-value pairs as configuration. Valid configuration strings @@ -566,12 +567,16 @@ private static int parseAcks(String acksString) { */ public void initTransactions() { throwIfNoTransactionManager(); - TransactionalRequestResult result = transactionManager.initializeTransactions(); - sender.wakeup(); + if (transactionalRequestResult == null) { + transactionalRequestResult = transactionManager.initializeTransactions(); + sender.wakeup(); + } + try { - if (!result.await(maxBlockTimeMs, TimeUnit.MILLISECONDS)) { - throw new TimeoutException("Timeout expired while initializing transactional state in " + - maxBlockTimeMs + "ms."); + if (transactionalRequestResult.await(maxBlockTimeMs, TimeUnit.MILLISECONDS)) { + transactionalRequestResult = null; + } else { + throw new TimeoutException("Timeout expired while initializing transactional state in " + maxBlockTimeMs + "ms."); } } catch (InterruptedException e) { throw new InterruptException("Initialize transactions interrupted.", e); diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java index 093286dd98bf7..b242d5a65a639 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java @@ -112,8 +112,6 @@ public class TransactionManager { private int inFlightRequestCorrelationId = NO_INFLIGHT_REQUEST_CORRELATION_ID; private Node transactionCoordinator; private Node consumerGroupCoordinator; - // only visible for testing - TransactionalRequestResult transactionalRequestResult; private volatile State currentState = State.UNINITIALIZED; private volatile RuntimeException lastError = null; @@ -203,16 +201,13 @@ public int compare(TxnRequestHandler o1, TxnRequestHandler o2) { public synchronized TransactionalRequestResult initializeTransactions() { ensureTransactional(); - if (transactionalRequestResult == null || transactionalRequestResult.isCompleted()) { - transitionTo(State.INITIALIZING); - setProducerIdAndEpoch(ProducerIdAndEpoch.NONE); - this.nextSequence.clear(); - InitProducerIdRequest.Builder builder = new InitProducerIdRequest.Builder(transactionalId, transactionTimeoutMs); - InitProducerIdHandler handler = new InitProducerIdHandler(builder); - enqueueRequest(handler); - transactionalRequestResult = handler.result; - } - return transactionalRequestResult; + transitionTo(State.INITIALIZING); + setProducerIdAndEpoch(ProducerIdAndEpoch.NONE); + this.nextSequence.clear(); + InitProducerIdRequest.Builder builder = new InitProducerIdRequest.Builder(transactionalId, transactionTimeoutMs); + InitProducerIdHandler handler = new InitProducerIdHandler(builder); + enqueueRequest(handler); + return handler.result; } public synchronized void beginTransaction() { diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java index bcdf0eea1519d..6fcf48059672a 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java @@ -2240,16 +2240,6 @@ public void testShouldResetProducerStateAfterResolvingSequences() throws Interru assertTrue(manager.shouldResetProducerStateAfterResolvingSequences()); } - @Test - public void verifyInitializeTransactionsCacheRequestResult() { - assertNull(transactionManager.transactionalRequestResult); - transactionManager.initializeTransactions(); - assertNotNull(transactionManager.transactionalRequestResult); - assertFalse(transactionManager.transactionalRequestResult.isCompleted()); - TransactionalRequestResult result = transactionManager.initializeTransactions(); // should cache the incomplete result - assertEquals(result, transactionManager.transactionalRequestResult); - } - private void verifyAddPartitionsFailsWithPartitionLevelError(final Errors error) throws InterruptedException { final long pid = 1L; final short epoch = 1; diff --git a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala index ff05ed9d56e70..8076b946d1434 100644 --- a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala +++ b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala @@ -535,11 +535,25 @@ class TransactionsTest extends KafkaServerTestHarness { @Test(expected = classOf[TimeoutException]) def testInitTransactionFailWithBadBrokers() { val producer = createTransactionalProducerToConnectNonExistentBrokers() + try { producer.initTransactions() - fail("should have raised a TimeoutException since initializing the transaction expired") + producer.beginTransaction() } finally { - producer.close() // should successfully close the producer + producer.close() + } + } + + @Test(expected = classOf[KafkaException]) + def testConsecutivelyRunInitTransactions(): Unit = { + val producer = createTransactionalProducer(transactionalId = "normalProducer") + + try { + producer.initTransactions() + producer.initTransactions() + fail("Should have raised a KafkaException") + } finally { + producer.close() } } From e0efabaf9b97dfd8a90496d9da96fdc96cf4c947 Mon Sep 17 00:00:00 2001 From: huxihx Date: Fri, 23 Mar 2018 09:53:03 +0800 Subject: [PATCH 6/8] addressed Jason's comments. --- .../kafka/clients/producer/KafkaProducer.java | 11 ++--- .../clients/producer/KafkaProducerTest.java | 35 ++++++++++++++++ .../kafka/api/TransactionsTest.scala | 40 +------------------ 3 files changed, 43 insertions(+), 43 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index 3393d48c0755e..a0c307d20ba9b 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -256,7 +256,7 @@ public class KafkaProducer implements Producer { private final ProducerInterceptors interceptors; private final ApiVersions apiVersions; private final TransactionManager transactionManager; - private TransactionalRequestResult transactionalRequestResult; + private TransactionalRequestResult initTransactionsResult; /** * A producer is instantiated by providing a set of key-value pairs as configuration. Valid configuration strings @@ -547,6 +547,7 @@ private static int parseAcks(String acksString) { /** * Needs to be called before any other methods when the transactional.id is set in the configuration. + * This method could be retried if a {@link TimeoutException} or {@link InterruptException} is raised. * * This method does the following: * 1. Ensures any transactions initiated by previous instances of the producer with the same @@ -567,14 +568,14 @@ private static int parseAcks(String acksString) { */ public void initTransactions() { throwIfNoTransactionManager(); - if (transactionalRequestResult == null) { - transactionalRequestResult = transactionManager.initializeTransactions(); + if (initTransactionsResult == null) { + initTransactionsResult = transactionManager.initializeTransactions(); sender.wakeup(); } try { - if (transactionalRequestResult.await(maxBlockTimeMs, TimeUnit.MILLISECONDS)) { - transactionalRequestResult = null; + if (initTransactionsResult.await(maxBlockTimeMs, TimeUnit.MILLISECONDS)) { + initTransactionsResult = null; } else { throw new TimeoutException("Timeout expired while initializing transactional state in " + maxBlockTimeMs + "ms."); } diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java index 9f70fd7e70834..4e9d0d4ee9b8a 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java @@ -548,4 +548,39 @@ public void testPartitionsForWithNullTopic() { // expected } } + + @Test(expected = TimeoutException.class) + public void testInitTransactionTimeout() { + Properties props = new Properties(); + props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "bad-transaction"); + props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 1000); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000"); + + try (Producer producer = new KafkaProducer<>(props, new ByteArraySerializer(), new ByteArraySerializer())) { + producer.initTransactions(); + producer.beginTransaction(); + } + } + + @Test(expected = KafkaException.class) + public void testOnlyCanExecuteCloseAfterFailInitTransaction() { + Properties props = new Properties(); + props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "bad-transaction"); + props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 1000); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000"); + + Producer producer = new KafkaProducer<>(props, new ByteArraySerializer(), new ByteArraySerializer()); + + try { + producer.initTransactions(); + } catch (TimeoutException e) { + // expected + } + // other transactional operations should not be allowed even when we capture the error after initTransaction failed + try { + producer.beginTransaction(); + } finally { + producer.close(); + } + } } diff --git a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala index 8076b946d1434..8435e5a3a6c15 100644 --- a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala +++ b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala @@ -26,9 +26,9 @@ import kafka.server.KafkaConfig import kafka.utils.TestUtils import kafka.utils.TestUtils.consumeRecords import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer, OffsetAndMetadata} -import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord} +import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} import org.apache.kafka.common.{KafkaException, TopicPartition} -import org.apache.kafka.common.errors.{ProducerFencedException, TimeoutException} +import org.apache.kafka.common.errors.ProducerFencedException import org.apache.kafka.common.security.auth.SecurityProtocol import org.junit.{After, Before, Test} import org.junit.Assert._ @@ -532,18 +532,6 @@ class TransactionsTest extends KafkaServerTestHarness { } } - @Test(expected = classOf[TimeoutException]) - def testInitTransactionFailWithBadBrokers() { - val producer = createTransactionalProducerToConnectNonExistentBrokers() - - try { - producer.initTransactions() - producer.beginTransaction() - } finally { - producer.close() - } - } - @Test(expected = classOf[KafkaException]) def testConsecutivelyRunInitTransactions(): Unit = { val producer = createTransactionalProducer(transactionalId = "normalProducer") @@ -557,30 +545,6 @@ class TransactionsTest extends KafkaServerTestHarness { } } - @Test(expected = classOf[KafkaException]) - def testOnlyCanExecuteCloseAfterFailInitTransaction(): Unit = { - val producer = createTransactionalProducerToConnectNonExistentBrokers() - try { - producer.initTransactions() - } catch { - case _: TimeoutException => // expected - } - // other transactional operations should not be allowed even when we capture the error after initTransaction failed - try { - producer.beginTransaction() - } finally { - producer.close() // should successfully close the producer - } - } - - private def createTransactionalProducerToConnectNonExistentBrokers(): KafkaProducer[Array[Byte], Array[Byte]] = { - val props = new Properties() - props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "test-transaction-id") - val producer = TestUtils.createNewProducer(brokerList = "192.168.1.1:9092", maxBlockMs = 1000, - retries = Integer.MAX_VALUE, props = Some(props)) - producer - } - private def sendTransactionalMessagesWithValueRange(producer: KafkaProducer[Array[Byte], Array[Byte]], topic: String, start: Int, end: Int, willBeCommitted: Boolean): Unit = { for (i <- start until end) { From dc2633de52871c86a57e979212b2dd9f23576708 Mon Sep 17 00:00:00 2001 From: Jason Gustafson Date: Mon, 26 Mar 2018 23:41:33 -0700 Subject: [PATCH 7/8] Minor tweaks of test cases in KafkaProducerTest --- .../clients/producer/KafkaProducerTest.java | 42 +++++++++++++++---- 1 file changed, 34 insertions(+), 8 deletions(-) diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java index 4e9d0d4ee9b8a..8bfc5e7d28a4c 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java @@ -553,34 +553,60 @@ public void testPartitionsForWithNullTopic() { public void testInitTransactionTimeout() { Properties props = new Properties(); props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "bad-transaction"); - props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 1000); + props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 5); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000"); - try (Producer producer = new KafkaProducer<>(props, new ByteArraySerializer(), new ByteArraySerializer())) { + Time time = new MockTime(); + Cluster cluster = TestUtils.singletonCluster("topic", 1); + Node node = cluster.nodes().get(0); + + Metadata metadata = new Metadata(0, Long.MAX_VALUE, true); + metadata.update(cluster, Collections.emptySet(), time.milliseconds()); + + MockClient client = new MockClient(time, metadata); + client.setNode(node); + + Producer producer = new KafkaProducer<>( + new ProducerConfig(ProducerConfig.addSerializerToConfig(props, new StringSerializer(), new StringSerializer())), + new StringSerializer(), new StringSerializer(), metadata, client); + try { producer.initTransactions(); - producer.beginTransaction(); + fail("initTransactions() should have raised TimeoutException"); + } finally { + producer.close(0, TimeUnit.MILLISECONDS); } } @Test(expected = KafkaException.class) - public void testOnlyCanExecuteCloseAfterFailInitTransaction() { + public void testOnlyCanExecuteCloseAfterInitTransactionsTimeout() { Properties props = new Properties(); props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "bad-transaction"); - props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 1000); + props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 5); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000"); - Producer producer = new KafkaProducer<>(props, new ByteArraySerializer(), new ByteArraySerializer()); + Time time = new MockTime(); + Cluster cluster = TestUtils.singletonCluster("topic", 1); + Node node = cluster.nodes().get(0); + Metadata metadata = new Metadata(0, Long.MAX_VALUE, true); + metadata.update(cluster, Collections.emptySet(), time.milliseconds()); + + MockClient client = new MockClient(time, metadata); + client.setNode(node); + + Producer producer = new KafkaProducer<>( + new ProducerConfig(ProducerConfig.addSerializerToConfig(props, new StringSerializer(), new StringSerializer())), + new StringSerializer(), new StringSerializer(), metadata, client); try { producer.initTransactions(); } catch (TimeoutException e) { // expected } - // other transactional operations should not be allowed even when we capture the error after initTransaction failed + // other transactional operations should not be allowed if we catch the error after initTransactions failed try { producer.beginTransaction(); } finally { - producer.close(); + producer.close(0, TimeUnit.MILLISECONDS); } } } From 754249e243260d32faea0ca9d27ff043563988c2 Mon Sep 17 00:00:00 2001 From: Jason Gustafson Date: Tue, 27 Mar 2018 08:59:17 -0700 Subject: [PATCH 8/8] Improve javadoc for initTransactions --- .../org/apache/kafka/clients/producer/KafkaProducer.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index a0c307d20ba9b..a5af5b60093d9 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -547,7 +547,6 @@ private static int parseAcks(String acksString) { /** * Needs to be called before any other methods when the transactional.id is set in the configuration. - * This method could be retried if a {@link TimeoutException} or {@link InterruptException} is raised. * * This method does the following: * 1. Ensures any transactions initiated by previous instances of the producer with the same @@ -557,6 +556,11 @@ private static int parseAcks(String acksString) { * 2. Gets the internal producer id and epoch, used in all future transactional * messages issued by the producer. * + * Note that this method will raise {@link TimeoutException} if the transactional state cannot + * be initialized before expiration of {@code max.block.ms}. Additionally, it will raise {@link InterruptException} + * if interrupted. It is safe to retry in either case, but once the transactional state has been successfully + * initialized, this method should no longer be used. + * * @throws IllegalStateException if no transactional.id has been configured * @throws org.apache.kafka.common.errors.UnsupportedVersionException fatal error indicating the broker * does not support transactions (i.e. if its version is lower than 0.11.0.0)