diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java index ee84c7c168a4a..6085f1ca3f58d 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java @@ -56,7 +56,7 @@ public class BufferPool { private final Metrics metrics; private final Time time; private final Sensor waitTime; - private boolean closed; + private volatile boolean closed; /** * Create a new buffer pool @@ -212,10 +212,15 @@ protected void recordWaitTime(long timeNs) { /** * Allocate a buffer. If buffer allocation fails (e.g. because of OOM) then return the size count back to * available memory and signal the next waiter if it exists. + * + * if producer is closing, will throw close-exception */ private ByteBuffer safeAllocateByteBuffer(int size) { boolean error = true; try { + if (this.closed) { + throw new KafkaException("Producer closed while allocating memory"); + } ByteBuffer buffer = allocateByteBuffer(size); error = false; return buffer;