Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions common/src/main/java/io/druid/collections/BlockingPool.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package io.druid.collections;

import javax.annotation.Nullable;
import java.util.List;

public interface BlockingPool<T>
Expand All @@ -33,6 +34,7 @@ public interface BlockingPool<T>
*
* @return a resource, or null if the timeout was reached
*/
@Nullable
ReferenceCountingResourceHolder<T> take(long timeoutMs);

/**
Expand All @@ -49,16 +51,16 @@ public interface BlockingPool<T>
* @param elementNum number of resources to take
* @param timeoutMs maximum time to wait for resources, in milliseconds.
*
* @return a resource, or null if the timeout was reached
* @return a list of resource holders. An empty list is returned if {@code elementNum} resources aren't available.
*/
ReferenceCountingResourceHolder<List<T>> takeBatch(int elementNum, long timeoutMs);
List<ReferenceCountingResourceHolder<T>> takeBatch(int elementNum, long timeoutMs);

/**
* Take resources from the pool, waiting if necessary until the elements of the given number become available.
*
* @param elementNum number of resources to take
*
* @return a resource
* @return a list of resource holders. An empty list is returned if {@code elementNum} resources aren't available.
*/
ReferenceCountingResourceHolder<List<T>> takeBatch(int elementNum);
List<ReferenceCountingResourceHolder<T>> takeBatch(int elementNum);
}
81 changes: 23 additions & 58 deletions common/src/main/java/io/druid/collections/DefaultBlockingPool.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,17 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import io.druid.java.util.common.ISE;

import java.io.Closeable;
import javax.annotation.Nullable;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;

