diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java index 22d747265d618..b49a7e2215f42 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java @@ -23,6 +23,7 @@ import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; +import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.MetricName; import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.metrics.Metrics; @@ -55,6 +56,7 @@ public class BufferPool { private final Metrics metrics; private final Time time; private final Sensor waitTime; + private boolean closed; /** * Create a new buffer pool @@ -82,6 +84,7 @@ public BufferPool(long memory, int poolableSize, Metrics metrics, Time time, Str metricGrpName, "The total time an appender waits for space allocation."); this.waitTime.add(new Meter(TimeUnit.NANOSECONDS, rateMetricName, totalMetricName)); + this.closed = false; } /** @@ -104,6 +107,12 @@ public ByteBuffer allocate(int size, long maxTimeToBlockMs) throws InterruptedEx ByteBuffer buffer = null; this.lock.lock(); + + if (this.closed) { + this.lock.unlock(); + throw new KafkaException("Producer closed while allocating memory"); + } + try { // check if we have a free buffer of the right size pooled if (size == poolableSize && !this.free.isEmpty()) @@ -138,6 +147,9 @@ public ByteBuffer allocate(int size, long maxTimeToBlockMs) throws InterruptedEx recordWaitTime(timeNs); } + if (this.closed) + throw new KafkaException("Producer closed while allocating memory"); + if (waitingTimeElapsed) { throw new TimeoutException("Failed to allocate memory within the configured max blocking time " + maxTimeToBlockMs + " ms."); } @@ -316,4 +328,19 @@ public long totalMemory() { Deque waiters() { return this.waiters; } + + /** + * Closes the buffer pool. Memory will be prevented from being allocated, but may be deallocated. All allocations + * awaiting available memory will be notified to abort. + */ + public void close() { + this.lock.lock(); + this.closed = true; + try { + for (Condition waiter : this.waiters) + waiter.signal(); + } finally { + this.lock.unlock(); + } + } } diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java index 80d0c5fcb9c65..58a5b3fdc8136 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java @@ -802,6 +802,7 @@ public void unmutePartition(TopicPartition tp, long throttleUntilTimeMs) { */ public void close() { this.closed = true; + this.free.close(); } /* diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java index 8e44fa18146d4..724d9d4df10f4 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.clients.producer.internals; +import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.utils.MockTime; @@ -28,7 +29,10 @@ import java.util.ArrayList; import java.util.Deque; import java.util.List; +import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executors; +import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -36,6 +40,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.ArgumentMatchers.anyLong; @@ -232,7 +237,7 @@ public void testCleanupMemoryAvailabilityWaiterOnInterruption() throws Exception // both the allocate() called by threads t1 and t2 should have been interrupted and the waiters queue should be empty assertEquals(pool.queued(), 0); } - + @Test public void testCleanupMemoryAvailabilityOnMetricsException() throws Exception { BufferPool bufferPool = spy(new BufferPool(2, 1, new Metrics(), time, metricGroup)); @@ -377,4 +382,48 @@ public void run() { } } + @Test + public void testCloseAllocations() throws Exception { + BufferPool pool = new BufferPool(10, 1, metrics, Time.SYSTEM, metricGroup); + ByteBuffer buffer = pool.allocate(1, maxBlockTimeMs); + + // Close the buffer pool. This should prevent any further allocations. + pool.close(); + + assertThrows(KafkaException.class, () -> pool.allocate(1, maxBlockTimeMs)); + + // Ensure deallocation still works. + pool.deallocate(buffer); + } + + @Test + public void testCloseNotifyWaiters() throws Exception { + final int numWorkers = 2; + + BufferPool pool = new BufferPool(1, 1, metrics, Time.SYSTEM, metricGroup); + ByteBuffer buffer = pool.allocate(1, Long.MAX_VALUE); + + CountDownLatch completed = new CountDownLatch(numWorkers); + ExecutorService executor = Executors.newFixedThreadPool(numWorkers); + Callable work = new Callable() { + public Void call() throws Exception { + assertThrows(KafkaException.class, () -> pool.allocate(1, Long.MAX_VALUE)); + completed.countDown(); + return null; + } + }; + for (int i = 0; i < numWorkers; ++i) { + executor.submit(work); + } + + assertEquals("Allocation shouldn't have happened yet, waiting on memory", numWorkers, completed.getCount()); + + // Close the buffer pool. This should notify all waiters. + pool.close(); + + completed.await(15, TimeUnit.SECONDS); + + pool.deallocate(buffer); + } + }