From e6fd25ee9378cc47f0cd0e9e5633abb2fbb33fc0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6nke=20Liebau?= Date: Tue, 31 Mar 2020 15:08:51 +0200 Subject: [PATCH 1/4] KAFKA-3720: Change TimeoutException to BufferExhaustedException and increase buffer-exhausted-records metric when no memory can be allocated for a record withhin max.block.ms. --- .../producer/BufferExhaustedException.java | 4 +-- .../kafka/clients/producer/KafkaProducer.java | 7 ++--- .../producer/internals/BufferPool.java | 4 +-- .../producer/internals/BufferPoolTest.java | 26 +++++++++++++------ 4 files changed, 24 insertions(+), 17 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/BufferExhaustedException.java b/clients/src/main/java/org/apache/kafka/clients/producer/BufferExhaustedException.java index be840db282fac..862ede716f070 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/BufferExhaustedException.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/BufferExhaustedException.java @@ -16,13 +16,13 @@ */ package org.apache.kafka.clients.producer; -import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.errors.TimeoutException; /** * This exception is thrown if the producer is in non-blocking mode and the rate of data production exceeds the rate at * which data can be sent for long enough for the allocated buffer to be exhausted. */ -public class BufferExhaustedException extends KafkaException { +public class BufferExhaustedException extends TimeoutException { private static final long serialVersionUID = 1L; diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index b68330f32e0ff..3041320a26852 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -947,6 +947,8 @@ private Future doSend(ProducerRecord record, Callback call // for other exceptions throw directly } catch (ApiException e) { log.debug("Exception occurred during message send:", e); + if (e instanceof BufferExhaustedException) + this.metrics.sensor("buffer-exhausted-records").record(); if (callback != null) callback.onCompletion(null, e); this.errors.record(); @@ -956,11 +958,6 @@ private Future doSend(ProducerRecord record, Callback call this.errors.record(); this.interceptors.onSendError(record, tp, e); throw new InterruptException(e); - } catch (BufferExhaustedException e) { - this.errors.record(); - this.metrics.sensor("buffer-exhausted-records").record(); - this.interceptors.onSendError(record, tp, e); - throw e; } catch (KafkaException e) { this.errors.record(); this.interceptors.onSendError(record, tp, e); diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java index b49a7e2215f42..87b0d9e82775c 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java @@ -23,9 +23,9 @@ import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; +import org.apache.kafka.clients.producer.BufferExhaustedException; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.MetricName; -import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.metrics.stats.Meter; @@ -151,7 +151,7 @@ public ByteBuffer allocate(int size, long maxTimeToBlockMs) throws InterruptedEx throw new KafkaException("Producer closed while allocating memory"); if (waitingTimeElapsed) { - throw new TimeoutException("Failed to allocate memory within the configured max blocking time " + maxTimeToBlockMs + " ms."); + throw new BufferExhaustedException("Failed to allocate memory within the configured max blocking time " + maxTimeToBlockMs + " ms."); } remainingTimeToBlockNs -= timeNs; diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java index 9eee28468ac3c..68c70a4b03593 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java @@ -16,8 +16,8 @@ */ package org.apache.kafka.clients.producer.internals; +import org.apache.kafka.clients.producer.BufferExhaustedException; import org.apache.kafka.common.KafkaException; -import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Time; @@ -152,8 +152,18 @@ private CountDownLatch asyncAllocate(final BufferPool pool, final int size) { } /** - * Test if Timeout exception is thrown when there is not enough memory to allocate and the elapsed time is greater than the max specified block time. - * And verify that the allocation attempt finishes soon after the maxBlockTimeMs. + * Test if BufferExhausted exception is thrown when there is not enough memory to allocate and the elapsed + * time is greater than the max specified block time. + */ + @Test(expected = BufferExhaustedException.class) + public void testBufferExhaustedExceptionIsThrown() throws Exception { + BufferPool pool = new BufferPool(2, 1, metrics, time, metricGroup); + pool.allocate(1, maxBlockTimeMs); + pool.allocate(2, maxBlockTimeMs); + } + + /** + * Verify that a failed allocation attempt due to not enough memory finishes soon after the maxBlockTimeMs. */ @Test public void testBlockTimeout() throws Exception { @@ -171,14 +181,14 @@ public void testBlockTimeout() throws Exception { try { pool.allocate(10, maxBlockTimeMs); fail("The buffer allocated more memory than its maximum value 10"); - } catch (TimeoutException e) { + } catch (BufferExhaustedException e) { // this is good } // Thread scheduling sometimes means that deallocation varies by this point assertTrue("available memory " + pool.availableMemory(), pool.availableMemory() >= 8 && pool.availableMemory() <= 10); long durationMs = Time.SYSTEM.milliseconds() - beginTimeMs; - assertTrue("TimeoutException should not throw before maxBlockTimeMs", durationMs >= maxBlockTimeMs); - assertTrue("TimeoutException should throw soon after maxBlockTimeMs", durationMs < maxBlockTimeMs + 1000); + assertTrue("BufferExhaustedException should not throw before maxBlockTimeMs", durationMs >= maxBlockTimeMs); + assertTrue("BufferExhaustedException should throw soon after maxBlockTimeMs", durationMs < maxBlockTimeMs + 1000); } /** @@ -191,7 +201,7 @@ public void testCleanupMemoryAvailabilityWaiterOnBlockTimeout() throws Exception try { pool.allocate(2, maxBlockTimeMs); fail("The buffer allocated more memory than its maximum value 2"); - } catch (TimeoutException e) { + } catch (BufferExhaustedException e) { // this is good } assertEquals(0, pool.queued()); @@ -266,7 +276,7 @@ public void run() { try { pool.allocate(2, maxBlockTimeMs); fail("The buffer allocated more memory than its maximum value 2"); - } catch (TimeoutException e) { + } catch (BufferExhaustedException e) { // this is good } catch (InterruptedException e) { // this can be neglected From 928b45d326e9e5bedc0efed48cb2fa905adef8d8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6nke=20Liebau?= Date: Wed, 1 Apr 2020 10:32:28 +0200 Subject: [PATCH 2/4] Added comment on why BufferExhaustedException subclasses TimeoutException. --- .../kafka/clients/producer/BufferExhaustedException.java | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/BufferExhaustedException.java b/clients/src/main/java/org/apache/kafka/clients/producer/BufferExhaustedException.java index 862ede716f070..292bb4eef1c19 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/BufferExhaustedException.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/BufferExhaustedException.java @@ -19,8 +19,12 @@ import org.apache.kafka.common.errors.TimeoutException; /** - * This exception is thrown if the producer is in non-blocking mode and the rate of data production exceeds the rate at - * which data can be sent for long enough for the allocated buffer to be exhausted. + * This exception is thrown if the producer cannot allocate memory for a record within max.block.ms due to the buffer + * being too full. + * + * In earlier versions a TimeoutException was thrown instead of this. To keep existing catch-clauses working + * this class extends TimeoutException. + * */ public class BufferExhaustedException extends TimeoutException { From 714731e8d401ef2f1bc12952a5856a534d3d7089 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6nke=20Liebau?= Date: Wed, 1 Apr 2020 13:22:14 +0200 Subject: [PATCH 3/4] Fixed broken test. Had to refactor slightly, as verifySendFailure was used to test two different failure scenarios that both used to throw TimeoutException, but now one of them throws BufferExhaustedException. --- .../kafka/api/PlaintextProducerSendTest.scala | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala index bc2e2939f3579..bcd2f7f8737b6 100644 --- a/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala +++ b/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala @@ -23,7 +23,7 @@ import java.util.concurrent.{ExecutionException, Future, TimeUnit} import kafka.log.LogConfig import kafka.server.Defaults import kafka.utils.TestUtils -import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord, RecordMetadata} +import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord, RecordMetadata, BufferExhaustedException} import org.apache.kafka.common.errors.{InvalidTimestampException, RecordTooLargeException, SerializationException, TimeoutException} import org.apache.kafka.common.record.{DefaultRecord, DefaultRecordBatch, Records, TimestampType} import org.apache.kafka.common.serialization.ByteArraySerializer @@ -150,14 +150,19 @@ class PlaintextProducerSendTest extends BaseProducerSendTest { assertTrue(s"Invalid offset $recordMetadata", recordMetadata.offset >= 0) } - def verifySendFailure(future: Future[RecordMetadata]): Unit = { + def verifyMetadataNotAvailable(future: Future[RecordMetadata]): Unit = { assertTrue(future.isDone) // verify future was completed immediately assertEquals(classOf[TimeoutException], intercept[ExecutionException](future.get).getCause.getClass) } + def verifyBufferExhausted(future: Future[RecordMetadata]): Unit = { + assertTrue(future.isDone) // verify future was completed immediately + assertEquals(classOf[BufferExhaustedException], intercept[ExecutionException](future.get).getCause.getClass) + } + // Topic metadata not available, send should fail without blocking val producer = createProducer(brokerList = brokerList, maxBlockMs = 0) - verifySendFailure(send(producer)) + verifyMetadataNotAvailable(send(producer)) // Test that send starts succeeding once metadata is available val future = sendUntilQueued(producer) @@ -167,7 +172,7 @@ class PlaintextProducerSendTest extends BaseProducerSendTest { val producer2 = createProducer(brokerList = brokerList, maxBlockMs = 0, lingerMs = 15000, batchSize = 1100, bufferSize = 1500) val future2 = sendUntilQueued(producer2) // wait until metadata is available and one record is queued - verifySendFailure(send(producer2)) // should fail send since buffer is full + verifyBufferExhausted(send(producer2)) // should fail send since buffer is full verifySendSuccess(future2) // previous batch should be completed and sent now } From 8fba83628f3c0f1f094d11ec2f8dfa38d0f7c615 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6nke=20Liebau?= Date: Sat, 4 Apr 2020 00:26:41 +0200 Subject: [PATCH 4/4] Addressed comments from Jun: moved metrics recording to BufferPool, where the exception originates. Reordered imports in PlaintextProducerSendTest. --- .../org/apache/kafka/clients/producer/KafkaProducer.java | 2 -- .../kafka/clients/producer/internals/BufferPool.java | 7 +++++++ .../clients/producer/internals/RecordAccumulator.java | 7 ------- .../integration/kafka/api/PlaintextProducerSendTest.scala | 2 +- 4 files changed, 8 insertions(+), 10 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index 3041320a26852..24fe98b178e7e 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -947,8 +947,6 @@ private Future doSend(ProducerRecord record, Callback call // for other exceptions throw directly } catch (ApiException e) { log.debug("Exception occurred during message send:", e); - if (e instanceof BufferExhaustedException) - this.metrics.sensor("buffer-exhausted-records").record(); if (callback != null) callback.onCompletion(null, e); this.errors.record(); diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java index 87b0d9e82775c..ee84c7c168a4a 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java @@ -83,6 +83,12 @@ public BufferPool(long memory, int poolableSize, Metrics metrics, Time time, Str MetricName totalMetricName = metrics.metricName("bufferpool-wait-time-total", metricGrpName, "The total time an appender waits for space allocation."); + + Sensor bufferExhaustedRecordSensor = metrics.sensor("buffer-exhausted-records"); + MetricName bufferExhaustedRateMetricName = metrics.metricName("buffer-exhausted-rate", metricGrpName, "The average per-second number of record sends that are dropped due to buffer exhaustion"); + MetricName bufferExhaustedTotalMetricName = metrics.metricName("buffer-exhausted-total", metricGrpName, "The total number of record sends that are dropped due to buffer exhaustion"); + bufferExhaustedRecordSensor.add(new Meter(bufferExhaustedRateMetricName, bufferExhaustedTotalMetricName)); + this.waitTime.add(new Meter(TimeUnit.NANOSECONDS, rateMetricName, totalMetricName)); this.closed = false; } @@ -151,6 +157,7 @@ public ByteBuffer allocate(int size, long maxTimeToBlockMs) throws InterruptedEx throw new KafkaException("Producer closed while allocating memory"); if (waitingTimeElapsed) { + this.metrics.sensor("buffer-exhausted-records").record(); throw new BufferExhaustedException("Failed to allocate memory within the configured max blocking time " + maxTimeToBlockMs + " ms."); } diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java index 41e8d58635563..67b50c1718122 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java @@ -43,8 +43,6 @@ import org.apache.kafka.common.metrics.Measurable; import org.apache.kafka.common.metrics.MetricConfig; import org.apache.kafka.common.metrics.Metrics; -import org.apache.kafka.common.metrics.Sensor; -import org.apache.kafka.common.metrics.stats.Meter; import org.apache.kafka.common.record.AbstractRecords; import org.apache.kafka.common.record.CompressionRatioEstimator; import org.apache.kafka.common.record.CompressionType; @@ -160,11 +158,6 @@ public double measure(MetricConfig config, long now) { } }; metrics.addMetric(metricName, availableBytes); - - Sensor bufferExhaustedRecordSensor = metrics.sensor("buffer-exhausted-records"); - MetricName rateMetricName = metrics.metricName("buffer-exhausted-rate", metricGrpName, "The average per-second number of record sends that are dropped due to buffer exhaustion"); - MetricName totalMetricName = metrics.metricName("buffer-exhausted-total", metricGrpName, "The total number of record sends that are dropped due to buffer exhaustion"); - bufferExhaustedRecordSensor.add(new Meter(rateMetricName, totalMetricName)); } /** diff --git a/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala index bcd2f7f8737b6..961b0f3ad3825 100644 --- a/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala +++ b/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala @@ -23,7 +23,7 @@ import java.util.concurrent.{ExecutionException, Future, TimeUnit} import kafka.log.LogConfig import kafka.server.Defaults import kafka.utils.TestUtils -import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord, RecordMetadata, BufferExhaustedException} +import org.apache.kafka.clients.producer.{BufferExhaustedException, KafkaProducer, ProducerConfig, ProducerRecord, RecordMetadata} import org.apache.kafka.common.errors.{InvalidTimestampException, RecordTooLargeException, SerializationException, TimeoutException} import org.apache.kafka.common.record.{DefaultRecord, DefaultRecordBatch, Records, TimestampType} import org.apache.kafka.common.serialization.ByteArraySerializer