Skip to content

Commit 917d695

Browse files
KAFKA-17019 Producer TimeoutException should include root cause (#20159)
### Changes - Add new Exception class `PotentialCauseException`. - All `org.apache.kafka.common.errors.TimeoutException` in `KafkaProducer` has `PotentialCauseException` as root cause if it cannot catch any exception. ### Describe `TimeoutException` can be thrown for various reasons. However, it is often difficult to identify the root cause, Because there are so many potential factors that can lead to a `TimeoutException`. For example: 1. The `ProducerClient` might be busy, so it may not be able to send the request in time. As a result, some batches may expire, leading to a `TimeoutException`. 2. The `broker` might be unavailable due to network issues or internal failures. 3. A request may be in flight, and although the broker successfully handles and responds to it, the response might arrive slightly late. As shown above, there are many possible causes. In some cases, no `exception` is caught in the `catch` block, and a `TimeoutException` is thrown simply by comparing the `elapsed time`. However, the developer using `TimeoutException` in `KafkaProducer` likely already knows which specific reasons could cause it in that context. Therefore, I think it would be helpful to include a `PotentialCauseException` that reflects the likely reason, based on the developer’s knowledge. Reviewers: TengYao Chi <kitingiao@gmail.com>, Yung <yungyung7654321@gmail.com>, Andrew Schofield <aschofield@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>
1 parent fd5d923 commit 917d695

File tree

8 files changed

+123
-64
lines changed

8 files changed

+123
-64
lines changed

clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -248,6 +248,18 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
248248
public static final String NETWORK_THREAD_PREFIX = "kafka-producer-network-thread";
249249
public static final String PRODUCER_METRIC_GROUP_NAME = "producer-metrics";
250250

251+
private static final String INIT_TXN_TIMEOUT_MSG = "InitTransactions timed out — " +
252+
"did not complete coordinator discovery or " +
253+
"receive the InitProducerId response within max.block.ms.";
254+
255+
private static final String SEND_OFFSETS_TIMEOUT_MSG =
256+
"SendOffsetsToTransaction timed out – did not reach the coordinator or " +
257+
"receive the TxnOffsetCommit/AddOffsetsToTxn response within max.block.ms";
258+
private static final String COMMIT_TXN_TIMEOUT_MSG =
259+
"CommitTransaction timed out – did not complete EndTxn with the transaction coordinator within max.block.ms";
260+
private static final String ABORT_TXN_TIMEOUT_MSG =
261+
"AbortTransaction timed out – did not complete EndTxn(abort) with the transaction coordinator within max.block.ms";
262+
251263
private final String clientId;
252264
// Visible for testing
253265
final Metrics metrics;
@@ -672,7 +684,7 @@ public void initTransactions(boolean keepPreparedTxn) {
672684
long now = time.nanoseconds();
673685
TransactionalRequestResult result = transactionManager.initializeTransactions(keepPreparedTxn);
674686
sender.wakeup();
675-
result.await(maxBlockTimeMs, TimeUnit.MILLISECONDS);
687+
result.await(maxBlockTimeMs, TimeUnit.MILLISECONDS, INIT_TXN_TIMEOUT_MSG);
676688
producerMetrics.recordInit(time.nanoseconds() - now);
677689
transactionManager.maybeUpdateTransactionV2Enabled(true);
678690
}
@@ -761,7 +773,7 @@ public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offs
761773
long start = time.nanoseconds();
762774
TransactionalRequestResult result = transactionManager.sendOffsetsToTransaction(offsets, groupMetadata);
763775
sender.wakeup();
764-
result.await(maxBlockTimeMs, TimeUnit.MILLISECONDS);
776+
result.await(maxBlockTimeMs, TimeUnit.MILLISECONDS, SEND_OFFSETS_TIMEOUT_MSG);
765777
producerMetrics.recordSendOffsets(time.nanoseconds() - start);
766778
}
767779
}
@@ -847,7 +859,7 @@ public void commitTransaction() throws ProducerFencedException {
847859
long commitStart = time.nanoseconds();
848860
TransactionalRequestResult result = transactionManager.beginCommit();
849861
sender.wakeup();
850-
result.await(maxBlockTimeMs, TimeUnit.MILLISECONDS);
862+
result.await(maxBlockTimeMs, TimeUnit.MILLISECONDS, COMMIT_TXN_TIMEOUT_MSG);
851863
producerMetrics.recordCommitTxn(time.nanoseconds() - commitStart);
852864
}
853865

@@ -882,7 +894,7 @@ public void abortTransaction() throws ProducerFencedException {
882894
long abortStart = time.nanoseconds();
883895
TransactionalRequestResult result = transactionManager.beginAbort();
884896
sender.wakeup();
885-
result.await(maxBlockTimeMs, TimeUnit.MILLISECONDS);
897+
result.await(maxBlockTimeMs, TimeUnit.MILLISECONDS, ABORT_TXN_TIMEOUT_MSG);
886898
producerMetrics.recordAbortTxn(time.nanoseconds() - abortStart);
887899
}
888900

clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -424,7 +424,7 @@ public Uuid clientInstanceId(Duration timeout) {
424424
if (injectTimeoutExceptionCounter > 0) {
425425
--injectTimeoutExceptionCounter;
426426
}
427-
throw new TimeoutException();
427+
throw new TimeoutException("TimeoutExceptions are successfully injected for test.");
428428
}
429429

430430
return clientInstanceId;

clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -414,7 +414,8 @@ private long sendProducerData(long now) {
414414
log.trace("Expired {} batches in accumulator", expiredBatches.size());
415415
for (ProducerBatch expiredBatch : expiredBatches) {
416416
String errorMessage = "Expiring " + expiredBatch.recordCount + " record(s) for " + expiredBatch.topicPartition
417-
+ ":" + (now - expiredBatch.createdMs) + " ms has passed since batch creation";
417+
+ ":" + (now - expiredBatch.createdMs) + " ms has passed since batch creation. "
418+
+ "The request has not been sent, or no server response has been received yet.";
418419
failBatch(expiredBatch, new TimeoutException(errorMessage), false);
419420
if (transactionManager != null && expiredBatch.inRetry()) {
420421
// This ensures that no new batches are drained until the current in flight batches are fully resolved.

clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionalRequestResult.java

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -47,16 +47,12 @@ public void done() {
4747
this.latch.countDown();
4848
}
4949

50-
public void await() {
51-
this.await(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
52-
}
53-
54-
public void await(long timeout, TimeUnit unit) {
50+
public void await(long timeout, TimeUnit unit, String expectedTimeoutReason) {
5551
try {
5652
boolean success = latch.await(timeout, unit);
5753
if (!success) {
5854
throw new TimeoutException("Timeout expired after " + unit.toMillis(timeout) +
59-
"ms while awaiting " + operation);
55+
"ms while awaiting " + operation + ". " + expectedTimeoutReason);
6056
}
6157

6258
isAcked = true;

clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,12 @@
174174
import static org.mockito.Mockito.when;
175175

176176
public class KafkaProducerTest {
177+
178+
private static final String INIT_TXN_TIMEOUT_MSG =
179+
"InitTransactions timed out — " +
180+
"did not complete coordinator discovery or " +
181+
"receive the InitProducerId response within max.block.ms.";
182+
177183
private final String topic = "topic";
178184
private final Collection<Node> nodes = Collections.singletonList(NODE);
179185
private final Cluster emptyCluster = new Cluster(
@@ -1322,7 +1328,7 @@ public void testInitTransactionsResponseAfterTimeout() throws Exception {
13221328
"Timed out while waiting for expected `InitProducerId` request to be sent");
13231329

13241330
time.sleep(maxBlockMs);
1325-
TestUtils.assertFutureThrows(TimeoutException.class, future);
1331+
TestUtils.assertFutureThrowsWithMessageContaining(TimeoutException.class, future, INIT_TXN_TIMEOUT_MSG);
13261332

13271333
client.respond(initProducerIdResponse(1L, (short) 5, Errors.NONE));
13281334

@@ -1352,7 +1358,8 @@ public void testInitTransactionTimeout() {
13521358
((FindCoordinatorRequest) request).data().keyType() == FindCoordinatorRequest.CoordinatorType.TRANSACTION.id(),
13531359
FindCoordinatorResponse.prepareResponse(Errors.NONE, "bad-transaction", NODE));
13541360

1355-
assertThrows(TimeoutException.class, producer::initTransactions);
1361+
var timeoutEx = assertThrows(TimeoutException.class, producer::initTransactions);
1362+
assertTrue(timeoutEx.getMessage().contains(INIT_TXN_TIMEOUT_MSG));
13561363

13571364
client.prepareResponse(
13581365
request -> request instanceof FindCoordinatorRequest &&
@@ -2364,7 +2371,8 @@ public void testOnlyCanExecuteCloseAfterInitTransactionsTimeout() {
23642371

23652372
Producer<String, String> producer = kafkaProducer(configs, new StringSerializer(), new StringSerializer(),
23662373
metadata, client, null, time);
2367-
assertThrows(TimeoutException.class, producer::initTransactions);
2374+
var timeoutEx1 = assertThrows(TimeoutException.class, producer::initTransactions);
2375+
assertTrue(timeoutEx1.getMessage().contains(INIT_TXN_TIMEOUT_MSG));
23682376
// other transactional operations should not be allowed if we catch the error after initTransactions failed
23692377
try {
23702378
assertThrows(IllegalStateException.class, producer::beginTransaction);

clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,7 @@ public class SenderTest {
163163
TOPIC_NAME, TOPIC_ID,
164164
"testSplitBatchAndSend", Uuid.fromString("2J9hK8m1wHMKjXfIkQyXx1")
165165
);
166+
private static final String SENDER_TIMEOUT_MSG = "The request has not been sent, or no server response has been received yet.";
166167
private final TopicPartition tp0 = new TopicPartition(TOPIC_NAME, 0);
167168
private final TopicPartition tp1 = new TopicPartition(TOPIC_NAME, 1);
168169
private final TopicPartition tp2 = new TopicPartition(TOPIC_NAME, 2);
@@ -425,6 +426,7 @@ public void setPartition(int partition) {
425426
@Override
426427
public void onCompletion(RecordMetadata metadata, Exception exception) {
427428
if (exception instanceof TimeoutException) {
429+
assertTrue(exception.getMessage().contains(SENDER_TIMEOUT_MSG));
428430
expiryCallbackCount.incrementAndGet();
429431
try {
430432
accumulator.append(tp1.topic(), tp1.partition(), 0L, key, value,
@@ -2792,7 +2794,7 @@ public void testRecordsFlushedImmediatelyOnTransactionCompletion() throws Except
27922794
runUntil(sender, txnManager::isReady);
27932795

27942796
assertTrue(commitResult.isSuccessful());
2795-
commitResult.await();
2797+
commitResult.await(Long.MAX_VALUE, TimeUnit.MILLISECONDS, "Unexpected Timed out for transaction commit to completed during the test.");
27962798

27972799
// Finally, we want to assert that the linger time is still effective
27982800
// when the new transaction begins.
@@ -2942,8 +2944,9 @@ public void testForceShutdownWithIncompleteTransaction() {
29422944

29432945
sender.forceClose();
29442946
sender.run();
2945-
assertThrows(KafkaException.class, commitResult::await,
2946-
"The test expected to throw a KafkaException for forcefully closing the sender");
2947+
assertThrows(KafkaException.class, () -> commitResult.await(Long.MAX_VALUE, TimeUnit.MILLISECONDS,
2948+
"The test expected to throw a KafkaException for forcefully closing the sender")
2949+
);
29472950
} finally {
29482951
m.close();
29492952
}
@@ -3154,7 +3157,7 @@ public void testReceiveFailedBatchTwiceWithTransactions() throws Exception {
31543157
assertTrue(txnManager::isReady);
31553158

31563159
assertTrue(result.isSuccessful());
3157-
result.await();
3160+
result.await(Long.MAX_VALUE, TimeUnit.MILLISECONDS, "Unexpected time out during the test.");
31583161

31593162
txnManager.beginTransaction();
31603163
}
@@ -3193,7 +3196,7 @@ public void testInvalidTxnStateIsAnAbortableError() throws Exception {
31933196
assertTrue(txnManager::isReady);
31943197

31953198
assertTrue(result.isSuccessful());
3196-
result.await();
3199+
result.await(Long.MAX_VALUE, TimeUnit.MILLISECONDS, "Unexpected time out during the test.");
31973200

31983201
txnManager.beginTransaction();
31993202
}
@@ -3232,7 +3235,7 @@ public void testTransactionAbortableExceptionIsAnAbortableError() throws Excepti
32323235
assertTrue(txnManager::isReady);
32333236

32343237
assertTrue(result.isSuccessful());
3235-
result.await();
3238+
result.await(Long.MAX_VALUE, TimeUnit.MILLISECONDS, "Unexpected time out during the test.");
32363239

32373240
txnManager.beginTransaction();
32383241
}
@@ -3264,7 +3267,7 @@ public void testAbortableErrorIsConvertedToFatalErrorDuringAbort() throws Except
32643267
TransactionalRequestResult commitResult = transactionManager.beginCommit();
32653268
sender.runOnce();
32663269
try {
3267-
commitResult.await(1000, TimeUnit.MILLISECONDS);
3270+
commitResult.await(1000, TimeUnit.MILLISECONDS, "Unexpected time out during the test.");
32683271
fail("Expected abortable error to be thrown for commit");
32693272
} catch (KafkaException e) {
32703273
assertTrue(transactionManager.hasAbortableError());
@@ -3281,7 +3284,7 @@ public void testAbortableErrorIsConvertedToFatalErrorDuringAbort() throws Except
32813284

32823285
// Verify the error is converted to KafkaException (not TransactionAbortableException)
32833286
try {
3284-
abortResult.await(1000, TimeUnit.MILLISECONDS);
3287+
abortResult.await(1000, TimeUnit.MILLISECONDS, "Unexpected time out during the test.");
32853288
fail("Expected KafkaException to be thrown");
32863289
} catch (KafkaException e) {
32873290
// Verify TM is in FATAL_ERROR state
@@ -3884,7 +3887,7 @@ private void doInitTransactions(TransactionManager transactionManager, ProducerI
38843887
prepareInitProducerResponse(Errors.NONE, producerIdAndEpoch.producerId, producerIdAndEpoch.epoch);
38853888
sender.runOnce();
38863889
assertTrue(transactionManager.hasProducerId());
3887-
result.await();
3890+
result.await(Long.MAX_VALUE, TimeUnit.MILLISECONDS, "Unexpected time out during the test.");
38883891
}
38893892

38903893
private void prepareFindCoordinatorResponse(Errors error, String txnid) {

0 commit comments

Comments
 (0)