diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/BufferExhaustedException.java b/clients/src/main/java/org/apache/kafka/clients/producer/BufferExhaustedException.java index be840db282fac..292bb4eef1c19 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/BufferExhaustedException.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/BufferExhaustedException.java @@ -16,13 +16,17 @@ */ package org.apache.kafka.clients.producer; -import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.errors.TimeoutException; /** - * This exception is thrown if the producer is in non-blocking mode and the rate of data production exceeds the rate at - * which data can be sent for long enough for the allocated buffer to be exhausted. + * This exception is thrown if the producer cannot allocate memory for a record within max.block.ms due to the buffer + * being too full. + * + * In earlier versions a TimeoutException was thrown instead of this. To keep existing catch-clauses working + * this class extends TimeoutException. + * */ -public class BufferExhaustedException extends KafkaException { +public class BufferExhaustedException extends TimeoutException { private static final long serialVersionUID = 1L; diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index b68330f32e0ff..24fe98b178e7e 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -956,11 +956,6 @@ private Future doSend(ProducerRecord record, Callback call this.errors.record(); this.interceptors.onSendError(record, tp, e); throw new InterruptException(e); - } catch (BufferExhaustedException e) { - this.errors.record(); - this.metrics.sensor("buffer-exhausted-records").record(); - this.interceptors.onSendError(record, tp, e); - throw e; } catch (KafkaException e) { this.errors.record(); this.interceptors.onSendError(record, tp, e); diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java index b49a7e2215f42..ee84c7c168a4a 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java @@ -23,9 +23,9 @@ import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; +import org.apache.kafka.clients.producer.BufferExhaustedException; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.MetricName; -import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.metrics.stats.Meter; @@ -83,6 +83,12 @@ public BufferPool(long memory, int poolableSize, Metrics metrics, Time time, Str MetricName totalMetricName = metrics.metricName("bufferpool-wait-time-total", metricGrpName, "The total time an appender waits for space allocation."); + + Sensor bufferExhaustedRecordSensor = metrics.sensor("buffer-exhausted-records"); + MetricName bufferExhaustedRateMetricName = metrics.metricName("buffer-exhausted-rate", metricGrpName, "The average per-second number of record sends that are dropped due to buffer exhaustion"); + MetricName bufferExhaustedTotalMetricName = metrics.metricName("buffer-exhausted-total", metricGrpName, "The total number of record sends that are dropped due to buffer exhaustion"); + bufferExhaustedRecordSensor.add(new Meter(bufferExhaustedRateMetricName, bufferExhaustedTotalMetricName)); + this.waitTime.add(new Meter(TimeUnit.NANOSECONDS, rateMetricName, totalMetricName)); this.closed = false; } @@ -151,7 +157,8 @@ public ByteBuffer allocate(int size, long maxTimeToBlockMs) throws InterruptedEx throw new KafkaException("Producer closed while allocating memory"); if (waitingTimeElapsed) { - throw new TimeoutException("Failed to allocate memory within the configured max blocking time " + maxTimeToBlockMs + " ms."); + this.metrics.sensor("buffer-exhausted-records").record(); + throw new BufferExhaustedException("Failed to allocate memory within the configured max blocking time " + maxTimeToBlockMs + " ms."); } remainingTimeToBlockNs -= timeNs; diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java index 41e8d58635563..67b50c1718122 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java @@ -43,8 +43,6 @@ import org.apache.kafka.common.metrics.Measurable; import org.apache.kafka.common.metrics.MetricConfig; import org.apache.kafka.common.metrics.Metrics; -import org.apache.kafka.common.metrics.Sensor; -import org.apache.kafka.common.metrics.stats.Meter; import org.apache.kafka.common.record.AbstractRecords; import org.apache.kafka.common.record.CompressionRatioEstimator; import org.apache.kafka.common.record.CompressionType; @@ -160,11 +158,6 @@ public double measure(MetricConfig config, long now) { } }; metrics.addMetric(metricName, availableBytes); - - Sensor bufferExhaustedRecordSensor = metrics.sensor("buffer-exhausted-records"); - MetricName rateMetricName = metrics.metricName("buffer-exhausted-rate", metricGrpName, "The average per-second number of record sends that are dropped due to buffer exhaustion"); - MetricName totalMetricName = metrics.metricName("buffer-exhausted-total", metricGrpName, "The total number of record sends that are dropped due to buffer exhaustion"); - bufferExhaustedRecordSensor.add(new Meter(rateMetricName, totalMetricName)); } /** diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java index 9eee28468ac3c..68c70a4b03593 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java @@ -16,8 +16,8 @@ */ package org.apache.kafka.clients.producer.internals; +import org.apache.kafka.clients.producer.BufferExhaustedException; import org.apache.kafka.common.KafkaException; -import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Time; @@ -152,8 +152,18 @@ private CountDownLatch asyncAllocate(final BufferPool pool, final int size) { } /** - * Test if Timeout exception is thrown when there is not enough memory to allocate and the elapsed time is greater than the max specified block time. - * And verify that the allocation attempt finishes soon after the maxBlockTimeMs. + * Test if BufferExhausted exception is thrown when there is not enough memory to allocate and the elapsed + * time is greater than the max specified block time. + */ + @Test(expected = BufferExhaustedException.class) + public void testBufferExhaustedExceptionIsThrown() throws Exception { + BufferPool pool = new BufferPool(2, 1, metrics, time, metricGroup); + pool.allocate(1, maxBlockTimeMs); + pool.allocate(2, maxBlockTimeMs); + } + + /** + * Verify that a failed allocation attempt due to not enough memory finishes soon after the maxBlockTimeMs. */ @Test public void testBlockTimeout() throws Exception { @@ -171,14 +181,14 @@ public void testBlockTimeout() throws Exception { try { pool.allocate(10, maxBlockTimeMs); fail("The buffer allocated more memory than its maximum value 10"); - } catch (TimeoutException e) { + } catch (BufferExhaustedException e) { // this is good } // Thread scheduling sometimes means that deallocation varies by this point assertTrue("available memory " + pool.availableMemory(), pool.availableMemory() >= 8 && pool.availableMemory() <= 10); long durationMs = Time.SYSTEM.milliseconds() - beginTimeMs; - assertTrue("TimeoutException should not throw before maxBlockTimeMs", durationMs >= maxBlockTimeMs); - assertTrue("TimeoutException should throw soon after maxBlockTimeMs", durationMs < maxBlockTimeMs + 1000); + assertTrue("BufferExhaustedException should not throw before maxBlockTimeMs", durationMs >= maxBlockTimeMs); + assertTrue("BufferExhaustedException should throw soon after maxBlockTimeMs", durationMs < maxBlockTimeMs + 1000); } /** @@ -191,7 +201,7 @@ public void testCleanupMemoryAvailabilityWaiterOnBlockTimeout() throws Exception try { pool.allocate(2, maxBlockTimeMs); fail("The buffer allocated more memory than its maximum value 2"); - } catch (TimeoutException e) { + } catch (BufferExhaustedException e) { // this is good } assertEquals(0, pool.queued()); @@ -266,7 +276,7 @@ public void run() { try { pool.allocate(2, maxBlockTimeMs); fail("The buffer allocated more memory than its maximum value 2"); - } catch (TimeoutException e) { + } catch (BufferExhaustedException e) { // this is good } catch (InterruptedException e) { // this can be neglected diff --git a/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala index bc2e2939f3579..961b0f3ad3825 100644 --- a/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala +++ b/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala @@ -23,7 +23,7 @@ import java.util.concurrent.{ExecutionException, Future, TimeUnit} import kafka.log.LogConfig import kafka.server.Defaults import kafka.utils.TestUtils -import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord, RecordMetadata} +import org.apache.kafka.clients.producer.{BufferExhaustedException, KafkaProducer, ProducerConfig, ProducerRecord, RecordMetadata} import org.apache.kafka.common.errors.{InvalidTimestampException, RecordTooLargeException, SerializationException, TimeoutException} import org.apache.kafka.common.record.{DefaultRecord, DefaultRecordBatch, Records, TimestampType} import org.apache.kafka.common.serialization.ByteArraySerializer @@ -150,14 +150,19 @@ class PlaintextProducerSendTest extends BaseProducerSendTest { assertTrue(s"Invalid offset $recordMetadata", recordMetadata.offset >= 0) } - def verifySendFailure(future: Future[RecordMetadata]): Unit = { + def verifyMetadataNotAvailable(future: Future[RecordMetadata]): Unit = { assertTrue(future.isDone) // verify future was completed immediately assertEquals(classOf[TimeoutException], intercept[ExecutionException](future.get).getCause.getClass) } + def verifyBufferExhausted(future: Future[RecordMetadata]): Unit = { + assertTrue(future.isDone) // verify future was completed immediately + assertEquals(classOf[BufferExhaustedException], intercept[ExecutionException](future.get).getCause.getClass) + } + // Topic metadata not available, send should fail without blocking val producer = createProducer(brokerList = brokerList, maxBlockMs = 0) - verifySendFailure(send(producer)) + verifyMetadataNotAvailable(send(producer)) // Test that send starts succeeding once metadata is available val future = sendUntilQueued(producer) @@ -167,7 +172,7 @@ class PlaintextProducerSendTest extends BaseProducerSendTest { val producer2 = createProducer(brokerList = brokerList, maxBlockMs = 0, lingerMs = 15000, batchSize = 1100, bufferSize = 1500) val future2 = sendUntilQueued(producer2) // wait until metadata is available and one record is queued - verifySendFailure(send(producer2)) // should fail send since buffer is full + verifyBufferExhausted(send(producer2)) // should fail send since buffer is full verifySendSuccess(future2) // previous batch should be completed and sent now }