/**
* Pool that pre-generates objects up to a limit, then permits possibly-blocking "take" operations.
Expand Down Expand Up @@ -74,6 +75,7 @@ public int getPoolSize()
}

@Override
@Nullable
public ReferenceCountingResourceHolder<T> take(final long timeoutMs)
{
Preconditions.checkArgument(timeoutMs >= 0, "timeoutMs must be a non-negative value, but was [%s]", timeoutMs);
Expand All @@ -82,7 +84,7 @@ public ReferenceCountingResourceHolder<T> take(final long timeoutMs)
return wrapObject(timeoutMs > 0 ? pollObject(timeoutMs) : pollObject());
}
catch (InterruptedException e) {
throw Throwables.propagate(e);
throw new RuntimeException(e);
}
}

Expand All @@ -94,25 +96,20 @@ public ReferenceCountingResourceHolder<T> take()
return wrapObject(takeObject());
}
catch (InterruptedException e) {
throw Throwables.propagate(e);
throw new RuntimeException(e);
}
}

@Nullable
private ReferenceCountingResourceHolder<T> wrapObject(T theObject)
{
return theObject == null ? null : new ReferenceCountingResourceHolder<>(
theObject,
new Closeable()
{
@Override
public void close()
{
offer(theObject);
}
}
() -> offer(theObject)
);
}

@Nullable
private T pollObject()
{
final ReentrantLock lock = this.lock;
Expand All @@ -125,6 +122,7 @@ private T pollObject()
}
}

@Nullable
private T pollObject(long timeoutMs) throws InterruptedException
{
long nanos = TIME_UNIT.toNanos(timeoutMs);
Expand Down Expand Up @@ -160,53 +158,39 @@ private T takeObject() throws InterruptedException
}

@Override
public ReferenceCountingResourceHolder<List<T>> takeBatch(final int elementNum, final long timeoutMs)
public List<ReferenceCountingResourceHolder<T>> takeBatch(final int elementNum, final long timeoutMs)
{
Preconditions.checkArgument(timeoutMs >= 0, "timeoutMs must be a non-negative value, but was [%s]", timeoutMs);
checkInitialized();
try {
return wrapObjects(timeoutMs > 0 ? pollObjects(elementNum, timeoutMs) : pollObjects(elementNum));
final List<T> objects = timeoutMs > 0 ? pollObjects(elementNum, timeoutMs) : pollObjects(elementNum);
return objects.stream().map(this::wrapObject).collect(Collectors.toList());
}
catch (InterruptedException e) {
throw Throwables.propagate(e);
throw new RuntimeException(e);
}
}

@Override
public ReferenceCountingResourceHolder<List<T>> takeBatch(final int elementNum)
public List<ReferenceCountingResourceHolder<T>> takeBatch(final int elementNum)
{
checkInitialized();
try {
return wrapObjects(takeObjects(elementNum));
return takeObjects(elementNum).stream().map(this::wrapObject).collect(Collectors.toList());
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this has a buffer leak (and it looks like the old code had the leak too, and so does pollObjects). If either pollObjects or takeObjects is interrupted while it's waiting for more objects to become available, then the objects popped from objects are not returned to the pool - they are lost.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would you elaborate more on how resource leak occurs on interruption?

The implementation of takeBatch is

  private List<T> takeObjects(int elementNum) throws InterruptedException
  {
    final List<T> list = new ArrayList<>(elementNum);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
      while (objects.size() < elementNum) {
        notEnough.await();
      }
      for (int i = 0; i < elementNum; i++) {
        list.add(objects.pop());
      }
      return list;
    }
    finally {
      lock.unlock();
    }
  }

and InterruptedException can be thrown at lock.lockInterruptibly() and notEnough.await(). list.add(objects.pop()) is called only when there are enough number of available objects.

wrapObject() also doesn't check the interruption state, so objects should be wrapped once takeObjects() returns them.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, you're right, as long as nothing involved checks interrupts: list.add, objects.pop, wrapObject, etc. It looks like that is the case so there is no leak. Nevermind.

}
catch (InterruptedException e) {
throw Throwables.propagate(e);
throw new RuntimeException(e);
}
}

private ReferenceCountingResourceHolder<List<T>> wrapObjects(List<T> theObjects)
{
return theObjects == null ? null : new ReferenceCountingResourceHolder<>(
theObjects,
new Closeable()
{
@Override
public void close()
{
offerBatch(theObjects);
}
}
);
}

private List<T> pollObjects(int elementNum) throws InterruptedException
{
final List<T> list = Lists.newArrayListWithCapacity(elementNum);
final List<T> list = new ArrayList<>(elementNum);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
if (objects.size() < elementNum) {
return null;
return Collections.emptyList();
} else {
for (int i = 0; i < elementNum; i++) {
list.add(objects.pop());
Expand All @@ -222,13 +206,13 @@ private List<T> pollObjects(int elementNum) throws InterruptedException
private List<T> pollObjects(int elementNum, long timeoutMs) throws InterruptedException
{
long nanos = TIME_UNIT.toNanos(timeoutMs);
final List<T> list = Lists.newArrayListWithCapacity(elementNum);
final List<T> list = new ArrayList<>(elementNum);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (objects.size() < elementNum) {
if (nanos <= 0) {
return null;
return Collections.emptyList();
}
nanos = notEnough.awaitNanos(nanos);
}
Expand All @@ -244,7 +228,7 @@ private List<T> pollObjects(int elementNum, long timeoutMs) throws InterruptedEx

private List<T> takeObjects(int elementNum) throws InterruptedException
{
final List<T> list = Lists.newArrayListWithCapacity(elementNum);
final List<T> list = new ArrayList<>(elementNum);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
Expand Down Expand Up @@ -282,23 +266,4 @@ private void offer(T theObject)
lock.unlock();
}
}

private void offerBatch(List<T> offers)
{
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (objects.size() + offers.size() <= maxSize) {
for (T offer : offers) {
objects.push(offer);
}
notEnough.signal();
} else {
throw new ISE("Cannot exceed pre-configured maximum size");
}
}
finally {
lock.unlock();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,13 @@ public ReferenceCountingResourceHolder<T> take()
}

@Override
public ReferenceCountingResourceHolder<List<T>> takeBatch(int elementNum, long timeoutMs)
public List<ReferenceCountingResourceHolder<T>> takeBatch(int elementNum, long timeoutMs)
{
throw new UnsupportedOperationException();
}

@Override
public ReferenceCountingResourceHolder<List<T>> takeBatch(int elementNum)
public List<ReferenceCountingResourceHolder<T>> takeBatch(int elementNum)
{
throw new UnsupportedOperationException();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public static long leakedResources()
@SuppressWarnings("unused")
private final Cleaner cleaner;

ReferenceCountingResourceHolder(final T object, final Closeable closer)
public ReferenceCountingResourceHolder(final T object, final Closeable closer)
{
this.object = object;
this.closer = closer;
Expand All @@ -64,6 +64,10 @@ public static <T extends Closeable> ReferenceCountingResourceHolder<T> fromClose
return new ReferenceCountingResourceHolder<>(object, object);
}

/**
* Returns the resource with an initial reference count of 1. More references can be added by
* calling {@link #increment()}.
*/
@Override
public T get()
{
Expand All @@ -73,6 +77,13 @@ public T get()
return object;
}

/**
* Increments the reference count by 1 and returns a {@link Releaser}. The returned {@link Releaser} is used to
* decrement the reference count when the caller no longer needs the resource.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should include wording like:

Releasers are not thread-safe. If multiple threads need references to the same holder, they should
each acquire their own Releaser.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added.

*
* {@link Releaser}s are not thread-safe. If multiple threads need references to the same holder, they should
* each acquire their own {@link Releaser}.
*/
public Releaser increment()
{
while (true) {
Expand Down Expand Up @@ -103,6 +114,9 @@ public void close()
};
}

/**
* Decrements the reference count by 1. If it reaches to 0, then closes {@link #closer}.
*/
@Override
public void close()
{
Expand Down
Loading