Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public class BufferPool {
private final Metrics metrics;
private final Time time;
private final Sensor waitTime;
private boolean closed;
private volatile boolean closed;

/**
* Create a new buffer pool
Expand Down Expand Up @@ -212,10 +212,15 @@ protected void recordWaitTime(long timeNs) {
/**
* Allocate a buffer. If buffer allocation fails (e.g. because of OOM) then return the size count back to
* available memory and signal the next waiter if it exists.
*
* if producer is closing, will throw close-exception
*/
private ByteBuffer safeAllocateByteBuffer(int size) {
boolean error = true;
try {
if (this.closed) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is not holding the lock so this pool can be closed anytime. How we know the "best" place to check the close flag?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@chia7712 thanks for your reviewed. This is what I thought when I wrote this code

In the PR of #3053, for efficiency, the operation of allocating memory is moved out of the locked code block.
Subsequent modifications should be based on this consensus, and do some time-consuming operations without locking.

So what we can do is to try our best to check whether the memory can be allocated before allocating, and reduce the operation of allocating memory, just like the 'fail-fast' mechanism.

In this scenario, both BufferPool#allocate() and BufferPool#close() compete for locks.
Here we can think that lock blocking is more likely to occur in concurrent cases.

Finally, even if the memory is allocated in the close state, there is a plan to release the memory. Therefore, this COMMIT is like an optimization rather than bug-fix

throw new KafkaException("Producer closed while allocating memory");
}
ByteBuffer buffer = allocateByteBuffer(size);
error = false;
return buffer;
Expand Down