Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,17 @@
*/
package org.apache.kafka.clients.producer;

import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.errors.TimeoutException;

/**
* This exception is thrown if the producer is in non-blocking mode and the rate of data production exceeds the rate at
* which data can be sent for long enough for the allocated buffer to be exhausted.
* This exception is thrown if the producer cannot allocate memory for a record within max.block.ms due to the buffer
* being too full.
*
* In earlier versions a TimeoutException was thrown instead of this. To keep existing catch-clauses working
* this class extends TimeoutException.
*
*/
public class BufferExhaustedException extends KafkaException {
public class BufferExhaustedException extends TimeoutException {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add comment for this inheritance? this change is for keeping compatibility.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this for backwards compatibility? It seems to me that part of the reason is to allow the users to handle all timeout related type of exceptions with one catch clause.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this for backwards compatibility?

the client code catching TimeoutException to handle memory issue will be broken if BufferExhaustedException does not extend TimeoutException.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is a bit hen and egg I guess. The main reason for adding it in this case was compatibility (at least to my mind), but what you say is a very welcome side effect.
Happy to add a clarification to the comment of course if we feel this makes sense.


private static final long serialVersionUID = 1L;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -956,11 +956,6 @@ private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback call
this.errors.record();
this.interceptors.onSendError(record, tp, e);
throw new InterruptException(e);
} catch (BufferExhaustedException e) {
this.errors.record();
this.metrics.sensor("buffer-exhausted-records").record();
this.interceptors.onSendError(record, tp, e);
throw e;
} catch (KafkaException e) {
this.errors.record();
this.interceptors.onSendError(record, tp, e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

import org.apache.kafka.clients.producer.BufferExhaustedException;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Meter;
Expand Down Expand Up @@ -83,6 +83,12 @@ public BufferPool(long memory, int poolableSize, Metrics metrics, Time time, Str
MetricName totalMetricName = metrics.metricName("bufferpool-wait-time-total",
metricGrpName,
"The total time an appender waits for space allocation.");

Sensor bufferExhaustedRecordSensor = metrics.sensor("buffer-exhausted-records");
MetricName bufferExhaustedRateMetricName = metrics.metricName("buffer-exhausted-rate", metricGrpName, "The average per-second number of record sends that are dropped due to buffer exhaustion");
MetricName bufferExhaustedTotalMetricName = metrics.metricName("buffer-exhausted-total", metricGrpName, "The total number of record sends that are dropped due to buffer exhaustion");
bufferExhaustedRecordSensor.add(new Meter(bufferExhaustedRateMetricName, bufferExhaustedTotalMetricName));

this.waitTime.add(new Meter(TimeUnit.NANOSECONDS, rateMetricName, totalMetricName));
this.closed = false;
}
Expand Down Expand Up @@ -151,7 +157,8 @@ public ByteBuffer allocate(int size, long maxTimeToBlockMs) throws InterruptedEx
throw new KafkaException("Producer closed while allocating memory");

if (waitingTimeElapsed) {
throw new TimeoutException("Failed to allocate memory within the configured max blocking time " + maxTimeToBlockMs + " ms.");
this.metrics.sensor("buffer-exhausted-records").record();
throw new BufferExhaustedException("Failed to allocate memory within the configured max blocking time " + maxTimeToBlockMs + " ms.");
Comment thread
soenkeliebau marked this conversation as resolved.
}

remainingTimeToBlockNs -= timeNs;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,6 @@
import org.apache.kafka.common.metrics.Measurable;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Meter;
import org.apache.kafka.common.record.AbstractRecords;
import org.apache.kafka.common.record.CompressionRatioEstimator;
import org.apache.kafka.common.record.CompressionType;
Expand Down Expand Up @@ -160,11 +158,6 @@ public double measure(MetricConfig config, long now) {
}
};
metrics.addMetric(metricName, availableBytes);

Sensor bufferExhaustedRecordSensor = metrics.sensor("buffer-exhausted-records");
MetricName rateMetricName = metrics.metricName("buffer-exhausted-rate", metricGrpName, "The average per-second number of record sends that are dropped due to buffer exhaustion");
MetricName totalMetricName = metrics.metricName("buffer-exhausted-total", metricGrpName, "The total number of record sends that are dropped due to buffer exhaustion");
bufferExhaustedRecordSensor.add(new Meter(rateMetricName, totalMetricName));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
*/
package org.apache.kafka.clients.producer.internals;

import org.apache.kafka.clients.producer.BufferExhaustedException;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
Expand Down Expand Up @@ -152,8 +152,18 @@ private CountDownLatch asyncAllocate(final BufferPool pool, final int size) {
}

/**
* Test if Timeout exception is thrown when there is not enough memory to allocate and the elapsed time is greater than the max specified block time.
* And verify that the allocation attempt finishes soon after the maxBlockTimeMs.
* Test if BufferExhausted exception is thrown when there is not enough memory to allocate and the elapsed
* time is greater than the max specified block time.
*/
@Test(expected = BufferExhaustedException.class)
public void testBufferExhaustedExceptionIsThrown() throws Exception {
BufferPool pool = new BufferPool(2, 1, metrics, time, metricGroup);
pool.allocate(1, maxBlockTimeMs);
pool.allocate(2, maxBlockTimeMs);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This waits on real Timer. So waiting 2 secs in a unit test is too long. Perhaps try 10ms?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Happy to do that of course. The maxBlockTimeMs affects the entire class of tests though, I'd like to move that to a separate PR to make the change easier to trace in case tests become unstable due to this.

I ran a couple thousand tests with different values and for me testBlockTimeout became unstable with a value of 10ms - afaik can tell we are betting on a race condition in line 188 . We allocate three bytes, start delayed deallocations, wait a little and then hope that at least one deallocation took place by the time we check. Which worked for 2000 ms, but apparently breaks sometimes for 10 ms.
It is an easy fix by changing the condition to 7 instead of 8, but I'm not sure how much actual worth that assertion has after that.

Happy to discuss this further, but maybe we can first agree on if a new PR makes sense. I think it makes sense to separate this out tbh.

}

/**
* Verify that a failed allocation attempt due to not enough memory finishes soon after the maxBlockTimeMs.
*/
@Test
public void testBlockTimeout() throws Exception {
Expand All @@ -171,14 +181,14 @@ public void testBlockTimeout() throws Exception {
try {
pool.allocate(10, maxBlockTimeMs);
fail("The buffer allocated more memory than its maximum value 10");
} catch (TimeoutException e) {
} catch (BufferExhaustedException e) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to the above, perhaps reduce maxBlockTimeMs to 10ms?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks @junrao - I answered in your related comment above.

// this is good
}
// Thread scheduling sometimes means that deallocation varies by this point
assertTrue("available memory " + pool.availableMemory(), pool.availableMemory() >= 8 && pool.availableMemory() <= 10);
long durationMs = Time.SYSTEM.milliseconds() - beginTimeMs;
assertTrue("TimeoutException should not throw before maxBlockTimeMs", durationMs >= maxBlockTimeMs);
assertTrue("TimeoutException should throw soon after maxBlockTimeMs", durationMs < maxBlockTimeMs + 1000);
assertTrue("BufferExhaustedException should not throw before maxBlockTimeMs", durationMs >= maxBlockTimeMs);
assertTrue("BufferExhaustedException should throw soon after maxBlockTimeMs", durationMs < maxBlockTimeMs + 1000);
}

/**
Expand All @@ -191,7 +201,7 @@ public void testCleanupMemoryAvailabilityWaiterOnBlockTimeout() throws Exception
try {
pool.allocate(2, maxBlockTimeMs);
fail("The buffer allocated more memory than its maximum value 2");
} catch (TimeoutException e) {
} catch (BufferExhaustedException e) {
// this is good
}
assertEquals(0, pool.queued());
Expand Down Expand Up @@ -266,7 +276,7 @@ public void run() {
try {
pool.allocate(2, maxBlockTimeMs);
fail("The buffer allocated more memory than its maximum value 2");
} catch (TimeoutException e) {
} catch (BufferExhaustedException e) {
// this is good
} catch (InterruptedException e) {
// this can be neglected
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import java.util.concurrent.{ExecutionException, Future, TimeUnit}
import kafka.log.LogConfig
import kafka.server.Defaults
import kafka.utils.TestUtils
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord, RecordMetadata}
import org.apache.kafka.clients.producer.{BufferExhaustedException, KafkaProducer, ProducerConfig, ProducerRecord, RecordMetadata}
import org.apache.kafka.common.errors.{InvalidTimestampException, RecordTooLargeException, SerializationException, TimeoutException}
import org.apache.kafka.common.record.{DefaultRecord, DefaultRecordBatch, Records, TimestampType}
import org.apache.kafka.common.serialization.ByteArraySerializer
Expand Down Expand Up @@ -150,14 +150,19 @@ class PlaintextProducerSendTest extends BaseProducerSendTest {
assertTrue(s"Invalid offset $recordMetadata", recordMetadata.offset >= 0)
}

def verifySendFailure(future: Future[RecordMetadata]): Unit = {
def verifyMetadataNotAvailable(future: Future[RecordMetadata]): Unit = {
assertTrue(future.isDone) // verify future was completed immediately
assertEquals(classOf[TimeoutException], intercept[ExecutionException](future.get).getCause.getClass)
}

def verifyBufferExhausted(future: Future[RecordMetadata]): Unit = {
assertTrue(future.isDone) // verify future was completed immediately
assertEquals(classOf[BufferExhaustedException], intercept[ExecutionException](future.get).getCause.getClass)
}

// Topic metadata not available, send should fail without blocking
val producer = createProducer(brokerList = brokerList, maxBlockMs = 0)
verifySendFailure(send(producer))
verifyMetadataNotAvailable(send(producer))

// Test that send starts succeeding once metadata is available
val future = sendUntilQueued(producer)
Expand All @@ -167,7 +172,7 @@ class PlaintextProducerSendTest extends BaseProducerSendTest {
val producer2 = createProducer(brokerList = brokerList, maxBlockMs = 0,
lingerMs = 15000, batchSize = 1100, bufferSize = 1500)
val future2 = sendUntilQueued(producer2) // wait until metadata is available and one record is queued
verifySendFailure(send(producer2)) // should fail send since buffer is full
verifyBufferExhausted(send(producer2)) // should fail send since buffer is full
verifySendSuccess(future2) // previous batch should be completed and sent now
}

Expand Down