From c47e504f5550d37060f59fbb4cde356d55b383a9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9D=8E=E9=92=8A?= <3060507060@qq.com> Date: Sun, 3 Jan 2021 14:49:18 +0800 Subject: [PATCH] MINOR: Producer's BufferPool closing check According to the idea of #7967, it made BufferPool support closing Now, there are two places in the BufferPool#allocate() method to judge the 'closed' flag. One is when 'lock' is acquired, and the other is when 'condition' is awakened However, if the memory is allocated outside the lock code block after freeup, Therefore, it is possible for another thread to modify the 'closed' flag, causing this part of memory to be allocated in vain So two modifications have been made, One is to add the 'volatile' modifier before 'closed', One is to determine whether to 'closed' before allocating memory --- .../kafka/clients/producer/internals/BufferPool.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java index ee84c7c168a4a..6085f1ca3f58d 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java @@ -56,7 +56,7 @@ public class BufferPool { private final Metrics metrics; private final Time time; private final Sensor waitTime; - private boolean closed; + private volatile boolean closed; /** * Create a new buffer pool @@ -212,10 +212,15 @@ protected void recordWaitTime(long timeNs) { /** * Allocate a buffer. If buffer allocation fails (e.g. because of OOM) then return the size count back to * available memory and signal the next waiter if it exists. + * + * if producer is closing, will throw close-exception */ private ByteBuffer safeAllocateByteBuffer(int size) { boolean error = true; try { + if (this.closed) { + throw new KafkaException("Producer closed while allocating memory"); + } ByteBuffer buffer = allocateByteBuffer(size); error = false; return buffer;