From 3c92c5786a0a1548607579a85b892412aeb90734 Mon Sep 17 00:00:00 2001 From: Brian Byrne Date: Wed, 15 Jan 2020 10:57:37 -0800 Subject: [PATCH 1/4] MINOR: Adds support for closing the producer's BufferPool. --- .../producer/internals/BufferPool.java | 27 +++++++++ .../producer/internals/RecordAccumulator.java | 1 + .../producer/internals/BufferPoolTest.java | 57 ++++++++++++++++++- 3 files changed, 84 insertions(+), 1 deletion(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java index 22d747265d618..b49a7e2215f42 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java @@ -23,6 +23,7 @@ import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; +import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.MetricName; import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.metrics.Metrics; @@ -55,6 +56,7 @@ public class BufferPool { private final Metrics metrics; private final Time time; private final Sensor waitTime; + private boolean closed; /** * Create a new buffer pool @@ -82,6 +84,7 @@ public BufferPool(long memory, int poolableSize, Metrics metrics, Time time, Str metricGrpName, "The total time an appender waits for space allocation."); this.waitTime.add(new Meter(TimeUnit.NANOSECONDS, rateMetricName, totalMetricName)); + this.closed = false; } /** @@ -104,6 +107,12 @@ public ByteBuffer allocate(int size, long maxTimeToBlockMs) throws InterruptedEx ByteBuffer buffer = null; this.lock.lock(); + + if (this.closed) { + this.lock.unlock(); + throw new KafkaException("Producer closed while allocating memory"); + } + try { // check if we have a free buffer of the right size pooled if (size == poolableSize && !this.free.isEmpty()) @@ -138,6 +147,9 @@ public ByteBuffer allocate(int size, long maxTimeToBlockMs) throws InterruptedEx recordWaitTime(timeNs); } + if (this.closed) + throw new KafkaException("Producer closed while allocating memory"); + if (waitingTimeElapsed) { throw new TimeoutException("Failed to allocate memory within the configured max blocking time " + maxTimeToBlockMs + " ms."); } @@ -316,4 +328,19 @@ public long totalMemory() { Deque waiters() { return this.waiters; } + + /** + * Closes the buffer pool. Memory will be prevented from being allocated, but may be deallocated. All allocations + * awaiting available memory will be notified to abort. + */ + public void close() { + this.lock.lock(); + this.closed = true; + try { + for (Condition waiter : this.waiters) + waiter.signal(); + } finally { + this.lock.unlock(); + } + } } diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java index 80d0c5fcb9c65..58a5b3fdc8136 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java @@ -802,6 +802,7 @@ public void unmutePartition(TopicPartition tp, long throttleUntilTimeMs) { */ public void close() { this.closed = true; + this.free.close(); } /* diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java index 8e44fa18146d4..2420166262d2d 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.clients.producer.internals; +import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.utils.MockTime; @@ -232,7 +233,7 @@ public void testCleanupMemoryAvailabilityWaiterOnInterruption() throws Exception // both the allocate() called by threads t1 and t2 should have been interrupted and the waiters queue should be empty assertEquals(pool.queued(), 0); } - + @Test public void testCleanupMemoryAvailabilityOnMetricsException() throws Exception { BufferPool bufferPool = spy(new BufferPool(2, 1, new Metrics(), time, metricGroup)); @@ -377,4 +378,58 @@ public void run() { } } + @Test + public void testCloseAllocations() throws Exception { + BufferPool pool = new BufferPool(10, 1, metrics, Time.SYSTEM, metricGroup); + ByteBuffer buffer = pool.allocate(1, maxBlockTimeMs); + + // Close the buffer pool. This should prevent any further allocations. + pool.close(); + + try { + pool.allocate(1, maxBlockTimeMs); + fail("Should have thrown KafkaException"); + } catch (KafkaException e) { + // Expected. + } + + // Ensure deallocation still works. + pool.deallocate(buffer); + } + + @Test + public void testCloseNotifyWaiters() throws Exception { + BufferPool pool = new BufferPool(10, 1, metrics, Time.SYSTEM, metricGroup); + ByteBuffer buffer = pool.allocate(10, Long.MAX_VALUE); + + CountDownLatch waiter1 = asyncAllocateClose(pool, 10); + CountDownLatch waiter2 = asyncAllocateClose(pool, 10); + + assertEquals("Allocation shouldn't have happened yet, waiting on memory", 2L, waiter1.getCount() + waiter2.getCount()); + + // Close the buffer pool. This should notify all waiters. + pool.close(); + + assertTrue("Allocation should fail soon after close", waiter1.await(1, TimeUnit.SECONDS) && waiter2.await(1, TimeUnit.SECONDS)); + + pool.deallocate(buffer); + } + + private CountDownLatch asyncAllocateClose(final BufferPool pool, final int size) { + final CountDownLatch completed = new CountDownLatch(1); + Thread thread = new Thread() { + public void run() { + try { + pool.allocate(size, maxBlockTimeMs); + fail("Unexpected allocation"); + } catch (KafkaException e) { + completed.countDown(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + }; + thread.start(); + return completed; + } } From caed6c2ff8713f875e805ba1b947fdb37083ffb8 Mon Sep 17 00:00:00 2001 From: Brian Byrne Date: Fri, 17 Jan 2020 09:57:42 -0800 Subject: [PATCH 2/4] Address review comments. --- .../producer/internals/BufferPoolTest.java | 53 ++++++++----------- 1 file changed, 23 insertions(+), 30 deletions(-) diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java index 2420166262d2d..7408288a92940 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java @@ -29,7 +29,11 @@ import java.util.ArrayList; import java.util.Deque; import java.util.List; +import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executors; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -37,6 +41,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.ArgumentMatchers.anyLong; @@ -386,12 +391,7 @@ public void testCloseAllocations() throws Exception { // Close the buffer pool. This should prevent any further allocations. pool.close(); - try { - pool.allocate(1, maxBlockTimeMs); - fail("Should have thrown KafkaException"); - } catch (KafkaException e) { - // Expected. - } + assertThrows(KafkaException.class, () -> pool.allocate(1, maxBlockTimeMs)); // Ensure deallocation still works. pool.deallocate(buffer); @@ -399,37 +399,30 @@ public void testCloseAllocations() throws Exception { @Test public void testCloseNotifyWaiters() throws Exception { - BufferPool pool = new BufferPool(10, 1, metrics, Time.SYSTEM, metricGroup); - ByteBuffer buffer = pool.allocate(10, Long.MAX_VALUE); - - CountDownLatch waiter1 = asyncAllocateClose(pool, 10); - CountDownLatch waiter2 = asyncAllocateClose(pool, 10); + BufferPool pool = new BufferPool(1, 1, metrics, Time.SYSTEM, metricGroup); + ByteBuffer buffer = pool.allocate(1, Long.MAX_VALUE); + + CountDownLatch completed = new CountDownLatch(2); + ExecutorService executor = Executors.newFixedThreadPool(2); + Callable work = new Callable() { + public Void call() throws Exception { + assertThrows(KafkaException.class, () -> pool.allocate(1, maxBlockTimeMs)); + completed.countDown(); + return null; + } + }; + Future waiter1 = executor.submit(work); + Future waiter2 = executor.submit(work); - assertEquals("Allocation shouldn't have happened yet, waiting on memory", 2L, waiter1.getCount() + waiter2.getCount()); + assertEquals("Allocation shouldn't have happened yet, waiting on memory", 2L, completed.getCount()); // Close the buffer pool. This should notify all waiters. pool.close(); - assertTrue("Allocation should fail soon after close", waiter1.await(1, TimeUnit.SECONDS) && waiter2.await(1, TimeUnit.SECONDS)); + waiter1.get(200, TimeUnit.MILLISECONDS); + waiter2.get(200, TimeUnit.MILLISECONDS); pool.deallocate(buffer); } - private CountDownLatch asyncAllocateClose(final BufferPool pool, final int size) { - final CountDownLatch completed = new CountDownLatch(1); - Thread thread = new Thread() { - public void run() { - try { - pool.allocate(size, maxBlockTimeMs); - fail("Unexpected allocation"); - } catch (KafkaException e) { - completed.countDown(); - } catch (InterruptedException e) { - e.printStackTrace(); - } - } - }; - thread.start(); - return completed; - } } From 5c7f6760cc9180f207858200656c902c164954ed Mon Sep 17 00:00:00 2001 From: Brian Byrne Date: Fri, 17 Jan 2020 10:04:29 -0800 Subject: [PATCH 3/4] Simplify test. --- .../producer/internals/BufferPoolTest.java | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java index 7408288a92940..6c18965496d1d 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java @@ -33,7 +33,6 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executors; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -399,11 +398,13 @@ public void testCloseAllocations() throws Exception { @Test public void testCloseNotifyWaiters() throws Exception { + final int numWorkers = 2; + BufferPool pool = new BufferPool(1, 1, metrics, Time.SYSTEM, metricGroup); ByteBuffer buffer = pool.allocate(1, Long.MAX_VALUE); - CountDownLatch completed = new CountDownLatch(2); - ExecutorService executor = Executors.newFixedThreadPool(2); + CountDownLatch completed = new CountDownLatch(numWorkers); + ExecutorService executor = Executors.newFixedThreadPool(numWorkers); Callable work = new Callable() { public Void call() throws Exception { assertThrows(KafkaException.class, () -> pool.allocate(1, maxBlockTimeMs)); @@ -411,16 +412,16 @@ public Void call() throws Exception { return null; } }; - Future waiter1 = executor.submit(work); - Future waiter2 = executor.submit(work); + for (int i = 0; i < numWorkers; ++i) { + executor.submit(work); + } - assertEquals("Allocation shouldn't have happened yet, waiting on memory", 2L, completed.getCount()); + assertEquals("Allocation shouldn't have happened yet, waiting on memory", numWorkers, completed.getCount()); // Close the buffer pool. This should notify all waiters. pool.close(); - waiter1.get(200, TimeUnit.MILLISECONDS); - waiter2.get(200, TimeUnit.MILLISECONDS); + completed.await(200, TimeUnit.MILLISECONDS); pool.deallocate(buffer); } From 4c63d11596d51b9ddd8907fc23eefdb1f0588306 Mon Sep 17 00:00:00 2001 From: Brian Byrne Date: Fri, 17 Jan 2020 10:23:35 -0800 Subject: [PATCH 4/4] Increase timeouts. --- .../kafka/clients/producer/internals/BufferPoolTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java index 6c18965496d1d..724d9d4df10f4 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java @@ -407,7 +407,7 @@ public void testCloseNotifyWaiters() throws Exception { ExecutorService executor = Executors.newFixedThreadPool(numWorkers); Callable work = new Callable() { public Void call() throws Exception { - assertThrows(KafkaException.class, () -> pool.allocate(1, maxBlockTimeMs)); + assertThrows(KafkaException.class, () -> pool.allocate(1, Long.MAX_VALUE)); completed.countDown(); return null; } @@ -421,7 +421,7 @@ public Void call() throws Exception { // Close the buffer pool. This should notify all waiters. pool.close(); - completed.await(200, TimeUnit.MILLISECONDS); + completed.await(15, TimeUnit.SECONDS); pool.deallocate(buffer); }