Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
private final ProducerInterceptors<K, V> interceptors;
private final ApiVersions apiVersions;
private final TransactionManager transactionManager;
private TransactionalRequestResult initTransactionsResult;

/**
* A producer is instantiated by providing a set of key-value pairs as configuration. Valid configuration strings
Expand Down Expand Up @@ -555,18 +556,36 @@ private static int parseAcks(String acksString) {
* 2. Gets the internal producer id and epoch, used in all future transactional
* messages issued by the producer.
*
* Note that this method will raise {@link TimeoutException} if the transactional state cannot
* be initialized before expiration of {@code max.block.ms}. Additionally, it will raise {@link InterruptException}
* if interrupted. It is safe to retry in either case, but once the transactional state has been successfully
* initialized, this method should no longer be used.
*
* @throws IllegalStateException if no transactional.id has been configured
* @throws org.apache.kafka.common.errors.UnsupportedVersionException fatal error indicating the broker
* does not support transactions (i.e. if its version is lower than 0.11.0.0)
* @throws org.apache.kafka.common.errors.AuthorizationException fatal error indicating that the configured
* transactional.id is not authorized. See the exception for more details
* @throws KafkaException if the producer has encountered a previous fatal error or for any other unexpected error
* @throws TimeoutException if the time taken for initialize the transaction has surpassed <code>max.block.ms</code>.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a comment to the javadoc mentioning that this method may be retried if a TimeoutException or an InterruptException is raised?

* @throws InterruptException if the thread is interrupted while blocked
*/
public void initTransactions() {
throwIfNoTransactionManager();
TransactionalRequestResult result = transactionManager.initializeTransactions();
sender.wakeup();
result.await();
if (initTransactionsResult == null) {
initTransactionsResult = transactionManager.initializeTransactions();
sender.wakeup();
}

try {
if (initTransactionsResult.await(maxBlockTimeMs, TimeUnit.MILLISECONDS)) {
initTransactionsResult = null;
} else {
throw new TimeoutException("Timeout expired while initializing transactional state in " + maxBlockTimeMs + "ms.");
}
} catch (InterruptedException e) {
throw new InterruptException("Initialize transactions interrupted.", e);
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ private boolean maybeSendTransactionalRequest(long now) {
return false;

AbstractRequest.Builder<?> requestBuilder = nextRequestHandler.requestBuilder();
while (true) {
while (running) {
Node targetNode = null;
try {
if (nextRequestHandler.needsCoordinator()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,10 @@ public void await() {
}

public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
return latch.await(timeout, unit);
boolean success = latch.await(timeout, unit);
if (!isSuccessful())
throw error();
return success;
}

public RuntimeException error() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -548,4 +548,65 @@ public void testPartitionsForWithNullTopic() {
// expected
}
}

@Test(expected = TimeoutException.class)
public void testInitTransactionTimeout() {
Properties props = new Properties();
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "bad-transaction");
props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 5);
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");

Time time = new MockTime();
Cluster cluster = TestUtils.singletonCluster("topic", 1);
Node node = cluster.nodes().get(0);

Metadata metadata = new Metadata(0, Long.MAX_VALUE, true);
metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());

MockClient client = new MockClient(time, metadata);
client.setNode(node);

Producer<String, String> producer = new KafkaProducer<>(
new ProducerConfig(ProducerConfig.addSerializerToConfig(props, new StringSerializer(), new StringSerializer())),
new StringSerializer(), new StringSerializer(), metadata, client);
try {
producer.initTransactions();
fail("initTransactions() should have raised TimeoutException");
} finally {
producer.close(0, TimeUnit.MILLISECONDS);
}
}

@Test(expected = KafkaException.class)
public void testOnlyCanExecuteCloseAfterInitTransactionsTimeout() {
Properties props = new Properties();
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "bad-transaction");
props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 5);
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");

Time time = new MockTime();
Cluster cluster = TestUtils.singletonCluster("topic", 1);
Node node = cluster.nodes().get(0);

Metadata metadata = new Metadata(0, Long.MAX_VALUE, true);
metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());

MockClient client = new MockClient(time, metadata);
client.setNode(node);

Producer<String, String> producer = new KafkaProducer<>(
new ProducerConfig(ProducerConfig.addSerializerToConfig(props, new StringSerializer(), new StringSerializer())),
new StringSerializer(), new StringSerializer(), metadata, client);
try {
producer.initTransactions();
} catch (TimeoutException e) {
// expected
}
// other transactional operations should not be allowed if we catch the error after initTransactions failed
try {
producer.beginTransaction();
} finally {
producer.close(0, TimeUnit.MILLISECONDS);
}
}
}
17 changes: 15 additions & 2 deletions core/src/test/scala/integration/kafka/api/TransactionsTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,15 @@ package kafka.api

import java.lang.{Long => JLong}
import java.util.Properties
import java.util.concurrent.{ExecutionException, TimeUnit}
import java.util.concurrent.TimeUnit

import kafka.integration.KafkaServerTestHarness
import kafka.server.KafkaConfig
import kafka.utils.TestUtils
import kafka.utils.TestUtils.consumeRecords
import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer, OffsetAndMetadata}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.{KafkaException, TopicPartition}
import org.apache.kafka.common.errors.ProducerFencedException
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.junit.{After, Before, Test}
Expand Down Expand Up @@ -532,6 +532,19 @@ class TransactionsTest extends KafkaServerTestHarness {
}
}

@Test(expected = classOf[KafkaException])
def testConsecutivelyRunInitTransactions(): Unit = {
val producer = createTransactionalProducer(transactionalId = "normalProducer")

try {
producer.initTransactions()
producer.initTransactions()
fail("Should have raised a KafkaException")
} finally {
producer.close()
}
}

private def sendTransactionalMessagesWithValueRange(producer: KafkaProducer[Array[Byte], Array[Byte]], topic: String,
start: Int, end: Int, willBeCommitted: Boolean): Unit = {
for (i <- start until end) {
Expand Down